Creating and registering a component type
This feature is considered in a preview stage and is under active development. There may be API changes and feature gaps. Please go to the #dg-components channel in our Slack to report issues or give feedback.
The components system makes it easy to create new component types that can be reused across your project.
In most cases, component types map to a specific technology. For example, you might have a DockerScriptComponent
that executes a script in a Docker container, or a SnowflakeQueryComponent
that runs a query on Snowflake.
Refer to the project structuring guide to learn how to create a components-compatible project.
Scaffolding component type files
For this example, we'll write a lightweight component that executes a shell command.
First, we use the dg
command-line utility to scaffold a new component type:
dg scaffold component-type ShellCommand
Creating a Dagster component type at /.../my-component-library/src/my_component_library/lib/shell_command.py.
Scaffolded files for Dagster component type at /.../my-component-library/src/my_component_library/lib/shell_command.py.
This will add a new file to your project in the lib
directory:
import dagster as dg
from dagster.components import Component, ComponentLoadContext, Model, Resolvable
class ShellCommand(Component, Model, Resolvable):
"""COMPONENT SUMMARY HERE.
COMPONENT DESCRIPTION HERE.
"""
# added fields here will define yaml schema via Model
def build_defs(self, context: ComponentLoadContext) -> dg.Definitions:
# Add definition construction logic here.
return dg.Definitions()
This file contains the basic structure for the new component type. Our goal is to implement the build_defs
method to return a Definitions
. This will require some input which we will define as what our component class is instantiated with.
The use of Model
is optional if you only want a Pythonic interface to the component. If you wish to implement an __init__
method for your class (manually or using @dataclasses.dataclass
), you can provide the --no-model
flag to the dg scaffold
command.
Defining the Python class
The first step is to define what information this component needs. This means determining what aspects of the component should be customizable.
In this case, we'll want to define a few things:
- The path to the shell script that we'll want to run.
- The assets that we expect this script to produce.
Our class inherits from Resolvable
in addition to Component
. This will handle deriving a yaml schema for our class based on what the class is annotated with. To simplify common use cases, Dagster provides annotations for common bits of configuration, such as ResolvedAssetSpec
, which will handle exposing a schema for defining AssetSpec
s from yaml and resolving them before instantiating our component.
We can define the schema for our component and add it to our class as follows:
from collections.abc import Sequence
from dataclasses import dataclass
import dagster as dg
from dagster.components import (
Component,
ComponentLoadContext,
Resolvable,
ResolvedAssetSpec,
)
@dataclass
class ShellCommand(Component, Resolvable):
"""Models a shell script as a Dagster asset."""
script_path: str
asset_specs: Sequence[ResolvedAssetSpec]
def build_defs(self, context: ComponentLoadContext) -> dg.Definitions: ...
Additionally, it's possible to include metadata for your Component by overriding the get_component_type_metadata
method. This allows you to set fields like owners
and tags
that will be visible in the generated documentation.
from collections.abc import Sequence
import dagster as dg
from dagster.components import (
Component,
ComponentLoadContext,
ComponentTypeSpec,
Resolvable,
ResolvedAssetSpec,
)
class ShellCommand(Component, Resolvable):
"""Models a shell script as a Dagster asset."""
@classmethod
def get_spec(cls):
return ComponentTypeSpec(
owners=["John Dagster"],
tags=["shell", "script"],
)
def __init__(
self,
script_path: str,
asset_specs: Sequence[ResolvedAssetSpec],
):
self.script_path = script_path
self.asset_specs = asset_specs
def build_defs(self, context: ComponentLoadContext) -> dg.Definitions: ...
When defining a field on a component that isn't on the schema, or is of a different type, the components system allows you to provide custom resolution logic for that field. See the Providing resolution logic for non-standard types section for more information.
Building definitions
Now that we've defined how the component is parameterized, we need to define how to turn those parameters into a Definitions
object.
To do so, we'll want to override the build_defs
method, which is responsible for returning a Definitions
object containing all definitions related to the component.
Our build_defs
method will create a single @asset
that executes the provided shell script. By convention, we'll put the code to actually execute this asset inside of a function called execute
. This makes it easier for future developers to create subclasses of this component.
import subprocess
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
import dagster as dg
from dagster.components import (
Component,
ComponentLoadContext,
Resolvable,
ResolvedAssetSpec,
)
@dataclass
class ShellCommand(Component, Resolvable):
"""Models a shell script as a Dagster asset."""
script_path: str
asset_specs: Sequence[ResolvedAssetSpec]
def build_defs(self, context: ComponentLoadContext) -> dg.Definitions:
resolved_script_path = Path(context.path, self.script_path).absolute()
@dg.multi_asset(name=Path(self.script_path).stem, specs=self.asset_specs)
def _asset(context: dg.AssetExecutionContext):
self.execute(resolved_script_path, context)
return dg.Definitions(assets=[_asset])
def execute(self, resolved_script_path: Path, context: dg.AssetExecutionContext):
return subprocess.run(["sh", str(resolved_script_path)], check=True)
Component registration
Following the steps above will automatically register your component type in your environment. You can now run:
dg list plugins
Using /.../my-component-library/.venv/bin/dagster-components
┏━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Plugin ┃ Objects ┃
┡━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ dagster │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓ │
│ │ ┃ Symbol ┃ Summary ┃ Features ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩ │
│ │ │ dagster.asset │ Create a │ [scaffold-t… │ │
│ │ │ │ definition │ │ │
│ │ │ │ for how to │ │ │
│ │ │ │ compute an │ │ │
│ │ │ │ asset. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.asset_check │ Create a │ [scaffold-t… │ │
│ │ │ │ definition │ │ │
│ │ │ │ for how to │ │ │
│ │ │ │ execute an │ │ │
│ │ │ │ asset check. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.components.DefinitionsComponent │ An arbitrary │ [component, │ │
│ │ │ │ set of │ scaffold-ta… │ │
│ │ │ │ dagster │ │ │
│ │ │ │ definitions. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.components.DefsFolderComponent │ A folder │ [component, │ │
│ │ │ │ which may │ scaffold-ta… │ │
│ │ │ │ contain │ │ │
│ │ │ │ multiple │ │ │
│ │ │ │ submodules, │ │ │
│ │ │ │ each │ │ │
│ │ │ │ which define │ │ │
│ │ │ │ components. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.components.PipesSubprocessScriptCollectionComponent │ Assets that │ [component, │ │
│ │ │ │ wrap Python │ scaffold-ta… │ │
│ │ │ │ scripts │ │ │
│ │ │ │ executed │ │ │
│ │ │ │ with │ │ │
│ │ │ │ Dagster's │ │ │
│ │ │ │ PipesSubpro… │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.multi_asset │ Create a │ [scaffold-t… │ │
│ │ │ │ combined │ │ │
│ │ │ │ definition │ │ │
│ │ │ │ of multiple │ │ │
│ │ │ │ assets that │ │ │
│ │ │ │ are computed │ │ │
│ │ │ │ using the │ │ │
│ │ │ │ same op and │ │ │
│ │ │ │ same │ │ │
│ │ │ │ upstream │ │ │
│ │ │ │ assets. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.schedule │ Creates a │ [scaffold-t… │ │
│ │ │ │ schedule │ │ │
│ │ │ │ following │ │ │
│ │ │ │ the provided │ │ │
│ │ │ │ cron │ │ │
│ │ │ │ schedule and │ │ │
│ │ │ │ requests │ │ │
│ │ │ │ runs for the │ │ │
│ │ │ │ provided │ │ │
│ │ │ │ job. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.sensor │ Creates a │ [scaffold-t… │ │
│ │ │ │ sensor where │ │ │
│ │ │ │ the │ │ │
│ │ │ │ decorated │ │ │
│ │ │ │ function is │ │ │
│ │ │ │ used as the │ │ │
│ │ │ │ sensor's │ │ │
│ │ │ │ evaluation │ │ │
│ │ │ │ function. │ │ │
│ │ └─────────────────────────────────────────────────────────────┴──────────────┴──────────────┘ │
│ my_component_library │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Symbol ┃ Summary ┃ Features ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ my_component_library.lib.ShellCommand │ Models a shell script │ [component, │ │
│ │ │ │ as a Dagster asset. │ scaffold-target] │ │
│ │ └───────────────────────────────────────┴─────────────────────────┴─────────────────────────┘ │
└──────────────────────┴───────────────────────────────────────────────────────────────────────────────────────────────┘
and see your new component type in the list of available component types.
You can also view automatically generated documentation describing your new component type by running:
dg docs serve
Now, you can use this component type to create new component instances.
Configuring custom scaffolding
Once your component type is registered, instances of the component type can be scaffolded using the dg scaffold
command:
dg scaffold 'my_component_library.lib.ShellCommand' my_shell_command
Using /.../my-component-library/.venv/bin/dagster-components
Using /.../my-component-library/.venv/bin/dagster-components
By default, this will create a new directory alongside an unpopulated component.yaml
file. However, you can customize this behavior by decorating your component_type with scaffoldable
.
In this case, we might want to scaffold a template shell script alongside a filled-out component.yaml
file, which we accomplish with a custom scaffolder:
import os
import subprocess
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import dagster as dg
from dagster.components import (
Component,
ComponentLoadContext,
Resolvable,
ResolvedAssetSpec,
Scaffolder,
ScaffoldRequest,
scaffold_component,
)
from dagster.components.scaffold.scaffold import scaffold_with
class ShellCommandScaffolder(Scaffolder):
"""Scaffolds a template shell script alongside a filled-out component YAML file."""
def scaffold(self, request: ScaffoldRequest, params: Any) -> None:
scaffold_component(
request,
{
"script_path": "script.sh",
"asset_specs": [
{"key": "my_asset", "description": "Output of running a script"}
],
},
)
script_path = Path(request.target_path) / "script.sh"
script_path.write_text("#!/bin/bash\n\necho 'Hello, world!'")
os.chmod(script_path, 0o755)
@scaffold_with(ShellCommandScaffolder)
@dataclass
class ShellCommand(Component, Resolvable):
"""Models a shell script as a Dagster asset."""
script_path: str
asset_specs: Sequence[ResolvedAssetSpec]
def build_defs(self, context: ComponentLoadContext) -> dg.Definitions:
resolved_script_path = Path(context.path, self.script_path).absolute()
@dg.multi_asset(name=Path(self.script_path).stem, specs=self.asset_specs)
def _asset(context: dg.AssetExecutionContext):
self.execute(resolved_script_path, context)
return dg.Definitions(assets=[_asset])
def execute(self, resolved_script_path: Path, context: dg.AssetExecutionContext):
return subprocess.run(["sh", str(resolved_script_path)], check=True)
Now, when we run dg scaffold
, we'll see that a template shell script is created alongside a filled-out component.yaml
file:
type: my_component_library.lib.ShellCommand
attributes:
script_path: script.sh
asset_specs:
- key: my_asset
description: Output of running a script
#!/bin/bash
echo 'Hello, world!'
[Advanced] Providing resolution logic for non-standard types
In most cases, the types you use in your component schema and in the component class will be the same, or will have out-of-the-box resolution logic, as in the case of ResolvedAssetSpec
.
However, in some cases you may want to use a type that doesn't have an existing schema equivalent. In this case, you can provide a function that will resolve the value to the desired type by providing an annotation on the field with Annotated[<type>, Resolver(...)]
.
For example, we might want to provide an API client to our component, which can be configured with an API key in YAML, or a mock client in tests:
from dataclasses import dataclass
from typing import Annotated
import dagster as dg
from dagster.components import (
Component,
ComponentLoadContext,
ResolutionContext,
Resolvable,
Resolver,
)
class MyApiClient:
def __init__(self, api_key: str): ...
def resolve_api_key(
context: ResolutionContext,
api_key: str,
) -> MyApiClient:
return MyApiClient(api_key=api_key)
@dataclass
class MyComponent(Component, Resolvable):
# Resolver specifies a function used to map input from the model
# to a value for this field
api_client: Annotated[
MyApiClient,
Resolver(
resolve_api_key,
model_field_name="api_key",
model_field_type=str,
),
]
def build_defs(self, context: ComponentLoadContext) -> dg.Definitions: ...
[Advanced] Customize rendering of YAML values
The components system supports a rich templating syntax that allows you to load arbitrary Python values based off of your component.yaml
file. All string values in a Resolvable
can be templated using the Jinja2 templating engine, and may be resolved into arbitrary Python types. This allows you to expose complex object types, such as PartitionsDefinition
or AutomationCondition
to users of your component, even if they're working in pure YAML.
You can define custom values that will be made available to the templating engine by defining a get_additional_scope
classmethod on your component. In our case, we can define a "daily_partitions"
function which returns a DailyPartitionsDefinition
object with a pre-defined start date:
import subprocess
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import dagster as dg
from dagster.components import (
Component,
ComponentLoadContext,
Resolvable,
ResolvedAssetSpec,
)
@dataclass
class ShellCommand(Component, Resolvable):
script_path: str
asset_specs: Sequence[ResolvedAssetSpec]
@classmethod
def get_additional_scope(cls) -> Mapping[str, Any]:
return {
"daily_partitions": dg.DailyPartitionsDefinition(start_date="2024-01-01")
}
def build_defs(self, context: ComponentLoadContext) -> dg.Definitions:
@dg.multi_asset(name=Path(self.script_path).stem, specs=self.asset_specs)
def _asset(context: dg.AssetExecutionContext):
self.execute(context)
return dg.Definitions(assets=[_asset])
def execute(self, context: dg.AssetExecutionContext):
return subprocess.run(["sh", self.script_path], check=True)
When a user instantiates this component, they will be able to use this custom scope in their component.yaml
file:
component_type: my_component
attributes:
script_path: script.sh
asset_specs:
- key: a
partitions_def: '{{ daily_partitions }}'