automation.controller.PipelineController

class clearml.automation.controller.PipelineController

Pipeline controller. Pipeline is a DAG of base tasks, each task will be cloned (arguments changed as required) executed and monitored The pipeline process (task) itself can be executed manually or by the clearml-agent services queue. Notice: The pipeline controller lives as long as the pipeline itself is being executed.

add_step(name, base_task_id=None, parents=None, parameter_override=None, task_overrides=None, execution_queue=None, time_limit=None, base_task_project=None, base_task_name=None, clone_base_task=True, pre_execute_callback=None, post_execute_callback=None, cache_executed_step=False)

Add a step to the pipeline execution DAG. Each step must have a unique name (this name will later be used to address the step)

Parameters
  • name (str) – Unique of the step. For example stage1

  • base_task_id (str) – The Task ID to use for the step. Each time the step is executed, the base Task is cloned, then the cloned task will be sent for execution.

  • parents (list) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.

  • parameter_override (dict) –

    Optional parameter overriding dictionary. The dict values can reference a previously executed step using the following form ‘${step_name}’ Examples: - Artifact access

    parameter_override={‘Args/input_file’: ‘${stage1.artifacts.mydata.url}’ }

    • Model access (last model used)

      parameter_override={‘Args/input_file’: ‘${stage1.models.output.-1.url}’ }

    • Parameter access

      parameter_override={‘Args/input_file’: ‘${stage3.parameters.Args/input_file}’ }

    • Task ID

      parameter_override={‘Args/input_file’: ‘${stage3.id}’ }

  • task_overrides (dict) –

    Optional task section overriding dictionary. The dict values can reference a previously executed step using the following form ‘${step_name}’ Examples: - clear git repository commit ID

    parameter_override={‘script.version_num’: ‘’ }

    • git repository commit branch

      parameter_override={‘script.branch’: ‘${stage1.script.branch}’ }

    • container image

      parameter_override={‘container.image’: ‘${stage1.container.image}’ }

  • execution_queue (str) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class

  • time_limit (float) – Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.

  • base_task_project (str) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.

  • base_task_name (str) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.

  • clone_base_task (bool) – If True (default) the pipeline will clone the base task, and modify/enqueue the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created).

  • pre_execute_callback (Callable) –

    Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

    If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.

    Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.

    def step_created_callback(
        pipeline,             # type: PipelineController,
        node,                 # type: PipelineController.Node,
        parameters,           # type: dict
    ):
        pass
    

  • post_execute_callback (Callable) –

    Callback function, called when a step (Task) is completed and it other jobs are executed. Allows a user to modify the Task status after completion.

    def step_completed_callback(
        pipeline,             # type: PipelineController,
        node,                 # type: PipelineController.Node,
    ):
        pass
    

  • cache_executed_step – If True, before launching the new step, after updating with the latest configuration, check if an exact Task with the same parameter/code was already executed. If it was found, use it instead of launching a new Task. Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. If clone_base_task is False there is no cloning, hence the base_task is used.

Returns

True if successful

elapsed()

Return minutes elapsed from controller stating time stamp.

Returns

The minutes from controller start time. A negative value means the process has not started yet.

get_pipeline_dag()

Return the pipeline execution graph, each node in the DAG is PipelineController.Node object. Graph itself is a dictionary of Nodes (key based on the Node name), each node holds links to its parent Nodes (identified by their unique names)

Returns

execution tree, as a nested dictionary. Example:

{
    'stage1' : Node() {
        name: 'stage1'
        job: ClearmlJob
        ...
    },
}
get_processed_nodes()

Return the a list of the processed pipeline nodes, each entry in the list is PipelineController.Node object.

Returns

executed (excluding currently executing) nodes list

get_running_nodes()

Return the a list of the currently running pipeline nodes, each entry in the list is PipelineController.Node object.

Returns

Currently running nodes list

is_running()

return True if the pipeline controller is running.

Returns

A boolean indicating whether the pipeline controller is active (still running) or stopped.

is_successful()

return True if the pipeline controller is fully executed and none of the steps / Tasks failed

Returns

A boolean indicating whether all steps did not fail

start(step_task_created_callback=None, step_task_completed_callback=None)

Start the pipeline controller. If the calling process is stopped, then the controller stops as well.

Parameters
  • step_task_created_callback (Callable) –

    Callback function, called when a step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. parameters are the configuration arguments passed to the ClearmlJob.

    If the callback returned value is False, the Node is skipped and so is any node in the DAG that relies on this node.

    Notice the parameters are already parsed, e.g. ${step1.parameters.Args/param} is replaced with relevant value.

    def step_created_callback(
        pipeline,             # type: PipelineController,
        node,                 # type: PipelineController.Node,
        parameters,           # type: dict
    ):
        pass
    

  • step_task_completed_callback (Callable) –

    Callback function, called when a step (Task) is completed and it other jobs are executed. Allows a user to modify the Task status after completion.

    def step_completed_callback(
        pipeline,             # type: PipelineController,
        node,                 # type: PipelineController.Node,
    ):
        pass
    

Returns

True, if the controller started. False, if the controller did not start.

start_remotely(queue='services', exit_process=True)

Start the current pipeline remotely (on the selected services queue) The current process will be stopped if exit_process is True.

Parameters
  • queue – queue name to launch the pipeline on

  • exit_process – If True exit the current process after launching on the enqueuing on the queue

Returns

The remote Task object

stop(timeout=None)

Stop the pipeline controller and the optimization thread.

Parameters

timeout (float) – Wait timeout for the optimization thread to exit (minutes). The default is None, indicating do not wait terminate immediately.

update_execution_plot()

Update sankey diagram of the current pipeline

wait(timeout=None)

Wait for the pipeline to finish.

Note

This method does not stop the pipeline. Call stop() to terminate the pipeline.

Parameters

timeout (float) – The timeout to wait for the pipeline to complete (minutes). If None, then wait until we reached the timeout, or pipeline completed.

Returns

True, if the pipeline finished. False, if the pipeline timed out.