Skip to main content

Define assets

IBM DataStage runs replication jobs that move tables from a source system to a target. In this example, a single job replicates five tables from a DB2 mainframe to Snowflake. Rather than treating this as one opaque asset, we model each table as its own Dagster asset so that the asset catalog reflects the actual data being produced.

The code is organized into four building blocks:

  1. A translator that maps DataStage tables to Dagster concepts
  2. A resource that drives the cpdctl CLI
  3. A multi-asset factory that turns the table list into specs
  4. A DataStageProject component that assembles everything from YAML

The translator

DataStageTranslator converts each table definition into the Dagster asset attributes (key, group, description, and tags):

src/project_datastage/components/data_stage_job_component.py
@dataclass
class DataStageTranslator:
"""Translates DataStage table definitions into Dagster asset attributes.

Subclass and override any method to customise key structure, groups, etc.
"""

def get_asset_key(self, table_def: Mapping[str, str]) -> dg.AssetKey:
"""Default: ``["datastage_raw", "<table_name>"]``."""
return dg.AssetKey(["datastage_raw", table_def["name"]])

def get_group_name(self, table_def: Mapping[str, str]) -> str:
return "datastage_replication"

def get_description(self, table_def: Mapping[str, str]) -> str:
source_schema = table_def.get("source_schema", "UNKNOWN")
return f"Replicated from IBM DataStage - source: {source_schema}.{table_def['name']}"

def get_tags(self, table_def: Mapping[str, str]) -> dict[str, str]:
return {
"source_system": "ibm_datastage",
"schedule": "daily",
}

The translator is a @dataclass, so subclassing and overriding individual methods is straightforward. For example, you can change get_asset_key to use a different key prefix for your organization's naming conventions without touching any other logic.

The resource

DataStageResource is a ConfigurableResource that exposes a single run method dispatching to a demo or production path based on the demo_mode flag. In both paths it yields a MaterializeResult and two AssetCheckResult objects per table — all from the same step.

The class definition, run dispatcher, and demo path are shown below. Demo mode simulates a successful replication with random row counts so the pipeline can be run locally without a cpdctl installation:

src/project_datastage/components/data_stage_job_component.py
class DataStageResource(dg.ConfigurableResource):
"""Resource that drives IBM DataStage via the ``cpdctl dsjob`` CLI.

In **demo mode** the resource simulates a successful replication and
yields ``MaterializeResult`` + ``AssetCheckResult`` per table without
calling any external system.
"""

cpdctl_profile: str = "cp4d-profile"
project_name: str = "PROD_PROJECT"
demo_mode: bool = True

def run(
self,
context: dg.AssetExecutionContext,
project: DataStageProject,
translator: DataStageTranslator,
) -> Iterator[dg.MaterializeResult | dg.AssetCheckResult]:
"""Execute the DataStage job and yield a ``MaterializeResult`` and
``AssetCheckResult`` objects per replicated table, all in one step.
"""
if self.demo_mode:
yield from self._run_demo(context, project, translator)
else:
yield from self._run_production(context, project, translator)

def _run_demo(
self,
context: dg.AssetExecutionContext,
project: DataStageProject,
translator: DataStageTranslator,
) -> Iterator[dg.MaterializeResult | dg.AssetCheckResult]:
import random
import time

context.log.info(
f"[DEMO MODE] Simulating DataStage job '{project.job_name}' "
f"for {len(project.tables)} tables"
)

for table_def in project.tables:
time.sleep(0.3)
table_name = table_def["name"]
asset_key = translator.get_asset_key(table_def)
row_count = random.randint(5_000, 500_000)

context.log.info(f"[DEMO] Replicated {table_name}: {row_count:,} rows")

