Ask AI

OpenAI & Dagster (Experimental)#

This feature is considered experimental.

The dagster-openai library allows you to build OpenAI pipelines with Dagster and log OpenAI API usage metadata in Dagster Insights.

Using this library's OpenAIResource, you can easily interact with the OpenAI REST API via the OpenAI Python API.

When used with Dagster's Software-defined Assets, the resource automatically logs OpenAI usage metadata in asset metadata. See the Relevant APIs section for more information.


Getting started#

Before you get started with the dagster-openai library, we recommend familiarizing yourself with the OpenAI Python API library, which this integration uses to interact with the OpenAI REST API.


Prerequisites#

To get started, install the dagster and dagster-openai Python packages:

pip install dagster dagster-openai

Note that you will need an OpenAI API key to use the resource, which can be generated in your OpenAI account.


Connecting to OpenAI#

The first step in using OpenAI with Dagster is to tell Dagster how to connect to an OpenAI client using an OpenAI resource. This resource contains the credentials needed to interact with OpenAI API.

We will supply our credentials as environment variables by adding them to a .env file. For more information on setting environment variables in a production setting, see Using environment variables and secrets.

# .env

OPENAI_API_KEY=...

Then, we can instruct Dagster to authorize the OpenAI resource using the environment variables:

from dagster_openai import OpenAIResource

from dagster import EnvVar

# Pull API key from environment variables
openai = OpenAIResource(
    api_key=EnvVar("OPENAI_API_KEY"),
)

Using the OpenAI resource with assets#

The OpenAI resource can be used in assets in order to interact with the OpenAI API. Note that in this example, we supply our credentials as environment variables directly when instantiating the Definitions object.

from dagster_openai import OpenAIResource

from dagster import (
    AssetExecutionContext,
    Definitions,
    EnvVar,
    asset,
    define_asset_job,
)


@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
    with openai.get_client(context) as client:
        client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": "Say this is a test."}],
        )


openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")

defs = Definitions(
    assets=[openai_asset],
    jobs=[openai_asset_job],
    resources={
        "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
    },
)

After materializing your asset, your OpenAI API usage metadata will be available in the Events and Plots tabs of your asset in the Dagster UI. If you are using Dagster+, your usage metadata will also be available in Dagster Insights. Refer to the Viewing and materializing assets in the UI guide for more information.


Using the OpenAI resource with ops#

The OpenAI resource can also be used in ops. Note: Currently, the OpenAI resource doesn't (out-of-the-box) log OpenAI usage metadata when used in ops.

from dagster_openai import OpenAIResource

from dagster import (
    Definitions,
    EnvVar,
    GraphDefinition,
    OpExecutionContext,
    op,
)


@op
def openai_op(context: OpExecutionContext, openai: OpenAIResource):
    with openai.get_client(context) as client:
        client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": "Say this is a test"}],
        )


openai_op_job = GraphDefinition(name="openai_op_job", node_defs=[openai_op]).to_job()

defs = Definitions(
    jobs=[openai_op_job],
    resources={
        "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
    },
)

Relevant APIs#

NameDescription
OpenAIResourceThe OpenAI resource used for handing the client
with_usage_metadataThe function wrapper used on OpenAI API endpoint methods to log OpenAI usage metadata