Python Operations
Python functions run batch processing on a chain to generate new chain values. A function takes fields from one or more rows of the data and outputs new fields. Functions run at scale on multiple workers and processes.
Any Python function works as an operation. The classes below are useful to implement a "stateful"
operation where a plain function is insufficient, such as when additional setup() or teardown()
steps need to happen before or after the processing function runs.
UDFBase
Bases: AbstractUDF
Base class for stateful user-defined functions.
Any class that inherits from it must have a process() method that takes input
params from one or more rows in the chain and produces the expected output.
Optionally, the class may include these methods:
- setup() to run code on each worker before process() is called.
- teardown() to run code on each worker after process() completes.
Example
import datachain as dc
import open_clip
class ImageEncoder(dc.Mapper):
def __init__(self, model_name: str, pretrained: str):
self.model_name = model_name
self.pretrained = pretrained
def setup(self):
self.model, _, self.preprocess = (
open_clip.create_model_and_transforms(
self.model_name, self.pretrained
)
)
def process(self, file) -> list[float]:
img = file.get_value()
img = self.preprocess(img).unsqueeze(0)
emb = self.model.encode_image(img)
return emb[0].tolist()
(
dc.read_storage(
"gs://datachain-demo/fashion-product-images/images", type="image"
)
.limit(5)
.map(
ImageEncoder("ViT-B-32", "laion2b_s34b_b79k"),
params=["file"],
output={"emb": list[float]},
)
.show()
)
Source code in datachain/lib/udf.py
verbose_name
property
Returns the name of the function or class that implements the UDF.
hash
Creates SHA hash of this UDF function. It takes into account function, inputs and outputs.
For function-based UDFs, hashes self._func. For class-based UDFs, hashes the process method.
When include_body=False, the function body is excluded (identity-only:
module + qualname + defaults). Lambdas always include their
bytecode since they share the name '
Source code in datachain/lib/udf.py
process
Processing function that needs to be defined by user
setup
Initialization process executed on each worker before processing begins. This is needed for tasks like pre-loading ML models prior to scoring.
teardown
Teardown process executed on each process/worker after processing ends. This is needed for tasks like closing connections to end-points.