yield dg.MaterializeResult(
asset_key=asset_key,
metadata={
"row_count": dg.MetadataValue.int(row_count),
"source_schema": dg.MetadataValue.text(
table_def.get("source_schema", "UNKNOWN")
),
"target_schema": dg.MetadataValue.text(
table_def.get("target_schema", "UNKNOWN")
),
"job_name": dg.MetadataValue.text(project.job_name),
"status": dg.MetadataValue.text("SUCCESS"),
},
)

yield dg.AssetCheckResult(
check_name=f"{table_name}_row_count_positive",
passed=True,
asset_key=asset_key,
metadata={
"row_count": dg.MetadataValue.int(row_count),
},
)
yield dg.AssetCheckResult(
check_name=f"{table_name}_freshness",
passed=True,
asset_key=asset_key,
metadata={
"hours_since_materialization": dg.MetadataValue.float(0.0),
},
)

def _run_production(
self,
context: dg.AssetExecutionContext,
project: DataStageProject,
translator: DataStageTranslator,
) -> Iterator[dg.MaterializeResult | dg.AssetCheckResult]:
yield from _run_datastage_production(
context=context,
project=project,
translator=translator,
cpdctl_profile=self.cpdctl_profile,
project_name=self.project_name,
)

The production path is extracted into a standalone function that calls three cpdctl dsjob commands in sequence: run --wait 1800 to trigger the job and block until it finishes, jobinfo --full to read per-table row counts from the output, and logdetail to retrieve the full job log for auditing:

src/project_datastage/components/data_stage_job_component.py
def _run_datastage_production(
*,
context: dg.AssetExecutionContext,
project: DataStageProject,
translator: DataStageTranslator,
cpdctl_profile: str,
project_name: str,
) -> Iterator[dg.MaterializeResult | dg.AssetCheckResult]:
import subprocess

context.log.info(
f"Running DataStage job '{project.job_name}' "
f"via cpdctl dsjob CLI (profile: {cpdctl_profile})"
)

run_result = subprocess.run(
[
"cpdctl",
"dsjob",
"run",
"--project",
project_name,
"--job",
project.job_name,
"--wait",
"1800",
],
capture_output=True,
text=True,
check=True,
)
context.log.info(f"cpdctl dsjob run output:\n{run_result.stdout}")

info_result = subprocess.run(
[
"cpdctl",
"dsjob",
"jobinfo",
"--project",
project_name,
"--job",
project.job_name,
"--full",
],
capture_output=True,
text=True,
check=True,
)
context.log.info(f"cpdctl dsjob jobinfo output:\n{info_result.stdout}")

rows_by_table: dict[str, int] = {}
for line in info_result.stdout.splitlines():
for table_def in project.tables:
tname = table_def["name"]
if tname in line and "rows" in line.lower():
digit_str = "".join(c for c in line.split(":")[-1] if c.isdigit())
rows_by_table[tname] = int(digit_str) if digit_str else 0

log_result = subprocess.run(
[
"cpdctl",
"dsjob",
"logdetail",
"--project",
project_name,
"--job",
project.job_name,
],
capture_output=True,
text=True,
check=True,
)
context.log.info(f"cpdctl dsjob logdetail output:\n{log_result.stdout}")

for table_def in project.tables:
table_name = table_def["name"]
asset_key = translator.get_asset_key(table_def)
row_count = rows_by_table.get(table_name, 0)

yield dg.MaterializeResult(
asset_key=asset_key,
metadata={
"row_count": dg.MetadataValue.int(row_count),
"source_schema": dg.MetadataValue.text(table_def.get("source_schema", "UNKNOWN")),
"target_schema": dg.MetadataValue.text(table_def.get("target_schema", "UNKNOWN")),
"job_name": dg.MetadataValue.text(project.job_name),
"status": dg.MetadataValue.text("SUCCESS"),
},
)

yield dg.AssetCheckResult(
check_name=f"{table_name}_row_count_positive",
passed=bool(row_count > 0),
asset_key=asset_key,
metadata={
"row_count": dg.MetadataValue.int(row_count),
},
)
yield dg.AssetCheckResult(
check_name=f"{table_name}_freshness",
passed=True,
asset_key=asset_key,
metadata={
"hours_since_materialization": dg.MetadataValue.float(0.0),
},
)

