import datetime
import json
import logging
import time
from typing import Any, Mapping, Optional
import requests
from dagster import Failure, Field, StringSource, __version__, get_dagster_logger, resource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException
from dagster_census.types import CensusOutput
CENSUS_API_BASE = "app.getcensus.com/api"
CENSUS_VERSION = "v1"
DEFAULT_POLL_INTERVAL = 10
SYNC_RUN_STATUSES = {"completed", "failed", "queued", "skipped", "working"}
[docs]
class CensusResource:
"""This class exposes methods on top of the Census REST API."""
def __init__(
self,
api_key: str,
request_max_retries: int = 3,
request_retry_delay: float = 0.25,
log: logging.Logger = get_dagster_logger(),
):
self.api_key = api_key
self._request_max_retries = request_max_retries
self._request_retry_delay = request_retry_delay
self._log = log
@property
def _api_key(self):
if self.api_key.startswith("secret-token:"):
return self.api_key
return "secret-token:" + self.api_key
@property
def api_base_url(self) -> str:
return f"https://{CENSUS_API_BASE}/{CENSUS_VERSION}"
def make_request(
self, method: str, endpoint: str, data: Optional[str] = None
) -> Mapping[str, Any]:
"""Creates and sends a request to the desired Census API endpoint.
Args:
method (str): The http method to use for this request (e.g. "POST", "GET", "PATCH").
endpoint (str): The Census API endpoint to send this request to.
data (Optional[str]): JSON-formatted data string to be included in the request.
Returns:
Dict[str, Any]: JSON data from the response to this request
"""
url = f"{self.api_base_url}/{endpoint}"
headers = {
"User-Agent": f"dagster-census/{__version__}",
"Content-Type": "application/json;version=2",
}
num_retries = 0
while True:
try:
response = requests.request(
method=method,
url=url,
headers=headers,
auth=HTTPBasicAuth("bearer", self._api_key),
data=data,
)
response.raise_for_status()
return response.json()
except RequestException as e:
self._log.error("Request to Census API failed: %s", e)
if num_retries == self._request_max_retries:
break
num_retries += 1
time.sleep(self._request_retry_delay)
raise Failure(f"Max retries ({self._request_max_retries}) exceeded with url: {url}.")
def get_sync(self, sync_id: int) -> Mapping[str, Any]:
"""Gets details about a given sync from the Census API.
Args:
sync_id (int): The Census Sync ID.
Returns:
Dict[str, Any]: JSON data from the response to this request
"""
return self.make_request(method="GET", endpoint=f"syncs/{sync_id}")
def get_source(self, source_id: int) -> Mapping[str, Any]:
"""Gets details about a given source from the Census API.
Args:
source_id (int): The Census Source ID.
Returns:
Dict[str, Any]: JSON data from the response to this request
"""
return self.make_request(method="GET", endpoint=f"sources/{source_id}")
def get_destination(self, destination_id: int) -> Mapping[str, Any]:
"""Gets details about a given destination from the Census API.
Args:
destination_id (int): The Census Destination ID.
Returns:
Dict[str, Any]: JSON data from the response to this request
"""
return self.make_request(method="GET", endpoint=f"destinations/{destination_id}")
def get_sync_run(self, sync_run_id: int) -> Mapping[str, Any]:
"""Gets details about a specific sync run from the Census API.
Args:
sync_run_id (int): The Census Sync Run ID.
Returns:
Dict[str, Any]: JSON data from the response to this request
"""
return self.make_request(method="GET", endpoint=f"sync_runs/{sync_run_id}")
def poll_sync_run(
self,
sync_run_id: int,
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
) -> Mapping[str, Any]:
"""Given a Census sync run, poll until the run is complete.
Args:
sync_id (int): The Census Sync Run ID.
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will waited before this operation is timed
out. By default, this will never time out.
Returns:
Dict[str, Any]: JSON data from the response to this request
"""
log_url = f"https://app.getcensus.com/syncs_runs/{sync_run_id}"
poll_start = datetime.datetime.now()
while True:
time.sleep(poll_interval)
response_dict = self.get_sync_run(sync_run_id)
if "data" not in response_dict.keys():
raise ValueError(
f"Getting status of sync failed, please visit Census Logs at {log_url} to see"
" more."
)
sync_status = response_dict["data"]["status"]
sync_id = response_dict["data"]["sync_id"]
if sync_status not in SYNC_RUN_STATUSES:
raise ValueError(
f"Unexpected response status '{sync_status}'; "
f"must be one of {','.join(sorted(SYNC_RUN_STATUSES))}. "
"See Management API docs for more information: "
"https://docs.getcensus.com/basics/developers/api/sync-runs"
)
if sync_status in {"queued", "working"}:
self._log.debug(
f"Sync {sync_id} still running after {datetime.datetime.now() - poll_start}."
)
continue
if poll_timeout and datetime.datetime.now() > poll_start + datetime.timedelta(
seconds=poll_timeout
):
raise Failure(
f"Sync for sync '{sync_id}' timed out after"
f" {datetime.datetime.now() - poll_start}."
)
break
self._log.debug(
f"Sync {sync_id} has finished running after {datetime.datetime.now() - poll_start}."
)
self._log.info(f"View sync details here: {log_url}.")
return response_dict
def trigger_sync(self, sync_id: int, force_full_sync: bool = False) -> Mapping[str, Any]:
"""Trigger an asynchronous run for a specific sync.
Args:
sync_id (int): The Census Sync Run ID.
force_full_sync (bool): If the Sync should perform a full sync
Returns:
Dict[str, Any]: JSON data from the response to this request
"""
data = {"force_full_sync": force_full_sync}
return self.make_request(
method="POST", endpoint=f"syncs/{sync_id}/trigger", data=json.dumps(data)
)
def trigger_sync_and_poll(
self,
sync_id: int,
force_full_sync: bool = False,
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
) -> CensusOutput:
"""Trigger a run for a specific sync and poll until it has completed.
Args:
sync_id (int): The Census Sync Run ID.
force_full_sync (bool): If the Sync should perform a full sync
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will waited before this operation is timed
out. By default, this will never time out.
Returns:
:py:class:`~CensusOutput`:
Object containing details about the sync run and the sync details
"""
sync_details = self.get_sync(sync_id=sync_id)
source_details = self.get_source(
source_id=sync_details["data"]["source_attributes"]["connection_id"]
)["data"]
destination_details = self.get_destination(
destination_id=sync_details["data"]["destination_attributes"]["connection_id"]
)["data"]
trigger_sync_resp = self.trigger_sync(sync_id=sync_id, force_full_sync=force_full_sync)
sync_run_details = self.poll_sync_run(
sync_run_id=trigger_sync_resp["data"]["sync_run_id"],
poll_interval=poll_interval,
poll_timeout=poll_timeout,
)["data"]
return CensusOutput(
sync_run=sync_run_details,
source=source_details,
destination=destination_details,
)
[docs]
@dagster_maintained_resource
@resource(
config_schema={
"api_key": Field(
StringSource,
is_required=True,
description="Census API Key.",
),
"request_max_retries": Field(
int,
default_value=3,
description=(
"The maximum number of times requests to the Census API should be retried "
"before failing."
),
),
"request_retry_delay": Field(
float,
default_value=0.25,
description="Time (in seconds) to wait between each request retry.",
),
},
description="This resource helps manage Census connectors",
)
def census_resource(context) -> CensusResource:
"""This resource allows users to programatically interface with the Census REST API to launch
syncs and monitor their progress. This currently implements only a subset of the functionality
exposed by the API.
**Examples:**
.. code-block:: python
from dagster import job
from dagster_census import census_resource
my_census_resource = census_resource.configured(
{
"api_key": {"env": "CENSUS_API_KEY"},
}
)
@job(resource_defs={"census":my_census_resource})
def my_census_job():
...
"""
return CensusResource(
api_key=context.resource_config["api_key"],
request_max_retries=context.resource_config["request_max_retries"],
request_retry_delay=context.resource_config["request_retry_delay"],
log=context.log,
)