Integrating Snowflake & dbt with Dagster+ Insights#
External metrics, such as Snowflake credits, can be integrated into the Dagster Insights UI. The dagster-cloud package contains utilities for capturing and submitting external metrics about data operations to Dagster+ via an API.
If you use dbt to materialize tables in Snowflake, use this guide to integrate Snowflake metrics into the Insights UI. For instructions on integrating direct Snowflake queries, see Integrating Direct Snowflake Usage with Dagster+ Insights.
Snowflake credentials which have access to the snowflake.account_usage.query_history table. For more information on granting access to this table, see the Snowflake documentation.
@dbt_assets(...)defmy_asset(context: AssetExecutionContext, dbt: DbtCliResource):# Chain `with_insights` after any other metadata fetch, e.g. `fetch_row_count`yieldfrom dbt_resource.cli(["build"], context=context).stream().with_insights()
This passes through all underlying events and emits an AssetObservation for each asset materialization. The observation contains the dbt invocation ID and unique ID recorded in the Dagster event log.
First, append with_insights() to the dbt CLI call in your Dagster op function:
@op(out={})defmy_dbt_op(context: OpExecutionContext, dbt: DbtCliResource):# Chain `with_insights` after any other metadata fetch, e.g. `fetch_row_count`yieldfrom dbt.cli(["build"], context=context, manifest=dbt_manifest_path
).stream().with_insights()@jobdefmy_dbt_job():...
my_dbt_op()...
This passes through all underlying events and emits an AssetObservation for each asset materialization. The observation contains the dbt invocation ID and unique ID that are recorded in the Dagster event log.
This allows you to add a comment, containing the dbt invocation ID and unique ID, to every query recorded in Snowflake's query_history table. Using this data, Insights will attribute cost metrics in Snowflake to the corresponding Dagster jobs and assets.
Note: Make sure to include append: true, as Snowflake strips leading comments.
Step 3: Create a metrics ingestion pipeline in Dagster#
The last step is to create a Dagster pipeline that joins asset observation events with the Snowflake query history and calls the Dagster+ ingestion API. Snowflake usage information is available at a delay, so this pipeline will run on a schedule to ingest Snowflake usage information from the previous hour.
Note that you only need to create this pipeline in a single code location per deployment, even if you have instrumented dbt assets in multiple code locations.
To do this, you'll need a Snowflake resource (SnowflakeResource) that can query the snowflake.account_usage.query_history table. You can set up the ingestion pipeline like the following:
from dagster_snowflake import SnowflakeResource
from dagster import Definition, EnvVar
from dagster_cloud.dagster_insights import(
create_snowflake_insights_asset_and_schedule,)
snowflake_insights_definitions = create_snowflake_insights_asset_and_schedule(
start_date="2023-10-5-00:00",
snowflake_resource_key="snowflake_insights",)
defs = Definitions(
assets=[...,*snowflake_insights_definitions.assets],
schedules=[..., snowflake_insights_definitions.schedule],
resources={...,"snowflake_insights": SnowflakeResource(
account=EnvVar("SNOWFLAKE_PURINA_ACCOUNT"),
user=EnvVar("SNOWFLAKE_PURINA_USER"),
password=EnvVar("SNOWFLAKE_PURINA_PASSWORD"),),})
In this example, the snowflake_resource_key is a SnowflakeResource that has access to the query_history table.
Snowflake credit metrics should be available on the Insights tab in the Dagster UI after a short period of time (within 24 hours) of the ingestion job running: