Ask AI

Using Dagster Pipes, part 2: Modify external code & send information to Dagster#

This is part two of the Using Dagster Pipes tutorial.

At this point, you should have two files:

  • external_code.py which is a standalone Python script that you want to orchestrate with Dagster.
  • dagster_code.py which includes a Dagster asset and other Dagster definitions.

In this section, you'll learn how to modify the standalone Python script to work with Dagster Pipes in order to stream information back to Dagster. To do this, you'll:


Step 1: Make Dagster context available in external code#

Getting external code to send information back to Dagster via Dagster Pipes requires adding a few lines of code:

  • Imports from dagster-pipes

  • A call that connects to Dagster Pipes: open_dagster_pipes initializes the Dagster Pipes context that can be used to stream information back to Dagster. We recommend calling this function near the entry point of a pipes session.

    The with open_dagster_pipes(): is a context manager in Python, ensuring resource setup and cleanup for a specific segment of code. It's useful for tasks requiring initial setup and final teardown, like opening and closing connections. In this case, the context manager is used to initialize and close the Dagster Pipes connection.

  • An instance of the Dagster Pipes context via PipesContext.get. You can access information like partition_key and asset_key via this context object. Refer to the the API documentation for more information.

In our sample Python script, the changes would look like the following:

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    total_orders = len(orders_df)
    # get the Dagster Pipes context
    context = PipesContext.get()
    print(f"processing total {total_orders} orders")


if __name__ == "__main__":
    # connect to Dagster Pipes
    with open_dagster_pipes():
        main()

Step 2: Send log messages to Dagster#

Dagster Pipes context offers a built-in logging capability that enables you to stream log messages back to Dagster. Instead of printing to the standard output, you can use the context.log method on PipesContext to send log messages back to Dagster. In this case, we’re sending an info level log message:

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    total_orders = len(orders_df)
    # get the Dagster Pipes context
    context = PipesContext.get()
    context.log.info(f"processing total {total_orders} orders")


if __name__ == "__main__":
    # connect to Dagster Pipes
    with open_dagster_pipes():
        main()

Then, the log messages will show up in the Run details page of the Dagster UI. You can filter the log levels to only view info level messages:

  1. Click the Levels filter next to the log filter field. This will present a dropdown of all log levels.
  2. Select the info checkbox and deselect the others. This will show only the logs marked as info level.
Send log messages to Dagster

Step 3: Send structured metadata to Dagster#

Sometimes, you may want to log information from your external code as structured metadata shown in the Dagster UI. Dagster Pipes context also comes with the ability to log structured metadata back to Dagster.

Report asset materialization#

Similar to reporting materialization metadata within the Dagster process, you can also report asset materialization back to Dagster from the external process.

In this example, we’re passing a piece of metadata named total_orders to the metadata parameter of the PipesContext.report_asset_materialization. This payload will be sent from the external process back to Dagster:

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    total_orders = len(orders_df)
    # get the Dagster Pipes context
    context = PipesContext.get()
    # send structured metadata back to Dagster
    context.report_asset_materialization(metadata={"total_orders": total_orders})


if __name__ == "__main__":
    # connect to Dagster Pipes
    with open_dagster_pipes():
        main()

Then, total_orders will show up in the UI as structured metadata:

Report asset materialization to Dagster

This metadata will also be displayed on the Events tab of the Asset Details page in the UI:

View materialization events in asset details page

Report asset checks#

Dagster allows you to define and execute data quality checks on assets. Refer to the Asset Checks documentation for more information.

If your asset has data quality checks defined, you can report to Dagster that an asset check has been performed via PipesContext.report_asset_check:

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    total_orders = len(orders_df)
    # get the Dagster Pipes context
    context = PipesContext.get()
    # send structured metadata back to Dagster
    context.report_asset_materialization(metadata={"total_orders": total_orders})
    # report data quality check result back to Dagster
    context.report_asset_check(
        passed=orders_df[["item_id"]].notnull().all().bool(),
        check_name="no_empty_order_check",
    )


if __name__ == "__main__":
    # connect to Dagster Pipes
    with open_dagster_pipes():
        main()

When Dagster executes the code, you’ll see an asset check event with the check result in the UI:

Report asset checks to Dagster

This check result will also be displayed on the Checks tab of the Asset Details page in the UI:

View checks in asset details page

Finished code#

At this point, your two files should look like the following:

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    total_orders = len(orders_df)
    # get the Dagster Pipes context
    context = PipesContext.get()
    # send structured metadata back to Dagster
    context.report_asset_materialization(metadata={"total_orders": total_orders})
    # report data quality check result back to Dagster
    context.report_asset_check(
        passed=orders_df[["item_id"]].notnull().all().bool(),
        check_name="no_empty_order_check",
    )


if __name__ == "__main__":
    # connect to Dagster Pipes
    with open_dagster_pipes():
        main()

What's next?#

In this tutorial, you learned how to get access to Dagster Pipes context, report log messages events from the external process, and send structured events back to Dagster.

What's next? From here, you can: