BigQuery integration reference
This reference page provides information for working with features that are not covered as part of the Using Dagster with BigQuery tutorial.
- Providing credentials as configuration
- Selecting specific columns in a downstream asset
- Storing partitioned assets
- Storing tables in multiple datasets
- Using the BigQuery I/O manager with other I/O managers
- Storing and loading PySpark DataFrames in BigQuery
- Using Pandas and PySpark DataFrames with BigQuery
- Executing custom SQL commands with the BigQuery resource
Providing credentials as configuration
In most cases, you will authenticate with Google Cloud Project (GCP) using one of the methods outlined in the GCP documentation. However, in some cases you may find that you need to provide authentication credentials directly to the BigQuery I/O manager. For example, if you are using Dagster+ Serverless you cannot upload a credential file, so must provide your credentials as an environment variable.
You can provide credentials directly to the BigQuery I/O manager by using the gcp_credentials configuration value. The BigQuery I/O manager will create a temporary file to store the credential and will set GOOGLE_APPLICATION_CREDENTIALS to point to this file. When the Dagster run is completed, the temporary file is deleted and GOOGLE_APPLICATION_CREDENTIALS is unset.
To avoid issues with newline characters in the GCP credential key, you must base64 encode the key. For example, if your GCP key is stored at ~/.gcp/key.json you can base64 encode the key by using the following shell command:
cat ~/.gcp/key.json | base64
Then you can set an environment variable in your Dagster deployment (for example GCP_CREDS) to the encoded key and provide it to the BigQuery I/O manager:
from dagster_gcp_pandas import BigQueryPandasIOManager
from dagster import Definitions, EnvVar
defs = Definitions(
    assets=[iris_data],
    resources={
        "io_manager": BigQueryPandasIOManager(
            project="my-gcp-project",
            location="us-east5",
            dataset="IRIS",
            timeout=15.0,
            gcp_credentials=EnvVar("GCP_CREDS"),
        )
    },
)
Selecting specific columns in a downstream asset
Sometimes you may not want to fetch an entire table as the input to a downstream asset. With the BigQuery I/O manager, you can select specific columns to load by supplying metadata on the downstream asset.
import pandas as pd
from dagster import AssetIn, asset
# this example uses the iris_data asset from Step 2 of the Using Dagster with BigQuery tutorial
@asset(
    ins={
        "iris_sepal": AssetIn(
            key="iris_data",
            metadata={"columns": ["sepal_length_cm", "sepal_width_cm"]},
        )
    }
)
def sepal_data(iris_sepal: pd.DataFrame) -> pd.DataFrame:
    iris_sepal["sepal_area_cm2"] = (
        iris_sepal["sepal_length_cm"] * iris_sepal["sepal_width_cm"]
    )
    return iris_sepal
In this example, we only use the columns containing sepal data from the IRIS_DATA table created in Step 2: Create tables in BigQuery of the Using Dagster with BigQuery tutorial. Fetching the entire table would be unnecessarily costly, so to select specific columns, we can add metadata to the input asset. We do this in the metadata parameter of the AssetIn that loads the iris_data asset in the ins parameter. We supply the key columns with a list of names of the columns we want to fetch.
When Dagster materializes sepal_data and loads the iris_data asset using the BigQuery I/O manager, it will only fetch the sepal_length_cm and sepal_width_cm columns of the IRIS.IRIS_DATA table and pass them to sepal_data as a Pandas DataFrame.
Storing partitioned assets
The BigQuery I/O manager supports storing and loading partitioned data. In order to correctly store and load data from the BigQuery table, the BigQuery I/O manager needs to know which column contains the data defining the partition bounds. The BigQuery I/O manager uses this information to construct the correct queries to select or replace the data. In the following sections, we describe how the I/O manager constructs these queries for different types of partitions.
- Static partitioned assets
- Time-partitioned assets
- Multi-partitioned assets
Storing static partitioned assets
In order to store static partitioned assets in BigQuery, you must specify partition_expr metadata on the asset to tell the BigQuery I/O manager which column contains the partition data:
import pandas as pd
from dagster import AssetExecutionContext, StaticPartitionsDefinition, asset
@asset(
    partitions_def=StaticPartitionsDefinition(
        ["Iris-setosa", "Iris-virginica", "Iris-versicolor"]
    ),
    metadata={"partition_expr": "SPECIES"},
)
def iris_data_partitioned(context: AssetExecutionContext) -> pd.DataFrame:
    species = context.partition_key
    full_df = pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )
    return full_df[full_df["species"] == species]
