Componentizing asset factories
Data engineers often need to implement multiple similar workflows in data pipelines. To keep this code maintainable, many engineers use asset factories to generate Dagster objects based on configuration instead of defining each one manually.
While factories are powerful and flexible, many patterns that use them can also be expressed using components. In this guide, we will implement the asset factory from the asset factory creation guide into a custom component.
Before scaffolding a custom component, you must either create a components-ready Dagster project or migrate an existing project to dg
.
1. Scaffold the custom component
When creating a new custom component in Dagster, the first step is to scaffold the component using dg
. This generates the necessary boilerplate code and file structure for you to implement and register the component:
dg scaffold component AssetFactory
2. Define the component
Next, we will need to define the component. We recommend beginning new components by designing the interface. In the case of our asset factory, there is one set resource and one or more ETL assets that will be configured. Since there will be a number of ETL assets configured, we will need to define a Model
that can be used in the component.
Looking at the parameters of the build_etl_job
factory, we can see what needs to be in the model:
def build_etl_job(
bucket: str,
source_object: str,
target_object: str,
sql: str,
) -> dg.Definitions:
Within the component code in asset_factory.py
, we will create a class that inherits from dg.Model
with the attributes of the asset factory:
class EtlJob(dg.Model):
bucket: str = dg.Field
source_object: str = dg.Field
target_object: str = dg.Field
sql: str = dg.Field
Next, we can use that Model within the AssetFactory
class. At the top of the class, create attributes for access_key_id
and secret_access_key
. These will be shared across the assets and only need to be set once. The etl_job
attribute will be a list since it can be any number of assets:
class AssetFactory(dg.Component, dg.Model, dg.Resolvable):
access_key_id: str = dg.Field
secret_access_key: str = dg.Field
etl_job: list[EtlJob]
def build_defs(self, context: dg.ComponentLoadContext) -> dg.Definitions:
_assets = []
for etl in self.etl_job:
asset_key = f"etl_{etl.bucket}_{etl.target_object}".replace(".", "_")
@dg.asset(name=asset_key)
def _etl_asset(context, etl_config=etl):
with tempfile.TemporaryDirectory() as root:
source_path = f"{root}/{etl_config.source_object}"
target_path = f"{root}/{etl_config.target_object}"
# these steps could be split into separate assets, but
# for brevity we will keep them together.
# 1. extract
context.resources.s3.download_file(
etl_config.bucket, etl_config.source_object, source_path
)
# 2. transform
db = duckdb.connect(":memory:")
db.execute(
f"CREATE TABLE source AS SELECT * FROM read_csv('{source_path}');"
)
db.query(etl_config.sql).to_csv(target_path)
# 3. load
context.resources.s3.upload_file(
etl_config.bucket, etl_config.target_object, target_path
)
_assets.append(_etl_asset)
_resources = {
"s3": s3.S3Resource(
aws_access_key_id=self.access_key_id,
aws_secret_access_key=self.secret_access_key,
)
}
return dg.Definitions(assets=_assets, resources=_resources)
Most of the new AssetFactory
code will look similar to the code in the old asset factory, although the Definitions
object returned contains all of the assets that will be generated, as well as the resource.
3. Use the component
With the component created and registered, we can now use it in the project. The first step is to initialize the component:
dg scaffold defs 'my_project.components.asset_factory.AssetFactory' asset_factory
Next, set the attributes of the component:
type: my_project.components.asset_factory.AssetFactory
attributes:
access_key_id: key
secret_access_key: access
etl_job:
- bucket: my_bucket
source_object: raw_transactions.csv
target_object: cleaned_transactions.csv
sql: SELECT * FROM source WHERE amount IS NOT NULL;
- bucket: my_bucket
source_object: all_customers.csv
target_object: risky_customers.csv
sql: SELECT * FROM source WHERE risk_score > 0.8;
4. Viewing component assets
The assets generated by the initialized component behave the same as those created by the factory. You can view them on the command line:
dg list defs
or interact with them in the Dagster UI by running dg dev
: