Ask AI

Ops#

Ops are the core unit of computation in Dagster.

An individual op should perform relatively simple tasks, such as:

  • Deriving a dataset from other datasets
  • Executing a database query
  • Initiating a Spark job in a remote cluster
  • Querying an API and storing the result in a data warehouse
  • Sending an email or Slack message

The computational core of a software-defined asset is an op. Collections of ops can also be assembled to create a graph.

ops

Ops support a variety of useful features for data orchestration, such as:

  • Flexible execution strategies: Painlessly transition from development to production with ops, as they are sealed units of logic independent of execution strategy. Collections of ops - called graphs - can be bound via jobs to an appropriate executor for single-process execution or distribution across a cluster.

  • Pluggable external systems: If your data pipeline interfaces with external systems, you may want to use local substitutes during development over a cloud-based production system. Dagster provides resources as an abstraction layer for this purpose.

    Ops can be written against abstract resources (e.g. database), with resource definitions later bound at the job level. Op logic can thus remain uncoupled to any particular implementation of an external system.

  • Input and output management: Ops have defined inputs and outputs, analogous to the arguments and return value(s) of a Python function. An input or output can be annotated with a Dagster type for arbitrarily complex runtime validation. Outputs can additionally be tagged with an IO Manager to manage storage of the associated data in between ops. This enables easy swapping of I/O strategy depending on the execution environment, as well as efficient caching of data intermediates.

  • Configuration: Operations in a data pipeline are often parameterized by both upstream data (e.g. a stream of database records) and configuration parameters independent of upstream data (e.g. a "chunk size" of incoming records to operate on). Define configuration parameters by providing an associated config schema to the op.

  • Event streams: Ops emit a stream of events during execution. Certain events are emitted by default - such as indicating the start of an op's execution - but op authors are additionally given access to an event API.

    This can be used to report data asset creation or modification (AssetMaterialization), the result of a data quality check (ExpectationResult), or other arbitrary information. Event streams can be visualized in the Dagster UI. This rich log of execution facilitates debugging, inspection, and real-time monitoring of running jobs.

  • Testability: The properties that enable flexible execution of ops also facilitate versatile testing. Ops can be tested in isolation or as part of a pipeline. Further, the resource API allows external systems (e.g. databases) to be stubbed or substituted as needed.


Relevant APIs#

NameDescription
@opA decorator used to define ops. Returns an OpDefinition. The decorated function is called the "compute function".
InAn input to an op. Defined on the ins argument to the @op decorator.
OutAn output of an op. Defined on the out argument to the @op decorator.
OpExecutionContextAn object exposing Dagster system APIs for resource access, logging, and more. Can be injected into an op by specifying context as the first argument of the compute function.
OpDefinitionClass for ops. You will rarely want to instantiate this class directly. Instead, you should use the @op.

Defining an op#

To define an op, use the @op decorator. The decorated function is called the compute_fn.

@op
def my_op():
    return "hello"

Inputs and Outputs#

Each op has a set of inputs and outputs, which define the data it consumes and produces. Inputs and outputs are used to define dependencies between ops and to pass data between ops.

Both definitions have a few important properties:

  • They are named.
  • They are optionally typed. These types are validated at runtime.
  • (Advanced) They can be linked to an IOManager, which defines how the output or input is stored and loaded. See the IO manager concept page for more info.

Inputs#

Inputs are passed as arguments to an op's compute_fn. The value of an input can be passed from the output of another op, or stubbed (hardcoded) using config.

The most common way to define inputs is just to add arguments to the decorated function:

@op
def my_input_op(abc, xyz):
    pass

An op only starts to execute once all of its inputs have been resolved. Inputs can be resolved in two ways:

  • The upstream output that the input depends on has been successfully emitted and stored.
  • The input was stubbed through config.

You can use a Dagster Type to provide a function that validates an op's input every time the op runs. In this case, you use a dictionary of Ins corresponding to the decorated function arguments.

MyDagsterType = DagsterType(
    type_check_fn=lambda _, value: value % 2 == 0, name="MyDagsterType"
)


@op(ins={"abc": In(dagster_type=MyDagsterType)})
def my_typed_input_op(abc):
    pass

Outputs#

Outputs are yielded from an op's compute_fn. By default, all ops have a single output called "result".

When you have one output, you can return the output value directly.

@op
def my_output_op():
    return 5

To define multiple outputs, or to use a different output name than "result", you can provide a dictionary of Outs to the @op decorator.

@op(out={"first_output": Out(), "second_output": Out()})
def my_multi_output_op():
    return 5, 6

Return type annotations can be used directly on ops. For a single output, the return annotation will be used directly for type checking.

from dagster import op


@op
def return_annotation_op() -> int:
    return 5

If there are multiple outputs, a tuple annotation can be specified. Each inner type of the tuple annotation should correspond to an output in the op.

from dagster import op
from typing import Tuple


@op(out={"int_output": Out(), "str_output": Out()})
def my_multiple_output_annotation_op() -> Tuple[int, str]:
    return (5, "foo")

Outputs are expected to follow the order they are specified in the op's out dictionary. In the above example, the int output corresponds to int_output, and the str output corresponds to str_output.

Note that if you would like to specify a single tuple output and still utilize type annotations, this can be done by providing either a single Out to the op, or none.

from dagster import op
from typing import Tuple


@op
def my_single_tuple_output_op() -> Tuple[int, str]:
    return (5, "foo")  # Will be viewed as one output

Like inputs, outputs can also have Dagster Types.

While many use cases can be served using built-in python annotations, Output and DynamicOutput objects unlock additional functionality. Check out the docs on Op Outputs to learn more.

Op Configuration#

Ops in Dagster can specify a config schema which makes them configurable and parameterizable at execution time. The configuration system is explained in detail in the Config schema documentation.

Op functions can specify an annotated config parameter for the op's configuration. The config class, which subclasses Config (which wraps pydantic.BaseModel) specifies the configuration schema for the op. Op configuration can be used to specify op behavior at runtime, making ops more flexible and reusable.

For example, we can define an op where the API endpoint it queries is defined through its configuration:

from dagster import Config


class MyOpConfig(Config):
    api_endpoint: str


@op
def my_configurable_op(config: MyOpConfig):
    data = requests.get(f"{config.api_endpoint}/data").json()
    return data

Op Context#

When writing an op, users can optionally provide a first parameter, context. When this parameter is supplied, Dagster will supply a context object to the body of the op. The context provides access to system information like loggers and the current run id. See OpExecutionContext for the full list of properties accessible from the op context.

For example, to access the logger and log a info message:

@op
def context_op(context: OpExecutionContext):
    context.log.info(f"My run ID is {context.run_id}")

Using an op#

Ops are used within a job or graph. You can also execute a single op, usually within a test context, by directly invoking it. More information can be found at Testing ops.

Patterns#

Op Factory#

You may find the need to create utilities that help generate ops. In most cases, you should parameterize op behavior by adding op configuration. You should reach for this pattern if you find yourself needing to vary the arguments to the @op decorator or OpDefinition themselves, since they cannot be modified based on op configuration.

To create an op factory, you define a function that returns an OpDefinition, either directly or by decorating a function with the op decorator.

def my_op_factory(
    name="default_name",
    ins=None,
    **kwargs,
):
    """Args:
        name (str): The name of the new op.
        ins (Dict[str, In]): Any Ins for the new op. Default: None.

    Returns:
        function: The new op.
    """

    @op(name=name, ins=ins or {"start": In(Nothing)}, **kwargs)
    def my_inner_op(**kwargs):
        # Op logic here
        pass

    return my_inner_op