Ask AI

Slack (dagster-slack)


This library provides an integration with Slack, to support posting messages in your company’s Slack workspace.


Presently, it provides a thin wrapper on the Slack client API chat.postMessage.


To use this integration, you’ll first need to create a Slack App for it.

  1. Create App: Go to https://api.slack.com/apps and click “Create New App”:

  2. Install App: After creating an app, on the left-hand side of the app configuration, click “Bot Users”, and then create a bot user. Then, click “Install App” on the left hand side, and finally “Install App to Workspace”.

  3. Bot Token: Once finished, this will create a new bot token for your bot/workspace:

Copy this bot token and put it somewhere safe; see Safely Storing Credentials for more on this topic.

dagster_slack.SlackResource ResourceDefinition[source]

This resource is for connecting to Slack.

By configuring this Slack resource, you can post messages to Slack from any Dagster op, asset, schedule or sensor.

Examples

import os

from dagster import EnvVar, job, op
from dagster_slack import SlackResource


@op
def slack_op(slack: SlackResource):
    slack.get_client().chat_postMessage(channel='#noise', text=':wave: hey there!')

@job
def slack_job():
    slack_op()

defs = Definitions(
    jobs=[slack_job],
    resources={
        "slack": SlackResource(token=EnvVar("MY_SLACK_TOKEN")),
    },
)
dagster_slack.make_slack_on_run_failure_sensor(channel, slack_token, text_fn=<function _default_failure_message_text_fn>, blocks_fn=None, name=None, dagit_base_url=None, minimum_interval_seconds=None, monitored_jobs=None, job_selection=None, monitor_all_code_locations=False, default_status=DefaultSensorStatus.STOPPED, webserver_base_url=None, monitor_all_repositories=False)[source]

Create a sensor on job failures that will message the given Slack channel.

