Execute Pipeline by GraphQL

You can find the code for this example on Github.

This demonstrates how to trigger a Dagster pipeline externally by a GraphQL client.

Create a GraphQL client

First of all, run pip install --pre gql[requests] to install GQL 3 (the GraphQL Client for Python 3.6+) with the requests dependency. After installation, you can start using GQL by importing from the top-level gql package. For more information about GQL 3 installation, read the GQL 3 documentation.

This example uses the RequestsHTTPTransport to communicate with the backend. By default, the destination url is http://localhost:3000/graphql. You can create a GraphQL client with the selected transport and the url endpoint like below:

trigger.py
def launch_pipeline_over_graphql(
    location, repo_name, pipeline_name, run_config, mode, url="http://localhost:3000/graphql"
):
    transport = RequestsHTTPTransport(url=url)
    client = Client(transport=transport, fetch_schema_from_transport=True)

Trigger a pipeline over GraphQL

To query GraphQL, you can use the launchPipelineExecution mutation.

trigger.py
ERROR_FRAGMENT = """
fragment errorFragment on PythonError {
  message
  className
  stack
  cause {
    message
    className
    stack
    cause {
      message
      className
      stack
    }
  }
}
"""

LAUNCH_PIPELINE_EXECUTION_MUTATION = (
    ERROR_FRAGMENT
    + """
mutation($executionParams: ExecutionParams!) {
  launchPipelineExecution(executionParams: $executionParams) {
    __typename

    ... on InvalidStepError {
      invalidStepKey
    }
    ... on InvalidOutputError {
      stepKey
      invalidOutputName
    }
    ... on LaunchPipelineRunSuccess {
      run {
        runId
        pipeline {
          name
        }
        tags {
          key
          value
        }
        status
        runConfigYaml
        mode
      }
    }
    ... on ConflictingExecutionParamsError {
      message
    }
    ... on PresetNotFoundError {
      preset
      message
    }
    ... on PipelineConfigValidationInvalid {
      pipelineName
      errors {
        __typename
        message
        path
        reason
      }
    }
    ... on PipelineNotFoundError {
      message
      pipelineName
    }
    ... on PythonError {
      ...errorFragment
    }
  }
}
"""
)

Next, you provide the file or the module where your repository lives for the repositoryLocationName. Here, the repositoryLocationName is repo.py. You also pass the name of your repository to repositoryName. In this example, it is called my_repo. If you have never run a collectioin of pipelines using the concept of a repository before, check our documentation for Repositories.

Then, you supply the inputs to the pipelineName, runConfigData, and mode like below. The runConfigData is the same as the run_config parameter to execute_pipeline.

trigger.py
REPOSITORY_LOCATION_FROM_FILE = "repo.py"
REPOSITORY_NAME = "my_repo"
PIPELINE_NAME = "do_math"
RUN_CONFIG = {"solids": {"add_one": {"inputs": {"num": 5}}, "add_two": {"inputs": {"num": 6}}}}
MODE = "default"

Lastly, execute the query on the client to get the result.

trigger.py
def launch_pipeline_over_graphql(
    location, repo_name, pipeline_name, run_config, mode, url="http://localhost:3000/graphql"
):
    transport = RequestsHTTPTransport(url=url)
    client = Client(transport=transport, fetch_schema_from_transport=True)
    # end_trigger_marker_0

    query = LAUNCH_PIPELINE_EXECUTION_MUTATION
    params = {
        "executionParams": {
            "selector": {
                "repositoryLocationName": location,
                "repositoryName": repo_name,
                "pipelineName": pipeline_name,
            },
            "runConfigData": run_config,
            "mode": mode,
        }
    }
    return client.execute(gql(query), variable_values=params)


if __name__ == "__main__":

    result = launch_pipeline_over_graphql(
        location=REPOSITORY_LOCATION_FROM_FILE,
        repo_name=REPOSITORY_NAME,
        pipeline_name=PIPELINE_NAME,
        run_config=RUN_CONFIG,
        mode=MODE,
    )

Before you run this example script, remember to launch the Dagit web server first. You can either load the repository from a file or load from a module. If you load from a Python file, run dagit -f repo.py. If you load from a module, run dagit -m repo. Don't forget to change the input for the first argument location as well. In this example, repo.py is changed to repo. Then, open another tab in your terminal, run python trigger.py to execute the script.

trigger.py
REPOSITORY_LOCATION_FROM_MODULE = "repo"

Open in a playground

Open in Gitpod

Download

curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/trigger_pipeline
cd trigger_pipeline
pip install -e .