@asset
def iris_cleaned(iris_data_partitioned: pd.DataFrame):
    return iris_data_partitioned.dropna().drop_duplicates()
Dagster uses the partition_expr metadata to craft the SELECT statement when loading the partition in the downstream asset. When loading a static partition, the following statement is used:
SELECT *
 WHERE [partition_expr] = ([selected partitions])
When the partition_expr value is injected into this statement, the resulting SQL query must follow BigQuery's SQL syntax. Refer to the BigQuery documentation for more information.
When materializing the above assets, a partition must be selected, as described in the Partitioning assets documentation. In this example, the query used when materializing the Iris-setosa partition of the above assets would be:
SELECT *
 WHERE SPECIES in ('Iris-setosa')
Storing time partitioned assets
Like static partitioned assets, you can specify partition_expr metadata on the asset to tell the BigQuery I/O manager which column contains the partition data:
import pandas as pd
from dagster import AssetExecutionContext, DailyPartitionsDefinition, asset
@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2023-01-01"),
    metadata={"partition_expr": "TIMESTAMP_SECONDS(TIME)"},
)
def iris_data_per_day(context: AssetExecutionContext) -> pd.DataFrame:
    partition = context.partition_key
    # get_iris_data_for_date fetches all of the iris data for a given date,
    # the returned dataframe contains a column named 'TIME' with that stores
    # the time of the row as an integer of seconds since epoch
    return get_iris_data_for_date(partition)
@asset
def iris_cleaned(iris_data_per_day: pd.DataFrame):
    return iris_data_per_day.dropna().drop_duplicates()
Dagster uses the partition_expr metadata to craft the SELECT statement when loading the correct partition in the downstream asset. When loading a dynamic partition, the following statement is used:
SELECT *
 WHERE [partition_expr] >= [partition_start]
   AND [partition_expr] < [partition_end]
When the partition_expr value is injected into this statement, the resulting SQL query must follow BigQuery's SQL syntax. Refer to the BigQuery documentation for more information.
When materializing the above assets, a partition must be selected, as described in Materializing partitioned assets. The [partition_start] and [partition_end] bounds are of the form YYYY-MM-DD HH:MM:SS. In this example, the query when materializing the 2023-01-02 partition of the above assets would be:
SELECT *
 WHERE TIMESTAMP_SECONDS(TIME) >= '2023-01-02 00:00:00'
   AND TIMESTAMP_SECONDS(TIME) < '2023-01-03 00:00:00'
In this example, the data in the TIME column are integers, so the partition_expr metadata includes a SQL statement to convert integers to timestamps. A full list of BigQuery functions can be found here.
Storing multi-partitioned assets
The BigQuery I/O manager can also store data partitioned on multiple dimensions. To do this, you must specify the column for each partition as a dictionary of partition_expr metadata:
import pandas as pd
import dagster as dg
@dg.asset(
    partitions_def=dg.MultiPartitionsDefinition(
        {
            "date": dg.DailyPartitionsDefinition(start_date="2023-01-01"),
            "species": dg.StaticPartitionsDefinition(
                ["Iris-setosa", "Iris-virginica", "Iris-versicolor"]
            ),
        }
    ),
    metadata={
        "partition_expr": {"date": "TIMESTAMP_SECONDS(TIME)", "species": "SPECIES"}
    },
)
def iris_data_partitioned(context: dg.AssetExecutionContext) -> pd.DataFrame:
    partition = context.partition_key.keys_by_dimension
    species = partition["species"]
    date = partition["date"]
    # get_iris_data_for_date fetches all of the iris data for a given date,
    # the returned dataframe contains a column named 'TIME' with that stores
    # the time of the row as an integer of seconds since epoch
    full_df = get_iris_data_for_date(date)
    return full_df[full_df["species"] == species]
