Ask AI

Source code for dagster_fivetran.ops

from typing import Any, Dict, List, Optional

from dagster import AssetKey, Config, In, Nothing, Out, Output, op
from dagster._core.storage.tags import COMPUTE_KIND_TAG
from pydantic import Field

from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource
from dagster_fivetran.types import FivetranOutput
from dagster_fivetran.utils import generate_materializations


class SyncConfig(Config):
    connector_id: str = Field(
        description=(
            "The Fivetran Connector ID that this op will sync. You can retrieve this "
            'value from the "Setup" tab of a given connector in the Fivetran UI.'
        ),
    )
    poll_interval: float = Field(
        default=DEFAULT_POLL_INTERVAL,
        description="The time (in seconds) that will be waited between successive polls.",
    )
    poll_timeout: Optional[float] = Field(
        default=None,
        description=(
            "The maximum time that will waited before this operation is timed out. By "
            "default, this will never time out."
        ),
    )
    yield_materializations: bool = Field(
        default=True,
        description=(
            "If True, materializations corresponding to the results of the Fivetran sync will "
            "be yielded when the op executes."
        ),
    )
    asset_key_prefix: List[str] = Field(
        default=["fivetran"],
        description=(
            "If provided and yield_materializations is True, these components will be used to "
            "prefix the generated asset keys."
        ),
    )


[docs] @op( ins={"start_after": In(Nothing)}, out=Out( FivetranOutput, description=( "Parsed json dictionary representing the details of the Fivetran connector after the" " sync successfully completes. See the [Fivetran API" " Docs](https://fivetran.com/docs/rest-api/connectors#retrieveconnectordetails) to see" " detailed information on this response." ), ), tags={COMPUTE_KIND_TAG: "fivetran"}, ) def fivetran_sync_op(config: SyncConfig, fivetran: FivetranResource) -> Any: """Executes a Fivetran sync for a given ``connector_id``, and polls until that sync completes, raising an error if it is unsuccessful. It outputs a FivetranOutput which contains the details of the Fivetran connector after the sync successfully completes, as well as details about which tables the sync updates. It requires the use of the :py:class:`~dagster_fivetran.fivetran_resource`, which allows it to communicate with the Fivetran API. Examples: .. code-block:: python from dagster import job from dagster_fivetran import fivetran_resource, fivetran_sync_op my_fivetran_resource = fivetran_resource.configured( { "api_key": {"env": "FIVETRAN_API_KEY"}, "api_secret": {"env": "FIVETRAN_API_SECRET"}, } ) sync_foobar = fivetran_sync_op.configured({"connector_id": "foobar"}, name="sync_foobar") @job(resource_defs={"fivetran": my_fivetran_resource}) def my_simple_fivetran_job(): sync_foobar() @job(resource_defs={"fivetran": my_fivetran_resource}) def my_composed_fivetran_job(): final_foobar_state = sync_foobar(start_after=some_op()) other_op(final_foobar_state) """ fivetran_output = fivetran.sync_and_poll( connector_id=config.connector_id, poll_interval=config.poll_interval, poll_timeout=config.poll_timeout, ) if config.yield_materializations: yield from generate_materializations( fivetran_output, asset_key_prefix=config.asset_key_prefix ) yield Output(fivetran_output)
class FivetranResyncConfig(SyncConfig): resync_parameters: Optional[Dict[str, Any]] = Field( None, description=( "Optional resync parameters to send in the payload to the Fivetran API. You can" " find an example resync payload here:" " https://fivetran.com/docs/rest-api/connectors#request_7" ), ) @op( ins={"start_after": In(Nothing)}, out=Out( FivetranOutput, description=( "Parsed json dictionary representing the details of the Fivetran connector after the" " resync successfully completes. See the [Fivetran API" " Docs](https://fivetran.com/docs/rest-api/connectors#retrieveconnectordetails) to see" " detailed information on this response." ), ), tags={COMPUTE_KIND_TAG: "fivetran"}, ) def fivetran_resync_op( config: FivetranResyncConfig, fivetran: FivetranResource, ) -> Any: """Executes a Fivetran historical resync for a given ``connector_id``, and polls until that resync completes, raising an error if it is unsuccessful. It outputs a FivetranOutput which contains the details of the Fivetran connector after the resync successfully completes, as well as details about which tables the resync updates. It requires the use of the :py:class:`~dagster_fivetran.fivetran_resource`, which allows it to communicate with the Fivetran API. Examples: .. code-block:: python from dagster import job from dagster_fivetran import fivetran_resource, fivetran_resync_op my_fivetran_resource = fivetran_resource.configured( { "api_key": {"env": "FIVETRAN_API_KEY"}, "api_secret": {"env": "FIVETRAN_API_SECRET"}, } ) sync_foobar = fivetran_resync_op.configured( { "connector_id": "foobar", "resync_parameters": { "schema_a": ["table_a", "table_b"], "schema_b": ["table_c"] } }, name="sync_foobar" ) @job(resource_defs={"fivetran": my_fivetran_resource}) def my_simple_fivetran_job(): sync_foobar() @job(resource_defs={"fivetran": my_fivetran_resource}) def my_composed_fivetran_job(): final_foobar_state = sync_foobar(start_after=some_op()) other_op(final_foobar_state) """ fivetran_output = fivetran.resync_and_poll( connector_id=config.connector_id, resync_parameters=config.resync_parameters, poll_interval=config.poll_interval, poll_timeout=config.poll_timeout, ) if config.yield_materializations: asset_key_filter = ( [ AssetKey(config.asset_key_prefix + [schema, table]) for schema, tables in config.resync_parameters.items() for table in tables ] if config.resync_parameters is not None else None ) for mat in generate_materializations( fivetran_output, asset_key_prefix=config.asset_key_prefix ): if asset_key_filter is None or mat.asset_key in asset_key_filter: yield mat yield Output(fivetran_output)