Ask AI

OpenAI (dagster-openai)

The dagster_openai library provides utilities for using OpenAI with Dagster. A good place to start with dagster_openai is the guide.

dagster_openai.with_usage_metadata(context, output_name, func)[source]

experimental This API may break in future versions, even between dot releases.

This wrapper can be used on any endpoint of the openai library <https://github.com/openai/openai-python> to log the OpenAI API usage metadata in the asset metadata.

Examples

from dagster import (
    AssetExecutionContext,
    AssetKey,
    AssetSelection,
    AssetSpec,
    Definitions,
    EnvVar,
    MaterializeResult,
    asset,
    define_asset_job,
    multi_asset,
)
from dagster_openai import OpenAIResource, with_usage_metadata


@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
    with openai.get_client(context) as client:
        client.fine_tuning.jobs.create = with_usage_metadata(
            context=context, output_name="some_output_name", func=client.fine_tuning.jobs.create
        )
        client.fine_tuning.jobs.create(model="gpt-3.5-turbo", training_file="some_training_file")


openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")


@multi_asset(
    specs=[
        AssetSpec("my_asset1"),
        AssetSpec("my_asset2"),
    ]
)
def openai_multi_asset(context: AssetExecutionContext, openai: OpenAIResource):
    with openai.get_client(context, asset_key=AssetKey("my_asset1")) as client:
        client.chat.completions.create(
            model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test"}]
        )

    # The materialization of `my_asset1` will include both OpenAI usage metadata
    # and the metadata added when calling `MaterializeResult`.
    return (
        MaterializeResult(asset_key="my_asset1", metadata={"foo": "bar"}),
        MaterializeResult(asset_key="my_asset2", metadata={"baz": "qux"}),
    )


openai_multi_asset_job = define_asset_job(
    name="openai_multi_asset_job", selection=AssetSelection.assets(openai_multi_asset)
)


defs = Definitions(
    assets=[openai_asset, openai_multi_asset],
    jobs=[openai_asset_job, openai_multi_asset_job],
    resources={
        "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
    },
)
class dagster_openai.OpenAIResource(*, api_key)[source]

experimental This API may break in future versions, even between dot releases.

This resource is wrapper over the openai library.

By configuring this OpenAI resource, you can interact with OpenAI API and log its usage metadata in the asset metadata.

Examples

import os

from dagster import AssetExecutionContext, Definitions, EnvVar, asset, define_asset_job
from dagster_openai import OpenAIResource


@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
    with openai.get_client(context) as client:
        client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": "Say this is a test"}]
        )

openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")

defs = Definitions(
    assets=[openai_asset],
    jobs=[openai_asset_job],
    resources={
        "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
    },
)
get_client(context)[source]

Yields an openai.Client for interacting with the OpenAI API.

By default, in an asset context, the client comes with wrapped endpoints for three API resources, Completions, Embeddings and Chat, allowing to log the API usage metadata in the asset metadata.

Note that the endpoints are not and cannot be wrapped to automatically capture the API usage metadata in an op context.

Parameters:

context – The context object for computing the op or asset in which get_client is called.

Examples

from dagster import (
    AssetExecutionContext,
    Definitions,
    EnvVar,
    GraphDefinition,
    OpExecutionContext,
    asset,
    define_asset_job,
    op,
)
from dagster_openai import OpenAIResource


@op
def openai_op(context: OpExecutionContext, openai: OpenAIResource):
    with openai.get_client(context) as client:
        client.chat.completions.create(
            model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test"}]
        )


openai_op_job = GraphDefinition(name="openai_op_job", node_defs=[openai_op]).to_job()


@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
    with openai.get_client(context) as client:
        client.chat.completions.create(
            model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test"}]
        )


openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")

defs = Definitions(
    assets=[openai_asset],
    jobs=[openai_asset_job, openai_op_job],
    resources={
        "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
    },
)
get_client_for_asset(context, asset_key)[source]

Yields an openai.Client for interacting with the OpenAI.

When using this method, the OpenAI API usage metadata is automatically logged in the asset materializations associated with the provided asset_key.

By default, the client comes with wrapped endpoints for three API resources, Completions, Embeddings and Chat, allowing to log the API usage metadata in the asset metadata.

This method can only be called when working with assets, i.e. the provided context must be of type AssetExecutionContext.

Parameters:
  • context – The context object for computing the asset in which get_client is called.

  • asset_key – the asset_key of the asset for which a materialization should include the metadata.

Examples

from dagster import (
    AssetExecutionContext,
    AssetKey,
    AssetSpec,
    Definitions,
    EnvVar,
    MaterializeResult,
    asset,
    define_asset_job,
    multi_asset,
)
from dagster_openai import OpenAIResource


@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
    with openai.get_client_for_asset(context, context.asset_key) as client:
        client.chat.completions.create(
            model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test"}]
        )


openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")


@multi_asset(specs=[AssetSpec("my_asset1"), AssetSpec("my_asset2")], compute_kind="OpenAI")
def openai_multi_asset(context: AssetExecutionContext, openai_resource: OpenAIResource):
    with openai_resource.get_client_for_asset(context, asset_key=AssetKey("my_asset1")) as client:
        client.chat.completions.create(
            model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test"}]
        )
    return (
        MaterializeResult(asset_key="my_asset1", metadata={"some_key": "some_value1"}),
        MaterializeResult(asset_key="my_asset2", metadata={"some_key": "some_value2"}),
    )


openai_multi_asset_job = define_asset_job(
    name="openai_multi_asset_job", selection="openai_multi_asset"
)

defs = Definitions(
    assets=[openai_asset, openai_multi_asset],
    jobs=[openai_asset_job, openai_multi_asset_job],
    resources={
        "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
    },
)