Creating and registering a component type
dg
and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.
The components system makes it easy to create new component types that you and your teammates can reuse across your Dagster project.
In most cases, component types map to a specific technology. For example, you might create a DockerScriptComponent
that executes a script in a Docker container, or a SnowflakeQueryComponent
that runs a query on Snowflake.
Prerequisites
Before creating and registering custom component types, you will need to create a components-ready project.
Creating a new component type
For this example, we'll create a ShellCommand
component type that executes a shell command.
1. Create the new component type file
First, use the dg scaffold component-type
command to scaffold the ShellCommand
component type:
dg scaffold component-type ShellCommand
Creating a Dagster component type at /.../my-component-library/src/my_component_library/lib/shell_command.py.
Scaffolded files for Dagster component type at /.../my-component-library/src/my_component_library/lib/shell_command.py.
This will add a new file to the lib
directory of your Dagster project that contains the basic structure for the new component type:
import dagster as dg
from dagster.components import Component, ComponentLoadContext, Model, Resolvable
class ShellCommand(Component, Model, Resolvable):
"""COMPONENT SUMMARY HERE.
COMPONENT DESCRIPTION HERE.
"""
# added fields here will define yaml schema via Model
def build_defs(self, context: ComponentLoadContext) -> dg.Definitions:
# Add definition construction logic here.
return dg.Definitions()
Model
is used to implement a YAML interface for a component type. If your component type only needs a Pythonic interface, you can use the --no-model
flag when creating it:
dg scaffold component-type ShellCommand --no-model
This will allow you to implement an __init__
method for your class, either manually or by using @dataclasses.dataclass
.
2. Update the component type Python class
The next step is to define the information the component type needs when it is instantiated.
The ShellCommand
component type will need the following to be defined:
- The path to the shell script to be run
- The assets the shell script is expected to produce
The ShellCommand
class inherits from Resolvable
, in addition to Component
. Resolvable
handles deriving a YAML schema for the ShellCommand
class based on what the class is annotated with. To simplify common use cases, Dagster provides annotations for common bits of configuration, such as ResolvedAssetSpec
, which will handle exposing a schema for defining AssetSpecs
from YAML and resolving them before instantiating the component.
You can define the schema for the ShellCommand
component and add it to the ShellCommand
class as follows:
from collections.abc import Sequence
from dataclasses import dataclass
import dagster as dg
from dagster.components import (
Component,
ComponentLoadContext,
Resolvable,
ResolvedAssetSpec,
)
@dataclass
class ShellCommand(Component, Resolvable):
"""Models a shell script as a Dagster asset."""
script_path: str
asset_specs: Sequence[ResolvedAssetSpec]
def build_defs(self, context: ComponentLoadContext) -> dg.Definitions: ...
Additionally, you can include metadata for your component by overriding the get_spec
method. This allows you to set fields like owners
and tags
that will be visible in the generated documentation:
from collections.abc import Sequence
import dagster as dg
from dagster.components import (
Component,
ComponentLoadContext,
ComponentTypeSpec,
Resolvable,
ResolvedAssetSpec,
)
class ShellCommand(Component, Resolvable):
"""Models a shell script as a Dagster asset."""
@classmethod
def get_spec(cls):
return ComponentTypeSpec(
owners=["John Dagster"],
tags=["shell", "script"],
)
def __init__(
self,
script_path: str,
asset_specs: Sequence[ResolvedAssetSpec],
):
self.script_path = script_path
self.asset_specs = asset_specs
def build_defs(self, context: ComponentLoadContext) -> dg.Definitions: ...
When defining a field on a component that isn't on the schema, or is of a different type, the components system allows you to provide custom resolution logic for that field. For more information, see "Providing resolution logic for non-standard types".
3. Update the build_defs
method
Next, you'll need to define how to turn the component parameters into a Definitions
object.
To do so, you will need to update the build_defs
method, which is responsible for returning a Definitions
object containing all definitions related to the component.
In this example, the build_defs
method creates a single @asset
that executes the provided shell script. By convention, the code to execute this asset is placed inside of a function called execute
, which will make it easier for future developers to create subclasses of this component:
import subprocess
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
import dagster as dg
from dagster.components import (
Component,
ComponentLoadContext,
Resolvable,
ResolvedAssetSpec,
)
@dataclass
class ShellCommand(Component, Resolvable):
"""Models a shell script as a Dagster asset."""
script_path: str
asset_specs: Sequence[ResolvedAssetSpec]
def build_defs(self, context: ComponentLoadContext) -> dg.Definitions:
resolved_script_path = Path(context.path, self.script_path).absolute()
@dg.multi_asset(name=Path(self.script_path).stem, specs=self.asset_specs)
def _asset(context: dg.AssetExecutionContext):
self.execute(resolved_script_path, context)
return dg.Definitions(assets=[_asset])
def execute(self, resolved_script_path: Path, context: dg.AssetExecutionContext):
return subprocess.run(["sh", str(resolved_script_path)], check=True)
Registering a new component type
Following the steps above will automatically register your component type in your environment. To see your new component type in the list of available component types, run dg list plugins
:
dg list plugins
Using /.../my-component-library/.venv/bin/dagster-components
┏━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Plugin ┃ Objects ┃
┡━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ dagster │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓ │
│ │ ┃ Symbol ┃ Summary ┃ Features ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩ │
│ │ │ dagster.asset │ Create a │ [scaffold-t… │ │
│ │ │ │ definition │ │ │
│ │ │ │ for how to │ │ │
│ │ │ │ compute an │ │ │
│ │ │ │ asset. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.asset_check │ Create a │ [scaffold-t… │ │
│ │ │ │ definition │ │ │
│ │ │ │ for how to │ │ │
│ │ │ │ execute an │ │ │
│ │ │ │ asset check. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.components.DefinitionsComponent │ An arbitrary │ [component, │ │
│ │ │ │ set of │ scaffold-ta… │ │
│ │ │ │ dagster │ │ │
│ │ │ │ definitions. │ │ │
│ │ ├────── ───────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.components.DefsFolderComponent │ A folder │ [component, │ │
│ │ │ │ which may │ scaffold-ta… │ │
│ │ │ │ contain │ │ │
│ │ │ │ multiple │ │ │
│ │ │ │ submodules, │ │ │
│ │ │ │ each │ │ │
│ │ │ │ which define │ │ │
│ │ │ │ components. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.components.PipesSubprocessScriptCollectionComponent │ Assets that │ [component, │ │
│ │ │ │ wrap Python │ scaffold-ta… │ │
│ │ │ │ scripts │ │ │
│ │ │ │ executed │ │ │
│ │ │ │ with │ │ │
│ │ │ │ Dagster's │ │ │
│ │ │ │ PipesSubpro… │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.job │ Creates a │ [scaffold-t… │ │
│ │ │ │ job with the │ │ │
│ │ │ │ specified │ │ │
│ │ │ │ parameters │ │ │
│ │ │ │ from the │ │ │
│ │ │ │ decorated │ │ │
│ │ │ │ graph/op │ │ │
│ │ │ │ invocation │ │ │
│ │ │ │ function. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.multi_asset │ Create a │ [scaffold-t… │ │
│ │ │ │ combined │ │ │
│ │ │ │ definition │ │ │
│ │ │ │ of multiple │ │ │
│ │ │ │ assets that │ │ │
│ │ │ │ are computed │ │ │
│ │ │ │ using the │ │ │
│ │ │ │ same op and │ │ │
│ │ │ │ same │ │ │
│ │ │ │ upstream │ │ │
│ │ │ │ assets. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.resources │ Symbol for │ [scaffold-t… │ │
│ │ │ │ dg scaffold │ │ │
│ │ │ │ to target. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.schedule │ Creates a │ [scaffold-t… │ │
│ │ │ │ schedule │ │ │
│ │ │ │ following │ │ │
│ │ │ │ the provided │ │ │
│ │ │ │ cron │ │ │
│ │ │ │ schedule and │ │ │
│ │ │ │ requests │ │ │
│ │ │ │ runs for the │ │ │
│ │ │ │ provided │ │ │
│ │ │ │ job. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────┼──────────────┤ │
│ │ │ dagster.sensor │ Creates a │ [scaffold-t… │ │
│ │ │ │ sensor where │ │ │
│ │ │ │ the │ │ │
│ │ │ │ decorated │ │ │
│ │ │ │ function is │ │ │
│ │ │ │ used as the │ │ │
│ │ │ │ sensor's │ │ │
│ │ │ │ evaluation │ │ │
│ │ │ │ function. │ │ │
│ │ └─────────────────────────────────────────────────────────────┴──────────────┴──────────────┘ │
│ my_component_library │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━ ━━━━┓ │
│ │ ┃ Symbol ┃ Summary ┃ Features ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ my_component_library.lib.ShellCommand │ Models a shell script │ [component, │ │
│ │ │ │ as a Dagster asset. │ scaffold-target] │ │
│ │ └───────────────────────────────────────┴─────────────────────────┴─────────────────────────┘ │
└──────────────────────┴───────────────────────────────────────────────────────────────────────────────────────────────┘
You can also view automatically generated documentation describing your new component type by running dg docs serve
:
dg docs serve
Instantiating the new component type in your project
After you register your new component type, you can instantiate and use the component in your Dagster project with the dg scaffold
command:
dg scaffold 'my_component_library.lib.ShellCommand' my_shell_command
Using /.../my-component-library/.venv/bin/dagster-components
Using /.../my-component-library/.venv/bin/dagster-components