from functools import lru_cache, update_wrapper
from inspect import Parameter
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
Union,
cast,
overload,
)
import dagster._check as check
from dagster._annotations import deprecated_param
from dagster._config import UserConfigSchema
from dagster._core.decorator_utils import (
format_docstring_for_description,
get_function_params,
get_valid_name_permutations,
param_is_var_keyword,
positional_arg_name_list,
)
from dagster._core.definitions.inference import infer_input_props
from dagster._core.definitions.resource_annotation import (
get_resource_args,
)
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.types.dagster_type import DagsterTypeKind
from dagster._utils.warnings import config_argument_warning, normalize_renamed_param
from ..input import In, InputDefinition
from ..output import Out
from ..policy import RetryPolicy
from ..utils import DEFAULT_OUTPUT
if TYPE_CHECKING:
from ..op_definition import OpDefinition
class _Op:
def __init__(
self,
name: Optional[str] = None,
description: Optional[str] = None,
required_resource_keys: Optional[AbstractSet[str]] = None,
config_schema: Optional[Union[Any, Mapping[str, Any]]] = None,
tags: Optional[Mapping[str, Any]] = None,
code_version: Optional[str] = None,
decorator_takes_context: Optional[bool] = True,
retry_policy: Optional[RetryPolicy] = None,
ins: Optional[Mapping[str, In]] = None,
out: Optional[Union[Out, Mapping[str, Out]]] = None,
):
self.name = check.opt_str_param(name, "name")
self.decorator_takes_context = check.bool_param(
decorator_takes_context, "decorator_takes_context"
)
self.description = check.opt_str_param(description, "description")
# these will be checked within OpDefinition
self.required_resource_keys = required_resource_keys
self.tags = tags
self.code_version = code_version
self.retry_policy = retry_policy
# config will be checked within OpDefinition
self.config_schema = config_schema
self.ins = check.opt_nullable_mapping_param(ins, "ins", key_type=str, value_type=In)
self.out = out
def __call__(self, fn: Callable[..., Any]) -> "OpDefinition":
from dagster._config.pythonic_config import validate_resource_annotated_function
from ..op_definition import OpDefinition
validate_resource_annotated_function(fn)
if not self.name:
self.name = fn.__name__
compute_fn = (
DecoratedOpFunction(decorated_fn=fn)
if self.decorator_takes_context
else NoContextDecoratedOpFunction(decorated_fn=fn)
)
compute_fn.validate_malformed_config()
if compute_fn.has_config_arg():
check.param_invariant(
self.config_schema is None or self.config_schema == {},
"If the @op has a config arg, you cannot specify a config schema",
)
from dagster._config.pythonic_config import infer_schema_from_config_annotation
# Parse schema from the type annotation of the config arg
config_arg = compute_fn.get_config_arg()
config_arg_type = config_arg.annotation
config_arg_default = config_arg.default
self.config_schema = infer_schema_from_config_annotation(
config_arg_type, config_arg_default
)
outs: Optional[Mapping[str, Out]] = None
if self.out is not None and isinstance(self.out, Out):
outs = {DEFAULT_OUTPUT: self.out}
elif self.out is not None:
outs = check.mapping_param(self.out, "out", key_type=str, value_type=Out)
arg_resource_keys = {arg.name for arg in compute_fn.get_resource_args()}
decorator_resource_keys = set(self.required_resource_keys or [])
check.param_invariant(
len(decorator_resource_keys) == 0 or len(arg_resource_keys) == 0,
"Cannot specify resource requirements in both @op decorator and as arguments to the"
" decorated function",
)
resolved_resource_keys = decorator_resource_keys.union(arg_resource_keys)
op_def = OpDefinition.dagster_internal_init(
name=self.name,
ins=self.ins,
outs=outs,
compute_fn=compute_fn,
config_schema=self.config_schema,
description=self.description or format_docstring_for_description(fn),
required_resource_keys=resolved_resource_keys,
tags=self.tags,
code_version=self.code_version,
retry_policy=self.retry_policy,
version=None, # code_version has replaced version
)
update_wrapper(op_def, compute_fn.decorated_fn)
return op_def
@overload
def op(compute_fn: Callable[..., Any]) -> "OpDefinition": ...
@overload
def op(
*,
name: Optional[str] = ...,
description: Optional[str] = ...,
ins: Optional[Mapping[str, In]] = ...,
out: Optional[Union[Out, Mapping[str, Out]]] = ...,
config_schema: Optional[UserConfigSchema] = ...,
required_resource_keys: Optional[AbstractSet[str]] = ...,
tags: Optional[Mapping[str, Any]] = ...,
version: Optional[str] = ...,
retry_policy: Optional[RetryPolicy] = ...,
code_version: Optional[str] = ...,
) -> _Op: ...
[docs]@deprecated_param(
param="version", breaking_version="2.0", additional_warn_text="Use `code_version` instead"
)
def op(
compute_fn: Optional[Callable] = None,
*,
name: Optional[str] = None,
description: Optional[str] = None,
ins: Optional[Mapping[str, In]] = None,
out: Optional[Union[Out, Mapping[str, Out]]] = None,
config_schema: Optional[UserConfigSchema] = None,
required_resource_keys: Optional[AbstractSet[str]] = None,
tags: Optional[Mapping[str, Any]] = None,
version: Optional[str] = None,
retry_policy: Optional[RetryPolicy] = None,
code_version: Optional[str] = None,
) -> Union["OpDefinition", _Op]:
"""Create an op with the specified parameters from the decorated function.
Ins and outs will be inferred from the type signature of the decorated function
if not explicitly provided.
The decorated function will be used as the op's compute function. The signature of the
decorated function is more flexible than that of the ``compute_fn`` in the core API; it may:
1. Return a value. This value will be wrapped in an :py:class:`Output` and yielded by the compute function.
2. Return an :py:class:`Output`. This output will be yielded by the compute function.
3. Yield :py:class:`Output` or other :ref:`event objects <events>`. Same as default compute behavior.
Note that options 1) and 2) are incompatible with yielding other events -- if you would like
to decorate a function that yields events, it must also wrap its eventual output in an
:py:class:`Output` and yield it.
@op supports ``async def`` functions as well, including async generators when yielding multiple
events or outputs. Note that async ops will generally be run on their own unless using a custom
:py:class:`Executor` implementation that supports running them together.
Args:
name (Optional[str]): Name of op. Must be unique within any :py:class:`GraphDefinition`
using the op.
description (Optional[str]): Human-readable description of this op. If not provided, and
the decorated function has docstring, that docstring will be used as the description.
ins (Optional[Dict[str, In]]):
Information about the inputs to the op. Information provided here will be combined
with what can be inferred from the function signature.
out (Optional[Union[Out, Dict[str, Out]]]):
Information about the op outputs. Information provided here will be combined with
what can be inferred from the return type signature if the function does not use yield.
config_schema (Optional[ConfigSchema): The schema for the config. If set, Dagster will check
that config provided for the op matches this schema and fail if it does not. If not
set, Dagster will accept any config provided for the op.
required_resource_keys (Optional[Set[str]]): Set of resource handles required by this op.
tags (Optional[Dict[str, Any]]): Arbitrary metadata for the op. Frameworks may
expect and require certain metadata to be attached to a op. Values that are not strings
will be json encoded and must meet the criteria that `json.loads(json.dumps(value)) == value`.
code_version (Optional[str]): (Experimental) Version of the logic encapsulated by the op. If set,
this is used as a default version for all outputs.
retry_policy (Optional[RetryPolicy]): The retry policy for this op.
Examples:
.. code-block:: python
@op
def hello_world():
print('hello')
@op
def echo(msg: str) -> str:
return msg
@op(
ins={'msg': In(str)},
out=Out(str)
)
def echo_2(msg): # same as above
return msg
@op(
out={'word': Out(), 'num': Out()}
)
def multi_out() -> Tuple[str, int]:
return 'cool', 4
"""
code_version = normalize_renamed_param(
code_version,
"code_version",
version,
"version",
)
if compute_fn is not None:
check.invariant(description is None)
check.invariant(config_schema is None)
check.invariant(required_resource_keys is None)
check.invariant(tags is None)
check.invariant(version is None)
return _Op()(compute_fn)
return _Op(
name=name,
description=description,
config_schema=config_schema,
required_resource_keys=required_resource_keys,
tags=tags,
code_version=code_version,
retry_policy=retry_policy,
ins=ins,
out=out,
)
class DecoratedOpFunction(NamedTuple):
"""Wrapper around the decorated op function to provide commonly used util methods."""
decorated_fn: Callable[..., Any]
@property
def name(self):
return self.decorated_fn.__name__
@lru_cache(maxsize=1)
def has_context_arg(self) -> bool:
return is_context_provided(get_function_params(self.decorated_fn))
def get_context_arg(self) -> Parameter:
if self.has_context_arg():
return get_function_params(self.decorated_fn)[0]
check.failed("Requested context arg on function that does not have one")
@lru_cache(maxsize=1)
def _get_function_params(self) -> Sequence[Parameter]:
return get_function_params(self.decorated_fn)
def has_config_arg(self) -> bool:
for param in get_function_params(self.decorated_fn):
if param.name == "config":
return True
return False
def validate_malformed_config(self) -> None:
from dagster._config.pythonic_config.config import Config
from dagster._config.pythonic_config.type_check_utils import safe_is_subclass
positional_inputs = self.positional_inputs()
for param in get_function_params(self.decorated_fn):
if safe_is_subclass(param.annotation, Config) and param.name in positional_inputs:
config_argument_warning(param.name, self.name)
def get_config_arg(self) -> Parameter:
for param in get_function_params(self.decorated_fn):
if param.name == "config":
return param
check.failed("Requested config arg on function that does not have one")
def get_resource_args(self) -> Sequence[Parameter]:
return get_resource_args(self.decorated_fn)
def positional_inputs(self) -> Sequence[str]:
params = self._get_function_params()
input_args = params[1:] if self.has_context_arg() else params
resource_arg_names = [arg.name for arg in self.get_resource_args()]
input_args_filtered = [
input_arg
for input_arg in input_args
if input_arg.name != "config" and input_arg.name not in resource_arg_names
]
return positional_arg_name_list(input_args_filtered)
def has_var_kwargs(self) -> bool:
params = self._get_function_params()
# var keyword arg has to be the last argument
return len(params) > 0 and param_is_var_keyword(params[-1])
def get_output_annotation(self) -> Any:
from ..inference import infer_output_props
return infer_output_props(self.decorated_fn).annotation
class NoContextDecoratedOpFunction(DecoratedOpFunction):
"""Wrapper around a decorated op function, when the decorator does not permit a context
parameter.
"""
@lru_cache(maxsize=1)
def has_context_arg(self) -> bool:
return False
def is_context_provided(params: Sequence[Parameter]) -> bool:
if len(params) == 0:
return False
return params[0].name in get_valid_name_permutations("context")
def resolve_checked_op_fn_inputs(
decorator_name: str,
fn_name: str,
compute_fn: DecoratedOpFunction,
explicit_input_defs: Sequence[InputDefinition],
exclude_nothing: bool,
) -> Sequence[InputDefinition]:
"""Validate provided input definitions and infer the remaining from the type signature of the compute_fn.
Returns the resolved set of InputDefinitions.
Args:
decorator_name (str): Name of the decorator that is wrapping the op function.
fn_name (str): Name of the decorated function.
compute_fn (DecoratedOpFunction): The decorated function, wrapped in the
DecoratedOpFunction wrapper.
explicit_input_defs (List[InputDefinition]): The input definitions that were explicitly
provided in the decorator.
exclude_nothing (bool): True if Nothing type inputs should be excluded from compute_fn
arguments.
"""
explicit_names = set()
if exclude_nothing:
explicit_names = set(
inp.name
for inp in explicit_input_defs
if not inp.dagster_type.kind == DagsterTypeKind.NOTHING
)
nothing_names = set(
inp.name
for inp in explicit_input_defs
if inp.dagster_type.kind == DagsterTypeKind.NOTHING
)
else:
explicit_names = set(inp.name for inp in explicit_input_defs)
nothing_names = set()
params = get_function_params(compute_fn.decorated_fn)
input_args = params[1:] if compute_fn.has_context_arg() else params
# filter out config arg
resource_arg_names = {arg.name for arg in compute_fn.get_resource_args()}
explicit_names = explicit_names - resource_arg_names
if compute_fn.has_config_arg() or resource_arg_names:
new_input_args = []
for input_arg in input_args:
if input_arg.name != "config" and input_arg.name not in resource_arg_names:
new_input_args.append(input_arg)
input_args = new_input_args
# Validate input arguments
used_inputs = set()
inputs_to_infer = set()
has_kwargs = False
for param in cast(List[Parameter], input_args):
if param.kind == Parameter.VAR_KEYWORD:
has_kwargs = True
elif param.kind == Parameter.VAR_POSITIONAL:
raise DagsterInvalidDefinitionError(
f"{decorator_name} '{fn_name}' decorated function has positional vararg parameter "
f"'{param}'. {decorator_name} decorated functions should only have keyword "
"arguments that match input names and, if system information is required, a first "
"positional parameter named 'context'."
)
else:
if param.name not in explicit_names:
if param.name in nothing_names:
raise DagsterInvalidDefinitionError(
f"{decorator_name} '{fn_name}' decorated function has parameter"
f" '{param.name}' that is one of the input_defs of type 'Nothing' which"
" should not be included since no data will be passed for it. "
)
else:
inputs_to_infer.add(param.name)
else:
used_inputs.add(param.name)
undeclared_inputs = explicit_names - used_inputs
if not has_kwargs and undeclared_inputs:
undeclared_inputs_printed = ", '".join(undeclared_inputs)
nothing_exemption = (
", except for Ins that have the Nothing dagster_type"
if decorator_name not in {"@graph", "@graph_asset"}
else ""
)
raise DagsterInvalidDefinitionError(
f"{decorator_name} '{fn_name}' decorated function does not have argument(s)"
f" '{undeclared_inputs_printed}'. {decorator_name}-decorated functions should have a"
f" keyword argument for each of their Ins{nothing_exemption}. Alternatively, they can"
" accept **kwargs."
)
inferred_props = {
inferred.name: inferred
for inferred in infer_input_props(compute_fn.decorated_fn, compute_fn.has_context_arg())
}
input_defs = []
for input_def in explicit_input_defs:
if input_def.name in inferred_props:
# combine any information missing on the explicit def that can be inferred
input_defs.append(input_def.combine_with_inferred(inferred_props[input_def.name]))
else:
# pass through those that don't have any inference info, such as Nothing type inputs
input_defs.append(input_def)
# build defs from the inferred props for those without explicit entries
inferred_input_defs = [
InputDefinition.create_from_inferred(inferred)
for inferred in inferred_props.values()
if inferred.name in inputs_to_infer
]
if exclude_nothing:
for in_def in inferred_input_defs:
if in_def.dagster_type.is_nothing:
raise DagsterInvalidDefinitionError(
f"Input parameter {in_def.name} is annotated with"
f" {in_def.dagster_type.display_name} which is a type that represents passing"
" no data. This type must be used via In() and no parameter should be included"
f" in the {decorator_name} decorated function."
)
input_defs.extend(inferred_input_defs)
return input_defs