Lakehouse with Pandas and Pyspark

Lakehouse is an experimental API built on top of Dagster's core abstractions that makes it easy to define computations in terms of the data assets that they produce. In the last example, we demonstrated how to use Lakehouse to transform data assets into a core Dagster pipeline. In this example, we'll be demonstrating how we can use Lakehouse to construct a Dagster pipeline with multiple compute options. Users unfamiliar with lakehouse should first view the first lakehouse tutorial here.

Different computable assets will often ingest data in different ways. Let's say we have a table of temperature samples collected in 5 minute increments, and we want to compute the high temperature for each day represented in the table. In addition, we want to compute the difference between the high temperature of consecutive days. This pipeline will ingest a csv file as a pandas dataframe, and outputted the computed high temperatures again as a csv file. Then, we want to ingest the computed high temperatures as a Spark Dataframe, and then output the differences back to csv files. First, we would need to utilize a storage mechanism that is digestible by Spark. In this case, that means representing tables as directories of csv files as opposed to a single csv file. From these directories of csv files, we must be able to compute both pandas dataframes and spark dataframes. How do we utilize both the pandas conversion and the spark conversion?

Luckily, Lakehouse provides a way to compose different types of storage. To see how, let's first define our storage for converting between a folder of csv files and pandas dataframe.

In the below code examples, note that PandasDF refers to the pandas dataframe class, import pandas.Dataframe as PandasDF, and SparkDF refers to the spark dataframe class, import spark.Dataframe as SparkDF.

Data Assets

We'll use Assets to define each of the tables.

assets.py
"""Asset definitions for the multi_type_lakehouse example."""
import pandas as pd
from lakehouse import Column, computed_table, source_table
from pandas import DataFrame as PandasDF
from pyarrow import date32, float64, string
from pyspark.sql import DataFrame as SparkDF
from pyspark.sql import Window
from pyspark.sql import functions as f

sfo_q2_weather_sample_table = source_table(
    path=("sfo_q2_weather_sample",),
    columns=[Column("tmpf", float64()), Column("valid_date", string())],
)


@computed_table(
    input_assets=[sfo_q2_weather_sample_table],
    columns=[Column("valid_date", date32()), Column("max_tmpf", float64())],
)
def daily_temperature_highs_table(sfo_q2_weather_sample: PandasDF) -> PandasDF:
    """Computes the temperature high for each day"""
    sfo_q2_weather_sample["valid_date"] = pd.to_datetime(sfo_q2_weather_sample["valid"])
    return sfo_q2_weather_sample.groupby("valid_date").max().rename(columns={"tmpf": "max_tmpf"})


@computed_table(
    input_assets=[daily_temperature_highs_table],
    columns=[Column("valid_date", date32()), Column("max_tmpf", float64())],
)
def daily_temperature_high_diffs_table(daily_temperature_highs: SparkDF) -> SparkDF:
    """Computes the difference between each day's high and the previous day's high"""
    window = Window.orderBy("valid_date")
    return daily_temperature_highs.select(
        "valid_date",
        (
            daily_temperature_highs["max_tmpf"]
            - f.lag(daily_temperature_highs["max_tmpf"]).over(window)
        ).alias("day_high_diff"),
    )

sfo_q2_weather_sample_table represents our base temperature table. Passing in "filesystem" for the storage_key argument indicates that this asset is stored locally. The path argument gives the path to the data asset itself.

daily_temperature_highs_table represents our computed high temperatures. We explicitly define the dependency on the original table by passing sfo_q2_weather_sample_table as the value for the input_deps argument.

We have an additional table daily_temperature_high_diffs_table that represents the difference between the high temperature of consecutive days. We use the input_assets parameter to make explicit the dependency on daily_temperature_highs_table.

Storage

The ingestion of csv files is a bit different this time, because our specification requires ingesting a folder of csv files as opposed to a single csv file.

lakehouse_def.py
class LocalFileSystem:
    def __init__(self, config):
        self._root = config["root"]

    def get_fs_path(self, path: Tuple[str, ...]) -> str:
        return os.path.join(self._root, *(path[:-1]), path[-1])


local_filesystem_config_schema = {"root": StringSource}


@resource(config_schema=local_filesystem_config_schema)
def pandas_df_local_filesystem_storage(init_context):
    local_fs = LocalFileSystem(init_context.resource_config)

    class Storage(AssetStorage):
        def save(self, obj: PandasDF, path: Tuple[str, ...], _resources) -> None:
            """This saves the dataframe as a CSV using the layout written and expected by Spark/Hadoop.

            E.g. if the given storage maps the asset's path to the filesystem path "/a/b/c", a directory
            will be created with two files inside it:

                /a/b/c/
                    part-00000.csv
             2       _SUCCESS
            """
            directory = local_fs.get_fs_path(path)
            os.makedirs(directory, exist_ok=True)
            open(os.path.join(directory, "_SUCCESS"), "wb").close()
            csv_path = os.path.join(directory, "part-00000.csv")
            obj.to_csv(csv_path)

        def load(self, _python_type, path: Tuple[str, ...], _resources):
            """This reads a dataframe from a CSV using the layout written and expected by Spark/Hadoop.

            E.g. if the given storage maps the asset's path to the filesystem path "/a/b/c", and that
            directory contains:

                /a/b/c/
                    part-00000.csv
                    part-00001.csv
                    _SUCCESS

            then the produced dataframe will contain the concatenated contents of the two CSV files.
            """
            fs_path = os.path.abspath(local_fs.get_fs_path(path))
            paths = glob.glob(os.path.join(fs_path, "*.csv"))
            return pd.concat(map(pd.read_csv, paths))

    return Storage()

The load method takes in all csv files within a given directory, rather than specifying a csv file or set of csv files explicitly. Analogously, the save method writes csv files to a directory in parts.

We'll do something similar for conversion between a csv and a spark datafarame:

lakehouse_def.py


@resource(config_schema=local_filesystem_config_schema)
def spark_df_local_filesystem_storage(init_context):
    local_fs = LocalFileSystem(init_context.resource_config)

    class Storage(AssetStorage):
        def save(self, obj: SparkDF, path: Tuple[str, ...], _resources):
            obj.write.format("csv").options(header="true").save(
                local_fs.get_fs_path(path), mode="overwrite"
            )

        def load(self, _python_type, path, resources):
            return (
                resources.pyspark.spark_session.read.format("csv")
                .options(header="true")
                .load(local_fs.get_fs_path(path))
            )

    return Storage()

To compose these two compute types, we utilize the multi_type_asset_storage function provided by lakehouse:

from lakehouse import multi_type_asset_storage

We can now define a composed AssetStorage:

lakehouse_def.py
local_file_system_storage = multi_type_asset_storage(
    local_filesystem_config_schema,
    {SparkDF: spark_df_local_filesystem_storage, PandasDF: pandas_df_local_filesystem_storage},
)

Finally, we construct our lakehouse:

lakehouse_def.py
def make_multi_type_lakehouse():
    dev_mode = ModeDefinition(
        resource_defs={
            "pyspark": pyspark_resource,
            "default_storage": local_file_system_storage.configured({"root": "."}),
        },
    )

    return Lakehouse(mode_defs=[dev_mode], in_memory_type_resource_keys={SparkDF: ["pyspark"]},)


multi_type_lakehouse = make_multi_type_lakehouse()

Pipeline

Once again, using the data assets and storage for handling conversion, we have completely defined our computation graph. To construct a pipeline from these assets:

pipelines.py
"""Pipeline definitions for the multi_type_lakehouse example.
"""
from multi_type_lakehouse.assets import (
    daily_temperature_high_diffs_table,
    daily_temperature_highs_table,
)
from multi_type_lakehouse.lakehouse_def import multi_type_lakehouse

computed_assets = [daily_temperature_highs_table, daily_temperature_high_diffs_table]
multi_type_lakehouse_pipeline = multi_type_lakehouse.build_pipeline_definition(
    "multi_type_lakehouse_pipeline", computed_assets,
)

Note that the assets don't have to be provided in order. Lakehouse is able to determine asset ordering by resolving input asset dependencies.

Open in a playground

Open in Gitpod

Download

curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/multi_type_lakehouse
cd multi_type_lakehouse
pip install -e .