Ingestion
This example showcases a full end-to-end analytics pipeline with Dagster. It starts by ingesting data, transforming the data and presenting it in a BI tool. This is a common order of operations in the data space and having each phase exist in the same framework helps to build more reliable applications. We will start with ingestion.
The data that serves the foundation for our project is Bluesky. Bluesky is a decentralized social media platform that allows users to post content. For the ingestion layer we want to extract data from Bluesky and load it into a Cloudflare R2 Bucket which is a cloud-based object storage service compatible with S3. R2 will serve as our storage layer and where we model the data. But first we need to determine the best way to get data from Bluesky.
Because there is not an out of the box integration for Bluesky in Dagster, we will build our a custom ConfigurableResource
. Bluesky uses atproto and provides an SDK which will serve as the backbone for our resource.
class ATProtoResource(dg.ConfigurableResource):
login: str
password: str
session_cache_path: str = "atproto-session.txt"
def _login(self, client):
"""Create a re-usable session to be used across resource instances; we are rate limited to 30/5 minutes or 300/day session."""
if os.path.exists(self.session_cache_path):
with open(self.session_cache_path) as f:
session_string = f.read()
client.login(session_string=session_string)
else:
client.login(login=self.login, password=self.password)
session_string = client.export_session_string()
with open(self.session_cache_path, "w") as f:
f.write(session_string)
def get_client(
self,
) -> Client:
client = Client()
self._login(client)
return client
The most important part of our resource is that it returns the atproto client which our Dagster assets will use. The _login
method will initialize the client and cache it within the resource. We do this because Bluesky has rate limits and initializing the client counts against that limit. We intend to run these assets separately, so we want to be as efficient with our API calls as possible.
With the client defined, we can move to the strategy for pulling from Bluesky. There is a lot of data we can potentially pull but we will focus around posts related to data. In Bluesky there is the idea of starter packs which are curated lists of users associated with specific domains. We will ingest the Data People pack. Using the atproto client we can get all the members of that starter pack.
def get_all_list_members(client: Client, list_uri: str):
cursor = None
members = []
while True:
response = client.app.bsky.graph.get_list(
{"list": list_uri, "cursor": cursor, "limit": 100}
)
members.extend(response.items)
if not response.cursor:
break
cursor = response.cursor
return members
def get_all_starter_pack_members(client: Client, starter_pack_uri: str):
response = client.app.bsky.graph.get_starter_pack({"starter_pack": starter_pack_uri})
return get_all_list_members(client, response.starter_pack.list.uri)
The get_all_feed_items
function is similar in using the atproto client to get information about individual feeds. This retrieves a lot more data and is where we will be most concerned about rate limiting (which we will cover in the next section). But now that we have everything we need to interact with Bluesky, we can create our assets.
Extracting data
Our first asset (starter_pack_snapshot
) is responsible for extracting the members of the Data People starter pack and loading the data into R2. Let's look at the asset decorator and parameters before getting into the logic of the function.
@dg.asset(
partitions_def=dg.StaticPartitionsDefinition(
partition_keys=[
"at://did:plc:lc5jzrr425fyah724df3z5ik/app.bsky.graph.starterpack/3l7cddlz5ja24", # https://bsky.app/starter-pack/christiannolan.bsky.social/3l7cddlz5ja24
]
),
automation_condition=dg.AutomationCondition.on_cron("0 0 * * *"), # Midnight
kinds={"python"},
group_name="ingestion",
)
def starter_pack_snapshot(
context: dg.AssetExecutionContext,
atproto_resource: ATProtoResource,
s3_resource: S3Resource,
) -> dg.MaterializeResult:
First we can see that we are setting a static partition for the specific starter pack. Next an automation_condition
is added. This is a simple way to set a schedule for this asset and ensure it runs every midnight. Finally we add kinds
of Python
and group_name
of ingestion
which will help us organize our assets in the Dagster UI. For the parameters we will use the ATProtoResource
we created for Bluesky data and the Dagster maintained S3Resource
which works for R2. Now we can walk through the logic of the function.
atproto_client = atproto_resource.get_client()
starter_pack_uri = context.partition_key
list_items = get_all_starter_pack_members(atproto_client, starter_pack_uri)
_bytes = os.linesep.join([member.model_dump_json() for member in list_items]).encode("utf-8")
datetime_now = datetime.now()
object_key = "/".join(
(
"atproto_starter_pack_snapshot",
datetime_now.strftime("%Y-%m-%d"),
datetime_now.strftime("%H"),
datetime_now.strftime("%M"),
f"{starter_pack_uri}.json",
)
)
s3_resource.get_client().put_object(Body=_bytes, Bucket=AWS_BUCKET_NAME, Key=object_key)
context.instance.add_dynamic_partitions(
partitions_def_name="atproto_did_dynamic_partition",
partition_keys=[list_item_view.subject.did for list_item_view in list_items],
)
return dg.MaterializeResult(
metadata={
"len_members": len(list_items),
"s3_object_key": object_key,
}
)
Using the ATProtoResource
we initialize the client and extract the members from the starter pack. That information is stored in R2 at a path defined by the current date. We also update a dynamic partition that is defined outside of this asset. This partition will be used by our next asset.
atproto_did_dynamic_partition = dg.DynamicPartitionsDefinition(name="atproto_did_dynamic_partition")
Finally we set metadata about the file we saved in the Dagster asset catalog.
Dynamic partitions
The next asset is actor_feed_snapshot
where the feed data will be ingested. This asset will use the same resources as the starter_pack_snapshot
asset and has a similar flow. The primary difference is that while actor_feed_snapshot
had a single static partition, actor_feed_snapshot
uses a dynamic partition.
@dg.asset(
partitions_def=atproto_did_dynamic_partition,
deps=[dg.AssetDep(starter_pack_snapshot, partition_mapping=dg.AllPartitionMapping())],
automation_condition=dg.AutomationCondition.eager(),
kinds={"python"},
group_name="ingestion",
# start_concurrency
op_tags={"dagster/concurrency_key": "ingestion"},
# end_concurrency
)
def actor_feed_snapshot(
context: dg.AssetExecutionContext,
atproto_resource: ATProtoResource,
s3_resource: S3Resource,
) -> dg.MaterializeResult:
"""Snapshot of full user feed written to S3 storage."""
client = atproto_resource.get_client()
actor_did = context.partition_key
# NOTE: we may need to yield chunks to be more memory efficient
items = get_all_feed_items(client, actor_did)
datetime_now = datetime.now()
object_key = "/".join(
(
"atproto_actor_feed_snapshot",
datetime_now.strftime("%Y-%m-%d"),
datetime_now.strftime("%H"),
datetime_now.strftime("%M"),
f"{actor_did}.json",
)
)
_bytes = os.linesep.join([item.model_dump_json() for item in items]).encode("utf-8")
s3_resource.get_client().put_object(Body=_bytes, Bucket=AWS_BUCKET_NAME, Key=object_key)
return dg.MaterializeResult(
metadata={
"len_feed_items": len(items),
"s3_object_key": object_key,
}
)
This asset will maintain a separate partition and execution for every member of the data starter pack and store a file in R2 at an object path specific to that user.
One other difference you may have noticed is the automation_condition
. Because this is downstream of an asset on a schedule, we can set the condition to dg.AutomationCondition.eager()
and it will execute immediately after its upstream dependency.
Definition
This is everything we need for ingestion. At the bottom of the file we will set the Definitions
. This will contain all the assets and initialized resources.
defs = dg.Definitions(
assets=[starter_pack_snapshot, actor_feed_snapshot],
resources={
"atproto_resource": atproto_resource,
"s3_resource": s3_resource,
},
)
This definition is just one part of our overall project but it can be helpful to define separate definitions for more complicated projects devoted to specific domains. You will see the same pattern for the modeling and dashboard layers. All of these definitions will be merged into a final definition at the very end.
Next steps
- Continue this example with rate limiting