Skip to main content

Componentizing asset factories

Data engineers often need to implement multiple similar workflows in data pipelines. To keep this code maintainable, many engineers use asset factories to generate Dagster objects based on configuration instead of defining each one manually.

While factories are powerful and flexible, many patterns that use them can also be expressed using components. In this guide, we will implement the asset factory from the asset factory creation guide into a custom component.

Prerequisites

Before scaffolding a custom component, you must either create a components-ready Dagster project or migrate an existing project to dg.

1. Scaffold the custom component

When creating a new custom component in Dagster, the first step is to scaffold the component using dg. This generates the necessary boilerplate code and file structure for you to implement and register the component:

dg scaffold component AssetFactory

2. Define the component

Next, we will need to define the component. We recommend beginning new components by designing the interface. In the case of our asset factory, there is one set resource and one or more ETL assets that will be configured. Since there will be a number of ETL assets configured, we will need to define a Model that can be used in the component.

Looking at the parameters of the build_etl_job factory, we can see what needs to be in the model:

def build_etl_job(
bucket: str,
source_object: str,
target_object: str,
sql: str,
) -> dg.Definitions:

Within the component code in asset_factory.py, we will create a class that inherits from dg.Model with the attributes of the asset factory:

src/<project_name>/components/asset_factory.py
class EtlJob(dg.Model):
bucket: str = dg.Field
source_object: str = dg.Field
target_object: str = dg.Field
sql: str = dg.Field

Next, we can use that Model within the AssetFactory class. At the top of the class, create attributes for access_key_id and secret_access_key. These will be shared across the assets and only need to be set once. The etl_job attribute will be a list since it can be any number of assets:

src/<project_name>/components/asset_factory.py
class AssetFactory(dg.Component, dg.Model, dg.Resolvable):
access_key_id: str = dg.Field
secret_access_key: str = dg.Field

etl_job: list[EtlJob]

def build_defs(self, context: dg.ComponentLoadContext) -> dg.Definitions:
_assets = []

for etl in self.etl_job:
asset_key = f"etl_{etl.bucket}_{etl.target_object}".replace(".", "_")

@dg.asset(name=asset_key)
def _etl_asset(context, etl_config=etl):
with tempfile.TemporaryDirectory() as root:
source_path = f"{root}/{etl_config.source_object}"
target_path = f"{root}/{etl_config.target_object}"

# these steps could be split into separate assets, but
# for brevity we will keep them together.
# 1. extract
context.resources.s3.download_file(
etl_config.bucket, etl_config.source_object, source_path
)

# 2. transform
db = duckdb.connect(":memory:")
db.execute(
f"CREATE TABLE source AS SELECT * FROM read_csv('{source_path}');"
)
db.query(etl_config.sql).to_csv(target_path)

# 3. load
context.resources.s3.upload_file(
etl_config.bucket, etl_config.target_object, target_path
)

_assets.append(_etl_asset)

_resources = {
"s3": s3.S3Resource(
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
)
}

return dg.Definitions(assets=_assets, resources=_resources)

Most of the new AssetFactory code will look similar to the code in the old asset factory, although the Definitions object returned contains all of the assets that will be generated, as well as the resource.

3. Use the component

With the component created and registered, we can now use it in the project. The first step is to initialize the component:

dg scaffold defs 'my_project.components.asset_factory.AssetFactory' asset_factory

Next, set the attributes of the component:

src/<project_name>/defs/asset_factory/defs.yaml
type: my_project.components.asset_factory.AssetFactory

attributes:
access_key_id: key
secret_access_key: access

etl_job:
- bucket: my_bucket
source_object: raw_transactions.csv
target_object: cleaned_transactions.csv
sql: SELECT * FROM source WHERE amount IS NOT NULL;
- bucket: my_bucket
source_object: all_customers.csv
target_object: risky_customers.csv
sql: SELECT * FROM source WHERE risk_score > 0.8;

4. Viewing component assets

The assets generated by the initialized component behave the same as those created by the factory. You can view them on the command line:

dg list defs

or interact with them in the Dagster UI by running dg dev:

Asset factory DAG