Ask AI

Source code for dagster_gcp.gcs.sensor

import dagster._check as check
from google.cloud import storage

MAX_KEYS = 1000


[docs]def get_gcs_keys(bucket, prefix=None, since_key=None, gcs_session=None): """Return a list of updated keys in a GCS bucket. Args: bucket (str): The name of the GCS bucket. prefix (Optional[str]): The prefix to filter the keys by. since_key (Optional[str]): The key to start from. If provided, only keys updated after this key will be returned. gcs_session (Optional[google.cloud.storage.client.Client]): A GCS client session. If not provided, a new session will be created. Returns: List[str]: A list of keys in the bucket, sorted by update time, that are newer than the `since_key`. Example: .. code-block:: python @resource def google_cloud_storage_client(context): return storage.Client().from_service_account_json("my-service-account.json") @sensor(job=my_job, required_resource_keys={"google_cloud_storage_client"}) def my_gcs_sensor(context): since_key = context.cursor or None new_gcs_keys = get_gcs_keys( "my-bucket", prefix="data", since_key=since_key, gcs_session=context.resources.google_cloud_storage_client ) if not new_gcs_keys: return SkipReason("No new gcs files found for bucket 'my-bucket'.") for gcs_key in new_gcs_keys: yield RunRequest(run_key=gcs_key, run_config={ "ops": { "gcs_files": { "config": { "gcs_key": gcs_key } } } }) last_key = new_gcs_keys[-1] context.update_cursor(last_key) """ check.str_param(bucket, "bucket") check.opt_str_param(prefix, "prefix") check.opt_str_param(since_key, "since_key") if not gcs_session: gcs_session = storage.Client() contents = list( gcs_session.list_blobs( bucket_or_name=bucket, delimiter="", page_size=MAX_KEYS, prefix=prefix, ) ) sorted_keys = [obj.name for obj in sorted(contents, key=lambda x: x.updated)] if not since_key or since_key not in sorted_keys: return sorted_keys for idx, key in enumerate(sorted_keys): if key == since_key: return sorted_keys[idx + 1 :] return []