Define a Dagster asset that invokes subprocess
This is part one of the Using Dagster Pipes tutorial. If you are looking for how to modify your existing code that is already being orchestrated by Dagster, you can jump to part 2, Modify external code.
In this part of the tutorial, you'll create a Dagster asset that, in its execution function, opens a Dagster pipes session and invokes a subprocess that executes some external code.
Step 1: Define the Dagster asset
Before getting started, make sure you have fulfilled all the prerequisites for the tutorial. You should have a standalone Python script named external_code.py
which looks like the following:
import pandas as pd
def main():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
total_orders = len(orders_df)
print(f"processing total {total_orders} orders")
if __name__ == "__main__":
main()
Step 1.1: Define the asset
First, create a new file named dagster_code.py
in the same directory as the external_code.py
file you created earlier in the Prerequisites step.
Next, you’ll define the asset. Copy and paste the following into the file:
import shutil
from dagster import (
AssetExecutionContext,
MaterializeResult,
PipesSubprocessClient,
asset,
file_relative_path,
)
@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
) -> MaterializeResult:
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
Here’s what we did in this example:
- Created an asset named
subprocess_asset
- Provided
AssetExecutionContext
as thecontext
argument to the asset. This object provides system information such as resources, config, and logging. We’ll come back to this a bit later in this section. - Specified a resource for the asset to use,
PipesSubprocessClient
. We’ll also come back to this in a little bit. - Declared a command list
cmd
to run the external script. In the list:- First, found the path to the Python executable on the system using
shutil.which("python")
. - Then, provided the file path to the file that we want to execute. In this case, it’s the
external_code.py
file that you created earlier.
- First, found the path to the Python executable on the system using
Step 1.2: Invoke the external code from the asset
Then, invoke a subprocess that executes the external code from the asset using the pipes_subprocess_client
resource:
import shutil
from dagster import (
AssetExecutionContext,
MaterializeResult,
PipesSubprocessClient,
asset,
file_relative_path,
)
@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
) -> MaterializeResult:
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(
command=cmd, context=context
).get_materialize_result()
Let’s take a look at what this code does:
- The
PipesSubprocessClient
resource used by the asset exposes arun
method. - When the asset is executed, this method will synchronously execute the subprocess in in a pipes session, and it will return a
PipesClientCompletedInvocation
object. - This object contains a
get_materialize_result
method, which you can use to access theMaterializeResult
event reported by the subprocess. We'll talk about how to report events from the subprocess in the next section. - Lastly, return the result of the subprocess.
Step 2: Define a Definitions object
To make the asset and subprocess resource loadable and accessible by Dagster's tools, such as the CLI, UI, and Dagster+, you’ll create a Definitions
object that contains them.
Copy and paste the following to the bottom of dagster_code.py
:
from dagster import Definitions
defs = Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
At this point, dagster_code.py
should look like the following:
import shutil
from dagster import (
AssetExecutionContext,
Definitions,
MaterializeResult,
PipesSubprocessClient,
asset,
file_relative_path,
)
@asset
def subprocess_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
) -> MaterializeResult:
cmd = [shutil.which("python"), file_relative_path(__file__, "external_code.py")]
return pipes_subprocess_client.run(
command=cmd, context=context
).get_materialize_result()
defs = Definitions(
assets=[subprocess_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
Step 3: Run the subprocess from the Dagster UI
In this step, you’ll execute the subprocess asset you created in earlier steps from the Dagster UI.
-
In a new command line session, run the following to start the UI:
dagster dev -f dagster_code.py
-
Navigate to http://localhost:3000, where you should see the UI:
-
Click Materialize located in the top right to run your code:
-
Navigate to the Run details page, where you should see the logs for the run:
-
In
external_code.py
, we have aprint
statement that outputs tostdout
. Dagster will display these in the UI's raw compute log view. To see thestdout
log, toggle the log section to stdout:
What's next?
At this point, you've created a Dagster asset that invokes an external Python script, launched the code in a subprocess, and viewed the result in Dagster UI. Next, you'll learn how to modify your external code to work with Dagster Pipes to send information back to Dagster.