Running checks inline with materialization means the checks always reflect the data that was just loaded. There is no window between materialization and checking during which the data could change.

The multi-asset factory

datastage_assets is a decorator factory that builds a multi_asset from the table list in the project configuration. It creates one AssetSpec per table and two AssetCheckSpec per table, a row count check and a freshness check:

src/project_datastage/components/data_stage_job_component.py
def datastage_assets(
*,
project: DataStageProject,
name: str | None = None,
group_name: str | None = None,
translator: DataStageTranslator | None = None,
) -> Callable[[Callable[..., Any]], dg.AssetsDefinition]:
"""Return a ``@multi_asset`` decorator whose decorated function yields
``MaterializeResult`` and ``AssetCheckResult`` for every table in
the DataStage project, all in a single materialisation step.

Usage::

@datastage_assets(project=my_project, name="ds_replication")
def my_replication(context, datastage: DataStageResource):
yield from datastage.run(context, my_project, my_translator)
"""
translator = translator or DataStageTranslator()

specs = [
dg.AssetSpec(
key=translator.get_asset_key(table_def),
group_name=group_name or translator.get_group_name(table_def),
description=translator.get_description(table_def),
tags=translator.get_tags(table_def),
kinds={"ibm_datastage", "python"},
)
for table_def in project.tables
]

check_specs = []
for table_def in project.tables:
asset_key = translator.get_asset_key(table_def)
table_name = table_def["name"]
check_specs.append(
dg.AssetCheckSpec(
name=f"{table_name}_row_count_positive",
asset=asset_key,
description=f"Verify {table_name} has > 0 rows after replication",
)
)
check_specs.append(
dg.AssetCheckSpec(
name=f"{table_name}_freshness",
asset=asset_key,
description=f"Verify {table_name} was replicated recently",
)
)

return dg.multi_asset(
name=name,
specs=specs,
check_specs=check_specs,
)

Using a factory function keeps the asset definition tightly coupled to the project configuration. Adding a new table to the YAML automatically produces a new asset and its associated checks without any changes to the Python code.

The component

DataStageJobComponent ties the translator, resource, and multi-asset factory together as a reusable Dagster component. Its build_defs method constructs all Dagster definitions from the component's YAML attributes:

src/project_datastage/components/data_stage_job_component.py
class DataStageJobComponent(dg.Component, dg.Model, dg.Resolvable):
"""Dagster Component wrapping an IBM DataStage replication job.

Produces one asset and two data-quality checks per replicated table
in a single ``@multi_asset`` step. Set ``demo_mode: true`` to simulate
the replication locally without a ``cpdctl`` installation.
"""

demo_mode: bool = True
job_name: str = "datastage_job"
cpdctl_profile: str = "cp4d-profile"
datastage_project_name: str = "PROD_PROJECT"
source_connection: dict[str, Any] = {}
target_connection: dict[str, Any] = {}
tables: list[dict[str, str]] = []

def build_defs(self, context: dg.ComponentLoadContext) -> dg.Definitions:
project = DataStageProject(
{
"job_name": self.job_name,
"source_connection": self.source_connection,
"target_connection": self.target_connection,
"tables": self.tables,
}
)
translator = DataStageTranslator()

@datastage_assets(
project=project,
name=f"{self.job_name}_replication",
translator=translator,
)
def replication_assets(
context: dg.AssetExecutionContext,
datastage: DataStageResource,
):
yield from datastage.run(context, project, translator)

resource = DataStageResource(
cpdctl_profile=self.cpdctl_profile,
project_name=self.datastage_project_name,
demo_mode=self.demo_mode,
)

return dg.Definitions(
assets=[replication_assets],
resources={"datastage": resource},
)

Next steps