Ask AI

Source code for dagster_pagerduty.resources

from typing import Dict, Optional, cast

import pypd
from dagster import ConfigurableResource, resource
from dagster._config.pythonic_config import infer_schema_from_config_class
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._utils.warnings import suppress_dagster_warnings
from pydantic import Field as PyField


[docs]class PagerDutyService(ConfigurableResource): """This resource is for posting events to PagerDuty.""" """Integrates with PagerDuty via the pypd library. See: https://v2.developer.pagerduty.com/docs/events-api-v2 https://v2.developer.pagerduty.com/docs/send-an-event-events-api-v2 https://support.pagerduty.com/docs/services-and-integrations#section-events-api-v2 https://github.com/PagerDuty/pagerduty-api-python-client for documentation and more information. """ routing_key: str = PyField( ..., description=( "The routing key provisions access to your PagerDuty service. You" "will need to include the integration key for your new integration, as a" "routing_key in the event payload." ), ) @classmethod def _is_dagster_maintained(cls) -> bool: return True def EventV2_create( self, summary: str, source: str, severity: str, event_action: str = "trigger", dedup_key: Optional[str] = None, timestamp: Optional[str] = None, component: Optional[str] = None, group: Optional[str] = None, event_class: Optional[str] = None, custom_details: Optional[object] = None, ) -> object: """Events API v2 enables you to add PagerDuty's advanced event and incident management functionality to any system that can make an outbound HTTP connection. Args: summary (str): A high-level, text summary message of the event. Will be used to construct an alert's description. Example: "PING OK - Packet loss = 0%, RTA = 1.41 ms" "Host 'acme-andromeda-sv1-c40 :: 179.21.24.50' is DOWN" source (str): Specific human-readable unique identifier, such as a hostname, for the system having the problem. Examples: "prod05.theseus.acme-widgets.com" "171.26.23.22" "aws:elasticache:us-east-1:852511987:cluster/api-stats-prod-003" "9c09acd49a25" severity (str): How impacted the affected system is. Displayed to users in lists and influences the priority of any created incidents. Must be one of {info, warning, error, critical} Keyword Args: event_action (str): There are three types of events that PagerDuty recognizes, and are used to represent different types of activity in your monitored systems. (default: 'trigger') * trigger: When PagerDuty receives a trigger event, it will either open a new alert, or add a new trigger log entry to an existing alert, depending on the provided dedup_key. Your monitoring tools should send PagerDuty a trigger when a new problem has been detected. You may send additional triggers when a previously detected problem has occurred again. * acknowledge: acknowledge events cause the referenced incident to enter the acknowledged state. While an incident is acknowledged, it won't generate any additional notifications, even if it receives new trigger events. Your monitoring tools should send PagerDuty an acknowledge event when they know someone is presently working on the problem. * resolve: resolve events cause the referenced incident to enter the resolved state. Once an incident is resolved, it won't generate any additional notifications. New trigger events with the same dedup_key as a resolved incident won't re-open the incident. Instead, a new incident will be created. Your monitoring tools should send PagerDuty a resolve event when the problem that caused the initial trigger event has been fixed. dedup_key (str): Deduplication key for correlating triggers and resolves. The maximum permitted length of this property is 255 characters. timestamp (str): Timestamp (ISO 8601). When the upstream system detected / created the event. This is useful if a system batches or holds events before sending them to PagerDuty. This will be auto-generated by PagerDuty if not provided. Example: 2015-07-17T08:42:58.315+0000 component (str): The part or component of the affected system that is broken. Examples: "keepalive" "webping" "mysql" "wqueue" group (str): A cluster or grouping of sources. For example, sources "prod-datapipe-02" and "prod-datapipe-03" might both be part of "prod-datapipe". Examples: "prod-datapipe" "www" "web_stack" event_class (str): The class/type of the event. Examples: "High CPU" "Latency" "500 Error" custom_details (Dict[str, str]): Additional details about the event and affected system. Example: {"ping time": "1500ms", "load avg": 0.75 } """ data = { "routing_key": self.routing_key, "event_action": event_action, "payload": {"summary": summary, "source": source, "severity": severity}, } if dedup_key is not None: data["dedup_key"] = dedup_key payload: Dict[str, object] = cast(Dict[str, object], data["payload"]) if timestamp is not None: payload["timestamp"] = timestamp if component is not None: payload["component"] = component if group is not None: payload["group"] = group if event_class is not None: payload["class"] = event_class if custom_details is not None: payload["custom_details"] = custom_details return pypd.EventV2.create(data=data)
[docs]@dagster_maintained_resource @resource( config_schema=infer_schema_from_config_class(PagerDutyService), description="""This resource is for posting events to PagerDuty.""", ) @suppress_dagster_warnings def pagerduty_resource(context) -> PagerDutyService: """A resource for posting events (alerts) to PagerDuty. Example: .. code-block:: python @op def pagerduty_op(pagerduty: PagerDutyService): pagerduty.EventV2_create( summary='alert from dagster' source='localhost', severity='error', event_action='trigger', ) @job(resource_defs={ 'pagerduty': pagerduty_resource }) def pagerduty_test(): pagerduty_op() pagerduty_test.execute_in_process( run_config={ "resources": { 'pagerduty': {'config': {'routing_key': '0123456789abcdef0123456789abcdef'}} } } ) """ return PagerDutyService(**context.resource_config)