Ask AI

Source code for dagster_gcp.gcs.resources

from typing import Any, Optional

from dagster import ConfigurableResource, IAttachDifferentObjectToOpContext, resource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from google.cloud import storage
from pydantic import Field

from dagster_gcp.gcs.file_manager import GCSFileManager


[docs] class GCSResource(ConfigurableResource, IAttachDifferentObjectToOpContext): """Resource for interacting with Google Cloud Storage. Example: .. code-block:: @asset def my_asset(gcs: GCSResource): with gcs.get_client() as client: # client is a google.cloud.storage.Client ... """ project: Optional[str] = Field(default=None, description="Project name") def get_client(self) -> storage.Client: """Creates a GCS Client. Returns: google.cloud.storage.Client """ return _gcs_client_from_config(project=self.project) def get_object_to_set_on_execution_context(self) -> Any: return self.get_client()
[docs] @dagster_maintained_resource @resource( config_schema=GCSResource.to_config_schema(), description="This resource provides a GCS client", ) def gcs_resource(init_context) -> storage.Client: return GCSResource.from_resource_context(init_context).get_client()
[docs] class GCSFileManagerResource(ConfigurableResource, IAttachDifferentObjectToOpContext): """FileManager that provides abstract access to GCS.""" project: Optional[str] = Field(default=None, description="Project name") gcs_bucket: str = Field(description="GCS bucket to store files") gcs_prefix: str = Field(default="dagster", description="Prefix to add to all file paths") def get_client(self) -> GCSFileManager: """Creates a :py:class:`~dagster_gcp.GCSFileManager` object that implements the :py:class:`~dagster._core.storage.file_manager.FileManager` API . Returns: GCSFileManager """ gcs_client = _gcs_client_from_config(project=self.project) return GCSFileManager( client=gcs_client, gcs_bucket=self.gcs_bucket, gcs_base_key=self.gcs_prefix, ) def get_object_to_set_on_execution_context(self) -> Any: return self.get_client()
[docs] @dagster_maintained_resource @resource(config_schema=GCSFileManagerResource.to_config_schema()) def gcs_file_manager(context): """FileManager that provides abstract access to GCS. Implements the :py:class:`~dagster._core.storage.file_manager.FileManager` API. """ return GCSFileManagerResource.from_resource_context(context).get_client()
def _gcs_client_from_config(project: Optional[str]) -> storage.Client: """Creates a GCS Client. Args: project: The GCP project Returns: A GCS client. """ return storage.client.Client(project=project)