An Airbyte connection defines a series of data streams which are synced between a source and a destination. During a sync, a replica of the data from each data stream is written to the destination, typically as one or more tables. Dagster represents each of the replicas generated in the destination as an asset. This enables you to easily:
Visualize the streams involved in an Airbyte connection and execute a sync from Dagster
Define downstream computations which depend on replicas produced by Airbyte
Track historical metadata and logs for each data stream
Track data lineage through Airbyte and other tools
The first step in using Airbyte with Dagster is to tell Dagster how to connect to your Airbyte instance using an Airbyte resource. This resource contains information on where the Airbyte instance is located and any credentials needed to access it.
from dagster import EnvVar
from dagster_airbyte import AirbyteResource
airbyte_instance = AirbyteResource(
host="localhost",
port="8000",# If using basic auth, include username and password:
username="airbyte",
password=EnvVar("AIRBYTE_PASSWORD"),)
If you're running Airbyte locally using docker-compose, the host and port parameters should be set to localhost and 8000, respectively. The default basic auth credentials are a username airbyte and password password.
If you're hosting Airbyte externally, you'll need to provide a hostname where the Airbyte webapp and API are accssible, typically on port 80. For more information on the configuration options available for the Airbyte resource, see the API reference.
Step 2: Loading Airbyte asset definitions into Dagster#
The easiest way to get started using Airbyte with Dagster is to have Dagster automatically generate asset definitions from your Airbyte project. Dagster can load asset definitions from an Airbyte instance via API at initialization time.
Loading Airbyte asset definitions from an Airbyte instance#
To load Airbyte assets into Dagster from a live Airbyte instance, you will need to supply the Airbyte resource that we defined above in step 1. Here, the Airbyte instance is treated as the source of truth.
from dagster_airbyte import load_assets_from_airbyte_instance
# Use the airbyte_instance resource we defined in Step 1
airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)
The load_assets_from_airbyte_instance function retrieves all of the connections you have defined in the Airbyte interface, creating asset definitions for each data stream. Each connection has an associated op which triggers a sync of that connection.
Loading Airbyte asset definitions from YAML config
To load Airbyte assets into Dagster from a set of YAML configuration files, specify the Octavia project directory, which contains the sources, destinations, and connections subfolders. This is the directory where you first ran octavia init. Here, the YAML files are treated as the source of truth for building Dagster assets.
from dagster_airbyte import load_assets_from_airbyte_project
airbyte_assets = load_assets_from_airbyte_project(
project_dir="path/to/airbyte/project",)
The load_assets_from_airbyte_project function parses the YAML metadata, generating a set of asset definitions which reflect each of the data streams synced by your connections. Each connection has an associated op which triggers a sync of that connection.
Assets loaded from Airbyte require an AirbyteResource, which defines how to connect and interact with your Airbyte instance.
We can add the Airbyte resource we configured above to our Airbyte assets by doing the following:
from dagster_airbyte import load_assets_from_airbyte_project
from dagster import with_resources
# Use the airbyte_instance resource we defined in Step 1
airbyte_assets = with_resources([load_assets_from_airbyte_project(project_dir="path/to/airbyte/project")],{"airbyte": airbyte_instance},)
Instead of having Dagster automatically create the asset defintions for your Airbyte instance, you can opt to individually build them. First, determine the connection IDs for each of the connections you would like to build assets for. The connection ID can be seen in the URL of the connection page when viewing the Airbyte UI.
Then, supply the connection ID and the list of tables which the connection creates in the destination to build_airbyte_assets:
from dagster_airbyte import build_airbyte_assets
airbyte_assets = build_airbyte_assets(
connection_id="87b7fe85-a22c-420e-8d74-b30e7ede77df",
destination_tables=["releases","tags","teams","stargazers"],)
Manually built Airbyte assets require an AirbyteResource, which defines how to connect and interact with your Airbyte instance.
We can add the Airbyte resource we configured above to our Airbyte assets by doing the following:
from dagster_airbyte import build_airbyte_assets, AirbyteResource
from dagster import with_resources
airbyte_instance = AirbyteResource(
host="localhost",
port="8000",)
airbyte_assets = with_resources(
build_airbyte_assets(
connection_id="87b7fe85-a22c-420e-8d74-b30e7ede77df",
destination_tables=["releases","tags","teams","stargazers"],),# Use the airbyte_instance resource we defined in Step 1{"airbyte": airbyte_instance},)
Once you have loaded your Airbyte assets into Dagster, you can create assets which depend on them. These can be other assets pulled in from external sources such as dbt or assets defined in Python code.
In this case, we have an Airbyte connection that stores data in the stargazers table in our Snowflake warehouse. We specify the output I/O manager to tell downstream assets how to retrieve the data.
import json
from dagster import(
AssetSelection,
Definitions,
asset,
define_asset_job,)from dagster_airbyte import load_assets_from_airbyte_instance, AirbyteResource
from dagster_snowflake_pandas import SnowflakePandasIOManager
import pandas as pd
airbyte_instance = AirbyteResource(
host="localhost",
port="8000",)
airbyte_assets = load_assets_from_airbyte_instance(
airbyte_instance,
io_manager_key="snowflake_io_manager",)@assetdefstargazers_file(stargazers: pd.DataFrame):withopen("stargazers.json","w", encoding="utf8")as f:
f.write(json.dumps(stargazers.to_json(), indent=2))# only run the airbyte syncs necessary to materialize stargazers_file
my_upstream_job = define_asset_job("my_upstream_job",
AssetSelection.assets(stargazers_file).upstream()# all upstream assets (in this case, just the stargazers Airbyte asset).required_multi_asset_neighbors(),# all Airbyte assets linked to the same connection)
defs = Definitions(
jobs=[my_upstream_job],
assets=[airbyte_assets, stargazers_file],
resources={"snowflake_io_manager": SnowflakePandasIOManager(...)},)
In this case, we have an Airbyte connection that stores data in the stargazers table in our Snowflake warehouse. Since we are not using an I/O manager to fetch the data in downstream assets, we will use deps to define dependencies. Then within the downstream asset, we can fetch the data if necessary or launch other commands that work with data in external processes.
import json
from dagster import(
AssetSelection,
AssetKey,
Definitions,
asset,
define_asset_job,)from dagster_airbyte import load_assets_from_airbyte_instance, AirbyteResource
from dagster_snowflake import SnowflakeResource
airbyte_instance = AirbyteResource(
host="localhost",
port="8000",)
airbyte_assets = load_assets_from_airbyte_instance(
airbyte_instance,)@asset(deps=[AssetKey("stargazers")])defstargazers_file(snowflake: SnowflakeResource):with snowflake.get_connection()as conn:
stargazers = conn.cursor.execute("SELECT * FROM STARGAZERS").fetch_pandas_all()withopen("stargazers.json","w", encoding="utf8")as f:
f.write(json.dumps(stargazers.to_json(), indent=2))# only run the airbyte syncs necessary to materialize stargazers_file
my_upstream_job = define_asset_job("my_upstream_job",
AssetSelection.assets(stargazers_file).upstream()# all upstream assets (in this case, just the stargazers Airbyte asset).required_multi_asset_neighbors(),# all Airbyte assets linked to the same connection)
defs = Definitions(
jobs=[my_upstream_job],
assets=[airbyte_assets, stargazers_file],
resources={"snowflake": SnowflakeResource(...)},)