Advanced component type customization
dg
and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.
Prerequisites
Before following the steps below, you will need to create and register a component type.
Customizing scaffolding behavior
By default, when you instantiate a component type, dg scaffold
will create a new directory alongside an unpopulated component.yaml
file. However, you can customize this behavior by decorating your component type with @scaffold_with
.
In the example below, the custom ShellCommandScaffolder
class scaffolds a template shell script alongside a populated component.yaml
file:
import os
import subprocess
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import dagster as dg
from dagster.components import (
Component,
ComponentLoadContext,
Resolvable,
ResolvedAssetSpec,
Scaffolder,
scaffold_component,
)
from dagster.components.scaffold.scaffold import ScaffoldRequest, scaffold_with
class ShellCommandScaffolder(Scaffolder):
"""Scaffolds a template shell script alongside a filled-out component YAML file."""
def scaffold(self, request: ScaffoldRequest) -> None:
scaffold_component(
request,
{
"script_path": "script.sh",
"asset_specs": [
{"key": "my_asset", "description": "Output of running a script"}
],
},
)
script_path = Path(request.target_path) / "script.sh"
script_path.write_text("#!/bin/bash\n\necho 'Hello, world!'")
os.chmod(script_path, 0o755)
@scaffold_with(ShellCommandScaffolder)
@dataclass
class ShellCommand(Component, Resolvable):
"""Models a shell script as a Dagster asset."""
script_path: str
asset_specs: Sequence[ResolvedAssetSpec]
def build_defs(self, context: ComponentLoadContext) -> dg.Definitions:
resolved_script_path = Path(context.path, self.script_path).absolute()
@dg.multi_asset(name=Path(self.script_path).stem, specs=self.asset_specs)
def _asset(context: dg.AssetExecutionContext):
self.execute(resolved_script_path, context)
return dg.Definitions(assets=[_asset])
def execute(self, resolved_script_path: Path, context: dg.AssetExecutionContext):
return subprocess.run(["sh", str(resolved_script_path)], check=True)
Now, when you run dg scaffold
, you should see a template shell script created along with a populated component.yaml
file:
type: my_component_library.lib.ShellCommand
attributes:
script_path: script.sh
asset_specs:
- key: my_asset
description: Output of running a script
#!/bin/bash
echo 'Hello, world!'
Providing resolution logic for non-standard types
In most cases, the types you use in your component schema and in the component class will be the same, or will have out-of-the-box resolution logic, as in the case of ResolvedAssetSpec
.
However, in some cases, you may want to use a type that doesn't have an existing schema equivalent. In that case, you can provide a function that will resolve the value to the desired type by providing an annotation on the field with Annotated[<type>, Resolver(...)]
.
For example, to provide an API client to a component, which can be configured with an API key in YAML, or a mock client in tests, you would do the following:
from dataclasses import dataclass
from typing import Annotated
import dagster as dg
from dagster.components import (
Component,
ComponentLoadContext,
ResolutionContext,
Resolvable,
Resolver,
)
class MyApiClient:
def __init__(self, api_key: str): ...
def resolve_api_key(
context: ResolutionContext,
api_key: str,
) -> MyApiClient:
return MyApiClient(api_key=api_key)
@dataclass
class MyComponent(Component, Resolvable):
# Resolver specifies a function used to map input from the model
# to a value for this field
api_client: Annotated[
MyApiClient,
Resolver(
resolve_api_key,
model_field_name="api_key",
model_field_type=str,
),
]
def build_defs(self, context: ComponentLoadContext) -> dg.Definitions: ...
Customizing rendering of YAML values
The components system supports a rich templating syntax that allows you to load arbitrary Python values based off of your component.yaml
file. All string values in a Resolvable
can be templated using the Jinja2 templating engine, and may be resolved into arbitrary Python types. This allows you to expose complex object types, such as PartitionsDefinition
or AutomationCondition
to users of your component, even if they're working in pure YAML.
You can define custom values that will be made available to the templating engine by defining a get_additional_scope
classmethod on your component. In our case, we can define a "daily_partitions"
function which returns a DailyPartitionsDefinition
object with a pre-defined start date:
import subprocess
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import dagster as dg
from dagster.components import (
Component,
ComponentLoadContext,
Resolvable,
ResolvedAssetSpec,
)
@dataclass
class ShellCommand(Component, Resolvable):
script_path: str
asset_specs: Sequence[ResolvedAssetSpec]
@classmethod
def get_additional_scope(cls) -> Mapping[str, Any]:
return {
"daily_partitions": dg.DailyPartitionsDefinition(start_date="2024-01-01")
}
def build_defs(self, context: ComponentLoadContext) -> dg.Definitions:
@dg.multi_asset(name=Path(self.script_path).stem, specs=self.asset_specs)
def _asset(context: dg.AssetExecutionContext):
self.execute(context)
return dg.Definitions(assets=[_asset])
def execute(self, context: dg.AssetExecutionContext):
return subprocess.run(["sh", self.script_path], check=True)
When a user instantiates this component, they will be able to use this custom scope in their component.yaml
file:
component_type: my_component
attributes:
script_path: script.sh
asset_specs:
- key: a
partitions_def: '{{ daily_partitions }}'