Ask AI

Source code for dagster_airbyte.translator

from enum import Enum
from typing import Any, List, Mapping, Optional, Sequence

from dagster._annotations import deprecated, experimental
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet, TableMetadataSet
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method

from dagster_airbyte.utils import generate_table_schema, get_airbyte_connection_table_name


class AirbyteJobStatusType(str, Enum):
    RUNNING = "running"
    SUCCEEDED = "succeeded"
    CANCELLED = "cancelled"
    PENDING = "pending"
    FAILED = "failed"
    ERROR = "error"
    INCOMPLETE = "incomplete"


@deprecated(breaking_version="1.10", additional_warn_text="Use `AirbyteJobStatusType` instead.")
class AirbyteState:
    RUNNING = AirbyteJobStatusType.RUNNING
    SUCCEEDED = AirbyteJobStatusType.SUCCEEDED
    CANCELLED = AirbyteJobStatusType.CANCELLED
    PENDING = AirbyteJobStatusType.PENDING
    FAILED = AirbyteJobStatusType.FAILED
    ERROR = AirbyteJobStatusType.ERROR
    INCOMPLETE = AirbyteJobStatusType.INCOMPLETE


@record
class AirbyteConnectionTableProps:
    table_name: str
    stream_prefix: Optional[str]
    stream_name: str
    json_schema: Mapping[str, Any]
    connection_id: str
    connection_name: str
    destination_type: Optional[str]
    database: Optional[str]
    schema: Optional[str]

    @property
    def fully_qualified_table_name(self) -> Optional[str]:
        return (
            f"{self.database}.{self.schema}.{self.stream_name}"
            if self.database and self.schema
            else None
        )


@whitelist_for_serdes
@record
class AirbyteConnection:
    """Represents an Airbyte connection, based on data as returned from the API."""

    id: str
    name: str
    stream_prefix: Optional[str]
    streams: Mapping[str, "AirbyteStream"]
    destination_id: str

    @classmethod
    def from_connection_details(
        cls,
        connection_details: Mapping[str, Any],
    ) -> "AirbyteConnection":
        return cls(
            id=connection_details["connectionId"],
            name=connection_details["name"],
            stream_prefix=connection_details.get("prefix"),
            streams={
                stream_details["stream"]["name"]: AirbyteStream.from_stream_details(
                    stream_details=stream_details
                )
                for stream_details in connection_details.get("syncCatalog", {}).get("streams", [])
            },
            destination_id=connection_details["destinationId"],
        )


@whitelist_for_serdes
@record
class AirbyteDestination:
    """Represents an Airbyte destination, based on data as returned from the API."""

    id: str
    type: str
    database: Optional[str]
    schema: Optional[str]

    @classmethod
    def from_destination_details(
        cls,
        destination_details: Mapping[str, Any],
    ) -> "AirbyteDestination":
        return cls(
            id=destination_details["destinationId"],
            type=destination_details["destinationType"],
            database=destination_details["configuration"].get("database"),
            schema=destination_details["configuration"].get("schema"),
        )


@whitelist_for_serdes
@record
class AirbyteStream:
    """Represents an Airbyte stream, based on data as returned from the API.
    A stream in Airbyte corresponds to a table.
    """

    name: str
    selected: bool
    json_schema: Mapping[str, Any]

    @classmethod
    def from_stream_details(
        cls,
        stream_details: Mapping[str, Any],
    ) -> "AirbyteStream":
        return cls(
            name=stream_details["stream"]["name"],
            selected=stream_details["config"].get("selected", False),
            json_schema=stream_details["stream"].get("jsonSchema", {}),
        )


@whitelist_for_serdes
@record
class AirbyteJob:
    """Represents an Airbyte job, based on data as returned from the API."""

    id: int
    status: str

    @classmethod
    def from_job_details(
        cls,
        job_details: Mapping[str, Any],
    ) -> "AirbyteJob":
        return cls(
            id=job_details["jobId"],
            status=job_details["status"],
        )


@whitelist_for_serdes
@record
class AirbyteWorkspaceData:
    """A record representing all content in an Airbyte workspace.
    This applies to both Airbyte OSS and Cloud.
    """

    connections_by_id: Mapping[str, AirbyteConnection]
    destinations_by_id: Mapping[str, AirbyteDestination]

    @cached_method
    def to_airbyte_connection_table_props_data(self) -> Sequence[AirbyteConnectionTableProps]:
        """Method that converts a `AirbyteWorkspaceData` object
        to a collection of `AirbyteConnectionTableProps` objects.
        """
        data: List[AirbyteConnectionTableProps] = []

        for connection in self.connections_by_id.values():
            destination = self.destinations_by_id[connection.destination_id]

            for stream in connection.streams.values():
                if stream.selected:
                    data.append(
                        AirbyteConnectionTableProps(
                            table_name=get_airbyte_connection_table_name(
                                stream_prefix=connection.stream_prefix,
                                stream_name=stream.name,
                            ),
                            stream_prefix=connection.stream_prefix,
                            stream_name=stream.name,
                            json_schema=stream.json_schema,
                            connection_id=connection.id,
                            connection_name=connection.name,
                            destination_type=destination.type,
                            database=destination.database,
                            schema=destination.schema,
                        )
                    )

        return data


class AirbyteMetadataSet(NamespacedMetadataSet):
    connection_id: str
    connection_name: str
    stream_prefix: Optional[str] = None

    @classmethod
    def namespace(cls) -> str:
        return "dagster-airbyte"


[docs] @experimental class DagsterAirbyteTranslator: """Translator class which converts a `AirbyteConnectionTableProps` object into AssetSpecs. Subclass this class to implement custom logic how to translate Airbyte content into asset spec. """ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> AssetSpec: """Get the AssetSpec for a table synced by an Airbyte connection.""" table_schema_props = ( props.json_schema.get("properties") or props.json_schema.get("items", {}).get("properties") or {} ) column_schema = generate_table_schema(table_schema_props) metadata = { **TableMetadataSet( column_schema=column_schema, table_name=props.fully_qualified_table_name, ), **AirbyteMetadataSet( connection_id=props.connection_id, connection_name=props.connection_name, stream_prefix=props.stream_prefix, ), } return AssetSpec( key=AssetKey(props.table_name), metadata=metadata, kinds={"airbyte", *({props.destination_type} if props.destination_type else set())}, )