Ask AI

Source code for dagster_azure.adls2.resources

from typing import Any, Dict, Union

from azure.identity import DefaultAzureCredential
from azure.storage.filedatalake import DataLakeLeaseClient
from dagster import (
    Config,
    ConfigurableResource,
    Field as DagsterField,
    Permissive,
    Selector,
    StringSource,
    resource,
)
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._utils.cached_method import cached_method
from dagster._utils.merger import merge_dicts
from pydantic import Field
from typing_extensions import Literal

from dagster_azure.blob.utils import BlobServiceClient, create_blob_client

from .file_manager import ADLS2FileManager
from .utils import DataLakeServiceClient, create_adls2_client


class ADLS2SASToken(Config):
    credential_type: Literal["sas"] = "sas"
    token: str


class ADLS2Key(Config):
    credential_type: Literal["key"] = "key"
    key: str


class ADLS2DefaultAzureCredential(Config):
    credential_type: Literal["default_azure_credential"] = "default_azure_credential"
    kwargs: Dict[str, Any]


class ADLS2BaseResource(ConfigurableResource):
    storage_account: str = Field(description="The storage account name.")
    credential: Union[ADLS2SASToken, ADLS2Key, ADLS2DefaultAzureCredential] = Field(
        discriminator="credential_type", description="The credentials with which to authenticate."
    )


DEFAULT_AZURE_CREDENTIAL_CONFIG = DagsterField(
    Permissive(
        description="Uses DefaultAzureCredential to authenticate and passed as keyword arguments",
    )
)

ADLS2_CLIENT_CONFIG = {
    "storage_account": DagsterField(StringSource, description="The storage account name."),
    "credential": DagsterField(
        Selector(
            {
                "sas": DagsterField(StringSource, description="SAS token for the account."),
                "key": DagsterField(StringSource, description="Shared Access Key for the account."),
                "DefaultAzureCredential": DEFAULT_AZURE_CREDENTIAL_CONFIG,
            }
        ),
        description="The credentials with which to authenticate.",
    ),
}


[docs]class ADLS2Resource(ADLS2BaseResource): """Resource containing clients to access Azure Data Lake Storage Gen2. Contains a client for both the Data Lake and Blob APIs, to work around the limitations of each. """ @classmethod def _is_dagster_maintained(cls) -> bool: return True @property @cached_method def _raw_credential(self) -> Any: if isinstance(self.credential, ADLS2Key): return self.credential.key elif isinstance(self.credential, ADLS2SASToken): return self.credential.token else: return DefaultAzureCredential(**self.credential.kwargs) @property @cached_method def adls2_client(self) -> DataLakeServiceClient: return create_adls2_client(self.storage_account, self._raw_credential) @property @cached_method def blob_client(self) -> BlobServiceClient: return create_blob_client(self.storage_account, self._raw_credential) @property def lease_client_constructor(self) -> Any: return DataLakeLeaseClient
# Due to a limitation of the discriminated union type, we can't directly mirror these old # config fields in the new resource config. Instead, we'll just use the old config fields # to construct the new config and then use that to construct the resource.
[docs]@dagster_maintained_resource @resource(ADLS2_CLIENT_CONFIG) def adls2_resource(context): """Resource that gives ops access to Azure Data Lake Storage Gen2. The underlying client is a :py:class:`~azure.storage.filedatalake.DataLakeServiceClient`. Attach this resource definition to a :py:class:`~dagster.JobDefinition` in order to make it available to your ops. Example: .. code-block:: python from dagster import job, op from dagster_azure.adls2 import adls2_resource @op(required_resource_keys={'adls2'}) def example_adls2_op(context): return list(context.resources.adls2.adls2_client.list_file_systems()) @job(resource_defs={"adls2": adls2_resource}) def my_job(): example_adls2_op() Note that your ops must also declare that they require this resource with `required_resource_keys`, or it will not be initialized for the execution of their compute functions. You may pass credentials to this resource using either a SAS token, a key or by passing the `DefaultAzureCredential` object. .. code-block:: YAML resources: adls2: config: storage_account: my_storage_account # str: The storage account name. credential: sas: my_sas_token # str: the SAS token for the account. key: env: AZURE_DATA_LAKE_STORAGE_KEY # str: The shared access key for the account. DefaultAzureCredential: {} # dict: The keyword arguments used for DefaultAzureCredential # or leave the object empty for no arguments DefaultAzureCredential: exclude_environment_credential: true """ return _adls2_resource_from_config(context.resource_config)
[docs]@dagster_maintained_resource @resource( merge_dicts( ADLS2_CLIENT_CONFIG, { "adls2_file_system": DagsterField( StringSource, description="ADLS Gen2 file system name" ), "adls2_prefix": DagsterField(StringSource, is_required=False, default_value="dagster"), }, ) ) def adls2_file_manager(context): """FileManager that provides abstract access to ADLS2. Implements the :py:class:`~dagster._core.storage.file_manager.FileManager` API. """ adls2_client = _adls2_resource_from_config(context.resource_config).adls2_client return ADLS2FileManager( adls2_client=adls2_client, file_system=context.resource_config["adls2_file_system"], prefix=context.resource_config["adls2_prefix"], )
def _adls2_resource_from_config(config) -> ADLS2Resource: """Args: config: A configuration containing the fields in ADLS2_CLIENT_CONFIG. Returns: An adls2 client. """ storage_account = config["storage_account"] if "DefaultAzureCredential" in config["credential"]: credential = ADLS2DefaultAzureCredential( kwargs=config["credential"]["DefaultAzureCredential"] ) elif "sas" in config["credential"]: credential = ADLS2SASToken(token=config["credential"]["sas"]) else: credential = ADLS2Key(key=config["credential"]["key"]) return ADLS2Resource(storage_account=storage_account, credential=credential)