Tutorial, part six: Using Dagster to save and load your data#

You've written a data pipeline using assets that runs every hour. Currently, you are manually storing and loading your data using the json and pandas libraries at the beginning and end of each asset. Dagster provides I/O managers that help separate your business logic from I/O by handling reading and writing data to storage for you. I/O managers allow you to reduce boilerplate code and easily change where your data is stored. I/O managers can be a powerful tool in some situations, but a poor fit for others. By the end of this section you will understand how I/O managers work and how to choose if they are the right tool for you and your data pipelines.

By the end of this section, you will:

  • Store your files in a file system using an I/O manager
  • Store your data frames as tables in a database
  • Learn about I/O managers
  • Understand when to use I/O managers

What are I/O managers?#

I/O stands for input and output. I/O managers are Dagster objects that control how Dagster reads and writes data to specific external services, such as Snowflake or Amazon Web Services (AWS) S3.

They manage input by reading an asset from where it’s stored and loading it into memory to be used by a dependent asset. For example, most_frequent_words needs topstories. Therefore, you could use an I/O manager to load the topstories data into memory so that the most_frequent_words asset can use it.

I/O managers control output by writing the assets to the location configured. By the end of this section, you’ll configure an I/O manager to write the topstories DataFrame into a database as a table.


Step 1: Writing files to storage#

Now, you'll use an I/O manager to read and write your data. For this tutorial, you’ll be writing to a directory in your file system by using the FilesystemIOManager, which is bundled with the core dagster package.

In __init__.py, add the following snippet anywhere above the def = Definitions(...) line:

from dagster import (
    AssetSelection,
    Definitions,
    ScheduleDefinition,
    define_asset_job,
    load_assets_from_modules,
    FilesystemIOManager,  # Update the imports at the top of the file to also include this
)

hackernews_job = define_asset_job("hackernews_job", selection=AssetSelection.all())

hackernews_schedule = ScheduleDefinition(
    job=hackernews_job,
    cron_schedule="0 * * * *",  # every hour
)

io_manager = FilesystemIOManager(
    base_dir="data",  # Path is built relative to where `dagster dev` is run
)

Next, update your Definitions (shown below) to attach the I/O manager to your code location by defining it as a resource.

defs = Definitions(
    assets=all_assets,
    schedules=[hackernews_schedule],
    resources={
        "io_manager": io_manager,
    },
)
About resources
I/O managers are a special type of resource, a Dagster concept for connecting to external services. I/O managers are resources made specifically to control reading and writing to external services. Refer to the Resources documentation for more information.

Now, you will modify your three assets topstory_ids, topstories, and most_frequent_words to use the I/O manager to read and write data rather than direct calls to the json and pandas libraries. To do this, you will:

  • Return your data from each asset, rather than manually writing it to the file system. The I/O manager will now save the data.
  • Modify how you define dependencies between assets. To use the I/O manager to load the data associated with an upstream asset into memory, you use the upstream asset's name as the name of one of the arguments to the downstream asset's function.

Modify topstory_ids to use an I/O manager:

from typing import Dict, List  # add imports to the top of `assets.py`


@asset
def topstory_ids() -> List:  # modify return type signature
    newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
    top_new_story_ids = requests.get(newstories_url).json()[:100]

    return (
        top_new_story_ids  # return top_new_story_ids and the I/O manager will save it
    )

Modify topstories to use an I/O manager:

@asset  # remove deps parameter
def topstories(
    context: AssetExecutionContext,
    topstory_ids: List,  # add topstory_ids as a function argument
) -> pd.DataFrame:  # modify the return type signature
    logger = get_dagster_logger()

    # remove manually loading topstory_ids

    results = []
    for item_id in topstory_ids:
        item = requests.get(
            f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
        ).json()
        results.append(item)

        if len(results) % 20 == 0:
            logger.info(f"Got {len(results)} items so far.")

    df = pd.DataFrame(results)

    # remove manually saving df

    context.add_output_metadata(
        metadata={
            "num_records": len(df),
            "preview": MetadataValue.md(df.head().to_markdown()),
        }
    )

    return df  # return df and the I/O manager will save it

Modify most_frequent_words to use an I/O manager:

@asset  # remove deps parameter
def most_frequent_words(
    context: AssetExecutionContext,
    topstories: pd.DataFrame,  # add topstories as a function argument
) -> Dict:  # modify the return type signature
    stopwords = ["a", "the", "an", "of", "to", "in", "for", "and", "with", "on", "is"]

    # remove manually loading topstory_ids

    word_counts = {}
    for raw_title in topstories["title"]:
        title = raw_title.lower()
        for word in title.split():
            cleaned_word = word.strip(".,-!?:;()[]'\"-")
            if cleaned_word not in stopwords and len(cleaned_word) > 0:
                word_counts[cleaned_word] = word_counts.get(cleaned_word, 0) + 1

    top_words = {
        pair[0]: pair[1]
        for pair in sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:25]
    }

    plt.figure(figsize=(10, 6))
    plt.bar(list(top_words.keys()), list(top_words.values()))
    plt.xticks(rotation=45, ha="right")
    plt.title("Top 25 Words in Hacker News Titles")
    plt.tight_layout()

    buffer = BytesIO()
    plt.savefig(buffer, format="png")
    image_data = base64.b64encode(buffer.getvalue())

    md_content = f"![img](data:image/png;base64,{image_data.decode()})"

    # remove manually saving top_words

    context.add_output_metadata(metadata={"plot": MetadataValue.md(md_content)})

    return top_words  # return top_words and the I/O manager will save it

To see the effect of this change, reload your code location in the Dagster UI and materialize your assets. When you click on an asset, you’ll see the path metadata pointing to the data directory on your computer.

