Workflow management

Whenever you have to deal with multiple jobs on a HPC system, the idea of automating parts or all of the job management process involves describing and implementing so-called ‘workflows’. Options for managing workflows are numerous and range from using basic scheduler features such as job arrays and job dependencies, up to using a complex system backed up by a central, multi-user, database.

Important

Most topics presented here are addressed in the videos of the 2021 PRACE/EuroCC Workshop on Workflows available on the CISM/CECI Youtube channel.

Workflow management systems can typically do some (or all) of the following:

  • Compute dependencies between tasks and organise work ;
  • Submit jobs to the scheduler ;
  • Generate job scripts (templating, sweeping, etc.) ;
  • Install necessary scientific software ;
  • Monitor jobs and recover from failures, fault detection, “smart” reruns ;
  • Handle data : streaming, staging, etc. ;
  • Log processes and track data provenance ;
  • Enable sharing of data, results, workflows, with security and monitoring of access policies ;
  • Provide performance analysis and prediction.

Note

Workflow management systems have been developed in many scientific and technical communities. We will focus on the ones that are useful in a cluster computing context.

The list of existing workflow management systems is very long (300+ items!). On top of that, some potentially less-known UNIX commands can also be very useful to manage jobs. This document aims at offering users with some guidance on the choice of the tool to use.

We will focus on tools that are

  • relevant to the HPC environments (not cloud, K8s, Hadoop, etc.) ;
  • standalone (not language-specific libraries) ;
  • using a simple domain-specific language (no XML or other convoluted language) ;
  • general purpose (not reserved to ‘omics’ for instance) ;
  • mature, easy to install, and have an active community.

Introduction

Workflows exist in many areas of business, healthcare, administration, IT operations, machine learning, Internet of Things, etc. where specific sets of tools are available, some of which can be useful in a HPC context, but will not be addressed here.

From a cluster computing point of view, a workflow will be considered to be a collection of possibly interdependent jobs that are all part of the same study.

If we represent jobs with a dot, and dependency between jobs with an arrow, and arrange them from top to bottom, we can distinguish between “wide”, “deep”, and “cyclic” workflows.

Wide workflows are composed of many jobs that are actually independent one from another, and a few dependent jobs, for instance for consolidating the results. Deep workflows are composed of many jobs that are dependent on only one previous job. Cyclic workflows are more specific in the sense that they express the dependency of a job onto itself, which must be understood as the same job being re-submitted multiple times. For each type of workflow, specific techniques and tools exist.

../../_images/wftypes.png

Wide workflows

Typical studies include:

  • parameter sweep: each job runs an experiment with a specific value of a parameter and the aim is to find the optimum of a target variable ;
  • file collection processing ; each job processes a data file independently either to transform it or extract a summary value from its data ;
  • line-in-file processing ; each job runs an experiment using parameters values stored in a file listing all experiments.

Most wide workflows are made of multiple independent jobs and one “merging” or “summarizing” job, that often is performed interactively rather than submitted to the scheduler.

Slurm tips

The canonical way to deal with a wide workflow in Slurm is through job arrays. Job arrays are very useful to manage multiple jobs at once, but they are meant for jobs that - have a non negative bounded integer parameter, - but are otherwise completely identical.

The main option to deal with a parameter that does not fit the format of the parameter expected by Slurm (e.g. a list of files, a sequence of real values, etc.) is to use a Bash array. The template submission script would look like this:

#SBATCH --array=0-8 # Creates a 9-jobs array, ranked 0 to 8
FILES=(*.csv)       # Creates a Bash array with the list of CSV files in the working directory
PARAMS=(0.1 0.2 0.3 0.1 0.2 0.3  0.1 0.2 0.3)
                    # Creates a Bash array with real-valued numbers
./program --param=${PARAMS[$SLURM_ARRAY_TASK_ID]} ${FILES[$SLURM_ARRAY_TASK_ID]}
                    # Runs program on file whose order corresponds to the job rank in the array
                    # with a parameter whose value corresponds to the nth element in the PARAMS
                    # array, with n being the rank of the job in the array.

When the jobs are not identical, for instance the number of CPUs increases with the parameter as would be natural in a scaling study, the submission can be automated with Linux tools such as xargs or GNU Parallel taking advantage of the --wrap option of sbatch. Examples:

$ find . -name \*.csv | xargs -I {} sbatch --wrap "./program {}"
                  # Submit a series of jobs that run the program on each CSV file found in the current directory
$ parallel sbatch --wrap "./program --param={}" ::: 0.1 0.2 0.3 0.1 0.2 0.3  0.1 0.2 0.3
                  # Submit a series of jobs that run the program for each value in the list

More information: WorkflowsWithBasicTools.pdf

Helper tool: atools

To further alleviate the shortcomings of native Slurm job arrays, the tool suite atools comes very handy. The atools are a collection of utilities that allow you to better manage the parameters of the jobs, and also summarize the outcome of the jobs, identify and re-submit failed jobs, etc.

