Hooks
This pipeline demonstrates how to create success and failure handling policies on pipelines.
A @success_hook
or @failure_hook
decorated function is called a hook.
21
22
23
24
25
26
27
28
29
30
@success_hook(required_resource_keys={"slack"})
def slack_on_success(context):
message = "Solid {} finished successfully".format(context.solid.name)
context.resources.slack.chat.post_message(channel="#foo", text=message)
@failure_hook(required_resource_keys={"slack"})
def slack_on_failure(context):
message = "Solid {} failed".format(context.solid.name)
context.resources.slack.chat.post_message(channel="#foo", text=message)
Apply a hook on a pipeline¶
There are a few ways to apply hooks. For example, you want to send a slack message to a channel when any solid fails in a pipeline. In this case, we will be applying a hook on a pipeline, which will apply the hook on every solid instance within in that pipeline.
You can simply use the hook created "slack_on_failure" above to decorate the pipeline "notif_all". Then, slack messages will be sent when any solid in the pipeline fails.
61
62
63
64
65
66
@slack_on_failure
@pipeline(mode_defs=mode_defs)
def notif_all():
# the hook "slack_on_failure" is applied on every solid instance within this pipeline
a()
b()
Apply a hook on a solid¶
However, sometimes a pipeline is a shared responsibility or you only want to be alerted on high-priority solid executions. So we also provide a way to set up hooks on solid instances which enables you to apply policies on a per-solid basis.
72
73
74
75
76
77
@pipeline(mode_defs=mode_defs)
def selective_notif():
# only solid "a" triggers hooks: a slack message will be sent when it fails or succeeds
a.with_hooks({slack_on_failure, slack_on_success})()
# solid "b" won't trigger any hooks
b()
In this case, solid "b" won't trigger any hooks, while when solid "a" fails or succeeds it will send a slack message.
Hook context and resources¶
As you may have noticed, the hook function takes one argument, which is an instance of HookContext
. The available properties on this context are:
context.log
: loggerscontext.solid
: the solid associated with the hook.context.solid_config
: The config specific to the associated solid.context.step
: the execution step that triggers the hook.context.resources
: the resources the hook can use.
Hooks use resource keys to access resources. After including the resource key in its set of required_resource_keys
, the body of the hook can access the corresponding resource via the resources
attribute of its context object.
27
28
29
def slack_on_failure(context):
message = "Solid {} failed".format(context.solid.name)
context.resources.slack.chat.post_message(channel="#foo", text=message)
It also enables you to switch resource values in different modes so that, for example, you can send slack messages only when you are in "prod" mode and mock the slack resource when you are in "dev" mode.
In this case, we can mock the slack_resource
in the "dev" mode using a helper function ResourceDefinition.hardcoded_resource()
, so it won't send slack messages when you are developing the pipeline.
42 43 44 45 46 47 48 49 50 51 52
mode_defs = [ ModeDefinition( "dev", resource_defs={ "slack": ResourceDefinition.hardcoded_resource( slack_resource_mock, "do not send messages in dev" ) }, ), ModeDefinition("prod", resource_defs={"slack": slack_resource}), ]
When we switch to "prod" mode, we can provide the real slack token in the run_config
and therefore enable sending messages to a certain slack channel when a hook is triggered.
resources:
slack:
config:
token: "xoxp-1234123412341234-12341234-1234" # replace with your slack token
Then, we can execute a pipeline with the config through Python API, CLI, or the Dagit UI.
90 91 92
with open(file_relative_path(__file__, "prod.yaml"), "r",) as fd: run_config = yaml.safe_load(fd.read()) result = execute_pipeline(notif_all, run_config=run_config, mode="prod")
Open in a playground¶
Download¶
curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/hooks
cd hooks