Ask AI

Dagster Pipes subprocess reference#

This reference shows usage of Dagster Pipes with other entities in the Dagster system. For a step-by-step walkthrough, refer to the Dagster Pipes tutorial.


Specifying environment variables and extras#

When launching the subprocess, you may want to make environment variables or additional parameters available in the external process. Extras are arbitrary, user-defined parameters made available on the context object in the external process.

In the external code, you can access extras via the PipesContext object:

import os

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    total_orders = len(orders_df)
    # get the Dagster Pipes context
    context = PipesContext.get()
    # get all extras provided by Dagster asset
    print(context.extras)
    # get the value of an extra
    print(context.get_extra("foo"))
    # get env var
    print(os.environ["MY_ENV_VAR_IN_SUBPROCESS"])


if __name__ == "__main__":
    # connect to Dagster Pipes
    with open_dagster_pipes():
        main()

Working with @asset_check#

Sometimes, you may not want to materialize an asset, but instead want to report a data quality check result. When your asset has data quality checks defined in @asset_check:

From the external code, you can report to Dagster that an asset check has been performed via PipesContext.report_asset_check. Note that asset_key in this case is required, and must match the asset key defined in @asset_check:

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    # get the Dagster Pipes context
    context = PipesContext.get()
    # send structured metadata back to Dagster
    context.report_asset_check(
        asset_key="my_asset",
        passed=orders_df[["item_id"]].notnull().all().bool(),
        check_name="no_empty_order_check",
    )


if __name__ == "__main__":
    # connect to Dagster Pipes
    with open_dagster_pipes():
        main()

Working with multi-assets#

Sometimes, you may invoke a single call to an API that results in multiple tables being updated, or you may have a single script that computes multiple assets. In these cases, you can use Dagster Pipes to report back on multiple assets at once.

Note: when working with multi-assets, `PipesContext.report_asset_materialization may only be called once per unique asset key. If called more than once, an error similar to the following will surface:

Calling {method} with asset key {asset_key} is undefined. Asset has already been materialized, so no additional data can be reported for it

Instead, you’ll need to set the asset_key parameter for each instance of PipesContext.report_asset_materialization:

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
    orders_df = pd.DataFrame(
        {"order_id": [1, 2, 3], "item_id": [432, 878, 102], "user_id": ["a", "b", "a"]}
    )
    total_orders = len(orders_df)
    total_users = orders_df["user_id"].nunique()

    # get the Dagster Pipes context
    context = PipesContext.get()
    # send structured metadata back to Dagster. asset_key is required when there are multiple assets
    context.report_asset_materialization(
        asset_key="orders", metadata={"total_orders": total_orders}
    )
    context.report_asset_materialization(
        asset_key="users", metadata={"total_users": total_users}
    )


if __name__ == "__main__":
    # connect to Dagster Pipes
    with open_dagster_pipes():
        main()

Passing custom data#

Sometimes, you may want to pass data back from the external process for use in the orchestration code for purposes other than reporting directly to Dagster such as use in creating an output. In this example we use custom messages to create an I/O managed output that is returned from the asset.

In the external code, we send messages using report_custom_message. The message can be any data that is JSON serializable.

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
    # get the Dagster Pipes context
    context = PipesContext.get()

    # compute the full orders data
    orders = pd.DataFrame(
        {
            "order_id": [1, 2, 3],
            "item_id": [321, 654, 987],
            "order_details": [..., ..., ...],  # imagine large data,
            # and more columns
        }
    )

    # send a smaller table to be I/O managed by Dagster and passed to downstream assets
    summary_table = pd.DataFrame(orders[["order_id", "item_id"]])
    context.report_custom_message(summary_table.to_dict())

    context.report_asset_materialization(metadata={"total_orders": len(orders)})


if __name__ == "__main__":
    # connect to Dagster Pipes
    with open_dagster_pipes():
        main()