Tools include

  • aenv sets up job parameters as environment variables based on the job rank and a file describing all parameter values
  • alog fills a log file tracking which jobs in the array completed, failed, or are still pending, etc.
  • arange reports on the job array progress and can also re-submit failed jobs
  • areduce merges outputs of all jobs in the array into a “reduced” values, a la Hadoop

More information: https://atools.readthedocs.io/en/latest/

Deep workflows

Typical use cases:

  • pre- and post- processing jobs: jobs are necessary besides the main job to prepare the inputs and wrap up the outputs.
  • data transformation pipelines: each job runs a step in a transformation pipeline consuming data from the previous step and making it digestible for the next one.

Such jobs are dependent on one another ; job N+1 cannot start before job N has successfully completed.

Slurm tips

The canonical way to instruct Slurm about dependencies between jobs is with the --dependency option as in the following examples:

$ sbatch preprocess.sh
submitted batch job 1
$ sbatch -d afterok:1 process1.sh
submitted batch job 2
$ sbatch -d afterok:1 process2.sh
submitted batch job 3
$ sbatch -d afterok:2:3 postprocess.sh
$ sbatch -d afterany:1:2:3 cleanup.sh
$ sbatch -d afternotok:1?afternotok:2?afternotok:3 cancel.sh

One problem is that the dependee job ID must be known before submitting the dependent job (the job names unfortunately cannot be used). In the case of a linear dependency, it can be automated with a script like the following (chainsubmit.sh):

#!/bin/bash

 ID=$(sbatch --parsable $1)
 shift
 for script in "$@"; do
   ID=$(sbatch --parsable --dependency=afterok:${ID%%;*} $script)
 done

That script can then be used to submit multiple chained job scripts:

./chainsubmit.sh job1.slurm job2.slurm job3.slurm job4.slurm

Here is a variation of that script to use when the jobs submission script job.slurm accepts a parameter:

#!/bin/bash

SCRIPT=${1?Usage: $0 script.slum arg1 arg2 ...}
shift
ARG=$1
ID=$(sbatch --parsable $SCRIPT $ARG)
shift
for ARG in "$@"; do
  ID=$(sbatch --job-name=$ARG --parsable --dependency=afterok:${ID%%;*} $SCRIPT $ARG)
done

Use it like this:

./chainsubmit.sh job.slurm 2001 2002 2003 2004

to submit the job.slurm with four arguments, from 2001 to 2004.

If the steps in the workflow are short enough to hold within the maximum allowed walltime in the cluster, you can also pack them inside a single submission script, which will effectively run them in order. But in that case, you can also use GNU Make (the same make utility you probably already have used in the past to build software, its primary purpose) to build complex workflows with dependencies. Just create a Makefile and invoke the make command inside the submission script. Make sure to use the -j option of Make to allow running independent steps in parallel and make it compatible with the number of CPUs you requested.

More information: WorkflowsWithBasicTools.pdf

sdag

To alleviate the shortcomings of native Slurm job arrays, the sdag tool comes very handy. It uses a workflow description syntax similar to DagMan, a well-known tool in HTC, related to HTCCondor.

# File flow.dag
JOB A1 jobA.sbatch
JOB B2 jobB.sbatch
JOB C3 jobC.sbatch
JOB D4 jobD.sbatch

PARENT A1 CHILD B2 C3
PARENT B2 C3 CHILD D4

