Skip to main content

Advanced component type customization

info

dg and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.

Prerequisites

Before following the steps below, you will need to create and register a component type.

Customizing scaffolding behavior

By default, when you instantiate a component type, dg scaffold will create a new directory alongside an unpopulated component.yaml file. However, you can customize this behavior by decorating your component type with @scaffold_with.

In the example below, the custom ShellCommandScaffolder class scaffolds a template shell script alongside a populated component.yaml file:

my_component_library/lib/shell_command.py
import os
import subprocess
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Any

import dagster as dg
from dagster.components import (
Component,
ComponentLoadContext,
Resolvable,
ResolvedAssetSpec,
Scaffolder,
scaffold_component,
)
from dagster.components.scaffold.scaffold import ScaffoldRequest, scaffold_with


class ShellCommandScaffolder(Scaffolder):
"""Scaffolds a template shell script alongside a filled-out component YAML file."""

def scaffold(self, request: ScaffoldRequest) -> None:
scaffold_component(
request,
{
"script_path": "script.sh",
"asset_specs": [
{"key": "my_asset", "description": "Output of running a script"}
],
},
)
script_path = Path(request.target_path) / "script.sh"
script_path.write_text("#!/bin/bash\n\necho 'Hello, world!'")
os.chmod(script_path, 0o755)




@scaffold_with(ShellCommandScaffolder)
@dataclass
class ShellCommand(Component, Resolvable):
"""Models a shell script as a Dagster asset."""

script_path: str
asset_specs: Sequence[ResolvedAssetSpec]

def build_defs(self, context: ComponentLoadContext) -> dg.Definitions:
resolved_script_path = Path(context.path, self.script_path).absolute()

@dg.multi_asset(name=Path(self.script_path).stem, specs=self.asset_specs)
def _asset(context: dg.AssetExecutionContext):
self.execute(resolved_script_path, context)

return dg.Definitions(assets=[_asset])

def execute(self, resolved_script_path: Path, context: dg.AssetExecutionContext):
return subprocess.run(["sh", str(resolved_script_path)], check=True)

Now, when you run dg scaffold, you should see a template shell script created along with a populated component.yaml file:

my_component_library/components/my_shell_command/component.yaml
type: my_component_library.lib.ShellCommand

attributes:
script_path: script.sh
asset_specs:
- key: my_asset
description: Output of running a script
my_component_library/components/my_shell_command/script.sh
#!/bin/bash

echo 'Hello, world!'

Providing resolution logic for non-standard types

In most cases, the types you use in your component schema and in the component class will be the same, or will have out-of-the-box resolution logic, as in the case of ResolvedAssetSpec.

However, in some cases, you may want to use a type that doesn't have an existing schema equivalent. In that case, you can provide a function that will resolve the value to the desired type by providing an annotation on the field with Annotated[<type>, Resolver(...)].

For example, to provide an API client to a component, which can be configured with an API key in YAML, or a mock client in tests, you would do the following:

from dataclasses import dataclass
from typing import Annotated

import dagster as dg
from dagster.components import (
Component,
ComponentLoadContext,
ResolutionContext,
Resolvable,
Resolver,
)


class MyApiClient:
def __init__(self, api_key: str): ...


def resolve_api_key(
context: ResolutionContext,
api_key: str,
) -> MyApiClient:
return MyApiClient(api_key=api_key)


@dataclass
class MyComponent(Component, Resolvable):
# Resolver specifies a function used to map input from the model
# to a value for this field
api_client: Annotated[
MyApiClient,
Resolver(
resolve_api_key,
model_field_name="api_key",
model_field_type=str,
),
]

def build_defs(self, context: ComponentLoadContext) -> dg.Definitions: ...

Customizing rendering of YAML values

The components system supports a rich templating syntax that allows you to load arbitrary Python values based off of your component.yaml file. All string values in a Resolvable can be templated using the Jinja2 templating engine, and may be resolved into arbitrary Python types. This allows you to expose complex object types, such as PartitionsDefinition or AutomationCondition to users of your component, even if they're working in pure YAML.

You can define custom values that will be made available to the templating engine by defining a get_additional_scope classmethod on your component. In our case, we can define a "daily_partitions" function which returns a DailyPartitionsDefinition object with a pre-defined start date:

import subprocess
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Any

import dagster as dg
from dagster.components import (
Component,
ComponentLoadContext,
Resolvable,
ResolvedAssetSpec,
)


@dataclass
class ShellCommand(Component, Resolvable):
script_path: str
asset_specs: Sequence[ResolvedAssetSpec]

@classmethod
def get_additional_scope(cls) -> Mapping[str, Any]:
return {
"daily_partitions": dg.DailyPartitionsDefinition(start_date="2024-01-01")
}

def build_defs(self, context: ComponentLoadContext) -> dg.Definitions:
@dg.multi_asset(name=Path(self.script_path).stem, specs=self.asset_specs)
def _asset(context: dg.AssetExecutionContext):
self.execute(context)

return dg.Definitions(assets=[_asset])

def execute(self, context: dg.AssetExecutionContext):
return subprocess.run(["sh", self.script_path], check=True)

When a user instantiates this component, they will be able to use this custom scope in their component.yaml file:

component_type: my_component

attributes:
script_path: script.sh
asset_specs:
- key: a
partitions_def: '{{ daily_partitions }}'