Part 2: Ethical and efficient practices on SLURM clusters

Author

Romain Ligneul

Note

This tutorial was written to help my colleagues access our local cluster CRNL (FR), but as long as you have a cluster equipped with SLURM, the tutorial should also work smoothly on your infrastructure.

If you managed to complete the first part of this tutorial, you will also be able to pip install whatever in your virtual environment and do some computing. However, there is more to know.

Sharing the resources

Because it needs to remain highly flexible and adapted to a wide range of needs, the cluster is not very constrained with respect to resource allocation.

If you do not pay attention, you might monopolize all the CPUs or all the memory with your jobs, without leaving anything behind for your colleagues.

That why evaluating the amount of memory you need and the maximum time that a non-bugged job might take is important! Based on this information, you can adjust mem_gb and timeout_min (timeout in minutes) well.

Similarly, you may need to decide how many CPUs will be useful for you. Can you go with only one without losing much? Then use only 1. Do you divide your computation time by a huge factor if you use more, then use more. But how will you know?

What follows should help you with all this.

Anticipating time-memory consumption

Hereafter, we use memory_usage, which has a slighty unusual way of passing arguments to its target function. All positional arguments (those without an = sign in the def) are passed together, and all non-positional arguments (also called key-pairs) are passed together. For example, we could try:
mem_usage=memory_usage((somefunc,(0.1,4,0.88), {'file' : 'whatever.csv','index' : 0 }))
If we had a function defined like this:
somefunc(a,b,c, file=None, index=-1)

###### simple memory/time check
from memory_profiler import memory_usage
import time

# define a single thread function
def duplicate_ones(a, n=100, x=0):
    import time
    time.sleep(1)
    b = [a] * n
    b = [a] * n
    b = [a] * n
    time.sleep(1)
    return b

# duplicate ones a million time
print('Duplicate ones a thousand times')
start_time = time.time()
mem_usage=memory_usage((duplicate_ones,(1,), {'n' : int(1e3)}))
end_time = time.time()
print('Maximum memory usage (in MB): %s' % max(mem_usage))
print('Maximum memory usage (in GB): %s' % (max(mem_usage)/1000))
print('Time taken (in s): %s' % (end_time-start_time))

# duplicate ones 100 million times
print('Duplicate ones a million time')
start_time = time.time()
mem_usage=memory_usage((duplicate_ones,(1,), {'n' : int(1e8)}))
end_time = time.time()
print('Maximum memory usage (in MB): %s' % max(mem_usage))
print('Maximum memory usage (in GB): %s' % (max(mem_usage)/1000))
print('Time taken (in s): %s' % (end_time-start_time))

print('Do you notice the difference in time and memory due to the change in duplication size?')

Evaluating CPU count needs

How to evaluate whether our job will benefit from having more CPU available to them? If you don’t know whether your function use parallelization or not, because you relies on high-level toolboxes, then you can evaluate that empirically by looking at the time your jobs take depending on the number of CPUs you allow.

Let’s try first with our last function. It should take about 10s to run.

import os 
import submitit

# these commands may not be necessary but helped overcoming an error initially
os.environ['SLURM_CPUS_PER_TASK'] = str(1)
os.environ['SLURM_TRES_PER_TASK'] = os.environ['SLURM_CPUS_PER_TASK']
    
# cpu counts to test
nCPUs_totest=[1, 4]

# loop over cpu counts
jcount=0
joblist=[]
start_time = time.time()
for i, cpus in enumerate(nCPUs_totest):
    executor = submitit.AutoExecutor(folder=os.getcwd()+'/tuto_logs/')
    executor.update_parameters(mem_gb=4, timeout_min=5, slurm_partition="CPU", cpus_per_task=cpus)
    job = executor.submit(duplicate_ones, 1, int(1e8))
    job.n_cpus=cpus
    print("job with " + str(job.n_cpus) + " cpus submitted")
    joblist.append(job)
    jcount=jcount+1

