This guide provides instructions for using Dagster with Power BI using the dagster-powerbi library. Your Power BI assets, such as semantic models, data sources, reports, and dashboards, can be represented in the Dagster asset graph, allowing you to track lineage and dependencies between Power BI assets and upstream data assets you are already modeling in Dagster. You can also use Dagster to orchestrate Power BI semantic models, allowing you to trigger refreshes of these models on a cadence or based on upstream data changes.
To load Power BI assets into the Dagster asset graph, you must first construct a PowerBIWorkspace resource, which allows Dagster to communicate with your Power BI workspace. You'll need to supply your workspace ID and credentials. You may configure a service principal or use an API access token, which can be passed directly or accessed from the environment using EnvVar.
Dagster can automatically load all semantic models, data sources, reports, and dashboards from your Power BI workspace as asset specs. Call the load_powerbi_asset_specs function, which returns a list of AssetSpecs representing your Power BI assets. You can then include these asset specs in your Definitions object:
from dagster_powerbi import(
PowerBIServicePrincipal,
PowerBIToken,
PowerBIWorkspace,
load_powerbi_asset_specs,)import dagster as dg
# Connect using a service principal
power_bi_workspace = PowerBIWorkspace(
credentials=PowerBIServicePrincipal(
client_id=dg.EnvVar("POWER_BI_CLIENT_ID"),
client_secret=dg.EnvVar("POWER_BI_CLIENT_SECRET"),
tenant_id=dg.EnvVar("POWER_BI_TENANT_ID"),),
workspace_id=dg.EnvVar("POWER_BI_WORKSPACE_ID"),)# Alternatively, connect directly using an API access token
power_bi_workspace = PowerBIWorkspace(
credentials=PowerBIToken(api_token=dg.EnvVar("POWER_BI_API_TOKEN")),
workspace_id=dg.EnvVar("POWER_BI_WORKSPACE_ID"),)
power_bi_specs = load_powerbi_asset_specs(power_bi_workspace)
defs = dg.Definitions(
assets=[*power_bi_specs], resources={"power_bi": power_bi_workspace})
Customize asset definition metadata for Power BI assets#
By default, Dagster will generate asset keys for each Power BI asset based on its type and name and populate default metadata. You can further customize asset properties by passing a custom DagsterPowerBITranslator subclass to the load_powerbi_asset_specs function. This subclass can implement methods to customize the asset keys or specs for each Power BI asset type.
from dagster_powerbi import(
DagsterPowerBITranslator,
PowerBIServicePrincipal,
PowerBIWorkspace,
load_powerbi_asset_specs,)from dagster_powerbi.translator import PowerBIContentData
import dagster as dg
power_bi_workspace = PowerBIWorkspace(
credentials=PowerBIServicePrincipal(
client_id=dg.EnvVar("POWER_BI_CLIENT_ID"),
client_secret=dg.EnvVar("POWER_BI_CLIENT_SECRET"),
tenant_id=dg.EnvVar("POWER_BI_TENANT_ID"),),
workspace_id=dg.EnvVar("POWER_BI_WORKSPACE_ID"),)# A translator class lets us customize properties of the built# Power BI assets, such as the owners or asset keyclassMyCustomPowerBITranslator(DagsterPowerBITranslator):defget_report_spec(self, data: PowerBIContentData)-> dg.AssetSpec:# We add a team owner tag to all reportsreturnsuper().get_report_spec(data)._replace(owners=["team:my_team"])defget_semantic_model_spec(self, data: PowerBIContentData)-> dg.AssetSpec:returnsuper().get_semantic_model_spec(data)._replace(owners=["team:my_team"])defget_dashboard_spec(self, data: PowerBIContentData)-> dg.AssetSpec:returnsuper().get_dashboard_spec(data)._replace(owners=["team:my_team"])defget_dashboard_asset_key(self, data: PowerBIContentData)-> dg.AssetKey:# We prefix all dashboard asset keys with "powerbi" for organizational# purposesreturnsuper().get_dashboard_asset_key(data).with_prefix("powerbi")
power_bi_specs = load_powerbi_asset_specs(
power_bi_workspace, dagster_powerbi_translator=MyCustomPowerBITranslator
)
defs = dg.Definitions(
assets=[*power_bi_specs], resources={"power_bi": power_bi_workspace})
Definitions from multiple Power BI workspaces can be combined by instantiating multiple PowerBIWorkspace resources and merging their specs. This lets you view all your Power BI assets in a single asset graph:
from dagster_powerbi import(
PowerBIServicePrincipal,
PowerBIWorkspace,
load_powerbi_asset_specs,)import dagster as dg
credentials = PowerBIServicePrincipal(
client_id=dg.EnvVar("POWER_BI_CLIENT_ID"),
client_secret=dg.EnvVar("POWER_BI_CLIENT_SECRET"),
tenant_id=dg.EnvVar("POWER_BI_TENANT_ID"),)
sales_team_workspace = PowerBIWorkspace(
credentials=credentials,
workspace_id="726c94ff-c408-4f43-8edf-61fbfa1753c7",)
marketing_team_workspace = PowerBIWorkspace(
credentials=credentials,
workspace_id="8b7f815d-4e64-40dd-993c-cfa4fb12edee",)
sales_team_specs = load_powerbi_asset_specs(sales_team_workspace)
marketing_team_specs = load_powerbi_asset_specs(marketing_team_workspace)# Merge the specs into a single set of definitions
defs = dg.Definitions(
assets=[*sales_team_specs,*marketing_team_specs],
resources={"marketing_power_bi": marketing_team_workspace,"sales_power_bi": sales_team_workspace,},)
Materialize Power BI semantic models from Dagster#
Dagster's default behavior is to pull in representations of Power BI semantic models as external assets, which appear in the asset graph but can't be materialized. However, you can build executable asset definitions that trigger the refresh of Power BI semantic models. The build_semantic_model_refresh_asset_definition utility will construct an asset definition that triggers a refresh of a semantic model when materialized.
from dagster_powerbi import(
PowerBIServicePrincipal,
PowerBIWorkspace,
build_semantic_model_refresh_asset_definition,
load_powerbi_asset_specs,)import dagster as dg
power_bi_workspace = PowerBIWorkspace(
credentials=PowerBIServicePrincipal(
client_id=dg.EnvVar("POWER_BI_CLIENT_ID"),
client_secret=dg.EnvVar("POWER_BI_CLIENT_SECRET"),
tenant_id=dg.EnvVar("POWER_BI_TENANT_ID"),),
workspace_id=dg.EnvVar("POWER_BI_WORKSPACE_ID"),)# Load Power BI asset specs, and use the asset definition builder to# construct a semantic model refresh definition for each semantic model
power_bi_assets =[
build_semantic_model_refresh_asset_definition(resource_key="power_bi", spec=spec)if spec.tags.get("dagster-powerbi/asset_type")=="semantic_model"else spec
for spec in load_powerbi_asset_specs(power_bi_workspace)]
defs = dg.Definitions(
assets=[*power_bi_assets], resources={"power_bi": power_bi_workspace})
You can then add these semantic models to jobs or as targets of Dagster sensors or schedules to trigger refreshes of the models on a cadence or based on other conditions.
Customizing how Power BI semantic models are materialized#
Instead of using the out-of-the-box undefined.build_semantic_model_refresh_asset_definition utility, you can build your own asset definitions that trigger the refresh of Power BI semantic models. This allows you to customize how the refresh is triggered or to run custom code before or after the refresh.
from dagster_powerbi import(
PowerBIServicePrincipal,
PowerBIWorkspace,
build_semantic_model_refresh_asset_definition,
load_powerbi_asset_specs,)import dagster as dg
power_bi_workspace = PowerBIWorkspace(
credentials=PowerBIServicePrincipal(
client_id=dg.EnvVar("POWER_BI_CLIENT_ID"),
client_secret=dg.EnvVar("POWER_BI_CLIENT_SECRET"),
tenant_id=dg.EnvVar("POWER_BI_TENANT_ID"),),
workspace_id=dg.EnvVar("POWER_BI_WORKSPACE_ID"),)# Asset definition factory which triggers a semantic model refresh and sends a notification# once completedefbuild_semantic_model_refresh_and_notify_asset_def(
spec: dg.AssetSpec,)-> dg.AssetsDefinition:
dataset_id = spec.metadata["dagster-powerbi/id"]@dg.multi_asset(specs=[spec], name=spec.key.to_python_identifier())defrebuild_semantic_model(
context: dg.AssetExecutionContext, power_bi: PowerBIWorkspace
)->None:
power_bi.trigger_and_poll_refresh(dataset_id)# Do some custom work after refreshing here, such as sending an email notificationreturn rebuild_semantic_model
# Load Power BI asset specs, and use our custom asset definition builder to# construct a definition for each semantic model
power_bi_assets =[
build_semantic_model_refresh_and_notify_asset_def(spec=spec)if spec.tags.get("dagster-powerbi/asset_type")=="semantic_model"else spec
for spec in load_powerbi_asset_specs(power_bi_workspace)]
defs = dg.Definitions(
assets=[*power_bi_assets], resources={"power_bi": power_bi_workspace})