Ask AI

Op Hooks#

Op hooks let you define success and failure handling policies on ops.

Relevant APIs#

NameDescription
@failure_hookThe decorator to define a callback on op failure.
@success_hookThe decorator to define a callback on op success.
HookContextThe context object available to a hook function.
build_hook_contextA function for building a HookContext outside of execution, intended to be used when testing a hook.

Overview#

A @success_hook or @failure_hook decorated function is called an op hook. Op hooks are designed for generic purposes — it can be anything you would like to do at a per op level.


Defining a Op Hook#

from dagster import (
    HookContext,
    ResourceDefinition,
    failure_hook,
    file_relative_path,
    graph,
    job,
    op,
    repository,
    success_hook,
)


@success_hook(required_resource_keys={"slack"})
def slack_message_on_success(context: HookContext):
    message = f"Op {context.op.name} finished successfully"
    context.resources.slack.chat_postMessage(channel="#foo", text=message)


@failure_hook(required_resource_keys={"slack"})
def slack_message_on_failure(context: HookContext):
    message = f"Op {context.op.name} failed"
    context.resources.slack.chat_postMessage(channel="#foo", text=message)

Hook context#

As you may have noticed, the hook function takes one argument, which is an instance of HookContext. The available properties on this context are:

  • context.job_name: the name of the job where the hook is triggered.
  • context.log: loggers
  • context.hook_def: the hook that the context object belongs to.
  • context.op: the op associated with the hook.
  • context.op_config: The config specific to the associated op.
  • context.op_exception: The thrown exception in the associated failed op.
  • context.op_output_values: The computed output values of the associated op.
  • context.step_key: the key for the step where the hook is triggered.
  • context.resources: the resources the hook can use.
  • context.required_resource_keys: the resources required by this hook.

Using Hooks#

Dagster provides different ways to trigger op hooks.

Applying a hook on every op in a job#

For example, you want to send a slack message to a channel when any op fails in a job. In this case, we will be applying a hook on a job, which will apply the hook on every op instance within in that job.

The @job decorator accepts hooks as a parameter. Likewise, when creating a job from a graph, hooks are also accepted as a parameter in the GraphDefinition.to_job function. In the below example, we can pass the slack_message_on_failure hook above in a set as a parameter to @job. Then, slack messages will be sent when any op in the job fails.

@job(resource_defs={"slack": slack_resource}, hooks={slack_message_on_failure})
def notif_all():
    # the hook "slack_message_on_failure" is applied on every op instance within this graph
    a()
    b()

When you run this job, you can provide configuration to the slack resource in the run config:

resources:
  slack:
    config:
      token: "xoxp-1234123412341234-12341234-1234" # replace with your slack token

or by using the configured API:

@job(
    resource_defs={
        "slack": slack_resource.configured(
            {"token": "xoxp-1234123412341234-12341234-1234"}
        )
    },
    hooks={slack_message_on_failure},
)
def notif_all_configured():
    # the hook "slack_message_on_failure" is applied on every op instance within this graph
    a()
    b()

Applying a hook on an op#

Sometimes a job is a shared responsibility or you only want to be alerted on high-priority op executions. So we also provide a way to set up hooks on op instances which enables you to apply policies on a per-op basis.

@job(resource_defs={"slack": slack_resource})
def selective_notif():
    # only op "a" triggers hooks: a slack message will be sent when it fails or succeeds
    a.with_hooks({slack_message_on_failure, slack_message_on_success})()
    # op "b" won't trigger any hooks
    b()

In this case, op "b" won't trigger any hooks, while when op "a" fails or succeeds it will send a slack message.

Testing Hooks#

You can test the functionality of a hook by invoking the hook definition. This will run the underlying decorated function. You can construct a context to provide to the invocation using the build_hook_context function.

from dagster import build_hook_context


@success_hook(required_resource_keys={"my_conn"})
def my_success_hook(context):
    context.resources.my_conn.send("foo")


def test_my_success_hook():
    my_conn = mock.MagicMock()
    # construct HookContext with mocked ``my_conn`` resource.
    context = build_hook_context(resources={"my_conn": my_conn})

    my_success_hook(context)

    assert my_conn.send.call_count == 1

Examples#

Accessing failure information in a failure hook#

In many cases, you might want to know details about an op failure. You can get the exception object thrown in the failed op via the op_exception property on HookContext:

from dagster import HookContext, failure_hook
import traceback


@failure_hook
def my_failure_hook(context: HookContext):
    op_exception: BaseException = context.op_exception
    # print stack trace of exception
    traceback.print_tb(op_exception.__traceback__)

Patterns#

Environment-specific hooks using jobs#

Hooks use resource keys to access resources. After including the resource key in its set of required_resource_keys, the body of the hook can access the corresponding resource via the resources attribute of its context object.

It also enables you to switch resource values in different jobs so that, for example, you can send slack messages only while executing a production job and mock the slack resource while testing.

Because executing a production job and a testing job share the same core of business logic, we can build these jobs from a shared graph. In the GraphDefinition.to_job method, which builds a job from a graph, you can specify environment-specific hooks and resources.

In this case, we can mock the slack_resource using a helper function ResourceDefinition.hardcoded_resource(), so it won't send slack messages during development.

@graph
def slack_notif_all():
    a()
    b()


notif_all_dev = slack_notif_all.to_job(
    name="notif_all_dev",
    resource_defs={
        "slack": ResourceDefinition.hardcoded_resource(
            slack_resource_mock, "do not send messages in dev"
        )
    },
    hooks={slack_message_on_failure},
)

notif_all_prod = slack_notif_all.to_job(
    name="notif_all_prod",
    resource_defs={"slack": slack_resource},
    hooks={slack_message_on_failure},
)

When we switch to production, we can provide the real slack token in the run_config and therefore enable sending messages to a certain slack channel when a hook is triggered.

resources:
  slack:
    config:
      token: "xoxp-1234123412341234-12341234-1234" # replace with your slack token

Then, we can execute a job with the config through Python API, CLI, or the Dagster UI. Here's an example of using the Python API.

if __name__ == "__main__":
    prod_op_hooks_run_config_yaml = file_relative_path(__file__, "prod_op_hooks.yaml")
    with open(prod_op_hooks_run_config_yaml, "r", encoding="utf8") as fd:
        run_config = yaml.safe_load(fd.read())

    notif_all_prod.execute_in_process(run_config=run_config, raise_on_error=False)

Job-level hooks#

When you add a hook to a job, the hook will be added to every op in the job individually. The hook does not track job-scoped events and only tracks op-level success or failure events.

You may find the need to set up job-level policies. For example, you may want to run some code for every job failure.

Dagster provides a way to create a sensor that reacts to job failure events. You can find details at Run failure sensor on the Sensors page.