Hooks

You can find the code for this example on Github.

This pipeline demonstrates how to create success and failure handling policies on pipelines.

A @success_hook or @failure_hook decorated function is called a hook.

repo.py
21
22
23
24
25
26
27
28
29
30
@success_hook(required_resource_keys={"slack"})
def slack_on_success(context):
    message = "Solid {} finished successfully".format(context.solid.name)
    context.resources.slack.chat.post_message(channel="#foo", text=message)


@failure_hook(required_resource_keys={"slack"})
def slack_on_failure(context):
    message = "Solid {} failed".format(context.solid.name)
    context.resources.slack.chat.post_message(channel="#foo", text=message)

Apply a hook on a pipeline

There are a few ways to apply hooks. For example, you want to send a slack message to a channel when any solid fails in a pipeline. In this case, we will be applying a hook on a pipeline, which will apply the hook on every solid instance within in that pipeline.

You can simply use the hook created "slack_on_failure" above to decorate the pipeline "notif_all". Then, slack messages will be sent when any solid in the pipeline fails.

repo.py
61
62
63
64
65
66
@slack_on_failure
@pipeline(mode_defs=mode_defs)
def notif_all():
    # the hook "slack_on_failure" is applied on every solid instance within this pipeline
    a()
    b()

Apply a hook on a solid

However, sometimes a pipeline is a shared responsibility or you only want to be alerted on high-priority solid executions. So we also provide a way to set up hooks on solid instances which enables you to apply policies on a per-solid basis.

repo.py
72
73
74
75
76
77
@pipeline(mode_defs=mode_defs)
def selective_notif():
    # only solid "a" triggers hooks: a slack message will be sent when it fails or succeeds
    a.with_hooks({slack_on_failure, slack_on_success})()
    # solid "b" won't trigger any hooks
    b()

In this case, solid "b" won't trigger any hooks, while when solid "a" fails or succeeds it will send a slack message.

Hook context and resources

As you may have noticed, the hook function takes one argument, which is an instance of HookContext. The available properties on this context are:

  • context.log: loggers
  • context.solid: the solid associated with the hook.
  • context.solid_config: The config specific to the associated solid.
  • context.step: the execution step that triggers the hook.
  • context.resources: the resources the hook can use.

Hooks use resource keys to access resources. After including the resource key in its set of required_resource_keys, the body of the hook can access the corresponding resource via the resources attribute of its context object.

repo.py
27
28
29
30
@failure_hook(required_resource_keys={"slack"})
def slack_on_failure(context):
    message = "Solid {} failed".format(context.solid.name)
    context.resources.slack.chat.post_message(channel="#foo", text=message)

It also enables you to switch resource values in different modes so that, for example, you can send slack messages only when you are in "prod" mode and mock the slack resource when you are in "dev" mode.

In this case, we can mock the slack_resource in the "dev" mode using a helper function ResourceDefinition.hardcoded_resource(), so it won't send slack messages when you are developing the pipeline.

repo.py
42
43
44
45
46
47
48
49
50
51
52
mode_defs = [
    ModeDefinition(
        "dev",
        resource_defs={
            "slack": ResourceDefinition.hardcoded_resource(
                slack_resource_mock, "do not send messages in dev"
            )
        },
    ),
    ModeDefinition("prod", resource_defs={"slack": slack_resource}),
]

When we switch to "prod" mode, we can provide the real slack token in the run_config and therefore enable sending messages to a certain slack channel when a hook is triggered.

prod.yaml
resources:
  slack:
    config:
      token: "xoxp-1234123412341234-12341234-1234" # replace with your slack token

Then, we can execute a pipeline with the config through Python API, CLI, or the Dagit UI.

repo.py
90
91
92
    with open(file_relative_path(__file__, "prod.yaml"), "r",) as fd:
        run_config = yaml.safe_load(fd.read())
    result = execute_pipeline(notif_all, run_config=run_config, mode="prod")

Open in a playground

Open in Gitpod

Download

curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/hooks
cd hooks