Repositories¶
-
dagster.
repository
RepositoryDefinition[source]¶ Create a repository from the decorated function.
The decorated function should take no arguments and its return value should one of:
List[Union[PipelineDefinition, PartitionSetDefinition, ScheduleDefinition]]
. Use thisform when you have no need to lazy load pipelines or other definitions. This is the typical use case.
A dict of the form:
{ 'pipelines': Dict[str, Callable[[], PipelineDefinition]], 'partition_sets': Dict[str, Callable[[], PartitionSetDefinition]], 'schedules': Dict[str, Callable[[], ScheduleDefinition]] }
This form is intended to allow definitions to be created lazily when accessed by name, which can be helpful for performance when there are many definitions in a repository, or when constructing the definitions is costly.
- An object of type
RepositoryData
. Return this object if you need fine-grained control over the construction and indexing of definitions within the repository, e.g., to create definitions dynamically from .yaml files in a directory.
- An object of type
- Parameters
Example:
###################################################################### # A simple repository using the first form of the decorated function ###################################################################### @solid(config_schema={n: Field(Int)}) def return_n(context): return context.solid_config['n'] @pipeline(name='simple_pipeline') def simple_pipeline(): return_n() simple_partition_set = PartitionSetDefinition( name='simple_partition_set', pipeline_name='simple_pipeline', partition_fn=lambda: range(10), run_config_fn_for_partition=( lambda partition: { 'solids': {'return_n': {'config': {'n': partition}}} } ), ) simple_schedule = simple_partition_set.create_schedule_definition( schedule_name='simple_daily_10_pm_schedule', cron_schedule='0 22 * * *', ) @repository def simple_repository(): return [simple_pipeline, simple_partition_set, simple_schedule] ###################################################################### # A lazy-loaded repository ###################################################################### def make_expensive_pipeline(): @pipeline(name='expensive_pipeline') def expensive_pipeline(): for i in range(10000): return_n.alias('return_n_{i}'.format(i=i))() return expensive_pipeline expensive_partition_set = PartitionSetDefinition( name='expensive_partition_set', pipeline_name='expensive_pipeline', partition_fn=lambda: range(10), run_config_fn_for_partition=( lambda partition: { 'solids': { 'return_n_{i}'.format(i=i): {'config': {'n': partition}} for i in range(10000) } } ), ) def make_expensive_schedule(): expensive_partition_set.create_schedule_definition( schedule_name='expensive_schedule', cron_schedule='0 22 * * *', ) @repository def lazy_loaded_repository(): return { 'pipelines': {'expensive_pipeline': make_expensive_pipeline}, 'partition_sets': { 'expensive_partition_set': expensive_partition_set }, 'schedules': {'expensive_schedule: make_expensive_schedule} } ###################################################################### # A complex repository that lazily construct pipelines from a directory # of files in a bespoke YAML format ###################################################################### class ComplexRepositoryData(RepositoryData): def __init__(self, yaml_directory): self._yaml_directory = yaml_directory def get_pipeline(self, pipeline_name): return self._construct_pipeline_def_from_yaml_file( self._yaml_file_for_pipeline_name(pipeline_name) ) ... @repository def complex_repository(): return ComplexRepositoryData('some_directory')
-
class
dagster.
RepositoryDefinition
(name, repository_data, description=None)[source]¶ Define a repository that contains a collection of definitions.
Users should typically not create objects of this class directly. Instead, use the
@repository()
decorator.- Parameters
-
get_all_pipelines
()[source]¶ Return all pipelines in the repository as a list.
Note that this will construct any pipeline in the lazily evaluated
pipeline_dict
that has not yet been constructed.- Returns
All pipelines in the repository.
- Return type
List[PipelineDefinition]
-
get_all_solid_defs
()[source]¶ Get all the solid definitions in a repository.
- Returns
All solid definitions in the repository.
- Return type
List[SolidDefinition]
-
get_pipeline
(name)[source]¶ Get a pipeline by name.
If this pipeline is present in the lazily evaluated
pipeline_dict
passed to the constructor, but has not yet been constructed, only this pipeline is constructed, and will be cached for future calls.- Parameters
name (str) – Name of the pipeline to retrieve.
- Returns
The pipeline definition corresponding to the given name.
- Return type
-
has_pipeline
(name)[source]¶ Check if a pipeline with a given name is present in the repository.
- Parameters
name (str) – The name of the pipeline.
- Returns
bool