Creating a New Component Type
Refer to the project structuring guide to learn how to create a components-compatible project.
The dagster-components
system makes it easy to create new component types that can be reused across your project.
In most cases, component types map to a specific technology. For example, you might have a DockerScriptComponent
that executes a script in a Docker container, or a SnowflakeQueryComponent
that runs a query on Snowflake.
Making a component library
To let the dg
cli know that your Python package contains component types, you'll want to update your pyproject.toml
file with the following configuration:
[tool.dg]
is_component_lib = true
By default, it is assumed that all components types will be defined in your_package.lib
. If you'd like to define your components in a different directory, you can specify this in your pyproject.toml
file:
[tool.dg]
is_component_lib = true
component_lib_package="your_package.other_module"
Once this is done, as long as this package is installed in your environment, you'll be able to use the dg
command-line utility to interact with your component types.
Scaffolding a new component type
For this example, we'll write a lightweight component that executes a shell command.
First, we use the dg
command-line utility to scaffold a new component type:
dg component-type generate shell_command
This will add a new file to your project in the lib
directory:
from dagster_components import (
Component,
ComponentLoadContext,
registered_component_type,
)
from pydantic import BaseModel
from dagster import Definitions
@registered_component_type(name="shell_command")
class ShellCommand(Component):
@classmethod
def get_schema(cls) -> type[BaseModel]: ...
def build_defs(self, load_context: ComponentLoadContext) -> Definitions: ...
This file contains the basic structure for the new component type. There are two methods that you'll need to implement:
get_schema
: This method should return a Pydantic model that defines the schema for the component. This is the schema for the data that goes intocomponent.yaml
.load
: This method takes the loading context and returns an instance of the component class. This is where you'll load the parameters from thecomponent.yaml
file.build_defs
: This method should return aDefinitions
object for this component.
Defining a schema
The first step is to define a schema for the component. This means determining what aspects of the component should be customizable.
In this case, we'll want to define a few things:
- The path to the shell script that we'll want to run.
- The attributes of the asset that we expect this script to produce.
- Any tags or configuration related to the underlying compute.
To simplify common use cases, dagster-components
provides schemas for common bits of configuration:
AssetSpecSchema
: This contains attributes that are common to all assets, such as the key, description, tags, and dependencies.OpSpecSchema
: This contains attributes specific to an underlying operation, such as the name and tags.
We can the schema for our component and add it to our class as follows:
from collections.abc import Sequence
from typing import Optional
from dagster_components import (
AssetSpecSchema,
Component,
ComponentLoadContext,
ComponentSchema,
OpSpecSchema,
registered_component_type,
)
from pydantic import BaseModel
import dagster as dg
class ShellScriptSchema(ComponentSchema):
script_path: str
asset_specs: Sequence[AssetSpecSchema]
op: Optional[OpSpecSchema] = None
@registered_component_type(name="shell_command")
class ShellCommand(Component):
def __init__(
self,
script_path: str,
asset_specs: Sequence[dg.AssetSpec],
op: Optional[OpSpecSchema] = None,
):
self.script_path = script_path
self.specs = asset_specs
self.op = op or OpSpecSchema()
@classmethod
def get_schema(cls) -> type[ShellScriptSchema]:
return ShellScriptSchema
def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions: ...
Because the argument names in the schema match the names of the arguments in the ShellCommandComponent
class, the load
method will automatically populate the class with the values from the schema, and will automatically resolve the AssetSpecSchema
s into AssetSpec
objects.
Building definitions
Now that we've defined how the component is parameterized, we need to define how to turn those parameters into a Definitions
object.
To do so, there are two methods that need to be overridden:
load
: This method is responsible for loading the configuration from thecomponent.yaml
file into the schema, from which it creates an instance of the component class.build_defs
: This method is responsible for returning aDefinitions
object containing all definitions related to the component.
In our case, our load
method will check the loaded parameters against our schema and then instantiate our class from those parameters.
Our build_defs
method will create a single @asset
that executes the provided shell script. By convention, we'll put the code to actually execute this asset inside of a function called execute
. This makes it easier for future developers to create subclasses of this component.
import subprocess
from collections.abc import Sequence
from typing import Optional
from dagster_components import (
AssetSpecSchema,
Component,
ComponentLoadContext,
ComponentSchema,
OpSpecSchema,
registered_component_type,
)
import dagster as dg
class ShellScriptSchema(ComponentSchema):
script_path: str
asset_specs: Sequence[AssetSpecSchema]
op: Optional[OpSpecSchema] = None
@registered_component_type(name="shell_command")
class ShellCommand(Component):
def __init__(
self,
script_path: str,
asset_specs: Sequence[dg.AssetSpec],
op: Optional[OpSpecSchema] = None,
):
self.script_path = script_path
self.specs = asset_specs
self.op = op or OpSpecSchema()
@classmethod
def get_schema(cls) -> type[ShellScriptSchema]:
return ShellScriptSchema
def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions:
@dg.multi_asset(specs=self.specs, op_tags=self.op.tags, name=self.op.name)
def _asset(context: dg.AssetExecutionContext):
self.execute(context)
return dg.Definitions(assets=[_asset])
def execute(self, context: dg.AssetExecutionContext):
subprocess.run(["sh", self.script_path], check=True)
Component registration
Following the steps above will automatically register your component type in your environment. You can now run:
dg component-type list
and see your new component type in the list of available component types.
You can also view automatically generated documentation describing your new component type by running:
dg component-type docs your_library.shell_command
[Advanced] Custom templating
The components system supports a rich templating syntax that allows you to load arbitrary Python values based off of your component.yaml
file.
When creating the schema for your component, you can specify custom output types that should be resolved at runtime. This allows you to expose complex object types, such as PartitionsDefinition
or AutomationCondition
to users of your component, even if they're working in pure YAML.
Defining a resolvable field
When creating a schema for your component, if you have a field that should have some custom resolution logic, you can annotate that field with the ResolvableFieldInfo
class. This allows you to specify:
- The output type of the field
- Any post-processing that should be done on the resolved value of that field
- Any additional scope that will be available to use when resolving that field
from typing import Annotated, Optional
from dagster_components import ResolvableFieldInfo
from dagster_components.core.schema.objects import AssetAttributesSchema, OpSpecSchema
from pydantic import BaseModel
class ShellScriptSchema(BaseModel):
script_path: str
asset_attributes: AssetAttributesSchema
script_runner: Annotated[
str, ResolvableFieldInfo(required_scope={"get_script_runner"})
]
op: Optional[OpSpecSchema] = None
Resolving fields
Once you've defined a resolvable field, you'll need to implement the logic to actually resolve it into the desired Python value.
The ComponentSchemaBaseModel
class supports a resolve_properties
method, which returns a dictionary of resolved properties for your component. This method accepts a templated_value_resolver
, which holds any available scope that is available for use in the template.
If your resolvable field requires additional scope to be available, you can do so by using the with_scope
method on the templated_value_resolver
. This scope can be anything, such as a dictionary of properties related to an asset, or a function that returns a complex object type.
from collections.abc import Sequence
from dagster_components import (
AssetSpecSchema,
Component,
ComponentSchema,
registered_component_type,
)
class ShellCommandParams(ComponentSchema):
path: str
asset_specs: Sequence[AssetSpecSchema]
@registered_component_type(name="shell_command")
class ShellCommand(Component): ...
The ComponentSchemaBaseModel
class will ensure that the output type of the resolved field matches the type specified in the ResolvableFieldInfo
annotation.
When a user instantiates a component, they will be able to use your custom scope in their component.yaml
file:
component_type: my_component
params:
script_path: script.sh
script_runner: "{{ get_script_runner('arg') }}"
Next steps
- Add a new component to your project