In this guide, we’ll walk you through how to take your machine learning models and deploy and maintain them in production using Dagster, reliably and efficiently.
We will work through building a machine learning pipeline, including using assets for different elements, how to automate model training, and monitoring your model's drift.
This guide assumes you have familiarity with machine learning concepts and several Dagster concepts, including asset definitions and jobs.
Benefits of building machine learning pipelines in Dagster#
Dagster makes iterating on machine learning models and testing easy, and it is designed to use during the development process.
Dagster has a lightweight execution model means you can access the benefits of an orchestrator, like re-executing from the middle of a pipeline and parallelizing steps while you're experimenting.
Dagster models data assets, not just tasks, so it understands the upstream and downstream data dependencies.
Dagster is a one-stop shop for both the data transformations and the models that depend on the data transformations.
If you are already using Dagster for your ETL pipelines, it is a natural progression to build out and test your models in Dagster.
For this guide, we will be using the Hacker News data demoed in the tutorial.
The machine learning model we will walk through takes the Hacker News stories and uses the titles to predict the number of comments that a story will generate. This will be a supervised model since we have the number of comments for all the previous stories.
The assets graph will look like this at the end of this guide (click to expand):
First, we will create an asset that retrieves the most recent Hacker News records.
import requests
from dagster import asset
import pandas as pd
@assetdefhackernews_stories():# Get the max ID number from hacker news
latest_item = requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json()# Get items based on story ids from the HackerNews items endpoint
results =[]
scope =range(latest_item -1000, latest_item)for item_id in scope:
item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
results.append(item)# Store the results in a dataframe and filter on stories with valid titles
df = pd.DataFrame(results)iflen(df)>0:
df = df[df.type=="story"]
df = df[~df.title.isna()]return df
Now that we have a dataframe with all valid stories, we want to transform that data into something our machine learning model will be able to use.
The first step is taking the dataframe and splitting it into a training and test set. In some of your models, you also might choose to have an additional split for a validation set. The reason we split the data is so that we can have a test and/or a validation dataset that is independent of the training set. We can then use that dataset to see how well our model did.
from sklearn.model_selection import train_test_split
from dagster import multi_asset, AssetOut
@multi_asset(outs={"training_data": AssetOut(),"test_data": AssetOut()})deftraining_test_data(hackernews_stories):
X = hackernews_stories.title
y = hackernews_stories.descendants
# Split the dataset to reserve 20% of records as the test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)return(X_train, y_train),(X_test, y_test)
Next, we will take both the training and test data subsets and tokenize the titles e.g. take the words and turn them into columns with the frequency of terms for each record to create features for the data. To do this, we will be using the training set to fit the tokenizer. In this case, we are using TfidfVectorizer and then transforming both the training and test set based on that tokenizer.
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
@multi_asset(
outs={"tfidf_vectorizer": AssetOut(),"transformed_training_data": AssetOut()})deftransformed_train_data(training_data):
X_train, y_train = training_data
# Initiate and fit the tokenizer on the training data and transform the training dataset
vectorizer = TfidfVectorizer()
transformed_X_train = vectorizer.fit_transform(X_train)
transformed_X_train = transformed_X_train.toarray()
y_train = y_train.fillna(0)
transformed_y_train = np.array(y_train)return vectorizer,(transformed_X_train, transformed_y_train)@assetdeftransformed_test_data(test_data, tfidf_vectorizer):
X_test, y_test = test_data
# Use the fitted tokenizer to transform the test dataset
transformed_X_test = tfidf_vectorizer.transform(X_test)
y_test = y_test.fillna(0)
transformed_y_test = np.array(y_test)return transformed_X_test, transformed_y_test
We also transformed the dataframes into NumPy arrays and removed nan values to prepare the data for training.
At this point, we have X_train, y_train, X_test, and y_test ready to go for our model. To train our model, we can use any number of models from libraries like sklearn, TensorFlow, and PyTorch.
In our example, we will train an XGBoost model to predict a numerical value.
import xgboost as xg
from sklearn.metrics import mean_absolute_error
@assetdefxgboost_comments_model(transformed_training_data):
transformed_X_train, transformed_y_train = transformed_training_data
# Train XGBoost model, which is a highly efficient and flexible model
xgb_r = xg.XGBRegressor(
objective="reg:squarederror", eval_metric=mean_absolute_error, n_estimators=20)
xgb_r.fit(transformed_X_train, transformed_y_train)return xgb_r
@assetdefcomments_model_test_set_r_squared(transformed_test_data, xgboost_comments_model):
transformed_X_test, transformed_y_test = transformed_test_data
# Use the test set data to get a score of the XGBoost model
score = xgboost_comments_model.score(transformed_X_test, transformed_y_test)return score
In our model assets, we evaluated each of the models on the test data and in this case, got the score derived from comparing the predicted to actual results. Next, to predict the results, we'll create another asset that runs inference on the model more frequently than the model is re-trained.
@assetdeflatest_story_comment_predictions(xgboost_comments_model, tfidf_vectorizer):# Get the max ID number from hacker news
latest_item = requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json()# Get items based on story ids from the HackerNews items endpoint
results =[]
scope =range(latest_item -100, latest_item)for item_id in scope:
item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
results.append(item)
df = pd.DataFrame(results)iflen(df)>0:
df = df[df.type=="story"]
df = df[~df.title.isna()]
inference_x = df.title
# Transform the new story titles using the existing vectorizer
inference_x = tfidf_vectorizer.transform(inference_x)return xgboost_comments_model.predict(inference_x)
Depending on what the objective of your ML model is, you can use this data to set alerts, save model performance history, and trigger retraining.