DagsterDocs

A Single-Op Job #

You can find the code for this example on Github

Ops and Jobs #

Dagster's core abstractions are ops and jobs.

Ops are individual units of computation that we wire together to form jobs.

In this section, we'll cover how to define a simple job with a single op, and then execute it.

The Cereal Dataset #

Our job will operate on a simple but scary CSV dataset, cereal.csv, which contains nutritional facts about 80 breakfast cereals.

Hello, Op! #

Let's write our first Dagster op and save it as hello_cereal.py.

A op is a unit of computation in a job. Typically, you'll define ops by annotating ordinary Python functions with the @op decorator.

Our first op does three things: downloads a csv of cereal data, reads it into a list of dictionaries which each represent a row in the CSV, and logs the number of rows it finds.

import requests
import csv
from dagster import job, op


@op
def hello_cereal(context):
    response = requests.get("https://docs.dagster.io/assets/cereal.csv")
    lines = response.text.split("\n")
    cereals = [row for row in csv.DictReader(lines)]
    context.log.info(f"Found {len(cereals)} cereals")

    return cereals

In this simple case, our op takes no arguments except for the context in which it executes (provided by the Dagster framework as the first argument to ops who specify it), and also returns no outputs. Don't worry, we'll soon encounter ops that are much more dynamic.

Hello, Job! #

To execute our op, we'll embed it in an equally simple job. A job is a set of ops arranged into a DAG of computation. You'll typically define jobs by annotating ordinary Python functions with the @job decorator.

@job
def hello_cereal_job():
    hello_cereal()

Here you'll see that we call hello_cereal(). This call doesn't actually execute the op. Within the bodies of functions decorated with @job, we use function calls to indicate the dependency structure of the op making up the job. Here, we indicate that the execution of hello_cereal doesn't depend on any other ops by calling it with no arguments.

Executing Our First Job #

Assuming you’ve saved this job as hello_cereal.py, you can execute it via any of three different mechanisms:

Dagit #

To visualize your job (which only has one node) in Dagit, from the directory in which you've saved the job file, just run:

dagit -f hello_cereal.py

You'll see output like

Loading repository... Serving on http://127.0.0.1:3000

You should be able to navigate to http://127.0.0.1:3000 in your web browser and view your job. It isn't very interesting yet, because it only has one node.

hello_cereal_figure_one.png

Click on the "Launchpad" tab and you'll see the view below.

hello_cereal_figure_two.png

The large upper left pane is empty here, but, in jobs with parameters, this is where you'll be able to edit job configuration on the fly.

Click the "Launch Execution" button on the bottom right to execute this plan directly from Dagit. A new window should open, and you'll see a much more structured view of the stream of Dagster events start to appear in the left-hand pane.

If you have pop-up blocking enabled, you may need to tell your browser to allow pop-ups from 127.0.0.1—or, just navigate to the "Runs" tab to see this, and every run of your job.

hello_cereal_figure_three.png

In this view, you can filter and search through the logs corresponding to your job run.

Dagster CLI #

From the directory in which you've saved the job file, just run:

dagster job execute -f hello_cereal.py

You'll see the full stream of events emitted by Dagster appear in the console, including our call to the logging machinery, which will look like:

2021-02-05 08:50:25 - dagster - INFO - system - ce5d4576-2569-44ff-a14a-51010eea5329 - hello_cereal - Found 77 cereals

Success!

Python API #

If you'd rather execute your jobs as a script, you can do that without using the Dagster CLI at all. Just add a few lines to hello_cereal.py

if __name__ == "__main__":
    result = hello_cereal_job.execute_in_process()

Now you can just run:

python hello_cereal.py