DagsterDocs

Repositories

dagster.repository RepositoryDefinition[source]

Create a repository from the decorated function.

The decorated function should take no arguments and its return value should one of:

  1. List[Union[JobDefinition, PipelineDefinition, PartitionSetDefinition, ScheduleDefinition, SensorDefinition]].

    Use this form when you have no need to lazy load pipelines or other definitions. This is the typical use case.

  2. A dict of the form:

{
    'jobs': Dict[str, Callable[[], JobDefinition]],
    'pipelines': Dict[str, Callable[[], PipelineDefinition]],
    'partition_sets': Dict[str, Callable[[], PartitionSetDefinition]],
    'schedules': Dict[str, Callable[[], ScheduleDefinition]]
    'sensors': Dict[str, Callable[[], SensorDefinition]]
}

This form is intended to allow definitions to be created lazily when accessed by name, which can be helpful for performance when there are many definitions in a repository, or when constructing the definitions is costly.

  1. RepositoryData. Return this object if you need fine-grained

    control over the construction and indexing of definitions within the repository, e.g., to create definitions dynamically from .yaml files in a directory.

Parameters
  • name (Optional[str]) – The name of the repository. Defaults to the name of the decorated function.

  • description (Optional[str]) – A string description of the repository.

Example:

######################################################################
# A simple repository using the first form of the decorated function
######################################################################

@op(config_schema={n: Field(Int)})
def return_n(context):
    return context.op_config['n']

@job
def simple_job():
    return_n()

@job
def some_job():
    ...

@sensor(job=some_job)
def some_sensor():
    if foo():
        yield RunRequest(
            run_key= ...,
            run_config={
                'ops': {'return_n': {'config': {'n': bar()}}}
            }
        )

@job
def my_job():
    ...

my_schedule = ScheduleDefinition(cron_schedule="0 0 * * *", job=my_job)

@repository
def simple_repository():
    return [simple_job, some_sensor, my_schedule]


######################################################################
# A lazy-loaded repository
######################################################################

def make_expensive_job():
    @job
    def expensive_job():
        for i in range(10000):
            return_n.alias(f'return_n_{i}')()

    return expensive_job

def make_expensive_schedule():
    @job
    def other_expensive_job():
        for i in range(11000):
            return_n.alias(f'my_return_n_{i}')()

    return ScheduleDefinition(cron_schedule="0 0 * * *", job=other_expensive_job)

@repository
def lazy_loaded_repository():
    return {
        'jobs': {'expensive_job': make_expensive_job},
        'schedules': {'expensive_schedule: make_expensive_schedule}
    }


######################################################################
# A complex repository that lazily constructs jobs from a directory
# of files in a bespoke YAML format
######################################################################

class ComplexRepositoryData(RepositoryData):
    def __init__(self, yaml_directory):
        self._yaml_directory = yaml_directory

    def get_all_jobs(self):
        return [
            self._construct_job_def_from_yaml_file(
              self._yaml_file_for_job_name(file_name)
            )
            for file_name in os.listdir(self._yaml_directory)
        ]

    ...

@repository
def complex_repository():
    return ComplexRepositoryData('some_directory')
class dagster.RepositoryDefinition(name, repository_data, description=None)[source]

Define a repository that contains a collection of definitions.

Users should typically not create objects of this class directly. Instead, use the @repository() decorator.

Parameters
  • name (str) – The name of the repository.

  • repository_data (RepositoryData) – Contains the definitions making up the repository.

  • description (Optional[str]) – A string description of the repository.

get_all_jobs()[source]

Return all pipelines/jobs in the repository as a list.

Note that this will construct any pipeline/job in the lazily evaluated dictionary that has not yet been constructed.

Returns

All pipelines/jobs in the repository.

Return type

List[Union[PipelineDefinition, JobDefinition]]

get_job(name)[source]

Get a pipeline/job by name.

If this pipeline/job is present in the lazily evaluated dictionary passed to the constructor, but has not yet been constructed, only this pipeline/job is constructed, and will be cached for future calls.

Parameters

name (str) – Name of the pipeline/job to retrieve.

Returns

The pipeline/job definition corresponding to the given name.

Return type

Union[PipelineDefinition, JobDefinition]

has_job(name)[source]

Check if a pipeline/job with a given name is present in the repository.

Parameters

name (str) – The name of the pipeline/job.

Returns

bool

property job_names

Names of all pipelines/jobs in the repository

Type

List[str]

class dagster.RepositoryData[source]

Users should usually rely on the @repository decorator to create new repositories, which will in turn call the static constructors on this class. However, users may subclass RepositoryData for fine-grained control over access to and lazy creation of repository members.