Ask AI

Source code for dagster_census.resources

import datetime
import json
import logging
import time
from typing import Any, Mapping, Optional

import requests
from dagster import Failure, Field, StringSource, __version__, get_dagster_logger, resource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException

from dagster_census.types import CensusOutput

CENSUS_API_BASE = "app.getcensus.com/api"
CENSUS_VERSION = "v1"

DEFAULT_POLL_INTERVAL = 10

SYNC_RUN_STATUSES = {"completed", "failed", "queued", "skipped", "working"}


[docs] class CensusResource: """This class exposes methods on top of the Census REST API.""" def __init__( self, api_key: str, request_max_retries: int = 3, request_retry_delay: float = 0.25, log: logging.Logger = get_dagster_logger(), ): self.api_key = api_key self._request_max_retries = request_max_retries self._request_retry_delay = request_retry_delay self._log = log @property def _api_key(self): if self.api_key.startswith("secret-token:"): return self.api_key return "secret-token:" + self.api_key @property def api_base_url(self) -> str: return f"https://{CENSUS_API_BASE}/{CENSUS_VERSION}" def make_request( self, method: str, endpoint: str, data: Optional[str] = None ) -> Mapping[str, Any]: """Creates and sends a request to the desired Census API endpoint. Args: method (str): The http method to use for this request (e.g. "POST", "GET", "PATCH"). endpoint (str): The Census API endpoint to send this request to. data (Optional[str]): JSON-formatted data string to be included in the request. Returns: Dict[str, Any]: JSON data from the response to this request """ url = f"{self.api_base_url}/{endpoint}" headers = { "User-Agent": f"dagster-census/{__version__}", "Content-Type": "application/json;version=2", } num_retries = 0 while True: try: response = requests.request( method=method, url=url, headers=headers, auth=HTTPBasicAuth("bearer", self._api_key), data=data, ) response.raise_for_status() return response.json() except RequestException as e: self._log.error("Request to Census API failed: %s", e) if num_retries == self._request_max_retries: break num_retries += 1 time.sleep(self._request_retry_delay) raise Failure(f"Max retries ({self._request_max_retries}) exceeded with url: {url}.") def get_sync(self, sync_id: int) -> Mapping[str, Any]: """Gets details about a given sync from the Census API. Args: sync_id (int): The Census Sync ID. Returns: Dict[str, Any]: JSON data from the response to this request """ return self.make_request(method="GET", endpoint=f"syncs/{sync_id}") def get_source(self, source_id: int) -> Mapping[str, Any]: """Gets details about a given source from the Census API. Args: source_id (int): The Census Source ID. Returns: Dict[str, Any]: JSON data from the response to this request """ return self.make_request(method="GET", endpoint=f"sources/{source_id}") def get_destination(self, destination_id: int) -> Mapping[str, Any]: """Gets details about a given destination from the Census API. Args: destination_id (int): The Census Destination ID. Returns: Dict[str, Any]: JSON data from the response to this request """ return self.make_request(method="GET", endpoint=f"destinations/{destination_id}") def get_sync_run(self, sync_run_id: int) -> Mapping[str, Any]: """Gets details about a specific sync run from the Census API. Args: sync_run_id (int): The Census Sync Run ID. Returns: Dict[str, Any]: JSON data from the response to this request """ return self.make_request(method="GET", endpoint=f"sync_runs/{sync_run_id}") def poll_sync_run( self, sync_run_id: int, poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, ) -> Mapping[str, Any]: """Given a Census sync run, poll until the run is complete. Args: sync_id (int): The Census Sync Run ID. poll_interval (float): The time (in seconds) that will be waited between successive polls. poll_timeout (float): The maximum time that will waited before this operation is timed out. By default, this will never time out. Returns: Dict[str, Any]: JSON data from the response to this request """ log_url = f"https://app.getcensus.com/syncs_runs/{sync_run_id}" poll_start = datetime.datetime.now() while True: time.sleep(poll_interval) response_dict = self.get_sync_run(sync_run_id) if "data" not in response_dict.keys(): raise ValueError( f"Getting status of sync failed, please visit Census Logs at {log_url} to see" " more." ) sync_status = response_dict["data"]["status"] sync_id = response_dict["data"]["sync_id"] if sync_status not in SYNC_RUN_STATUSES: raise ValueError( f"Unexpected response status '{sync_status}'; " f"must be one of {','.join(sorted(SYNC_RUN_STATUSES))}. " "See Management API docs for more information: " "https://docs.getcensus.com/basics/developers/api/sync-runs" ) if sync_status in {"queued", "working"}: self._log.debug( f"Sync {sync_id} still running after {datetime.datetime.now() - poll_start}." ) continue if poll_timeout and datetime.datetime.now() > poll_start + datetime.timedelta( seconds=poll_timeout ): raise Failure( f"Sync for sync '{sync_id}' timed out after" f" {datetime.datetime.now() - poll_start}." ) break self._log.debug( f"Sync {sync_id} has finished running after {datetime.datetime.now() - poll_start}." ) self._log.info(f"View sync details here: {log_url}.") return response_dict def trigger_sync(self, sync_id: int, force_full_sync: bool = False) -> Mapping[str, Any]: """Trigger an asynchronous run for a specific sync. Args: sync_id (int): The Census Sync Run ID. force_full_sync (bool): If the Sync should perform a full sync Returns: Dict[str, Any]: JSON data from the response to this request """ data = {"force_full_sync": force_full_sync} return self.make_request( method="POST", endpoint=f"syncs/{sync_id}/trigger", data=json.dumps(data) ) def trigger_sync_and_poll( self, sync_id: int, force_full_sync: bool = False, poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, ) -> CensusOutput: """Trigger a run for a specific sync and poll until it has completed. Args: sync_id (int): The Census Sync Run ID. force_full_sync (bool): If the Sync should perform a full sync poll_interval (float): The time (in seconds) that will be waited between successive polls. poll_timeout (float): The maximum time that will waited before this operation is timed out. By default, this will never time out. Returns: :py:class:`~CensusOutput`: Object containing details about the sync run and the sync details """ sync_details = self.get_sync(sync_id=sync_id) source_details = self.get_source( source_id=sync_details["data"]["source_attributes"]["connection_id"] )["data"] destination_details = self.get_destination( destination_id=sync_details["data"]["destination_attributes"]["connection_id"] )["data"] trigger_sync_resp = self.trigger_sync(sync_id=sync_id, force_full_sync=force_full_sync) sync_run_details = self.poll_sync_run( sync_run_id=trigger_sync_resp["data"]["sync_run_id"], poll_interval=poll_interval, poll_timeout=poll_timeout, )["data"] return CensusOutput( sync_run=sync_run_details, source=source_details, destination=destination_details, )
[docs] @dagster_maintained_resource @resource( config_schema={ "api_key": Field( StringSource, is_required=True, description="Census API Key.", ), "request_max_retries": Field( int, default_value=3, description=( "The maximum number of times requests to the Census API should be retried " "before failing." ), ), "request_retry_delay": Field( float, default_value=0.25, description="Time (in seconds) to wait between each request retry.", ), }, description="This resource helps manage Census connectors", ) def census_resource(context) -> CensusResource: """This resource allows users to programatically interface with the Census REST API to launch syncs and monitor their progress. This currently implements only a subset of the functionality exposed by the API. **Examples:** .. code-block:: python from dagster import job from dagster_census import census_resource my_census_resource = census_resource.configured( { "api_key": {"env": "CENSUS_API_KEY"}, } ) @job(resource_defs={"census":my_census_resource}) def my_census_job(): ... """ return CensusResource( api_key=context.resource_config["api_key"], request_max_retries=context.resource_config["request_max_retries"], request_retry_delay=context.resource_config["request_retry_delay"], log=context.log, )