An individual op should perform relatively simple tasks, such as:
Deriving a dataset from other datasets
Executing a database query
Initiating a Spark job in a remote cluster
Querying an API and storing the result in a data warehouse
Sending an email or Slack message
The computational core of an asset definition is an op. Collections of ops can also be assembled to create a graph.
Ops support a variety of useful features for data orchestration, such as:
Flexible execution strategies: Painlessly transition from development to production with ops, as they are sealed units of logic independent of execution strategy. Collections of ops - called graphs - can be bound via jobs to an appropriate executor for single-process execution or distribution across a cluster.
Pluggable external systems: If your data pipeline interfaces with external systems, you may want to use local substitutes during development over a cloud-based production system. Dagster provides resources as an abstraction layer for this purpose.
Ops can be written against abstract resources (e.g. database), with resource definitions later bound at the job level. Op logic can thus remain uncoupled to any particular implementation of an external system.
Input and output management: Ops have defined inputs and outputs, analogous to the arguments and return value(s) of a Python function. An input or output can be annotated with a Dagster type for arbitrarily complex runtime validation. Outputs can additionally be tagged with an IO Manager to manage storage of the associated data in between ops. This enables easy swapping of I/O strategy depending on the execution environment, as well as efficient caching of data intermediates.
Configuration: Operations in a data pipeline are often parameterized by both upstream data (e.g. a stream of database records) and configuration parameters independent of upstream data (e.g. a "chunk size" of incoming records to operate on). Define configuration parameters by providing an associated config schema to the op.
Event streams: Ops emit a stream of events during execution. Certain events are emitted by default - such as indicating the start of an op's execution - but op authors are additionally given access to an event API.
This can be used to report data asset creation or modification (AssetMaterialization), the result of a data quality check (ExpectationResult), or other arbitrary information. Event streams can be visualized in the Dagster UI. This rich log of execution facilitates debugging, inspection, and real-time monitoring of running jobs.
Testability: The properties that enable flexible execution of ops also facilitate versatile testing. Ops can be tested in isolation or as part of a pipeline. Further, the resource API allows external systems (e.g. databases) to be stubbed or substituted as needed.
An object exposing Dagster system APIs for resource access, logging, and more. Can be injected into an op by specifying context as the first argument of the compute function.
Each op has a set of inputs and outputs, which define the data it consumes and produces. Inputs and outputs are used to define dependencies between ops and to pass data between ops.
Both definitions have a few important properties:
They are named.
They are optionally typed. These types are validated at runtime.
(Advanced) They can be linked to an IOManager, which defines how the output or input is stored and loaded. See the IO manager concept page for more info.
Inputs are passed as arguments to an op's compute_fn. The value of an input can be passed from the output of another op, or stubbed (hardcoded) using config.
The most common way to define inputs is just to add arguments to the decorated function:
@opdefmy_input_op(abc, xyz):pass
An op only starts to execute once all of its inputs have been resolved. Inputs can be resolved in two ways:
The upstream output that the input depends on has been successfully emitted and stored.
The input was stubbed through config.
You can use a Dagster Type to provide a function that validates an op's input every time the op runs. In this case, you use a dictionary of Ins corresponding to the decorated function arguments.
MyDagsterType = DagsterType(
type_check_fn=lambda _, value: value %2==0, name="MyDagsterType")@op(ins={"abc": In(dagster_type=MyDagsterType)})defmy_typed_input_op(abc):pass
Return type annotations can be used directly on ops. For a single output, the return annotation will be used directly for type checking.
from dagster import op
@opdefreturn_annotation_op()->int:return5
If there are multiple outputs, a tuple annotation can be specified. Each inner type of the tuple annotation should correspond to an output in the op.
from dagster import op
from typing import Tuple
@op(out={"int_output": Out(),"str_output": Out()})defmy_multiple_output_annotation_op()-> Tuple[int,str]:return(5,"foo")
Outputs are expected to follow the order they are specified in the op's out dictionary. In the above example, the int output corresponds to int_output, and the str output corresponds to str_output.
Note that if you would like to specify a single tuple output and still utilize type annotations, this can be done by providing either a single Out to the op, or none.
from dagster import op
from typing import Tuple
@opdefmy_single_tuple_output_op()-> Tuple[int,str]:return(5,"foo")# Will be viewed as one output
While many use cases can be served using built-in python annotations, Output and DynamicOutput objects unlock additional functionality. Check out the docs on Op Outputs to learn more.
Ops in Dagster can specify a config schema which makes them configurable and parameterizable at execution time. The configuration system is explained in detail in the Config schema documentation.
Op functions can specify an annotated config parameter for the op's configuration. The config class, which subclasses Config (which wraps pydantic.BaseModel) specifies the configuration schema for the op. Op configuration can be used to specify op behavior at runtime, making ops more flexible and reusable.
For example, we can define an op where the API endpoint it queries is defined through its configuration:
from dagster import Config
classMyOpConfig(Config):
api_endpoint:str@opdefmy_configurable_op(config: MyOpConfig):
data = requests.get(f"{config.api_endpoint}/data").json()return data
When writing an op, users can optionally provide a first parameter, context. When this parameter is supplied, Dagster will supply a context object to the body of the op. The context provides access to system information like loggers and the current run id. See OpExecutionContext for the full list of properties accessible from the op context.
For example, to access the logger and log a info message:
@opdefcontext_op(context: OpExecutionContext):
context.log.info(f"My run ID is {context.run_id}")
Ops are used within a job or graph. You can also execute a single op, usually within a test context, by directly invoking it. More information can be found at Testing ops.
You may find the need to create utilities that help generate ops. In most cases, you should parameterize op behavior by adding op configuration. You should reach for this pattern if you find yourself needing to vary the arguments to the @op decorator or OpDefinition themselves, since they cannot be modified based on op configuration.
To create an op factory, you define a function that returns an OpDefinition, either directly or by decorating a function with the op decorator.
defmy_op_factory(
name="default_name",
ins=None,**kwargs,):"""Args:
name (str): The name of the new op.
ins (Dict[str, In]): Any Ins for the new op. Default: None.
Returns:
function: The new op.
"""@op(name=name, ins=ins or{"start": In(Nothing)},**kwargs)defmy_inner_op(**kwargs):# Op logic herepassreturn my_inner_op