Source code for dagster_azure.adls2.fake_adls2_resource
import io
import random
from typing import Any, Dict, Optional
from unittest import mock
from dagster import resource
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._utils.cached_method import cached_method
from dagster_azure.adls2.utils import ResourceNotFoundError
from dagster_azure.blob import FakeBlobServiceClient
@dagster_maintained_resource
@resource({"account_name": str})
def fake_adls2_resource(context):
return FakeADLS2Resource(account_name=context.resource_config["account_name"])
[docs]
class FakeADLS2Resource(ConfigurableResource):
"""Stateful mock of an ADLS2Resource for testing.
Wraps a ``mock.MagicMock``. Containers are implemented using an in-memory dict.
"""
account_name: str
storage_account: Optional[str] = None
@classmethod
def _is_dagster_maintained(cls) -> bool:
return True
@property
@cached_method
def adls2_client(self) -> "FakeADLS2ServiceClient":
return FakeADLS2ServiceClient(self.account_name)
@property
@cached_method
def blob_client(self) -> FakeBlobServiceClient:
return FakeBlobServiceClient(self.account_name)
@property
def lease_client_constructor(self) -> Any:
return FakeLeaseClient
class FakeLeaseClient:
def __init__(self, client):
self.client = client
self.id = None
# client needs a ref to self to check if a given lease is valid
self.client._lease = self # noqa: SLF001
def acquire(self, lease_duration=-1):
if self.id is None:
self.id = random.randint(0, 2**9)
else:
raise Exception("Lease already held")
def release(self):
self.id = None
def is_valid(self, lease):
if self.id is None:
# no lease is held so any operation is valid
return True
return lease == self.id
class FakeADLS2ServiceClient:
"""Stateful mock of an ADLS2 service client for testing.
Wraps a ``mock.MagicMock``. Containers are implemented using an in-memory dict.
"""
def __init__(self, account_name, credential="fake-creds"):
self._account_name = account_name
self._credential = mock.MagicMock()
self._credential.account_key = credential
self._file_systems = {}
@property
def account_name(self):
return self._account_name
@property
def credential(self):
return self._credential
@property
def file_systems(self):
return self._file_systems
def get_file_system_client(self, file_system):
return self._file_systems.setdefault(
file_system, FakeADLS2FilesystemClient(self.account_name, file_system)
)
def get_file_client(self, file_system, file_path):
return self.get_file_system_client(file_system).get_file_client(file_path)
class FakeADLS2FilesystemClient:
"""Stateful mock of an ADLS2 filesystem client for testing."""
def __init__(self, account_name, file_system_name):
self._file_system: Dict[str, FakeADLS2FileClient] = {}
self._account_name = account_name
self._file_system_name = file_system_name
@property
def account_name(self):
return self._account_name
@property
def file_system_name(self):
return self._file_system_name
def keys(self):
return self._file_system.keys()
def get_file_system_properties(self):
return {"account_name": self.account_name, "file_system_name": self.file_system_name}
def has_file(self, path):
return bool(self._file_system.get(path))
def get_file_client(self, file_path):
# pass fileclient a ref to self and its name so the file can delete itself
self._file_system.setdefault(file_path, FakeADLS2FileClient(self, file_path))
return self._file_system[file_path]
def create_file(self, file):
# pass fileclient a ref to self and the file's name so the file can delete itself by
# accessing the self._file_system dict
self._file_system.setdefault(file, FakeADLS2FileClient(fs_client=self, name=file))
return self._file_system[file]
def delete_file(self, file):
for k in list(self._file_system.keys()):
if k.startswith(file):
del self._file_system[k]
class FakeADLS2FileClient:
"""Stateful mock of an ADLS2 file client for testing."""
def __init__(self, name, fs_client):
self.name = name
self.contents = None
self._lease = None
self.fs_client = fs_client
@property
def lease(self):
return self._lease if self._lease is None else self._lease.id
def get_file_properties(self):
if self.contents is None:
raise ResourceNotFoundError("File does not exist!")
lease_id = None if self._lease is None else self._lease.id
return {"lease": lease_id}
def upload_data(self, contents, overwrite=False, lease=None):
if self._lease is not None:
if not self._lease.is_valid(lease):
raise Exception("Invalid lease!")
if self.contents is not None or overwrite is True:
if isinstance(contents, str):
self.contents = contents.encode("utf8")
elif isinstance(contents, io.BytesIO):
self.contents = contents.read()
elif isinstance(contents, io.StringIO):
self.contents = contents.read().encode("utf8")
elif isinstance(contents, bytes):
self.contents = contents
else:
self.contents = contents
def download_file(self):
if self.contents is None:
raise ResourceNotFoundError("File does not exist!")
return FakeADLS2FileDownloader(contents=self.contents)
def delete_file(self, lease=None):
if self._lease is not None:
if not self._lease.is_valid(lease):
raise Exception("Invalid lease!")
self.fs_client.delete_file(self.name)
class FakeADLS2FileDownloader:
"""Mock of an ADLS2 file downloader for testing."""
def __init__(self, contents):
self.contents = contents
def readall(self):
return self.contents
def readinto(self, fileobj):
fileobj.write(self.contents)