Source code for dagster_msteams.resources
from typing import Optional
from dagster import ConfigurableResource, resource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from pydantic import Field
from dagster_msteams.client import TeamsClient
[docs]
class MSTeamsResource(ConfigurableResource):
"""This resource is for connecting to Microsoft Teams.
Provides a `dagster_msteams.TeamsClient` which can be used to
interface with the MS Teams API.
By configuring this resource, you can post messages to MS Teams from any Dagster op,
asset, schedule, or sensor:
Examples:
.. code-block:: python
import os
from dagster import op, job, Definitions, EnvVar
from dagster_msteams import Card, MSTeamsResource
@op
def teams_op(msteams: MSTeamsResource):
card = Card()
card.add_attachment(text_message="Hello There !!")
msteams.get_client().post_message(payload=card.payload)
@job
def teams_job():
teams_op()
defs = Definitions(
jobs=[teams_job],
resources={
"msteams": MSTeamsResource(
hook_url=EnvVar("TEAMS_WEBHOOK_URL")
)
}
)
"""
hook_url: str = Field(
description=(
"To send messages to MS Teams channel, an incoming webhook has to be created. The"
" incoming webhook url must be given as a part of the resource config to the"
" MSTeamsResource in Dagster. For more information on how to create an incoming"
" webhook, see"
" https://docs.microsoft.com/en-us/microsoftteams/platform/webhooks-and-connectors/how-to/add-incoming-webhook"
),
)
http_proxy: Optional[str] = Field(default=None, description="HTTP proxy URL")
https_proxy: Optional[str] = Field(default=None, description="HTTPS proxy URL")
timeout: float = Field(default=60, description="Timeout for requests to MS Teams")
verify: bool = Field(
default=True, description="Whether to verify SSL certificates, defaults to True"
)
@classmethod
def _is_dagster_maintained(cls) -> bool:
return True
def get_client(self) -> TeamsClient:
return TeamsClient(
hook_url=self.hook_url,
http_proxy=self.http_proxy,
https_proxy=self.https_proxy,
timeout=self.timeout,
verify=self.verify,
)
[docs]
@dagster_maintained_resource
@resource(
config_schema=MSTeamsResource.to_config_schema(),
description="This resource is for connecting to MS Teams",
)
def msteams_resource(context) -> TeamsClient:
"""This resource is for connecting to Microsoft Teams.
The resource object is a `dagster_msteams.TeamsClient`.
By configuring this resource, you can post messages to MS Teams from any Dagster solid:
Examples:
.. code-block:: python
import os
from dagster import op, job
from dagster_msteams import Card, msteams_resource
@op(required_resource_keys={"msteams"})
def teams_op(context):
card = Card()
card.add_attachment(text_message="Hello There !!")
context.resources.msteams.post_message(payload=card.payload)
@job(resource_defs={"msteams": msteams_resource})
def teams_job():
teams_op()
teams_job.execute_in_process(
{"resources": {"msteams": {"config": {"hook_url": os.getenv("TEAMS_WEBHOOK_URL")}}}}
)
"""
return MSTeamsResource.from_resource_context(context).get_client()