Ask AI

Using Dagster Pipes, part 1: Define a Dagster asset that invokes a subprocess#

This is part one of the Using Dagster Pipes tutorial. If you are looking for how to modify your existing code that is already being orchestrated by Dagster, you can jump to Part 2: Modify external code.

In this part of the tutorial, you'll create a Dagster asset that, in its execution function, opens a Dagster pipes session and invokes a subprocess that executes some external code.


Step 1: Define the Dagster asset#

Before getting started, make sure you have fulfilled all the prerequisites for the tutorial. You should have a standalone Python script named external_code.py which looks like the following:

import pandas as pd


def main():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    total_orders = len(orders_df)
    print(f"processing total {total_orders} orders")


if __name__ == "__main__":
    main()

Step 1.1: Define the asset#

First, create a new file named dagster_code.py in the same directory as the external_code.py file you created earlier in the Prerequisites step.

Next, you’ll define the asset. Copy and paste the following into the file:

import shutil

from dagster import (
    AssetExecutionContext,
    MaterializeResult,
    PipesSubprocessClient,
    asset,
    file_relative_path,
)


@asset
def subprocess_asset(
    context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
) -> MaterializeResult:
    cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]

Here’s what we did in this example:

  • Created an asset named subprocess_asset
  • Provided AssetExecutionContext as the context argument to the asset. This object provides system information such as resources, config, and logging. We’ll come back to this a bit later in this section.
  • Specified a resource for the asset to use, PipesSubprocessClient. We’ll also come back to this in a little bit.
  • Declared a command list cmd to run the external script. In the list:
    • First, found the path to the Python executable on the system using shutil.which("python").
    • Then, provided the file path to the file that we want to execute. In this case, it’s the external_code.py file that you created earlier.

Step 1.2: Invoke the external code from the asset#

Then, invoke a subprocess that executes the external code from the asset using the pipes_subprocess_client resource:

import shutil

from dagster import (
    AssetExecutionContext,
    MaterializeResult,
    PipesSubprocessClient,
    asset,
    file_relative_path,
)


@asset
def subprocess_asset(
    context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
) -> MaterializeResult:
    cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
    return pipes_subprocess_client.run(
        command=cmd, context=context
    ).get_materialize_result()

Let’s take a look at what this code does:

  • The PipesSubprocessClient resource used by the asset exposes a run method.
  • When the asset is executed, this method will synchronously execute the subprocess in in a pipes session, and it will return a PipesClientCompletedInvocation object.
  • This object contains a get_materialize_result method, which you can use to access the MaterializeResult event reported by the subprocess. We'll talk about how to report events from the subprocess in the next section.
  • Lastly, return the result of the subprocess.

Step 2: Define a Definitions object#

To make the asset and subprocess resource loadable and accessible by Dagster's tools, such as the CLI, UI, and Dagster+, you’ll create a Definitions object that contains them.

Copy and paste the following to the bottom of dagster_code.py:

from dagster import Definitions

defs = Definitions(
    assets=[subprocess_asset],
    resources={"pipes_subprocess_client": PipesSubprocessClient()},
)

At this point, dagster_code.py should look like the following:

import shutil

from dagster import (
    AssetExecutionContext,
    Definitions,
    MaterializeResult,
    PipesSubprocessClient,
    asset,
    file_relative_path,
)


@asset
def subprocess_asset(
    context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
) -> MaterializeResult:
    cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
    return pipes_subprocess_client.run(
        command=cmd, context=context
    ).get_materialize_result()


defs = Definitions(
    assets=[subprocess_asset],
    resources={"pipes_subprocess_client": PipesSubprocessClient()},
)

Step 3: Run the subprocess from the Dagster UI#

In this step, you’ll execute the subprocess asset you created in earlier steps from the Dagster UI.

  1. In a new command line session, run the following to start the UI:

    dagster dev -f dagster_code.py
    
  2. Navigate to http://localhost:3000, where you should see the UI:

    Asset in the UI
  3. Click Materialize located in the top right to run your code:

    Materialize asset
  4. Navigate to the Run details page, where you should see the logs for the run:

    Logs in the run details page
  5. In external_code.py, we have a print statement that outputs to stdout. Dagster will display these in the UI's raw compute log view. To see the stdout log, toggle the log section to stdout:

    Raw compute logs in the run details page

What's next?#

At this point, you've created a Dagster asset that invokes an external Python script, launched the code in a subprocess, and viewed the result in Dagster UI. Next, you'll learn how to modify your external code to work with Dagster Pipes to send information back to Dagster.