# wait for job completion
njobs_finished = sum(job.done() for job in joblist)
while njobs_finished<jcount:
    doneIdx=-1
    time.sleep(1)
    for j, job in enumerate(joblist):
        if job.done():
            doneIdx=j
            break
    if doneIdx>=0:
        print(str(njobs_finished)+' on ' + str(jcount))
        # report last job finished
        print("job with " + str(job.n_cpus) + " cpus returned in " + str(time.time()-start_time) + " seconds")
        joblist.pop(doneIdx)
        njobs_finished=njobs_finished+1

print('### Do you think that increasing the number of CPUs made a big difference? ###')

Now let’s redo exactly the same thing, with with a numpy function may benefit from multiple CPUs (i.e. np.dot).

import numpy as np
import time

def mat_multiply(size):
  # Generate large random matrices
  A = np.random.rand(size, size)
  B = np.random.rand(size, size)

  # Measure time for matrix multiplication
  C = np.dot(A, B)
  
  return 'this function does not return anything special'
  
os.environ['SLURM_CPUS_PER_TASK'] = str(1)
os.environ['SLURM_TRES_PER_TASK'] = os.environ['SLURM_CPUS_PER_TASK']

# cpu counts to test
nCPUs_totest=[4, 4, 4, 1]

# define the max number of jobs that may run in parallel
maxjobs=2

# loop over cpu counts
jcount=0
joblist=[]
start_time = time.time()
for i, cpus in enumerate(nCPUs_totest):
    executor = submitit.AutoExecutor(folder=os.getcwd()+'/tuto_logs/')
    executor.update_parameters(mem_gb=4, timeout_min=5, slurm_partition="CPU", cpus_per_task=cpus)
    # check how many job are running (not done) and wait it they exceed our limit
    while sum(not job.done() for job in joblist)>maxjobs:
        print('wait to submit new job')
        time.sleep(3)
    job = executor.submit(mat_multiply, 8000)
    time.sleep(0.5)
    job.n_cpus=cpus
    print("job with " + str(job.n_cpus) + " cpus submitted")
    joblist.append(job)
    jcount=jcount+1

# wait for job completion
njobs_finished = 0; 
while njobs_finished<jcount:
    doneIdx=-1
    time.sleep(1)
    for j, job in enumerate(joblist):
        if job.done():
            doneIdx=j
            break
    if doneIdx>=0:
        print(str(njobs_finished)+' on ' + str(jcount))
        # report last job finished and print stats
        print("job with " + str(job.n_cpus) + " cpus returned in " + str(time.time()-start_time) + " seconds")
        print("job status: " + job.state)
        joblist.pop(doneIdx)
        njobs_finished=njobs_finished+1

print('\n### Do you think that increasing the number of CPUs made a big difference? ###')
print('\n### MaxRSS indicates the memory used ###')

Scaling up responsibly

In the loop above, you might have noticed something new: we’ve implemented another good practice by self-limiting the number of jobs we will run in parallel on the cluster. Indeed, it might be ok to launch 40 or even 100 parallel jobs if you are in a hurry, but the amount of CPUs in the cluster is not infinite, and neither is the amount of memory.

Number of CPUs: you can get this information by running sinfo -o%C in your terminal, or !sinfo -o%C in the notebook.

Amount of memory: you can see this by running sinfo -o "%P %n %m" in your terminal (or with a ! in the notebook).

If it is a sunday and nobody is using the cluster, it is probably fine to increase maxjobs to 100 or more (note that if you require 4 cpu per task, it means that you are actually requiring 400 cpus overall!). But if it is 10.30pm on a tuesday, using this parameter might be the same as walking to the coffee machine and taking all the coffee reserves to your office! So, take the habit of setting your maxjobs-like parameter on a daily basis after checking sinfo -o%C.

# check node and CPU information
print("### Node counts: \nA: currently in use \B available")
!sinfo -o%A
print("### CPU counts: \nA: core you currently use (notebook) \nI: available \nO: unavailable (maintenance, down, etc) \nT: total")
!sinfo -o%C

# check some stats of our last job
print('### CPU time and MaxRSS of our last job (about 1Gb should be added to your MaxRSS in order to cover safely the memory needs of the python runtime)###')
os.system(f'sacct -j {job.job_id} --format="CPUTime,MaxRSS"')

A more compact approach

