Microsoft Teams (dagster-msteams)

Resource

dagster_msteams.MSTeamsResource ResourceDefinition[source]

Config Schema:
hook_url (dagster.StringSource):

To send messages to MS Teams channel, an incoming webhook has to be created. The incoming webhook url must be given as a part of the resource config to the MSTeamsResource in Dagster. For more information on how to create an incoming webhook, see https://docs.microsoft.com/en-us/microsoftteams/platform/webhooks-and-connectors/how-to/add-incoming-webhook

http_proxy (Union[dagster.StringSource, None], optional):

HTTP proxy URL

https_proxy (Union[dagster.StringSource, None], optional):

HTTPS proxy URL

timeout (Union[Float, None], optional):

Timeout for requests to MS Teams

Default Value: 60

verify (Union[dagster.BoolSource, None], optional):

Whether to verify SSL certificates, defaults to True

Default Value: True

This resource is for connecting to Microsoft Teams.

Provides a dagster_msteams.TeamsClient which can be used to interface with the MS Teams API.

By configuring this resource, you can post messages to MS Teams from any Dagster op, asset, schedule, or sensor:

Examples

import os

from dagster import op, job, Definitions, EnvVar
from dagster_msteams import Card, MSTeamsResource


@op
def teams_op(msteams: MSTeamsResource):
    card = Card()
    card.add_attachment(text_message="Hello There !!")
    msteams.get_client().post_message(payload=card.payload)


@job
def teams_job():
    teams_op()

defs = Definitions(
    jobs=[teams_job],
    resources={
        "msteams": MSTeamsResource(
            hook_url=EnvVar("TEAMS_WEBHOOK_URL")
        )
    }
)

Sensors

dagster_msteams.teams_on_failure HookDefinition[source]

Create a hook on step failure events that will message the given MS Teams webhook URL.

Parameters:
  • message_fn (Optional(Callable[[HookContext], str])) – Function which takes in the HookContext outputs the message you want to send.

  • dagit_base_url deprecated (This parameter will be removed in version 2.0. Use webserver_base_url instead.) (Optional[str]): The base url of your webserver instance. Specify this to allow messages to include deeplinks to the specific run that triggered the hook.

  • webserver_base_url – (Optional[str]): The base url of your webserver instance. Specify this to allow messages to include deeplinks to the specific run that triggered the hook.

Examples

@teams_on_failure(webserver_base_url="http://localhost:3000")
@job(...)
def my_job():
    pass
def my_message_fn(context: HookContext) -> str:
    return f"Op {context.op.name} failed!"

@op
def a_op(context):
    pass

@job(...)
def my_job():
    a_op.with_hooks(hook_defs={teams_on_failure("#foo", my_message_fn)})
dagster_msteams.teams_on_success HookDefinition[source]

Create a hook on step success events that will message the given MS Teams webhook URL.

Parameters:
  • message_fn (Optional(Callable[[HookContext], str])) – Function which takes in the HookContext outputs the message you want to send.

  • dagit_base_url deprecated (This parameter will be removed in version 2.0. Use webserver_base_url instead.) (Optional[str]): The base url of your webserver instance. Specify this to allow messages to include deeplinks to the specific run that triggered the hook.

Examples

@teams_on_success(webserver_base_url="http://localhost:3000")
@job(...)
def my_job():
    pass
def my_message_fn(context: HookContext) -> str:
    return f"Op {context.op.name} failed!"

@op
def a_op(context):
    pass

@job(...)
def my_job():
    a_op.with_hooks(hook_defs={teams_on_success("#foo", my_message_fn)})
dagster_msteams.make_teams_on_run_failure_sensor(hook_url, message_fn=<function _default_failure_message>, http_proxy=None, https_proxy=None, timeout=60, verify=None, name=None, dagit_base_url=None, default_status=DefaultSensorStatus.STOPPED, monitored_jobs=None, monitor_all_code_locations=False, webserver_base_url=None, monitor_all_repositories=False)[source]

Create a sensor on run failures that will message the given MS Teams webhook URL.

