Execute Pipeline by GraphQL
This demonstrates how to trigger a Dagster pipeline externally by a GraphQL client.
Create a GraphQL client¶
First of all, run pip install --pre gql[requests]
to install GQL 3 (the GraphQL Client for Python 3.6+)
with the requests
dependency. After installation, you can start using GQL by importing from the top-level gql
package.
For more information about GQL 3 installation, read the GQL 3 documentation.
This example uses the RequestsHTTPTransport
to communicate with the backend.
By default, the destination url is http://localhost:3000/graphql
.
You can create a GraphQL client with the selected transport and the url endpoint like below:
def launch_pipeline_over_graphql(
location, repo_name, pipeline_name, run_config, mode, url="http://localhost:3000/graphql"
):
transport = RequestsHTTPTransport(url=url)
client = Client(transport=transport, fetch_schema_from_transport=True)
Trigger a pipeline over GraphQL¶
To query GraphQL, you can use the launchPipelineExecution
mutation.
ERROR_FRAGMENT = """
fragment errorFragment on PythonError {
message
className
stack
cause {
message
className
stack
cause {
message
className
stack
}
}
}
"""
LAUNCH_PIPELINE_EXECUTION_MUTATION = (
ERROR_FRAGMENT
+ """
mutation($executionParams: ExecutionParams!) {
launchPipelineExecution(executionParams: $executionParams) {
__typename
... on InvalidStepError {
invalidStepKey
}
... on InvalidOutputError {
stepKey
invalidOutputName
}
... on LaunchPipelineRunSuccess {
run {
runId
pipeline {
name
}
tags {
key
value
}
status
runConfigYaml
mode
}
}
... on ConflictingExecutionParamsError {
message
}
... on PresetNotFoundError {
preset
message
}
... on PipelineConfigValidationInvalid {
pipelineName
errors {
__typename
message
path
reason
}
}
... on PipelineNotFoundError {
message
pipelineName
}
... on PythonError {
...errorFragment
}
}
}
"""
)
Next, you provide the file or the module where your repository lives for the repositoryLocationName
.
Here, the repositoryLocationName
is repo.py
. You also pass the name of your repository to repositoryName
.
In this example, it is called my_repo
. If you have never run a collectioin of pipelines using the concept of a repository before,
check our documentation for Repositories.
Then, you supply the inputs to the pipelineName
, runConfigData
, and mode
like below. The runConfigData
is the same as the run_config
parameter to execute_pipeline
.
REPOSITORY_LOCATION_FROM_FILE = "repo.py"
REPOSITORY_NAME = "my_repo"
PIPELINE_NAME = "do_math"
RUN_CONFIG = {"solids": {"add_one": {"inputs": {"num": 5}}, "add_two": {"inputs": {"num": 6}}}}
MODE = "default"
Lastly, execute the query on the client to get the result.
def launch_pipeline_over_graphql(
location, repo_name, pipeline_name, run_config, mode, url="http://localhost:3000/graphql"
):
transport = RequestsHTTPTransport(url=url)
client = Client(transport=transport, fetch_schema_from_transport=True)
# end_trigger_marker_0
query = LAUNCH_PIPELINE_EXECUTION_MUTATION
params = {
"executionParams": {
"selector": {
"repositoryLocationName": location,
"repositoryName": repo_name,
"pipelineName": pipeline_name,
},
"runConfigData": run_config,
"mode": mode,
}
}
return client.execute(gql(query), variable_values=params)
if __name__ == "__main__":
result = launch_pipeline_over_graphql(
location=REPOSITORY_LOCATION_FROM_FILE,
repo_name=REPOSITORY_NAME,
pipeline_name=PIPELINE_NAME,
run_config=RUN_CONFIG,
mode=MODE,
)
Before you run this example script, remember to launch the Dagit web server first.
You can either load the repository from a file or load from a module.
If you load from a Python file, run dagit -f repo.py
.
If you load from a module, run dagit -m repo
. Don't forget to change the input for
the first argument location
as well. In this example, repo.py
is changed to repo
.
Then, open another tab in your terminal, run python trigger.py
to execute the script.
REPOSITORY_LOCATION_FROM_MODULE = "repo"
Open in a playground¶
Download¶
curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/trigger_pipeline
cd trigger_pipeline
pip install -e .