@dg.asset
def iris_cleaned(iris_data_partitioned: pd.DataFrame):
    return iris_data_partitioned.dropna().drop_duplicates()
Dagster uses the partition_expr metadata to craft the SELECT statement when loading the correct partition in a downstream asset. For multi-partitions, Dagster concatenates the WHERE statements described in the static partition and time-window partition sections to craft the correct SELECT statement.
When materializing the above assets, a partition must be selected, as described in Materializing partitioned assets. For example, when materializing the 2023-01-02|Iris-setosa partition of the above assets, the following query will be used:
SELECT *
 WHERE SPECIES in ('Iris-setosa')
   AND TIMESTAMP_SECONDS(TIME) >= '2023-01-02 00:00:00'
   AND TIMESTAMP_SECONDS(TIME) < '2023-01-03 00:00:00'`
Storing tables in multiple datasets
You may want to have different assets stored in different BigQuery datasets. The BigQuery I/O manager allows you to specify the dataset in several ways.
You can specify the default dataset where data will be stored as configuration to the I/O manager, like we did in Step 1: Configure the BigQuery I/O manager of the Using Dagster with BigQuery tutorial.
If you want to store assets in different datasets, you can specify the dataset as metadata:
    daffodil_data = AssetSpec(key=["daffodil_data"], metadata={"schema": "daffodil"})
    @asset(metadata={"schema": "iris"})
    def iris_data() -> pd.DataFrame:
        return pd.read_csv(
            "https://docs.dagster.io/assets/iris.csv",
            names=[
                "sepal_length_cm",
                "sepal_width_cm",
                "petal_length_cm",
                "petal_width_cm",
                "species",
            ],
        )
You can also specify the dataset as part of the asset's asset key:
    daffodil_data = AssetSpec(key=["gcp", "bigquery", "daffodil", "daffodil_data"])
    @asset(key_prefix=["gcp", "bigquery", "iris"])
    def iris_data() -> pd.DataFrame:
        return pd.read_csv(
            "https://docs.dagster.io/assets/iris.csv",
            names=[
                "sepal_length_cm",
                "sepal_width_cm",
                "petal_length_cm",
                "petal_width_cm",
                "species",
            ],
        )
The dataset will be the last prefix before the asset's name. In this example, the iris_data asset will be stored in the IRIS dataset, and the daffodil_data asset will be found in the DAFFODIL dataset.
The dataset is determined in this order:
- If the dataset is set via metadata, that dataset will be used
- Otherwise, the dataset set as configuration on the I/O manager will be used
- Otherwise, if there is a - key_prefix, that dataset will be used
- If none of the above are provided, the default dataset will be - PUBLIC
Using the BigQuery I/O manager with other I/O managers
You may have assets that you don't want to store in BigQuery. You can provide an I/O manager to each asset using the io_manager_key parameter in the asset decorator:
import pandas as pd
from dagster_aws.s3.io_manager import s3_pickle_io_manager
from dagster_gcp_pandas import BigQueryPandasIOManager
from dagster import Definitions, asset
@asset(io_manager_key="warehouse_io_manager")
def iris_data() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )
@asset(io_manager_key="blob_io_manager")
def iris_plots(iris_data):
    # plot_data is a function we've defined somewhere else
    # that plots the data in a DataFrame
    return plot_data(iris_data)
defs = Definitions(
    assets=[iris_data, iris_plots],
    resources={
        "warehouse_io_manager": BigQueryPandasIOManager(
            project="my-gcp-project",
            dataset="IRIS",
        ),
        "blob_io_manager": s3_pickle_io_manager,
    },
)
In this example, the iris_data asset uses the I/O manager bound to the key warehouse_io_manager and iris_plots will use the I/O manager bound to the key blob_io_manager. In the Definitions object, we supply the I/O managers for those keys. When the assets are materialized, the iris_data will be stored in BigQuery, and iris_plots will be saved in Amazon S3.
Storing and loading PySpark DataFrames in BigQuery
The BigQuery I/O manager also supports storing and loading PySpark DataFrames. To use the BigQueryPySparkIOManager, first install the package:
- uv
- pip
uv add dagster-gcp-pyspark
pip install dagster-gcp-pyspark
Then you can use the gcp_pyspark_io_manager in your Definitions as in Step 1: Configure the BigQuery I/O manager of the Using Dagster with BigQuery tutorial.
from dagster_gcp_pyspark import BigQueryPySparkIOManager
from dagster import Definitions
defs = Definitions(
    assets=[iris_data],
    resources={
        "io_manager": BigQueryPySparkIOManager(
            project="my-gcp-project",  # required
            location="us-east5",  # optional, defaults to the default location for the project - see https://cloud.google.com/bigquery/docs/locations for a list of locations
            dataset="IRIS",  # optional, defaults to PUBLIC
            temporary_gcs_bucket="my-gcs-bucket",  # optional, defaults to None, which will result in a direct write to BigQuery
        )
    },
)
When using the BigQueryPySparkIOManager you may provide the temporary_gcs_bucket configuration. This will store the data is a temporary GCS bucket, then all of the data into BigQuery in one operation. If not provided, data will be directly written to BigQuery. If you choose to use a temporary GCS bucket, you must include the GCS Hadoop connector in your Spark Session, in addition to the BigQuery connector (described below).
The BigQueryPySparkIOManager requires that a SparkSession be active and configured with the BigQuery connector for Spark. You can either create your own SparkSession or use the spark_resource.
- With the spark_resource
- With your own SparkSession
from dagster_gcp_pyspark import BigQueryPySparkIOManager
from dagster_pyspark import pyspark_resource
from pyspark import SparkFiles
from pyspark.sql import DataFrame
from pyspark.sql.types import DoubleType, StringType, StructField, StructType
from dagster import AssetExecutionContext, Definitions, asset
BIGQUERY_JARS = "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.28.0"
@asset(required_resource_keys={"pyspark"})
def iris_data(context: AssetExecutionContext) -> DataFrame:
    spark = context.resources.pyspark.spark_session
    schema = StructType(
        [
            StructField("sepal_length_cm", DoubleType()),
            StructField("sepal_width_cm", DoubleType()),
            StructField("petal_length_cm", DoubleType()),
            StructField("petal_width_cm", DoubleType()),
            StructField("species", StringType()),
        ]
    )
    url = "https://docs.dagster.io/assets/iris.csv"
    spark.sparkContext.addFile(url)
    return spark.read.schema(schema).csv("file://" + SparkFiles.get("iris.csv"))
defs = Definitions(
    assets=[iris_data],
    resources={
        "io_manager": BigQueryPySparkIOManager(
            project="my-gcp-project",
            location="us-east5",
        ),
        "pyspark": pyspark_resource.configured(
            {"spark_conf": {"spark.jars.packages": BIGQUERY_JARS}}
        ),
    },
)
from dagster_gcp_pyspark import BigQueryPySparkIOManager
from pyspark import SparkFiles
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import DoubleType, StringType, StructField, StructType
from dagster import Definitions, asset
BIGQUERY_JARS = "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.28.0"
@asset
def iris_data() -> DataFrame:
    spark = SparkSession.builder.config(
        key="spark.jars.packages",
        value=BIGQUERY_JARS,
    ).getOrCreate()
    schema = StructType(
        [
            StructField("sepal_length_cm", DoubleType()),
            StructField("sepal_width_cm", DoubleType()),
            StructField("petal_length_cm", DoubleType()),
            StructField("petal_width_cm", DoubleType()),
            StructField("species", StringType()),
        ]
    )
    url = "https://docs.dagster.io/assets/iris.csv"
    spark.sparkContext.addFile(url)
    return spark.read.schema(schema).csv("file://" + SparkFiles.get("iris.csv"))
defs = Definitions(
    assets=[iris_data],
    resources={
        "io_manager": BigQueryPySparkIOManager(
            project="my-gcp-project",
            location="us-east5",
        ),
    },
)
In order to load data from BigQuery as a PySpark DataFrame, the BigQuery PySpark connector will create a view containing the data. This will result in the creation of a temporary table in your BigQuery dataset. For more details, see the BigQuery PySpark connector documentation.
Using Pandas and PySpark DataFrames with BigQuery
If you work with both Pandas and PySpark DataFrames and want a single I/O manager to handle storing and loading these DataFrames in BigQuery, you can write a new I/O manager that handles both types. To do this, inherit from the BigQueryIOManager base class and implement the type_handlers and default_load_type methods. The resulting I/O manager will inherit the configuration fields of the base BigQueryIOManager.
from collections.abc import Sequence
from typing import Optional
import pandas as pd
from dagster_gcp import BigQueryIOManager
from dagster_gcp_pandas import BigQueryPandasTypeHandler
from dagster_gcp_pyspark import BigQueryPySparkTypeHandler
from dagster import Definitions
from dagster._core.storage.db_io_manager import DbTypeHandler
class MyBigQueryIOManager(BigQueryIOManager):
    @staticmethod
    def type_handlers() -> Sequence[DbTypeHandler]:
        """type_handlers should return a list of the TypeHandlers that the I/O manager can use.
        Here we return the BigQueryPandasTypeHandler and BigQueryPySparkTypeHandler so that the I/O
        manager can store Pandas DataFrames and PySpark DataFrames.
        """
        return [BigQueryPandasTypeHandler(), BigQueryPySparkTypeHandler()]
    @staticmethod
    def default_load_type() -> Optional[type]:
        """If an asset is not annotated with an return type, default_load_type will be used to
        determine which TypeHandler to use to store and load the output.
        In this case, unannotated assets will be stored and loaded as Pandas DataFrames.
        """
        return pd.DataFrame
defs = Definitions(
    assets=[iris_data, rose_data],
    resources={
        "io_manager": MyBigQueryIOManager(project="my-gcp-project", dataset="FLOWERS")
    },
)
Executing custom SQL commands with the BigQuery resource
In addition to the BigQuery I/O manager, Dagster also provides a BigQuery resource for executing custom SQL queries.
from dagster_gcp import BigQueryResource
from dagster import Definitions, asset
# this example executes a query against the IRIS.IRIS_DATA table created in Step 2 of the
# Using Dagster with BigQuery tutorial
@asset
def small_petals(bigquery: BigQueryResource):
    with bigquery.get_client() as client:
        return client.query(
            'SELECT * FROM IRIS.IRIS_DATA WHERE "petal_length_cm" < 1 AND'
            ' "petal_width_cm" < 1',
        ).result()
defs = Definitions(
    assets=[small_petals],
    resources={
        "bigquery": BigQueryResource(
            project="my-gcp-project",
            location="us-east5",
        )
    },
)
In this example, we attach the BigQuery resource to the small_petals asset. In the body of the asset function, we use the get_client context manager method of the resource to get a bigquery.client.Client. We can use the client to execute a custom SQL query against the IRIS_DATA table created in Step 2: Create tables in BigQuery of the Using Dagster with BigQuery tutorial.