This guide provides instructions for using Dagster with Airbyte Cloud using the dagster-airbyte library. Your Airbyte Cloud connection tables can be represented as assets in the Dagster asset graph, allowing you to track lineage and dependencies between Airbyte Cloud assets and data assets you are already modeling in Dagster. You can also use Dagster to orchestrate Airbyte Cloud connections, allowing you to trigger syncs for these on a cadence or based on upstream data changes.
To get started, you'll need to install the dagster and dagster-airbyte Python packages:
pip install dagster dagster-airbyte
Represent Airbyte Cloud assets in the asset graph#
To load Airbyte Cloud assets into the Dagster asset graph, you must first construct a AirbyteCloudWorkspace resource, which allows Dagster to communicate with your Airbyte Cloud workspace. You'll need to supply your workspace ID, client ID and client secret. See Configuring API Access in the Airbyte Cloud REST API documentation for more information on how to create your client ID and client secret.
Dagster can automatically load all connection tables from your Airbyte Cloud workspace as asset specs. Call the undefined.load_airbyte_cloud_asset_specs function, which returns list of AssetSpecs representing your Airbyte Cloud assets. You can then include these asset specs in your Definitions object:
You can use Dagster to sync Airbyte Cloud connections and materialize Airbyte Cloud connection tables. You can use the undefined.build_airbyte_assets_definitions factory to create all assets definitions for your Airbyte Cloud workspace.
Customize the materialization of Airbyte Cloud assets#
If you want to customize the sync of your connections, you can use the undefined.airbyte_assets decorator to do so. This allows you to execute custom code before and after the call to the Airbyte Cloud sync.
from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets
import dagster as dg
airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),)@airbyte_assets(
connection_id="airbyte_connection_id",
workspace=airbyte_workspace,
name="airbyte_connection_name",
group_name="airbyte_connection_name",)defairbyte_connection_assets(
context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace
):# Do something before the materialization...yieldfrom airbyte.sync_and_poll(context=context)# Do something after the materialization...
defs = dg.Definitions(
assets=[airbyte_connection_assets],
resources={"airbyte": airbyte_workspace},)
Customize asset definition metadata for Airbyte Cloud assets#
By default, Dagster will generate asset specs for each Airbyte Cloud asset and populate default metadata. You can further customize asset properties by passing an instance of the custom DagsterAirbyteTranslator to the undefined.load_airbyte_cloud_asset_specs function.
from dagster_airbyte import(
AirbyteCloudWorkspace,
AirbyteConnectionTableProps,
DagsterAirbyteTranslator,
load_airbyte_cloud_asset_specs,)import dagster as dg
airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),)# A translator class lets us customize properties of the built# Airbyte Cloud assets, such as the owners or asset keyclassMyCustomAirbyteTranslator(DagsterAirbyteTranslator):defget_asset_spec(self, props: AirbyteConnectionTableProps)-> dg.AssetSpec:# We create the default asset spec using super()
default_spec =super().get_asset_spec(props)# We customize the metadata and asset key prefix for all assetsreturn default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),).merge_attributes(metadata={"custom":"metadata"})
airbyte_cloud_specs = load_airbyte_cloud_asset_specs(
airbyte_workspace, dagster_airbyte_translator=MyCustomAirbyteTranslator())
defs = dg.Definitions(assets=airbyte_cloud_specs)
Note that super() is called in each of the overridden methods to generate the default asset spec. It is best practice to generate the default asset spec before customizing it.
Load Airbyte Cloud assets from multiple workspaces#
Definitions from multiple Airbyte Cloud workspaces can be combined by instantiating multiple AirbyteCloudWorkspace resources and merging their specs. This lets you view all your Airbyte Cloud assets in a single asset graph:
from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs
import dagster as dg
sales_airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_SALES_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_SALES_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_SALES_CLIENT_SECRET"),)
marketing_airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_MARKETING_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_MARKETING_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_MARKETING_CLIENT_SECRET"),)
sales_airbyte_cloud_specs = load_airbyte_cloud_asset_specs(
workspace=sales_airbyte_workspace
)
marketing_airbyte_cloud_specs = load_airbyte_cloud_asset_specs(
workspace=marketing_airbyte_workspace
)# Merge the specs into a single set of definitions
defs = dg.Definitions(
assets=[*sales_airbyte_cloud_specs,*marketing_airbyte_cloud_specs],)