Ask AI

Source code for dagster_airbyte.managed.types

import json
from abc import ABC
from enum import Enum
from typing import Any, Dict, List, Mapping, Optional, Union

import dagster._check as check
from dagster._annotations import deprecated, public
from typing_extensions import Self

MANAGED_ELEMENTS_DEPRECATION_MSG = (
    "Dagster is deprecating support for ingestion-as-code."
    " We suggest using the Airbyte terraform provider:"
    " https://reference.airbyte.com/reference/using-the-terraform-provider."
)


[docs]class AirbyteSyncMode(ABC): """Represents the sync mode for a given Airbyte stream, which governs how Airbyte reads from a source and writes to a destination. For more information, see https://docs.airbyte.com/understanding-airbyte/connections/. """ def __eq__(self, other: Any) -> bool: return isinstance(other, AirbyteSyncMode) and self.to_json() == other.to_json() def __init__(self, json_repr: Dict[str, Any]): self.json_repr = json_repr def to_json(self) -> Dict[str, Any]: return self.json_repr @classmethod def from_json(cls, json_repr: Dict[str, Any]) -> "AirbyteSyncMode": return cls( { k: v for k, v in json_repr.items() if k in ("syncMode", "destinationSyncMode", "cursorField", "primaryKey") } )
[docs] @public @classmethod def full_refresh_append(cls) -> "AirbyteSyncMode": """Syncs the entire data stream from the source, appending rows to the destination. https://docs.airbyte.com/understanding-airbyte/connections/full-refresh-append/ """ return cls({"syncMode": "full_refresh", "destinationSyncMode": "append"})
[docs] @public @classmethod def full_refresh_overwrite(cls) -> "AirbyteSyncMode": """Syncs the entire data stream from the source, replaces data in the destination by overwriting it. https://docs.airbyte.com/understanding-airbyte/connections/full-refresh-overwrite """ return cls({"syncMode": "full_refresh", "destinationSyncMode": "overwrite"})
[docs] @public @classmethod def incremental_append( cls, cursor_field: Optional[str] = None, ) -> "AirbyteSyncMode": """Syncs only new records from the source, appending rows to the destination. May optionally specify the cursor field used to determine which records are new. https://docs.airbyte.com/understanding-airbyte/connections/incremental-append/ """ cursor_field = check.opt_str_param(cursor_field, "cursor_field") return cls( { "syncMode": "incremental", "destinationSyncMode": "append", **({"cursorField": [cursor_field]} if cursor_field else {}), } )
[docs] @public @classmethod def incremental_append_dedup( cls, cursor_field: Optional[str] = None, primary_key: Optional[Union[str, List[str]]] = None, ) -> "AirbyteSyncMode": """Syncs new records from the source, appending to an append-only history table in the destination. Also generates a deduplicated view mirroring the source table. May optionally specify the cursor field used to determine which records are new, and the primary key used to determine which records are duplicates. https://docs.airbyte.com/understanding-airbyte/connections/incremental-append-dedup/ """ cursor_field = check.opt_str_param(cursor_field, "cursor_field") if isinstance(primary_key, str): primary_key = [primary_key] primary_key = check.opt_list_param(primary_key, "primary_key", of_type=str) return cls( { "syncMode": "incremental", "destinationSyncMode": "append_dedup", **({"cursorField": [cursor_field]} if cursor_field else {}), **({"primaryKey": [[x] for x in primary_key]} if primary_key else {}), } )
[docs]class AirbyteSource: """Represents a user-defined Airbyte source. Args: name (str): The display name of the source. source_type (str): The type of the source, from Airbyte's list of sources https://airbytehq.github.io/category/sources/. source_configuration (Mapping[str, Any]): The configuration for the source, as defined by Airbyte's API. """
[docs] @public def __init__(self, name: str, source_type: str, source_configuration: Mapping[str, Any]): self.name = check.str_param(name, "name") self.source_type = check.str_param(source_type, "source_type") self.source_configuration = check.mapping_param( source_configuration, "source_configuration", key_type=str )
def must_be_recreated(self, other: "AirbyteSource") -> bool: return self.name != other.name or self.source_type != other.source_type
class InitializedAirbyteSource: """User-defined Airbyte source bound to actual created Airbyte source.""" def __init__(self, source: AirbyteSource, source_id: str, source_definition_id: Optional[str]): self.source = source self.source_id = source_id self.source_definition_id = source_definition_id @classmethod def from_api_json(cls, api_json: Mapping[str, Any]): return cls( source=AirbyteSource( name=api_json["name"], source_type=api_json["sourceName"], source_configuration=api_json["connectionConfiguration"], ), source_id=api_json["sourceId"], source_definition_id=None, )
[docs]class AirbyteDestination: """Represents a user-defined Airbyte destination. Args: name (str): The display name of the destination. destination_type (str): The type of the destination, from Airbyte's list of destinations https://airbytehq.github.io/category/destinations/. destination_configuration (Mapping[str, Any]): The configuration for the destination, as defined by Airbyte's API. """
[docs] @public def __init__( self, name: str, destination_type: str, destination_configuration: Mapping[str, Any] ): self.name = check.str_param(name, "name") self.destination_type = check.str_param(destination_type, "destination_type") self.destination_configuration = check.mapping_param( destination_configuration, "destination_configuration", key_type=str )
def must_be_recreated(self, other: "AirbyteDestination") -> bool: return self.name != other.name or self.destination_type != other.destination_type
class InitializedAirbyteDestination: """User-defined Airbyte destination bound to actual created Airbyte destination.""" def __init__( self, destination: AirbyteDestination, destination_id: str, destination_definition_id: Optional[str], ): self.destination = destination self.destination_id = destination_id self.destination_definition_id = destination_definition_id @classmethod def from_api_json(cls, api_json: Mapping[str, Any]): return cls( destination=AirbyteDestination( name=api_json["name"], destination_type=api_json["destinationName"], destination_configuration=api_json["connectionConfiguration"], ), destination_id=api_json["destinationId"], destination_definition_id=None, ) class AirbyteDestinationNamespace(Enum): """Represents the sync mode for a given Airbyte stream.""" SAME_AS_SOURCE = "source" DESTINATION_DEFAULT = "destination"
[docs]@deprecated(breaking_version="2.0", additional_warn_text=MANAGED_ELEMENTS_DEPRECATION_MSG) class AirbyteConnection: """A user-defined Airbyte connection, pairing an Airbyte source and destination and configuring which streams to sync. Args: name (str): The display name of the connection. source (AirbyteSource): The source to sync from. destination (AirbyteDestination): The destination to sync to. stream_config (Mapping[str, AirbyteSyncMode]): A mapping from stream name to the sync mode for that stream, including any additional configuration of primary key or cursor field. normalize_data (Optional[bool]): Whether to normalize the data in the destination. destination_namespace (Optional[Union[AirbyteDestinationNamespace, str]]): The namespace to sync to in the destination. If set to AirbyteDestinationNamespace.SAME_AS_SOURCE, the namespace will be the same as the source namespace. If set to AirbyteDestinationNamespace.DESTINATION_DEFAULT, the namespace will be the default namespace for the destination. If set to a string, the namespace will be that string. prefix (Optional[str]): A prefix to add to the table names in the destination. Example: .. code-block:: python from dagster_airbyte.managed.generated.sources import FileSource from dagster_airbyte.managed.generated.destinations import LocalJsonDestination from dagster_airbyte import AirbyteConnection, AirbyteSyncMode cereals_csv_source = FileSource(...) local_json_destination = LocalJsonDestination(...) cereals_connection = AirbyteConnection( name="download-cereals", source=cereals_csv_source, destination=local_json_destination, stream_config={"cereals": AirbyteSyncMode.full_refresh_overwrite()}, ) """
[docs] @public def __init__( self, name: str, source: AirbyteSource, destination: AirbyteDestination, stream_config: Mapping[str, AirbyteSyncMode], normalize_data: Optional[bool] = None, destination_namespace: Optional[ Union[AirbyteDestinationNamespace, str] ] = AirbyteDestinationNamespace.SAME_AS_SOURCE, prefix: Optional[str] = None, ): self.name = check.str_param(name, "name") self.source = check.inst_param(source, "source", AirbyteSource) self.destination = check.inst_param(destination, "destination", AirbyteDestination) self.stream_config = check.mapping_param( stream_config, "stream_config", key_type=str, value_type=AirbyteSyncMode ) self.normalize_data = check.opt_bool_param(normalize_data, "normalize_data") self.destination_namespace = check.opt_inst_param( destination_namespace, "destination_namespace", (str, AirbyteDestinationNamespace) ) self.prefix = check.opt_str_param(prefix, "prefix")
def must_be_recreated(self, other: Optional["AirbyteConnection"]) -> bool: return ( not other or self.source.must_be_recreated(other.source) or self.destination.must_be_recreated(other.destination) )
class InitializedAirbyteConnection: """User-defined Airbyte connection bound to actual created Airbyte connection.""" def __init__( self, connection: AirbyteConnection, connection_id: str, ): self.connection = connection self.connection_id = connection_id @classmethod def from_api_json( cls, api_dict: Mapping[str, Any], init_sources: Mapping[str, InitializedAirbyteSource], init_dests: Mapping[str, InitializedAirbyteDestination], ) -> Self: source = next( ( source.source for source in init_sources.values() if source.source_id == api_dict["sourceId"] ), None, ) dest = next( ( dest.destination for dest in init_dests.values() if dest.destination_id == api_dict["destinationId"] ), None, ) source = check.not_none(source, f"Could not find source with id {api_dict['sourceId']}") dest = check.not_none( dest, f"Could not find destination with id {api_dict['destinationId']}" ) streams = { stream["stream"]["name"]: AirbyteSyncMode.from_json(stream["config"]) for stream in api_dict["syncCatalog"]["streams"] } return cls( AirbyteConnection( name=api_dict["name"], source=source, destination=dest, stream_config=streams, normalize_data=len(api_dict["operationIds"]) > 0, destination_namespace=( api_dict["namespaceFormat"] if api_dict["namespaceDefinition"] == "customformat" else AirbyteDestinationNamespace(api_dict["namespaceDefinition"]) ), prefix=api_dict["prefix"] if api_dict.get("prefix") else None, ), api_dict["connectionId"], ) def _remove_none_values(obj: Dict[str, Any]) -> Dict[str, Any]: return {k: v for k, v in obj.items() if v is not None} def _dump_class(obj: Any) -> Dict[str, Any]: return json.loads(json.dumps(obj, default=lambda o: _remove_none_values(o.__dict__))) class GeneratedAirbyteSource(AirbyteSource): """Base class used by the codegen Airbyte sources. This class is not intended to be used directly. Converts all of its attributes into a source configuration dict which is passed down to the base AirbyteSource class. """ def __init__(self, source_type: str, name: str): source_configuration = _dump_class(self) super().__init__( name=name, source_type=source_type, source_configuration=source_configuration ) class GeneratedAirbyteDestination(AirbyteDestination): """Base class used by the codegen Airbyte destinations. This class is not intended to be used directly. Converts all of its attributes into a destination configuration dict which is passed down to the base AirbyteDestination class. """ def __init__(self, source_type: str, name: str): destination_configuration = _dump_class(self) super().__init__( name=name, destination_type=source_type, destination_configuration=destination_configuration, )