This is a short example of the value of I/O managers. With I/O managers, you can save your assets to places like AWS S3, Google Cloud Storage, or Azure Blob Storage. For example, to store your assets in AWS S3, all you need to do is replace FilesystemIOManager in __init__.py with Dagster-provided S3PickleIOManager.

In addition to file storage systems, I/O managers can connect to any service from which data can be read or written, such as databases.


Step 2: Writing to databases#

In data engineering, a common asset is tabular data. These are often seen as database tables or DataFrames in-memory. In Dagster, I/O managers define the relationship between how data is saved as a table in the database and what in-memory data format to represent the data as.

Dagster includes out-of-the-box I/O managers for common databases, such as DuckDB. When a DataFrame is returned in an asset’s definition, it is translated into a database-compatible format. The I/O manager then runs a SQL query to write the data into the database.

Let’s modify our pipeline to store our topstories DataFrame as a table in DuckDB. We’ll use Dagster’s out-of-the-box I/O manager for DuckDB, an easy-to-set-up database that runs on your computer without any setup on your end.

Setting up I/O managers#

Adding an I/O manager for a database is similar to connecting it to file storage. The biggest difference is you must specify what type of DataFrame to use when the data is loaded into Dagster’s memory. In this tutorial, you’ll use DuckDB to store the data and Pandas DataFrames when transforming with the data.

Update your __init__.py to reflect the changes below:

from dagster_duckdb_pandas import DuckDBPandasIOManager

# Add the imports to the top
# These imports let you define how Dagster communicates with DuckDB

# Insert this section anywhere above your `defs = Definitions(...)`
database_io_manager = DuckDBPandasIOManager(database="analytics.hackernews")

# Update your Definitions
defs = Definitions(
    assets=all_assets,
    schedules=[hackernews_schedule],
    resources={
        "io_manager": io_manager,
        "database_io_manager": database_io_manager,  # Define the I/O manager here
    },
)

There are three changes in this snippet:

  • DuckDBPandasIOManager is imported
  • A DuckDB+Pandas I/O manager is defined
  • The new I/O manager is added as a resource under the key database_io_manager

Choosing an I/O manager for each asset#

You now have two I/O managers configured within a code location. By default, all assets will use the I/O manager under the io_manager key. Because you updated the io_manager key, the FilesytemIOManager could plug and play without any additional changes. Overriding and priority are given to the I/O manager key that is most specific to the asset.

To have the topstories asset store its data in DuckDB, modify its asset function in assets.py by specifying the asset’s io_manager_key to be "database_io_manager", as shown below:

@asset(
    group_name="hackernews",
    io_manager_key="database_io_manager",  # Addition: `io_manager_key` specified
)
def topstories(context: AssetExecutionContext, topstory_ids: List) -> pd.DataFrame:
    logger = get_dagster_logger()

    results = []
    for item_id in topstory_ids:
        item = requests.get(
            f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
        ).json()
        results.append(item)

        if len(results) % 20 == 0:
            logger.info(f"Got {len(results)} items so far.")

    df = pd.DataFrame(results)

    context.add_output_metadata(
        metadata={
            "num_records": len(df),
            "preview": MetadataValue.md(df.head().to_markdown()),
        }
    )

    return df

To validate that this worked, materialize your entire asset graph. The I/O manager should store the DataFrame in a table called analytics.hackernews.topstories and continue to store the IDs and most frequent words in a directory called data.

Despite the changes in where the data is stored, the developer experience hasn’t changed. I/O managers deal with reading/writing data from storage for you so you can focus on the core logic and computation.


When should you use I/O managers?#

I/O managers are a powerful tool that can simplify your code and allow you to easily modify where your assets are stored. However, they are not required, nor are they the best option, in all scenarios.

In this tutorial, using an I/O manager made sense because the contents of each asset were used in the downstream assets. For example, you needed to load the topstory_ids asset in order to compute the topstories asset. Using an I/O manager allowed you to remove repetitive reading and writing code. The storage requirements were also relatively simple, and were standardized across the assets using the same storage location. However, you could have left your pipeline as it was at the end of section five, and had a fully functional data pipeline.

Some assets do not need an I/O manager. For example, you may write an asset that executes a Snowflake query that creates a new table based on the data of another table. This asset would depend on the existing table, but does not need to load that table in memory in order to execute the query:

@asset(deps=[orders])
def returns():
    conn = get_snowflake_connection()
    conn.execute(
        "CREATE TABLE returns AS SELECT * from orders WHERE status = 'RETURNED'"
    )

Other reasons you may not want to use I/O managers:

  • You want to run a SQL query that creates or updates a table in a database (the above example).
  • Your pipeline manages I/O on its own by using other libraries/tools that write to storage.
  • You have unique storage requirements for each asset.
  • Your assets won't fit in memory (for example, a database table with billions of rows).
  • You have an existing pipeline that manages its own I/O and want to run it with Dagster with minimal code changes.
  • You simply prefer to have the reading and writing code explicitly defined in each asset.

As a general rule, if you find yourself getting tripped up writing your pipeline to use I/O managers, or you find yourself making your pipeline more complicated in order to use I/O managers, you will likely have more success not using I/O managers. In these cases you should use deps to define dependencies.


Next steps#

This tutorial section shows some of the simplest possible uses of I/O managers. However, this is the tip of the iceberg of what I/O managers can do.

Once you’ve outgrown being able to load all your data into memory in one chunk, you should learn about partitions and how they are used to load only slices of your data into memory.

We also briefly mentioned resources. A property of resources is that they can be configured depending on the use case, such as using DuckDB as your data warehouse during development and Databricks during production. Next, you'll learn how to build more robustness, reusability, and flexibility when connecting to external services by using resources.