Creating a new component type
This feature is still in development and might change in patch releases. It’s not production-ready, and the documentation may also evolve. Stay tuned for updates.
The dagster-components
system makes it easy to create new component types that can be reused across your project.
In most cases, component types map to a specific technology. For example, you might have a DockerScriptComponent
that executes a script in a Docker container, or a SnowflakeQueryComponent
that runs a query on Snowflake.
Refer to the project structuring guide to learn how to create a components-compatible project.
Making a component library
To let the dg
cli know that your Python package contains component types, you'll want to update your pyproject.toml
file with the following configuration:
[tool.dg]
is_component_lib = true
By default, it is assumed that all components types will be defined in your_package.lib
. If you'd like to define your components in a different directory, you can specify this in your pyproject.toml
file:
[tool.dg]
is_component_lib = true
component_lib_package="your_package.other_module"
Once this is done, as long as this package is installed in your environment, you'll be able to use the dg
command-line utility to interact with your component types.
Scaffolding a new component type
For this example, we'll write a lightweight component that executes a shell command.
First, we use the dg
command-line utility to scaffold a new component type:
dg component-type scaffold shell_command
Creating a Dagster component type at /.../my-component-library/my_component_library/lib/shell_command.py.
Scaffolded files for Dagster project in /.../my-component-library/my_component_library/lib.
This will add a new file to your project in the lib
directory:
from dagster import Definitions
from dagster_components import (
Component,
ComponentLoadContext,
DefaultComponentScaffolder,
ResolvableSchema,
registered_component_type,
)
class ShellCommandSchema(ResolvableSchema):
...
@registered_component_type(name="shell_command")
class ShellCommand(Component):
"""COMPONENT SUMMARY HERE.
COMPONENT DESCRIPTION HERE.
"""
@classmethod
def get_schema(cls):
return ShellCommandSchema
@classmethod
def get_scaffolder(cls) -> DefaultComponentScaffolder:
return DefaultComponentScaffolder()
def build_defs(self, load_context: ComponentLoadContext) -> Definitions:
# Add definition construction logic here.
return Definitions()
This file contains the basic structure for the new component type. There are two methods that you'll need to implement:
get_schema
: This method should return a Pydantic model that defines the schema for the component. This is the schema for the data that goes intocomponent.yaml
.build_defs
: This method should return aDefinitions
object for this component.
Defining a schema
The first step is to define a schema for the component. This means determining what aspects of the component should be customizable.
In this case, we'll want to define a few things:
- The path to the shell script that we'll want to run.
- The assets that we expect this script to produce.
To simplify common use cases, dagster-components
provides schemas for common bits of configuration, such as AssetSpecSchema
, which contains attributes that are common to all assets, such as the key, description, tags, and dependencies.
We can the schema for our component and add it to our class as follows:
from collections.abc import Sequence
from dagster_components import (
AssetSpecSchema,
Component,
ComponentLoadContext,
ResolvableSchema,
registered_component_type,
)
import dagster as dg
class ShellScriptSchema(ResolvableSchema):
script_path: str
asset_specs: Sequence[AssetSpecSchema]
@registered_component_type(name="shell_command")
class ShellCommand(Component):
"""Models a shell script as a Dagster asset."""
@classmethod
def get_schema(cls) -> type[ShellScriptSchema]:
return ShellScriptSchema
def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions: ...
Defining the python class
Next, we'll want to translate this schema into fully-resolved python objects. For example, our schema defines asset_specs
as Sequence[AssetSpecSchema]
, but at runtime we'll want to work with Sequence[AssetSpec]
.
By convention, we'll use the @dataclass
decorator to simplify our class definition. We can define attributes for our class that line up with the properties in our schema, but this time we'll use the fully-resolved types where appropriate.
Our path will still just be a string, but our asset_specs
will be a list of AssetSpec
objects. Whenever we define a field on the component that isn't on the schema, or is a different type, we can add an annotation to that field with Annotated[<type>, FieldResolver(...)]
to tell the system how to resolve that particular field.
In our case, we'll just define a single field resolver for the asset_specs
field on our component. Because AssetSpecSchema
is a ResolvableModel
, this can be directly resolved into an AssetSpec
object using context.resolve_value()
.
from collections.abc import Sequence
from dataclasses import dataclass
from typing import Annotated
from dagster_components import (
AssetSpecSchema,
Component,
ComponentLoadContext,
FieldResolver,
ResolutionContext,
ResolvableSchema,
registered_component_type,
)
import dagster as dg
class ShellScriptSchema(ResolvableSchema):
script_path: str
asset_specs: Sequence[AssetSpecSchema]
def resolve_asset_specs(
context: ResolutionContext, schema: ShellScriptSchema
) -> Sequence[dg.AssetSpec]:
return context.resolve_value(schema.asset_specs)
@registered_component_type(name="shell_command")
@dataclass
class ShellCommand(Component):
script_path: str
asset_specs: Annotated[Sequence[dg.AssetSpec], FieldResolver(resolve_asset_specs)]
@classmethod
def get_schema(cls) -> type[ShellScriptSchema]:
return ShellScriptSchema
def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions: ...
Building definitions
Now that we've defined how the component is parameterized, we need to define how to turn those parameters into a Definitions
object.
To do so, we'll want to override the build_defs
method, which is responsible for returning a Definitions
object containing all definitions related to the component.
Our build_defs
method will create a single @asset
that executes the provided shell script. By convention, we'll put the code to actually execute this asset inside of a function called execute
. This makes it easier for future developers to create subclasses of this component.
import subprocess
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Annotated
from dagster_components import (
AssetSpecSchema,
Component,
ComponentLoadContext,
FieldResolver,
ResolutionContext,
ResolvableSchema,
registered_component_type,
)
import dagster as dg
class ShellScriptSchema(ResolvableSchema):
script_path: str
asset_specs: Sequence[AssetSpecSchema]
def resolve_asset_specs(
context: ResolutionContext, schema: ShellScriptSchema
) -> Sequence[dg.AssetSpec]:
return context.resolve_value(schema.asset_specs)
@registered_component_type(name="shell_command")
@dataclass
class ShellCommand(Component):
"""Models a shell script as a Dagster asset."""
script_path: str
asset_specs: Annotated[Sequence[dg.AssetSpec], FieldResolver(resolve_asset_specs)]
@classmethod
def get_schema(cls) -> type[ShellScriptSchema]:
return ShellScriptSchema
def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions:
resolved_script_path = Path(load_context.path, self.script_path).absolute()
@dg.multi_asset(name=Path(self.script_path).stem, specs=self.asset_specs)
def _asset(context: dg.AssetExecutionContext):
self.execute(resolved_script_path, context)
return dg.Definitions(assets=[_asset])
def execute(self, resolved_script_path: Path, context: dg.AssetExecutionContext):
subprocess.run(["sh", str(resolved_script_path)], check=True)
Component registration
Following the steps above will automatically register your component type in your environment. You can now run:
dg component-type list
Using /.../my-component-library/.venv/bin/dagster-components
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Component Type ┃ Summary ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ definitions@dagster_components │ Wraps an arbitrary set of │
│ │ Dagster definitions. │
│ pipes_subprocess_script_collection@dagster_components │ Assets that wrap Python │
│ │ scripts executed with │
│ │ Dagster's │
│ │ PipesSubprocessClient. │
│ shell_command@my_component_library │ Models a shell script as a │
│ │ Dagster asset. │
└───────────────────────────────────────────────────────┴────────────────────────────────┘
and see your new component type in the list of available component types.
You can also view automatically generated documentation describing your new component type by running:
dg component-type docs shell_command@my_component_library
[Advanced] Custom templating
The components system supports a rich templating syntax that allows you to load arbitrary Python values based off of your component.yaml
file. All string values in a ResolvableModel
can be templated using the Jinja2 templating engine, and may be resolved into arbitrary Python types. This allows you to expose complex object types, such as PartitionsDefinition
or AutomationCondition
to users of your component, even if they're working in pure YAML.
You can define custom values that will be made available to the templating engine by defining a get_additional_scope
classmethod on your component. In our case, we can define a "daily_partitions"
function which returns a DailyPartitionsDefinition
object with a pre-defined start date:
import subprocess
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Annotated, Any
from dagster_components import (
AssetSpecSchema,
Component,
ComponentLoadContext,
FieldResolver,
ResolutionContext,
ResolvableSchema,
registered_component_type,
)
import dagster as dg
class ShellScriptSchema(ResolvableSchema):
script_path: str
asset_specs: Sequence[AssetSpecSchema]
def resolve_asset_specs(
context: ResolutionContext, schema: ShellScriptSchema
) -> Sequence[dg.AssetSpec]:
return context.resolve_value(schema.asset_specs)
@registered_component_type(name="shell_command")
@dataclass
class ShellCommand(Component):
script_path: str
asset_specs: Annotated[Sequence[dg.AssetSpec], FieldResolver(resolve_asset_specs)]
@classmethod
def get_additional_scope(cls) -> Mapping[str, Any]:
return {
"daily_partitions": dg.DailyPartitionsDefinition(start_date="2024-01-01")
}
@classmethod
def get_schema(cls) -> type[ShellScriptSchema]:
return ShellScriptSchema
def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions:
@dg.multi_asset(name=Path(self.script_path).stem, specs=self.asset_specs)
def _asset(context: dg.AssetExecutionContext):
self.execute(context)
return dg.Definitions(assets=[_asset])
def execute(self, context: dg.AssetExecutionContext):
subprocess.run(["sh", self.script_path], check=True)
When a user instantiates this component, they will be able to use this custom scope in their component.yaml
file:
component_type: my_component
params:
script_path: script.sh
asset_specs:
- key: a
partitions_def: "{{ daily_partitions }}"