Ask AI

Source code for dagster_wandb.launch.ops

from dagster import OpExecutionContext, op
from wandb.sdk.launch import launch, launch_add

from .configs import launch_agent_config, launch_config


def raise_on_invalid_config(context: OpExecutionContext):
    entity = context.resources.wandb_config["entity"]
    if entity == "":
        raise RuntimeError(
            "(dagster_wandb) An empty string was provided for the 'entity' property of the"
            " 'wandb_config'."
        )

    project = context.resources.wandb_config["project"]
    if project == "":
        raise RuntimeError(
            "(dagster_wandb) An empty string was provided for the 'project' property of the"
            " 'wandb_config'."
        )


[docs]@op( required_resource_keys={"wandb_resource", "wandb_config"}, config_schema=launch_agent_config(), ) def run_launch_agent(context: OpExecutionContext): """It starts a Launch Agent and runs it as a long running process until stopped manually. Agents are processes that poll launch queues and execute the jobs (or dispatch them to external services to be executed) in order. **Example:** .. code-block:: YAML # config.yaml resources: wandb_config: config: entity: my_entity project: my_project ops: run_launch_agent: config: max_jobs: -1 queues: - my_dagster_queue .. code-block:: python from dagster_wandb.launch.ops import run_launch_agent from dagster_wandb.resources import wandb_resource from dagster import job, make_values_resource @job( resource_defs={ "wandb_config": make_values_resource( entity=str, project=str, ), "wandb_resource": wandb_resource.configured( {"api_key": {"env": "WANDB_API_KEY"}} ), }, ) def run_launch_agent_example(): run_launch_agent() """ raise_on_invalid_config(context) config = { "entity": context.resources.wandb_config["entity"], "project": context.resources.wandb_config["project"], **context.op_config, } context.log.info(f"Launch agent configuration: {config}") context.log.info("Running Launch agent...") launch.create_and_run_agent(api=context.resources.wandb_resource["api"], config=config)
[docs]@op( required_resource_keys={ "wandb_resource", "wandb_config", }, config_schema=launch_config(), ) def run_launch_job(context: OpExecutionContext): """Executes a Launch job. A Launch job is assigned to a queue in order to be executed. You can create a queue or use the default one. Make sure you have an active agent listening to that queue. You can run an agent inside your Dagster instance but can also consider using a deployable agent in Kubernetes. **Example:** .. code-block:: YAML # config.yaml resources: wandb_config: config: entity: my_entity project: my_project ops: my_launched_job: config: entry_point: - python - train.py queue: my_dagster_queue uri: https://github.com/wandb/example-dagster-integration-with-launch .. code-block:: python from dagster_wandb.launch.ops import run_launch_job from dagster_wandb.resources import wandb_resource from dagster import job, make_values_resource @job( resource_defs={ "wandb_config": make_values_resource( entity=str, project=str, ), "wandb_resource": wandb_resource.configured( {"api_key": {"env": "WANDB_API_KEY"}} ), }, ) def run_launch_job_example(): run_launch_job.alias("my_launched_job")() # we rename the job with an alias """ raise_on_invalid_config(context) config = { "entity": context.resources.wandb_config["entity"], "project": context.resources.wandb_config["project"], **context.op_config, } context.log.info(f"Launch job configuration: {config}") queue = context.op_config.get("queue") if queue is None: context.log.info("No queue provided, running Launch job locally") launch.run(api=context.resources.wandb_resource["api"], config=config) else: synchronous = config.get("synchronous", True) config.pop("synchronous", None) queued_run = launch_add(**config) if synchronous is True: context.log.info( f"Synchronous Launch job added to queue with name={queue}. Waiting for" " completion..." ) queued_run.wait_until_finished() else: context.log.info(f"Asynchronous Launch job added to queue with name={queue}")