Ask AI

Source code for dagster_airflow.operators.dagster_operator

import json

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

from dagster_airflow.hooks.dagster_hook import DagsterHook
from dagster_airflow.links.dagster_link import LINK_FMT, DagsterLink
from dagster_airflow.utils import is_airflow_2_loaded_in_environment


[docs]class DagsterOperator(BaseOperator): """DagsterOperator. Uses the dagster graphql api to run and monitor dagster jobs on remote dagster infrastructure Parameters: repository_name (str): the name of the repository to use repostitory_location_name (str): the name of the repostitory location to use job_name (str): the name of the job to run run_config (Optional[Dict[str, Any]]): the run config to use for the job run dagster_conn_id (Optional[str]): the id of the dagster connection, airflow 2.0+ only organization_id (Optional[str]): the id of the dagster cloud organization deployment_name (Optional[str]): the name of the dagster cloud deployment user_token (Optional[str]): the dagster cloud user token to use """ template_fields = ["run_config"] template_ext = (".yaml", ".yml", ".json") ui_color = "#663399" ui_fgcolor = "#e0e3fc" operator_extra_links = (DagsterLink(),) @apply_defaults def __init__( self, dagster_conn_id="dagster_default", run_config=None, repository_name="", repostitory_location_name="", job_name="", # params for airflow < 2.0.0 were custom connections aren't supported deployment_name="prod", user_token=None, organization_id="", url="https://dagster.cloud/", *args, **kwargs, ) -> None: super().__init__(*args, **kwargs) self.run_id = None self.dagster_conn_id = dagster_conn_id if is_airflow_2_loaded_in_environment() else None self.run_config = run_config or {} self.repository_name = repository_name self.repostitory_location_name = repostitory_location_name self.job_name = job_name self.user_token = user_token self.url = url self.organization_id = organization_id self.deployment_name = deployment_name self.hook = DagsterHook( dagster_conn_id=self.dagster_conn_id, user_token=self.user_token, url=f"{self.url}{self.organization_id}/{self.deployment_name}/graphql", ) def _is_json(self, blob): try: json.loads(blob) except ValueError: return False return True def pre_execute(self, context): # force re-rendering to ensure run_config renders any templated # content from run_config that couldn't be accessed on init setattr( self, "run_config", self.render_template(self.run_config, context), ) def on_kill(self): self.log.info("Terminating Run") self.hook.terminate_run( run_id=self.run_id, ) def execute(self, context): try: return self._execute(context) except Exception as e: raise e def _execute(self, context): self.run_id = self.hook.launch_run( repository_name=self.repository_name, repostitory_location_name=self.repostitory_location_name, job_name=self.job_name, run_config=self.run_config, ) # save relevant info in xcom for use in links context["task_instance"].xcom_push(key="run_id", value=self.run_id) context["task_instance"].xcom_push( key="organization_id", value=self.hook.organization_id if self.dagster_conn_id else self.organization_id, ) context["task_instance"].xcom_push( key="deployment_name", value=self.hook.deployment_name if self.dagster_conn_id else self.deployment_name, ) self.log.info("Run Starting....") self.log.info( "Run tracking: %s", LINK_FMT.format( organization_id=self.hook.organization_id, deployment_name=self.hook.deployment_name, run_id=self.run_id, ), ) self.hook.wait_for_run( run_id=self.run_id, )
[docs]class DagsterCloudOperator(DagsterOperator): """DagsterCloudOperator. Uses the dagster cloud graphql api to run and monitor dagster jobs on dagster cloud Parameters: repository_name (str): the name of the repository to use repostitory_location_name (str): the name of the repostitory location to use job_name (str): the name of the job to run run_config (Optional[Dict[str, Any]]): the run config to use for the job run dagster_conn_id (Optional[str]): the id of the dagster connection, airflow 2.0+ only organization_id (Optional[str]): the id of the dagster cloud organization deployment_name (Optional[str]): the name of the dagster cloud deployment user_token (Optional[str]): the dagster cloud user token to use """