In the above examples, we have decomposed most operations using for loops in order to illustrate the different concepts. But with more advanced methods we can compact a lot the code used above.

The example below (taken from submitit documentation) allows getting rid of the job submission loop and directly map our input arrays to job submissions, using executor.map_array and some asynchronous operations. Note that such compact approach might be more difficult to debug.

import asyncio

# just add a/b, multiply by c and wait for b seconds
def simple_function(a, b, c):
    output=(a + b)*c
    time.sleep(b)
    return output

# define arrays matched in length for the iteration (if you have constant parameters, you can always duplicate them as done with "c" below)
a = [1, 2, 2, 1, 0, 1]
b = [10, 20, 30, 40, 30, 10]
c=[0.1]*len(b)

# make sure our arrays are matched in length
assert len(a)==len(b)==len(c)

# prepare executor
executor = submitit.AutoExecutor(folder=os.getcwd()+'/tuto_logs/')

# define maxjobs to a low value to illustrate
maxjobs=3

# the pupdate_parameters(slurm_array_parallelism=maxjobs, mem_gb=2, timeout_min=4, slurm_partition="CPU", cpus_per_task=1)

# execute the job (note the .map_array command that different from the .submit command used above)
jobs = executor.map_array(simple_function, a, b, c)  # just a list of jobs

# print results as they become available
for aws in asyncio.as_completed([j.awaitable().result() for j in jobs]):
    result = await aws
    print("result of computation: " + str(result))
    arameter "slurm_array_parallelism" tells submitit to limit the number of concurrent jobs
executor.
# note that we use here an asynchronous method based on asyncio
# it essential do something similar to what we were doing after 
# "# wait for job completion", but in a much more compact way
# however, the reordering of outputs wrt to inputs is not implemented

Submitting and going home

Often, when we have very long jobs, we want to submit these jobs, go home and come back the next day or the next week to check the results of their computations.

In this case, we should not expect our notebook to be still alive when we come back. Instead, we should adopt the more standard approach of writing down our results and load them in a new jupyter session afterwards!

This is way will we simulate in the final example below.

# write in job_output within our home directory (~ is synonymous of /home/username/)
job_output_folder=os.getcwd()+'/tuto_output/'

# make sure our output folder exists
if not os.path.exists(job_output_folder):
  os.makedirs(job_output_folder)

# just add a/b, multiply by c, wait for b seconds and write down the result to an output folder (c)
def simple_function_write(a, b, c):
    output=(a + b)
    time.sleep(b)
    output_filepath=os.path.join(c, str(a) + '_' + str(b) + '.txt')
    with open(output_filepath, 'w') as file:
      file.write(f'{a}\n')
      file.write(f'{b}\n')
    
# define arrays matched in length for the iteration (if you have constant parameters, you can always duplicate them as done with "c" below)
a = [1, 2, 2, 1, 0, 1]
b = [10, 20, 30, 40, 30, 10]
c=[job_output_folder]*len(b)

# make sure our arrays are matched in length
assert len(a)==len(b)==len(c)

# prepare executor
executor = submitit.AutoExecutor(folder="joblogs")

# define maxjobs to a low value to illustrate
maxjobs=3

# the pupdate_parameters(slurm_array_parallelism=maxjobs, mem_gb=2, timeout_min=4, slurm_partition="CPU", cpus_per_task=1)

# execute the job (note the .map_array command that different from the .submit command used above)
jobs = executor.map_array(simple_function_write, a, b, c)  # just a list of jobs
print('### all jobs submitted ###')
print('the kernel will now be killed (and your notebook will crash) but you can see that your jobs keep running by typing squeue in the terminal')

# wait a little and kill manually the kernel process
time.sleep(3)
os.system('kill ' + str(os.getpid()))

It is a good practice to run os.system('kill ' + str(os.getpid())) if you don’t need to use the notebook anymore. Simply closing it may not interrupt the process and free the resources for the other users.

Conclusion

Whether you need several CPUs, and how to set memory and timeout parameters depend on the functions you use.

If you are not sure, look in the documentation of your packages or test for a performance improvement as we just did!

Comments, questions?

If you have a Github account you can comment below.