Source code for dagster_gcp.gcs.resources
from typing import Any, Optional
from dagster import ConfigurableResource, IAttachDifferentObjectToOpContext, resource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from google.cloud import storage
from pydantic import Field
from dagster_gcp.gcs.file_manager import GCSFileManager
[docs]
class GCSResource(ConfigurableResource, IAttachDifferentObjectToOpContext):
"""Resource for interacting with Google Cloud Storage.
Example:
.. code-block::
@asset
def my_asset(gcs: GCSResource):
with gcs.get_client() as client:
# client is a google.cloud.storage.Client
...
"""
project: Optional[str] = Field(default=None, description="Project name")
def get_client(self) -> storage.Client:
"""Creates a GCS Client.
Returns: google.cloud.storage.Client
"""
return _gcs_client_from_config(project=self.project)
def get_object_to_set_on_execution_context(self) -> Any:
return self.get_client()
[docs]
@dagster_maintained_resource
@resource(
config_schema=GCSResource.to_config_schema(),
description="This resource provides a GCS client",
)
def gcs_resource(init_context) -> storage.Client:
return GCSResource.from_resource_context(init_context).get_client()
[docs]
class GCSFileManagerResource(ConfigurableResource, IAttachDifferentObjectToOpContext):
"""FileManager that provides abstract access to GCS."""
project: Optional[str] = Field(default=None, description="Project name")
gcs_bucket: str = Field(description="GCS bucket to store files")
gcs_prefix: str = Field(default="dagster", description="Prefix to add to all file paths")
def get_client(self) -> GCSFileManager:
"""Creates a :py:class:`~dagster_gcp.GCSFileManager` object that implements the
:py:class:`~dagster._core.storage.file_manager.FileManager` API .
Returns: GCSFileManager
"""
gcs_client = _gcs_client_from_config(project=self.project)
return GCSFileManager(
client=gcs_client,
gcs_bucket=self.gcs_bucket,
gcs_base_key=self.gcs_prefix,
)
def get_object_to_set_on_execution_context(self) -> Any:
return self.get_client()
[docs]
@dagster_maintained_resource
@resource(config_schema=GCSFileManagerResource.to_config_schema())
def gcs_file_manager(context):
"""FileManager that provides abstract access to GCS.
Implements the :py:class:`~dagster._core.storage.file_manager.FileManager` API.
"""
return GCSFileManagerResource.from_resource_context(context).get_client()
def _gcs_client_from_config(project: Optional[str]) -> storage.Client:
"""Creates a GCS Client.
Args:
project: The GCP project
Returns: A GCS client.
"""
return storage.client.Client(project=project)