Parameters:
  • hook_url (str) – MS Teams incoming webhook URL.

  • message_fn (Optional(Callable[[RunFailureSensorContext], str])) – Function which takes in the RunFailureSensorContext and outputs the message you want to send. Defaults to a text message that contains error message, job name, and run ID.

  • http_proxy – (Optional[str]): Proxy for requests using http protocol.

  • https_proxy – (Optional[str]): Proxy for requests using https protocol.

  • timeout – (Optional[float]): Connection timeout in seconds. Defaults to 60.

  • verify – (Optional[bool]): Whether to verify the servers TLS certificate.

  • name – (Optional[str]): The name of the sensor. Defaults to “teams_on_run_failure”.

  • dagit_base_url deprecated (This parameter will be removed in version 2.0. Use webserver_base_url instead.) (Optional[str]): The base url of your webserver instance. Specify this to allow messages to include deeplinks to the failed run.

  • default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.

  • monitored_jobs (Optional[List[Union[JobDefinition, GraphDefinition, UnresolvedAssetJobDefinition, RepositorySelector, JobSelector]]]) – Jobs in the current repository that will be monitored by this sensor. Defaults to None, which means the alert will be sent when any job in the repository matches the requested run_status. To monitor jobs in external repositories, use RepositorySelector and JobSelector.

  • monitor_all_code_locations (bool) – If set to True, the sensor will monitor all runs in the Dagster instance. If set to True, an error will be raised if you also specify monitored_jobs or job_selection. Defaults to False.

  • webserver_base_url – (Optional[str]): The base url of your webserver instance. Specify this to allow messages to include deeplinks to the failed run.

  • monitor_all_repositories (bool) – deprecated (This parameter will be removed in version 2.0. Use monitor_all_code_locations instead.) If set to True, the sensor will monitor all runs in the Dagster instance. If set to True, an error will be raised if you also specify monitored_jobs or job_selection. Defaults to False.

Examples

teams_on_run_failure = make_teams_on_run_failure_sensor(
    hook_url=os.getenv("TEAMS_WEBHOOK_URL")
)

@repository
def my_repo():
    return [my_job + teams_on_run_failure]
def my_message_fn(context: RunFailureSensorContext) -> str:
    return "Job {job_name} failed! Error: {error}".format(
        job_name=context.dagster_run.job_name,
        error=context.failure_event.message,
    )

teams_on_run_failure = make_teams_on_run_failure_sensor(
    hook_url=os.getenv("TEAMS_WEBHOOK_URL"),
    message_fn=my_message_fn,
    webserver_base_url="http://localhost:3000",
)

Legacy

dagster_msteams.msteams_resource ResourceDefinition[source]

Config Schema:
hook_url (dagster.StringSource):

To send messages to MS Teams channel, an incoming webhook has to be created. The incoming webhook url must be given as a part of the resource config to the MSTeamsResource in Dagster. For more information on how to create an incoming webhook, see https://docs.microsoft.com/en-us/microsoftteams/platform/webhooks-and-connectors/how-to/add-incoming-webhook

http_proxy (Union[dagster.StringSource, None], optional):

HTTP proxy URL

https_proxy (Union[dagster.StringSource, None], optional):

HTTPS proxy URL

timeout (Union[Float, None], optional):

Timeout for requests to MS Teams

Default Value: 60

verify (Union[dagster.BoolSource, None], optional):

Whether to verify SSL certificates, defaults to True

Default Value: True

This resource is for connecting to Microsoft Teams.

The resource object is a dagster_msteams.TeamsClient.

By configuring this resource, you can post messages to MS Teams from any Dagster solid:

Examples

import os

from dagster import op, job
from dagster_msteams import Card, msteams_resource


@op(required_resource_keys={"msteams"})
def teams_op(context):
    card = Card()
    card.add_attachment(text_message="Hello There !!")
    context.resources.msteams.post_message(payload=card.payload)


@job(resource_defs={"msteams": msteams_resource})
def teams_job():
    teams_op()


teams_job.execute_in_process(
    {"resources": {"msteams": {"config": {"hook_url": os.getenv("TEAMS_WEBHOOK_URL")}}}}
)