automation.controller.PipelineController

class clearml.automation.controller.PipelineController

Pipeline controller. Pipeline is a DAG of base tasks, each task will be cloned (arguments changed as required) executed and monitored The pipeline process (task) itself can be executed manually or by the clearml-agent services queue. Notice: The pipeline controller lives as long as the pipeline itself is being executed.

add_step(name, base_task_id=None, parents=None, parameter_override=None, execution_queue=None, time_limit=None, base_task_project=None, base_task_name=None)

Add a step to the pipeline execution DAG. Each step must have a unique name (this name will later be used to address the step)

Parameters
  • name (str) – Unique of the step. For example stage1

  • base_task_id (str) – The Task ID to use for the step. Each time the step is executed, the base Task is cloned, then the cloned task will be sent for execution.

  • parents (list) – Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully.

  • parameter_override (dict) –

    Optional parameter overriding dictionary. The dict values can reference a previously executed step using the following form ‘${step_name}’ Examples:

    Artifact access

    parameter_override={‘Args/input_file’: ‘${stage1.artifacts.mydata.url}’ }

    Model access (last model used)

    parameter_override={‘Args/input_file’: ‘${stage1.models.output.-1.url}’ }

    Parameter access

    parameter_override={‘Args/input_file’: ‘${stage3.parameters.Args/input_file}’ }

    Task ID

    parameter_override={‘Args/input_file’: ‘${stage3.id}’ }

  • execution_queue (str) – Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class

  • time_limit (float) – Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.

  • base_task_project (str) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.

  • base_task_name (str) – If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.

Returns

True if successful

elapsed()

Return minutes elapsed from controller stating time stamp.

Returns

The minutes from controller start time. A negative value means the process has not started yet.

get_pipeline_dag()

Return the pipeline execution graph, each node in the DAG is PipelineController.Node object. Graph itself is a dictionary of Nodes (key based on the Node name), each node holds links to its parent Nodes (identified by their unique names)

Returns

execution tree, as a nested dictionary

Example:
{
‘stage1’Node() {

name: ‘stage1’ job: TrainsJob …

},

}

get_processed_nodes()

Return the a list of the processed pipeline nodes, each entry in the list is PipelineController.Node object.

Returns

executed (excluding currently executing) nodes list

get_running_nodes()

Return the a list of the currently running pipeline nodes, each entry in the list is PipelineController.Node object.

Returns

Currently running nodes list

is_running()

return True if the pipeline controller is running.

Returns

A boolean indicating whether the pipeline controller is active (still running) or stopped.

start(run_remotely=False, step_task_created_callback=None)

Start the pipeline controller. If the calling process is stopped, then the controller stops as well.

Parameters
  • run_remotely (bool) – (default False), If True stop the current process and continue execution on a remote machine. This is done by calling the Task.execute_remotely with the queue name ‘services’. If run_remotely is a string, it will specify the execution queue for the pipeline remote execution.

  • step_task_created_callback (Callable) –

    Callback function, called when a step (Task) is created and before it is sent for execution.

    def step_created_callback(
        node,                 # type: PipelineController.Node,
        parameters,           # type: dict
    ):
        pass
    

Returns

True, if the controller started. False, if the controller did not start.

stop(timeout=None)

Stop the pipeline controller and the optimization thread.

Parameters

timeout (float) – Wait timeout for the optimization thread to exit (minutes). The default is None, indicating do not wait terminate immediately.

wait(timeout=None)

Wait for the pipeline to finish.

Note

This method does not stop the pipeline. Call stop() to terminate the pipeline.

Parameters

timeout (float) – The timeout to wait for the pipeline to complete (minutes). If None, then wait until we reached the timeout, or pipeline completed.

Returns

True, if the pipeline finished. False, if the pipeline timed out.