Parameters:
  • channel (str) – The channel to send the message to (e.g. “#my_channel”)

  • slack_token (str) – The slack token. Tokens are typically either user tokens or bot tokens. More in the Slack API documentation here: https://api.slack.com/docs/token-types

  • text_fn (Optional(Callable[[RunFailureSensorContext], str])) – Function which takes in the RunFailureSensorContext and outputs the message you want to send. Defaults to a text message that contains error message, job name, and run ID. The usage of the text_fn changes depending on whether you’re using blocks_fn. If you are using blocks_fn, this is used as a fallback string to display in notifications. If you aren’t, this is the main body text of the message. It can be formatted as plain text, or with markdown. See more details in https://api.slack.com/methods/chat.postMessage#text_usage

  • blocks_fn (Callable[[RunFailureSensorContext], List[Dict]]) – Function which takes in the RunFailureSensorContext and outputs the message blocks you want to send. See information about Blocks in https://api.slack.com/reference/block-kit/blocks

  • name – (Optional[str]): The name of the sensor. Defaults to “slack_on_run_failure”.

  • dagit_base_url deprecated (This parameter will be removed in version 2.0. Use webserver_base_url instead.) (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the failed job run.

  • minimum_interval_seconds – (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations.

  • monitored_jobs (Optional[List[Union[JobDefinition, GraphDefinition, RepositorySelector, JobSelector, CodeLocationSensor]]]) – The jobs in the current repository that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any job in the repository fails. To monitor jobs in external repositories, use RepositorySelector and JobSelector

  • job_selection (Optional[List[Union[JobDefinition, GraphDefinition, RepositorySelector, JobSelector, CodeLocationSensor]]]) – deprecated (This parameter will be removed in version 2.0. Use monitored_jobs instead.) (deprecated in favor of monitored_jobs) The jobs in the current repository that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any job in the repository fails.

  • monitor_all_code_locations (bool) – If set to True, the sensor will monitor all runs in the Dagster instance. If set to True, an error will be raised if you also specify monitored_jobs or job_selection. Defaults to False.

  • default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.

  • webserver_base_url – (Optional[str]): The base url of your webserver instance. Specify this to allow messages to include deeplinks to the failed job run.

  • monitor_all_repositories (bool) – deprecated (This parameter will be removed in version 2.0. Use monitor_all_code_locations instead.) If set to True, the sensor will monitor all runs in the Dagster instance. If set to True, an error will be raised if you also specify monitored_jobs or job_selection. Defaults to False.

Examples

slack_on_run_failure = make_slack_on_run_failure_sensor(
    "#my_channel",
    os.getenv("MY_SLACK_TOKEN")
)

@repository
def my_repo():
    return [my_job + slack_on_run_failure]
def my_message_fn(context: RunFailureSensorContext) -> str:
    return (
        f"Job {context.dagster_run.job_name} failed!"
        f"Error: {context.failure_event.message}"
    )

slack_on_run_failure = make_slack_on_run_failure_sensor(
    channel="#my_channel",
    slack_token=os.getenv("MY_SLACK_TOKEN"),
    text_fn=my_message_fn,
    webserver_base_url="http://mycoolsite.com",
)
dagster_slack.make_slack_on_freshness_policy_status_change_sensor(channel, slack_token, asset_selection, warn_after_minutes_overdue=0, notify_when_back_on_time=False, text_fn=<function _default_freshness_message_text_fn>, blocks_fn=None, name=None, dagit_base_url=None, default_status=DefaultSensorStatus.STOPPED, webserver_base_url=None)[source]

experimental This API may break in future versions, even between dot releases.

Create a sensor that will message the given Slack channel whenever an asset in the provided AssetSelection becomes out of date. Messages are only fired when the state changes, meaning only a single slack message will be sent (when the asset begins to be out of date). If notify_when_back_on_time is set to True, a second slack message will be sent once the asset is on time again.

Parameters:
  • channel (str) – The channel to send the message to (e.g. “#my_channel”)

  • slack_token (str) – The slack token. Tokens are typically either user tokens or bot tokens. More in the Slack API documentation here: https://api.slack.com/docs/token-types

  • asset_selection (AssetSelection) – The selection of assets which this sensor will monitor. Alerts will only be fired for assets that have a FreshnessPolicy defined.

  • warn_after_minutes_overdue (float) – How many minutes past the specified FreshnessPolicy this sensor will wait before firing an alert (by default, an alert will be fired as soon as the policy is violated).

  • notify_when_back_on_time (bool) – If a success message should be sent when the asset becomes on time again.

  • text_fn (Optional(Callable[[RunFailureSensorContext], str])) – Function which takes in the FreshnessPolicySensorContext and outputs the message you want to send. Defaults to a text message that contains the relevant asset key, and the number of minutes past its defined freshness policy it currently is. The usage of the text_fn changes depending on whether you’re using blocks_fn. If you are using blocks_fn, this is used as a fallback string to display in notifications. If you aren’t, this is the main body text of the message. It can be formatted as plain text, or with markdown. See more details in https://api.slack.com/methods/chat.postMessage#text_usage

  • blocks_fn (Callable[[FreshnessPolicySensorContext], List[Dict]]) – Function which takes in the FreshnessPolicySensorContext and outputs the message blocks you want to send. See information about Blocks in https://api.slack.com/reference/block-kit/blocks

  • name – (Optional[str]): The name of the sensor. Defaults to “slack_on_freshness_policy”.

  • dagit_base_url deprecated (This parameter will be removed in version 2.0. Use webserver_base_url instead.) (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the relevant asset page.

  • default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.

  • webserver_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the relevant asset page.

Examples

slack_on_freshness_policy = make_slack_on_freshness_policy_status_change_sensor(
    "#my_channel",
    os.getenv("MY_SLACK_TOKEN"),
)
def my_message_fn(context: FreshnessPolicySensorContext) -> str:
    if context.minutes_overdue == 0:
        return f"Asset {context.asset_key} is currently on time :)"
    return (
        f"Asset {context.asset_key} is currently {context.minutes_overdue} minutes late!!"
    )

slack_on_run_failure = make_slack_on_run_failure_sensor(
    channel="#my_channel",
    slack_token=os.getenv("MY_SLACK_TOKEN"),
    text_fn=my_message_fn,
    webserver_base_url="http://mycoolsite.com",
)
dagster_slack.slack_on_failure HookDefinition[source]

Create a hook on step failure events that will message the given Slack channel.

Parameters:
  • channel (str) – The channel to send the message to (e.g. “#my_channel”)

  • message_fn (Optional(Callable[[HookContext], str])) – Function which takes in the HookContext outputs the message you want to send.

  • dagit_base_url deprecated (This parameter will be removed in version 2.0. Use webserver_base_url instead.) (Optional[str]): The base url of your webserver instance. Specify this to allow messages to include deeplinks to the specific run that triggered the hook.

  • webserver_base_url – (Optional[str]): The base url of your webserver instance. Specify this to allow messages to include deeplinks to the specific run that triggered the hook.

Examples

@slack_on_failure("#foo", webserver_base_url="http://localhost:3000")
@job(...)
def my_job():
    pass
def my_message_fn(context: HookContext) -> str:
    return f"Op {context.op} failed!"

@op
def an_op(context):
    pass

@job(...)
def my_job():
    an_op.with_hooks(hook_defs={slack_on_failure("#foo", my_message_fn)})
dagster_slack.slack_on_success HookDefinition[source]

Create a hook on step success events that will message the given Slack channel.

Parameters:
  • channel (str) – The channel to send the message to (e.g. “#my_channel”)

  • message_fn (Optional(Callable[[HookContext], str])) – Function which takes in the HookContext outputs the message you want to send.

  • dagit_base_url deprecated (This parameter will be removed in version 2.0. Use webserver_base_url instead.) (Optional[str]): The base url of your webserver instance. Specify this to allow messages to include deeplinks to the specific run that triggered the hook.

  • webserver_base_url – (Optional[str]): The base url of your webserver instance. Specify this to allow messages to include deeplinks to the specific run that triggered the hook.

Examples

@slack_on_success("#foo", webserver_base_url="http://localhost:3000")
@job(...)
def my_job():
    pass
def my_message_fn(context: HookContext) -> str:
    return f"Op {context.op} worked!"

@op
def an_op(context):
    pass

@job(...)
def my_job():
    an_op.with_hooks(hook_defs={slack_on_success("#foo", my_message_fn)})

Legacy

dagster_slack.slack_resource ResourceDefinition[source]

This resource is for connecting to Slack.

The resource object is a slack_sdk.WebClient.

By configuring this Slack resource, you can post messages to Slack from any Dagster op, asset, schedule or sensor.

Examples

import os

from dagster import job, op
from dagster_slack import slack_resource


@op(required_resource_keys={'slack'})
def slack_op(context):
    context.resources.slack.chat_postMessage(channel='#noise', text=':wave: hey there!')

@job(resource_defs={'slack': slack_resource})
def slack_job():
    slack_op()

slack_job.execute_in_process(
    run_config={'resources': {'slack': {'config': {'token': os.getenv('SLACK_TOKEN')}}}}
)