Ask AI

Source code for dagster_airbyte.ops

from typing import Any, Iterable, List, Optional

from dagster import Config, In, Nothing, Out, Output, op
from dagster._core.storage.tags import COMPUTE_KIND_TAG
from pydantic import Field

from dagster_airbyte.resources import DEFAULT_POLL_INTERVAL_SECONDS, BaseAirbyteResource
from dagster_airbyte.types import AirbyteOutput
from dagster_airbyte.utils import _get_attempt, generate_materializations


class AirbyteSyncConfig(Config):
    connection_id: str = Field(
        ...,
        description=(
            "Parsed json dictionary representing the details of the Airbyte connector after the"
            " sync successfully completes. See the [Airbyte API"
            " Docs](https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#overview)"
            " to see detailed information on this response."
        ),
    )
    poll_interval: float = Field(
        DEFAULT_POLL_INTERVAL_SECONDS,
        description=(
            "The maximum time that will waited before this operation is timed out. By "
            "default, this will never time out."
        ),
    )
    poll_timeout: Optional[float] = Field(
        None,
        description=(
            "The maximum time that will waited before this operation is timed out. By "
            "default, this will never time out."
        ),
    )
    yield_materializations: bool = Field(
        True,
        description=(
            "If True, materializations corresponding to the results of the Airbyte sync will "
            "be yielded when the op executes."
        ),
    )
    asset_key_prefix: List[str] = Field(
        ["airbyte"],
        description=(
            "If provided and yield_materializations is True, these components will be used to "
            "prefix the generated asset keys."
        ),
    )


[docs] @op( ins={"start_after": In(Nothing)}, out=Out( AirbyteOutput, description=( "Parsed json dictionary representing the details of the Airbyte connector after the" " sync successfully completes. See the [Airbyte API" " Docs](https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#overview)" " to see detailed information on this response." ), ), tags={COMPUTE_KIND_TAG: "airbyte"}, ) def airbyte_sync_op( context, config: AirbyteSyncConfig, airbyte: BaseAirbyteResource ) -> Iterable[Any]: """Executes a Airbyte job sync for a given ``connection_id``, and polls until that sync completes, raising an error if it is unsuccessful. It outputs a AirbyteOutput which contains the job details for a given ``connection_id``. It requires the use of the :py:class:`~dagster_airbyte.airbyte_resource`, which allows it to communicate with the Airbyte API. Examples: .. code-block:: python from dagster import job from dagster_airbyte import airbyte_resource, airbyte_sync_op my_airbyte_resource = airbyte_resource.configured( { "host": {"env": "AIRBYTE_HOST"}, "port": {"env": "AIRBYTE_PORT"}, } ) sync_foobar = airbyte_sync_op.configured({"connection_id": "foobar"}, name="sync_foobar") @job(resource_defs={"airbyte": my_airbyte_resource}) def my_simple_airbyte_job(): sync_foobar() @job(resource_defs={"airbyte": my_airbyte_resource}) def my_composed_airbyte_job(): final_foobar_state = sync_foobar(start_after=some_op()) other_op(final_foobar_state) """ airbyte_output = airbyte.sync_and_poll( connection_id=config.connection_id, poll_interval=config.poll_interval, poll_timeout=config.poll_timeout, ) if config.yield_materializations: yield from generate_materializations( airbyte_output, asset_key_prefix=config.asset_key_prefix ) yield Output( airbyte_output, metadata={ **_get_attempt(airbyte_output.job_details.get("attempts", [{}])[-1]).get( "totalStats", {} ) }, )