Skip to main content

Schedule IBM DataStage replication jobs

With the replication assets defined, the next step is to schedule them. This example uses two schedules: a daily run for data replication and a weekly run for data quality checks. Rather than hardcoding asset keys into each schedule, we use Dagster's asset selection syntax to target assets by tag or group so schedules remain correct as the table list grows or changes.

The scheduling component

ScheduledJobComponent takes a cron expression and an asset selection string and creates a Dagster job with a schedule:

src/project_datastage/components/scheduled_job_component.py
class ScheduledJobComponent(dg.Component, dg.Model, dg.Resolvable):
"""Schedule assets using flexible selection syntax."""

job_name: str
cron_schedule: str
asset_selection: str

def build_defs(self, context: dg.ComponentLoadContext) -> dg.Definitions:
job = dg.define_asset_job(
name=self.job_name,
selection=self.asset_selection,
)

schedule = dg.ScheduleDefinition(
job=job,
cron_schedule=self.cron_schedule,
)

return dg.Definitions(
schedules=[schedule],
jobs=[job],
)

The asset_selection field accepts the same syntax as Dagster's asset selection UI: tag:key=value, group:name, kind:name, and boolean operators like and, or, not. This makes it easy to target a subset of assets without modifying the component.

Putting it together with YAML

The full pipeline showcases a replication job that copies five tables from a DB2 mainframe to Snowflake with inline data-quality checks, plus two schedules; declared in a single defs.yaml file as three composed components:

src/project_datastage/defs/datastage_pipeline/defs.yaml
type: project_datastage.components.data_stage_job_component.DataStageJobComponent

attributes:
demo_mode: true
job_name: customer_data_replication
cpdctl_profile: cp4d-profile
datastage_project_name: PROD_PROJECT
source_connection:
type: db2
host: mainframe.corp.example.com
port: "50000"
database: PROD_DB
target_connection:
type: snowflake
account: acme.snowflakecomputing.com
database: RAW_ZONE
warehouse: ETL_WH
tables:
- name: customers
source_schema: LEGACY
target_schema: RAW
- name: orders
source_schema: LEGACY
target_schema: RAW
- name: order_line_items
source_schema: LEGACY
target_schema: RAW
- name: products
source_schema: LEGACY
target_schema: RAW
- name: shipments
source_schema: LEGACY
target_schema: RAW

---
type: project_datastage.components.scheduled_job_component.ScheduledJobComponent

attributes:
job_name: daily_datastage_replication
cron_schedule: "0 6 * * *"
asset_selection: "tag:schedule=daily"

---
type: project_datastage.components.scheduled_job_component.ScheduledJobComponent

attributes:
job_name: weekly_datastage_data_quality
cron_schedule: "0 8 * * 1"
asset_selection: "group:datastage_replication"

The first document declares the DataStageJobComponent with demo_mode: true so it runs locally without a cpdctl installation. The second runs the replication daily at 6 AM UTC, targeting tag:schedule=daily which matches the tag set by DataStageTranslator.get_tags. The third reruns all assets in the datastage_replication group every Monday at 8 AM UTC because checks run inline with materialization, this also re-executes the row count and freshness checks for every table. Changing the schedule frequency or adding a third schedule requires only a new YAML document, no Python changes needed.