The above example expresses the dependencies between four jobs (A1, B2`, C3, D4), described by four distinct submission scripts (jobA.sbatch, ..., jobD.sbatch) as a diamond-shaped graph:

../../_images/diamond.png

By running sdag flow.dag, you will submit all four jobs with the appropriate --dependency=afterok... options.

Some caveats though: sdag is does not handle federated clusters, does not handle re-submissions, and requires Python 2.7.

More information: https://github.com/abdulrahmanazab/sdag

Auto-requeuing job

Typical use case:

  • studies that are too long for the maximum wall time of the clusters (see the checkpointing video tutorial)
  • studies that cannot be parallelized efficiently

Slurm tips

There are two main problems to resolve in this case:

  1. make sure the program stops in time for the wrap up operations (checkpoint) to perform properly (this includes possibly letting the program run until the next iteration) , and
  2. make the job re-enter the queue.

The two main options for stopping the program are

  • using the timeout command that runs a command for a maximum duration. Use this when the program can regularly write its status on disk and can be interrupted without notification. Set a duration a bit shorter than the requested run time.
  • using UNIX signals. Use this when the program is designed to handle signals and perform a status dump on disk upon receiving one. Slurm will send a signal if you specify the --signal option.

When the program has stopped properly, and is ready to run again (restart), the job can be submitted to the queue again. You can either

  • re-submit a new, identical, job with sbatch $0
  • re-queue the same job (and keep the same job ID) with scontrol requeue $SLURM_JOB_ID. Restarted jobs will have the environment variable SLURM_RESTART_COUNT set to the number of times the job has been restarted.

In both cases, you might want to use the --open-mode=append option to prevent Slurm from truncating the output file upon job restart.

Note that Slurm can be configured to automatically re-queue a job if it terminates with a specific return code ; see the RequeueExit and RequeueExitHold configuration options.

Maestro

Maestro removes the need to write the submission script all together and rather expect a Yaml description of the set of jobs that must be run, along with inline documentation of the study. Maestro then submits the jobs and is able to report on the progress, and organises the outputs nicely so that files from distinct studies do not mix together.

An example of study is as follows: ~ .. code:

description:
    name: maestro_test
    description: A simple study to run a program on files

study:
    - name: run_program
      description: run program on the files
      run:
         cmd: |
            ./program --param=$(PARAM)

global.parameters:
    PARAM:
        values: [0.1 0.2 0.3 0.1 0.2 0.3 0.1 0.2 0.3]
        label: PARAM.%%

More complete example can include multiple tasks with inter-dependencies, and Slurm-specific options for some of the tasks. Maestro also provides a utility framework pgen and pargs to be build arbitrary parameter collections using the Python language.

A study is run with maestro run ... and monitored with maestro status .... Job outputs are organised in directories named automatically after the study name and the submission date and time. The study description and parameters are also copied alongside the results to ease organisation and provenance tracking.

More information: https://maestrowf.readthedocs.io/en/latest/

Generic DAGs

The tools presented in this section differ from the ones in the previous sections in the sense that they manage all dependencies and parametrized jobs by themself without relying onto Slurm specific features. Like Maestro, they will submit then monitor the jobs and take care of ordering, re-submission, logging, etc.

SlurmDagman

SlurmDagman uses the same HTCondor-based workflow description as sdag with additional options and parameters:

JOB dag-node slurm-submission-file
VARS dag-node var-name=var-value [...]
RETRY dag-node max-retries
PARENT parent-dag-node[,...] CHILD child-dag-node[,...]

SlurmDagman also include a tool to translate DagMan options specific to Condor into Slurm lingo.

More information: https://andrestanasijczuk.github.io/SlurmDagman/

Makeflow

Makeflow uses a syntax similar to the GNU Make syntax to describe dependencies. The main difference with the DagMan syntax is that dependencies is not expressed between jobs but rather between data (files). Makeflow then computes the dependences between jobs from the dependences between files.

output: inputs
    command to generate outpus from inputs

Makeflow also ships with a tool to build a visual representation of the workflows. It can also work with a JSON representation of the workflow, which allows to create them programmatically in a easy and simple way.

More information: http://ccl.cse.nd.edu/software/makeflow/

Generic Cyclic workflows

Cycl is a workflow manager specifically designed for cyclic workflows. Jobs in the workflow can be re-submitted until sufficient work has been done.

The workflow description language is a bit more complex. The example below builds the same diamond-shaped workflow as previously, with the additional rule that when D4 is finished, a new cycle starting at A1 can begin.

[scheduler]
    allow implicit tasks = True
[scheduling]
    cycling mode = integer
    initial cycle point = 1
    final cycle point = 10
    [[graph]]
        P1 = """
            A1 => B2 & C3
            B2 & C3 => D4
            D4[-P1] => A1
        """

A workflow is started with cylc play, possibly with the specification of an initial start cycle point as cylc play --start-cycle-point=5.

Further reading

References:

  • Curcin, Vasa & Ghanem, Moustafa. (2009). Scientific workflow systems - Can one size fit all?. Cairo Int Biomed Eng Conf. 2008. 1 - 9. 10.1109/CIBEC.2008.4786077
  • Deelman, Ewa & Gannon, Dennis & Shields, Matthew & Taylor, Ian. (2009). Workflows and e- Science: An overview of workflow system features and capabilities. Future Generation Computer Systems. 25. 524-540. 10.1016/j.future.2008.06.012.
  • Liu, Ji & Pacitti, Esther & Valduriez, Patrick & Mattoso, Marta. (2015). A Survey of Data-Intensive Scientific Workflow Management. Journal of Grid Computing. 13. 10.1007/s10723-015-9329-8.
  • Badia, Rosa M. & Ayguade, E. & Labarta, Jesús. (2017). Workflows for science: a challenge when facing the convergence of HPC and Big Data. Supercomputing Frontiers and Innovations. 4. 27-47. 10.14529/jsfi170102.
  • Ferreira da Silva, Rafael & Filgueira, Rosa & Pietri, Ilia & Jiang, Ming & Sakellariou, Rizos & Deelman, Ewa. (2017). A characterization of workflow management systems for extreme-scale applications. Future Generation Computer Systems. 75. 10.1016/j.future.2017.02.026.
  • Deelman, Ewa & Peterka, Tom & Altintas, Ilkay & Carothers, Christopher & Dam, Kerstin & Moreland, Kenneth & Parashar, Manish & Ramakrishnan, Lavanya & Taufer, Michela & Vetter, Jeffrey. (2017). The future of scientific workflows. The International Journal of High Performance Computing Applications. 32. 109434201770489. 10.1177/1094342017704893.

Lists:

Communities: