Modeling data
After running the ingestion assets, we will have all the data we need in R2 to start modeling. We will use dbt to handle our transformation logic and DuckDB as our query engine. We will combine both of these together and add them into our Dagster asset graph.
The first thing to do is set up our dbt project. We will configure the connection details for the R2 bucket and the DuckDB database in the profiles.yml
file. We will define two profiles, each with their own schema and path for our dev and production environments.
bluesky:
target: prod
outputs:
dev:
type: duckdb
schema: bluesky_dev
path: "local.duckdb"
threads: 1
extensions:
- httpfs
settings:
s3_region: "auto"
s3_access_key_id: "{{ env_var('AWS_ACCESS_KEY_ID') }}"
s3_secret_access_key: "{{ env_var('AWS_SECRET_ACCESS_KEY') }}"
s3_endpoint: "{{ env_var('AWS_ENDPOINT_URL') | replace('https://', '') }}"
prod:
type: duckdb
schema: bluesky
path: "md:prod_bluesky"
threads: 1
extensions:
- httpfs
settings:
s3_region: "auto"
s3_access_key_id: "{{ env_var('AWS_ACCESS_KEY_ID') }}"
s3_secret_access_key: "{{ env_var('AWS_SECRET_ACCESS_KEY') }}"
s3_endpoint: "{{ env_var('AWS_ENDPOINT_URL') | replace('https://', '') }}"
Next we can define the sources.yml
which will be the foundation for our dbt models. We can use the DuckDB function read_ndjson_objects to retrieve all the data in our specific R2 object paths. Even though all the data exists within the same R2 bucket, it can still be mapped into individual tables in DuckDB.
version: 2
sources:
- name: r2_bucket
meta:
external_location: "read_ndjson_objects('r2://dagster-demo/atproto_{name}/**/*.json', filename=true)"
tables:
- name: actor_feed_snapshot
description: "external r2 bucket with json files of actor feeds"
- name: starter_pack_snapshot
description: "external r2 bucket with json files for feed snapshots"
DuckDB Table | R2 Path |
---|---|
{profiles schema}.actor_feed_snapshot | r2://dagster-demo/atproto_actor_feed_snapshot/ |
{profiles schema}.starter_pack_snapshot | r2://dagster-demo/atproto_starter_pack_snapshot/ |
Modeling
With dbt configured to read our JSON data, we can start to build the models. We will follow dbt conventions and begin with staging models that map to the tables defined in the sources.yml
. These will be models that extract all the information.
WITH raw AS (
SELECT * FROM {{ source('r2_bucket', 'actor_feed_snapshot') }}
)
SELECT * FROM raw
Within the dbt project the analysis
directory builds out the rest of the models where more complex metrics such as top daily posts are calculated. For metrics such as latest feeds, we can also leverage how we partitioned the data within our R2 bucket during ingestion to ensure we are using the most up to date posts.
WITH max_update AS (
SELECT
max(
strptime(
regexp_extract(
filename,
'dagster-demo/atproto_actor_feed_snapshot/(\d{4}-\d{2}-\d{2}/\d{2}/\d{2})',
1
),
'%Y-%m-%d/%H/%M'
)
) AS max_extracted_timestamp,
regexp_extract(filename, 'did:(.*?)\.json') AS profile_id
FROM {{ ref("stg_feed_snapshots") }}
GROUP BY
regexp_extract(filename, 'did:(.*?)\.json')
),
dbt assets
Moving back into Dagster, there is not too much we need to do to turn the dbt models into assets. Dagster can parse a dbt project and generate all the assets by using a path to the project directory.
dbt_project = DbtProject(
project_dir=Path(__file__).joinpath("..", "..", "..", "dbt_project").resolve(),
target=os.getenv("DBT_TARGET"),
)
dbt_project.prepare_if_dev()
dbt_resource = DbtCliResource(project_dir=dbt_project)
We will use the DagsterDbtTranslator
to map our ingestion assets that bring in the Bluesky data to the tables we defined in the sources.yml
. This will ensure that everything exists as part of the same DAG and lineage within Dagster. Next we will combine the translator and dbt project to generate our Dagster assets.
@dbt_assets(
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator(),
)
def dbt_bluesky(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from (dbt.cli(["build"], context=context).stream().fetch_row_counts())
defs = dg.Definitions(
assets=[dbt_bluesky],
resources={
"dbt": dbt_resource,
},
)
Like the ingestion layer, we will create a definition specific to dbt and modeling which we will combine with the other layers of our project.
There is one final layer to add to make a full end-to-end analytics pipeline. Next we will add in the dashboarding.
Next steps
- Continue this example with dashboard