Ask AI

dagstermill integration reference#

This reference provides a high-level look at working with Jupyter notebooks using the dagstermill integration library.

For a step-by-step implementation walkthrough, refer to the Using notebooks with Dagster tutorial.


Notebooks as assets#

To load a Jupyter notebook as a Dagster asset, use define_dagstermill_asset:

from dagstermill import define_dagstermill_asset

from dagster import file_relative_path

iris_kmeans_notebook = define_dagstermill_asset(
    name="iris_kmeans",
    notebook_path=file_relative_path(__file__, "../notebooks/iris-kmeans.ipynb"),
)

In this code block, we use define_dagstermill_asset to create a Dagster asset. We provide the name for the asset with the name parameter and the path to our .ipynb file with the notebook_path parameter. The resulting asset will execute our notebook and store the resulting .ipynb file in a persistent location.


Notebooks as ops#

Dagstermill also supports running Jupyter notebooks as ops. We can use define_dagstermill_op to turn a notebook into an op:

from dagstermill import ConfigurableLocalOutputNotebookIOManager, define_dagstermill_op

from dagster import file_relative_path, job

k_means_iris = define_dagstermill_op(
    name="k_means_iris",
    notebook_path=file_relative_path(__file__, "./notebooks/iris-kmeans.ipynb"),
    output_notebook_name="iris_kmeans_output",
)


@job(
    resource_defs={
        "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(),
    }
)
def iris_classify():
    k_means_iris()

In this code block, we use define_dagstermill_op to create an op that will execute the Jupyter notebook. We give the op the name k_means_iris, and provide the path to the notebook file. We also specify output_notebook_name=iris_kmeans_output. This means that the executed notebook will be returned in a buffered file object as one of the outputs of the op, and that output will have the name iris_kmeans_output. We then include the k_means_iris op in the iris_classify job and specify the ConfigurableLocalOutputNotebookIOManager as the output_notebook_io_manager to store the executed notebook file.


Notebook context#

If you look at one of the notebooks executed by Dagster, you'll notice that the injected-parameters cell in your output notebooks defines a variable called context. This context object mirrors the execution context object that's available in the body of any other asset or op's compute function.

As with the parameters that dagstermill injects, you can also construct a context object for interactive exploration and development by using the dagstermill.get_context API in the tagged parameters cell of your input notebook. When Dagster executes your notebook, this development context will be replaced with the injected runtime context.

You can use the development context to access asset and op config and resources, to log messages, and to yield results and other Dagster events just as you would in production. When the runtime context is injected by Dagster, none of your other code needs to change.

For instance, suppose we want to make the number of clusters (the k in k-means) configurable. We'll change our asset definition to include a config field:

from dagstermill import define_dagstermill_asset

from dagster import AssetIn, Field, Int, file_relative_path

iris_kmeans_jupyter_notebook = define_dagstermill_asset(
    name="iris_kmeans_jupyter",
    notebook_path=file_relative_path(__file__, "./notebooks/iris-kmeans.ipynb"),
    group_name="template_tutorial",
    ins={"iris": AssetIn("iris_dataset")},
    config_schema=Field(
        Int,
        default_value=3,
        is_required=False,
        description="The number of clusters to find",
    ),
)

You can also provide config_schema to define_dagstermill_op in the same way demonstrated in this code snippet.

In our notebook, we'll stub the context as follows (in the parameters cell):

import dagstermill

context = dagstermill.get_context(op_config=3)

Now we can use our config value in our estimator. In production, this will be replaced by the config value provided to the job:

estimator = sklearn.cluster.KMeans(n_clusters=context.op_config)

Results and custom materializations#

The functionality described in this section only works for notebooks run withdefine_dagstermill_op. If you'd like adding this feature todefine_dagstermill_asset to be prioritized, give this GitHub Issue a thumbs up.

If you are using define_dagstermill_op and you'd like to yield a result to be consumed downstream of a notebook, you can call yield_result with the value of the result and its name. In interactive execution, this is a no-op, so you don't need to change anything when moving from interactive exploration and development to production.

# my_notebook.ipynb
import dagstermill

dagstermill.yield_result(3, output_name="my_output")

And then:

from dagstermill import ConfigurableLocalOutputNotebookIOManager, define_dagstermill_op

from dagster import Out, file_relative_path, job, op

my_notebook_op = define_dagstermill_op(
    name="my_notebook",
    notebook_path=file_relative_path(__file__, "./notebooks/my_notebook.ipynb"),
    output_notebook_name="output_notebook",
    outs={"my_output": Out(int)},
)


@op
def add_two(x):
    return x + 2


@job(
    resource_defs={
        "output_notebook_io_manager": ConfigurableLocalOutputNotebookIOManager(),
    }
)
def my_job():
    three, _ = my_notebook_op()
    add_two(three)

Dagster events#

You can also yield Dagster events from your notebook using yield_event.

For example, if you'd like to yield a custom AssetMaterialization object (for instance, to tell the Dagster UI where you've saved a plot), you can do the following:

import dagstermill
from dagster import AssetMaterialization

dagstermill.yield_event(AssetMaterialization(asset_key="marketing_data_plotted"))