Ask AI

Integrating Snowflake usage with Dagster+ Insights#

External metrics, such as Snowflake credits, can be integrated into the Dagster Insights UI. The dagster-cloud package contains utilities for capturing and submitting external metrics about data operations to Dagster+ via an API.

If you use the Snowflake Resource to query Snowflake, use this guide to integrate Snowflake metrics into the Insights UI. For instructions on integrating usage of dbt models which run in Snowflake, see Integrating Snowflake & dbt with Dagster+ Insights.


Prerequisites#

To complete the steps in this guide, you'll need:

  • A Dagster+ account on the Pro plan

  • Access to the Dagster+ Insights feature

  • Snowflake credentials which have access to the snowflake.account_usage.query_history table. For more information on granting access to this table, see the Snowflake documentation.

  • To install the following libraries:

    pip install dagster dagster-cloud dagster-snowflake
    

    Note: If you already have dagster-cloud installed, make sure you're using version 1.5.8 or newer.


Step 1: Replace your Snowflake resources#

The first step is to replace any existing Snowflake resources with InsightsSnowflakeResource. This resource is a drop-in replacement for the SnowflakeResource resource, but it also emits Snowflake usage metrics to the Dagster+ Insights API.

from dagster_cloud.dagster_insights import InsightsSnowflakeResource

defs = Definitions(
  resources={
    "snowflake": InsightsSnowflakeResource(
      account=EnvVar("SNOWFLAKE_PURINA_ACCOUNT"),
      user=EnvVar("SNOWFLAKE_PURINA_USER"),
      password=EnvVar("SNOWFLAKE_PURINA_PASSWORD"),
    ),
 }
)

Step 2: Create a metrics ingestion pipeline in Dagster#

The second step is to create a Dagster pipeline that joins asset observation events with the Snowflake query history and calls the Dagster+ ingestion API. Snowflake usage information is available at a delay, so this pipeline will run on a schedule to ingest Snowflake usage information from the previous hour.

Note that you only need to create this pipeline in a single code location per deployment, even if you have assets in multiple code locations.

To do this, you'll need a Snowflake resource (InsightsSnowflakeResource) that can query the snowflake.account_usage.query_history table. You can set up the ingestion pipeline like the following:

from dagster import Definition, EnvVar

from dagster_cloud.dagster_insights import (
    InsightsSnowflakeResource,
    create_snowflake_insights_asset_and_schedule,
)

snowflake_insights_definitions = create_snowflake_insights_asset_and_schedule(
    start_date="2023-10-5-00:00",
    snowflake_resource_key="snowflake_insights",
)

defs = Definitions(
  assets=[..., *snowflake_insights_definitions.assets],
  schedules=[..., snowflake_insights_definitions.schedule],
  resources={
    ...,
    "snowflake_insights": InsightsSnowflakeResource(
      account=EnvVar("SNOWFLAKE_PURINA_ACCOUNT"),
      user=EnvVar("SNOWFLAKE_PURINA_USER"),
      password=EnvVar("SNOWFLAKE_PURINA_PASSWORD"),
    ),
 }
)

In this example, the snowflake_resource_key is a SnowflakeResource that has access to the query_history table.

Once the pipeline runs, Snowflake credits will be visible in the Insights tab in the Dagster UI:

Snowflake credits in the Insights tab of the Dagster UI