Ask AI

Building machine learning pipelines with Dagster#

In this guide, we’ll walk you through how to take your machine learning models and deploy and maintain them in production using Dagster, reliably and efficiently.

We will work through building a machine learning pipeline, including using assets for different elements, how to automate model training, and monitoring your model's drift.


Before you begin#

This guide assumes you have familiarity with machine learning concepts and several Dagster concepts, including asset definitions and jobs.


Benefits of building machine learning pipelines in Dagster#

  • Dagster makes iterating on machine learning models and testing easy, and it is designed to use during the development process.
  • Dagster has a lightweight execution model means you can access the benefits of an orchestrator, like re-executing from the middle of a pipeline and parallelizing steps while you're experimenting.
  • Dagster models data assets, not just tasks, so it understands the upstream and downstream data dependencies.
  • Dagster is a one-stop shop for both the data transformations and the models that depend on the data transformations.

Machine learning development#

If you are already using Dagster for your ETL pipelines, it is a natural progression to build out and test your models in Dagster.

For this guide, we will be using the Hacker News data demoed in the tutorial.

The machine learning model we will walk through takes the Hacker News stories and uses the titles to predict the number of comments that a story will generate. This will be a supervised model since we have the number of comments for all the previous stories.

The assets graph will look like this at the end of this guide (click to expand):

alt

Ingesting data#

First, we will create an asset that retrieves the most recent Hacker News records.

import requests
from dagster import asset
import pandas as pd


@asset
def hackernews_stories():
    # Get the max ID number from hacker news
    latest_item = requests.get(
        "https://hacker-news.firebaseio.com/v0/maxitem.json"
    ).json()
    # Get items based on story ids from the HackerNews items endpoint
    results = []
    scope = range(latest_item - 1000, latest_item)
    for item_id in scope:
        item = requests.get(
            f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
        ).json()
        results.append(item)
    # Store the results in a dataframe and filter on stories with valid titles
    df = pd.DataFrame(results)
    if len(df) > 0:
        df = df[df.type == "story"]
        df = df[~df.title.isna()]

    return df

Transforming data#

Now that we have a dataframe with all valid stories, we want to transform that data into something our machine learning model will be able to use.

The first step is taking the dataframe and splitting it into a training and test set. In some of your models, you also might choose to have an additional split for a validation set. The reason we split the data is so that we can have a test and/or a validation dataset that is independent of the training set. We can then use that dataset to see how well our model did.

from sklearn.model_selection import train_test_split
from dagster import multi_asset, AssetOut


@multi_asset(outs={"training_data": AssetOut(), "test_data": AssetOut()})
def training_test_data(hackernews_stories):
    X = hackernews_stories.title
    y = hackernews_stories.descendants
    # Split the dataset to reserve 20% of records as the test set
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
    return (X_train, y_train), (X_test, y_test)

Next, we will take both the training and test data subsets and tokenize the titles e.g. take the words and turn them into columns with the frequency of terms for each record to create features for the data. To do this, we will be using the training set to fit the tokenizer. In this case, we are using TfidfVectorizer and then transforming both the training and test set based on that tokenizer.

from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np


@multi_asset(
    outs={"tfidf_vectorizer": AssetOut(), "transformed_training_data": AssetOut()}
)
def transformed_train_data(training_data):
    X_train, y_train = training_data
    # Initiate and fit the tokenizer on the training data and transform the training dataset
    vectorizer = TfidfVectorizer()
    transformed_X_train = vectorizer.fit_transform(X_train)
    transformed_X_train = transformed_X_train.toarray()
    y_train = y_train.fillna(0)
    transformed_y_train = np.array(y_train)
    return vectorizer, (transformed_X_train, transformed_y_train)


@asset
def transformed_test_data(test_data, tfidf_vectorizer):
    X_test, y_test = test_data
    # Use the fitted tokenizer to transform the test dataset
    transformed_X_test = tfidf_vectorizer.transform(X_test)
    y_test = y_test.fillna(0)
    transformed_y_test = np.array(y_test)
    return transformed_X_test, transformed_y_test

We also transformed the dataframes into NumPy arrays and removed nan values to prepare the data for training.

Training the model#

At this point, we have X_train, y_train, X_test, and y_test ready to go for our model. To train our model, we can use any number of models from libraries like sklearn, TensorFlow, and PyTorch.

In our example, we will train an XGBoost model to predict a numerical value.

import xgboost as xg
from sklearn.metrics import mean_absolute_error


@asset
def xgboost_comments_model(transformed_training_data):
    transformed_X_train, transformed_y_train = transformed_training_data
    # Train XGBoost model, which is a highly efficient and flexible model
    xgb_r = xg.XGBRegressor(
        objective="reg:squarederror", eval_metric=mean_absolute_error, n_estimators=20
    )
    xgb_r.fit(transformed_X_train, transformed_y_train)
    return xgb_r


@asset
def comments_model_test_set_r_squared(transformed_test_data, xgboost_comments_model):
    transformed_X_test, transformed_y_test = transformed_test_data
    # Use the test set data to get a score of the XGBoost model
    score = xgboost_comments_model.score(transformed_X_test, transformed_y_test)
    return score

Evaluating our results#

In our model assets, we evaluated each of the models on the test data and in this case, got the score derived from comparing the predicted to actual results. Next, to predict the results, we'll create another asset that runs inference on the model more frequently than the model is re-trained.

@asset
def latest_story_comment_predictions(xgboost_comments_model, tfidf_vectorizer):
    # Get the max ID number from hacker news
    latest_item = requests.get(
        "https://hacker-news.firebaseio.com/v0/maxitem.json"
    ).json()
    # Get items based on story ids from the HackerNews items endpoint
    results = []
    scope = range(latest_item - 100, latest_item)
    for item_id in scope:
        item = requests.get(
            f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
        ).json()
        results.append(item)

    df = pd.DataFrame(results)
    if len(df) > 0:
        df = df[df.type == "story"]
        df = df[~df.title.isna()]
    inference_x = df.title
    # Transform the new story titles using the existing vectorizer
    inference_x = tfidf_vectorizer.transform(inference_x)
    return xgboost_comments_model.predict(inference_x)

Depending on what the objective of your ML model is, you can use this data to set alerts, save model performance history, and trigger retraining.


Where to go from here#

  • Managing machine learning models with Dagster - This guide reviews ways to manage and maintain your machine learning (ML) models in Dagster
  • Dagster is flexible so there could be many 'right' ways to implement your machine learning pipeline and different avenues to explore
  • Dagster intergrates with MLflow that can be used to keep track of your models
  • Dagster integrates with Weights & Biases and an example which demonstrates how to use W\&B's artifacts with Dagster.