Ask AI

Source code for dagster_twilio.resources

from dagster import ConfigurableResource, resource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._core.execution.context.init import InitResourceContext
from pydantic import Field
from twilio.rest import Client


[docs]class TwilioResource(ConfigurableResource): """This resource is for connecting to Twilio.""" account_sid: str = Field( description=( "Twilio Account SID, created with yout Twilio account. This can be found on your Twilio" " dashboard, see" " https://www.twilio.com/blog/twilio-access-tokens-python" ), ) auth_token: str = Field( description=( "Twilio Authentication Token, created with yout Twilio account. This can be found on" " your Twilio dashboard, see https://www.twilio.com/blog/twilio-access-tokens-python" ), ) @classmethod def _is_dagster_maintained(cls) -> bool: return True def create_client(self) -> Client: return Client(self.account_sid, self.auth_token)
[docs]@dagster_maintained_resource @resource( config_schema=TwilioResource.to_config_schema(), description="This resource is for connecting to Twilio", ) def twilio_resource(context: InitResourceContext) -> Client: return TwilioResource.from_resource_context(context).create_client()