Part 1: clean start on SLURM clusters with Python

Author

Romain Ligneul

Note

This tutorial was written to help my colleagues access our local cluster CRNL (FR), but as long as you have a cluster equipped with SLURM, the tutorial should also work smoothly on your infrastructure.

This tutorial will help you to set up your personal Python environment and hopefully improve your way of using it on a compute cluster.

Install miniconda to manage your virtual environment

Installing miniconda is not incompatible with using venv later on.

Open a terminal and type the following commands:
cd ~
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
source ~/.bashrc

Follow the instructions and say yes to everything (you make press Ctrl+C once followed by Enter to skip the text faster)

NB: In JupyterLab, you can open a new terminal at ‘File->New->Terminal’. You can then bring this terminal just below your notebook by clicking on its tab and dragging it toward the lower part of the window.

Create your first conda environment

Still in the terminal, type:
conda create -n crnlenv python=3.9
conda activate crnlenv

The crnlenv virtual environment is active! Everything that will be installed from now on will only be accessible when crnlenv has been activated

Make your conda environment visible to Jupyter lab

Still in the terminal type:
conda install ipykernel
python -m ipykernel install --user --name crnlenv --display-name "Python (crnlenv)"

This commands allow your kernel to be accessed from Jupyter Lab, not only from the command line. If you create more conda environments / kernels, you will also have to run these lines

Populate your conda environment / kernel with essential tools

Install a package that allow to submit your jobs easily from any Jupyter notebook on Slurm
conda install submitit

Install numpy
conda install numpy

Install a memory_profiler
pip install memory_profiler -U

Later on you could install various other tools in your virtual environment, but the priority is to check that you can use the cluster and distribute your jobs.

NB: if you wonder why install alternatively with conda or pip, the answer is: you can almost always do it with pip but if it works with conda, the package might be “better” installed in some case.

Let’s start computing

Since we had , you should be able to see the crnlenv in Jupyterlab if you go in “Kernel->Change Kernel”.

Select it and then restart the kernel (“Kernel->Restart Kernel”) to continue this tutorial.

On the top right of this window, you should see something like “Python (crnlenv)”. It means your notebook is running in the right virtual environment!

From now on, you will execute the code cells below, in order. You can do it either by pressing the play button (at the top of the notebook) or by clicking in the target cell and pressing Shift+Enter.

You may also want to check the tutorials of the module submitit used here.

###### Import packages/modules
import submitit
# memory profiler to evaluate how much your jobs demand
from memory_profiler import memory_usage
# import garbage collector: it is sometimes useful to trigger the garbage collector manually with gc.collect()
import gc
# import other modules
import time
###### Define a function that should run on the cluster

# this specific function is very dumb and only for demonstration purposes
# we will just feed it with a number and a string, but we could pass any object to it (filepath, DataFrames, etc.)
# here, the function only return one argument but it could return several (result result1, result2)
def yourFunction(argument1, argument2):

    # print something to the log
    print('I am running with argument1=' + str(argument1))
    
    # sleep for the duration specified by argument1
    # just to illustrate the parallel processing implemented
    time.sleep(argument1)
    
    # we simply duplicate argument2 as a function of argument1 and return it as our results
    results=''
    for i in range(argument1):
        results=results+'_'+argument2
    return results
# check time and memory usage of your function
# ideally, try to test it with the input values that will produce the biggest memory consumption
start_time = time.time()
mem_usage=memory_usage((yourFunction, (3,'consumption',)))
end_time = time.time()
print('Maximum memory usage (in MB): %s' % max(mem_usage))
print('Maximum memory usage (in GB): %s' % (max(mem_usage)/1000))
print('Time taken (in s): %s' % (end_time-start_time))
#### Set some environment variables for our jobs
### for some reason, some default values are set on the cluster, which do not match 
### each other and submitit will complain
import os
os.environ['SLURM_CPUS_PER_TASK'] = '1'
os.environ['SLURM_TRES_PER_TASK'] = os.environ['SLURM_CPUS_PER_TASK']
#### define some array for which each item will be associated with an independent job on the cluster
#### when you execute this cells, the jobs are sent to the cluster 

# here we define an array of numbers: since this array will be used to feed the first argument of yourFunction
# and that yourFunction waits for as many second as its first argument, the jobs will return in the wrong order
# (with the output of the second call about 20s after the first one!)
array_parallel=[1, 20, 2, 5]

# define an additional parameter to be passed to the function
additional_parameter='whatever'

# define a variable that will be outside the scope of the function
variable_outside_scope = 'something'

# initialize a list in which our returning jobs will be stored
joblist=[]

# loop over array_parallel
print('#### Start submitting jobs #####')
jcount=0
for i, value in enumerate(array_parallel):
    
  # executor is the submission interface (logs are dumped in the folder)
  executor = submitit.AutoExecutor(folder=os.getcwd()+'/tuto_logs/')
  
  # set memory, timeout in min, and partition for running the job
  # if you expect your job to be longer or to require more memory: you will need to increase corresponding values
  # however, note that increase mem_gb too much is an antisocial selfish behavior :)
  executor.update_parameters(mem_gb=1, timeout_min=5, slurm_partition="CPU")
  
  # actually submit the job: note that "value" correspond to that of array_parallel in this iteration
  job = executor.submit(yourFunction, value, additional_parameter)
  
  # add info about job submission order
  job.job_initial_indice=i 
  
  # print the ID of your job
  print("submit job" + str(job.job_id))  

  # append the job to the joblist
  joblist.append(job)

  # increase the job count
  jcount=jcount+1


### now that the loop has ended we check whether any job is already done
print('#### Start waiting for jobs to return #####')
njobs_finished = sum(job.done() for job in joblist)

# decide whether we clean our job live or not
clean_jobs_live=False

# create a list to store finished jobs (optional, and depends on whether we need to cleanup job live)
finished_list=[]
finished_order=[]

### now we will keep looking for a new finished job until all jobs are done:
njobs_finished=0
while njobs_finished<jcount:
  doneIdx=-1
  for j, job in enumerate(joblist):
    if job.done():
      doneIdx=j
      break
  if doneIdx>=0:
    print(str(1+njobs_finished)+' on ' + str(jcount))
    # report last job finished
    print("last job finished: " + job.job_id)
    # obtain result from job
    job_result=job.result()
    # do some processing with this job (we could directly do print(job.results() though obviously!)
    print(job_result)
    # decide what to do with the finished job object
    if clean_jobs_live:
      # delete the job object
      del job
      # collect all the garbage immediately to spare memory
      gc.collect()
    else:
      # if we decided to keep the jobs in a list for further processing, add it finished job list 
      finished_list.append(job)
      finished_order.append(job.job_initial_indice)
    # increment the count of finished jobs
    njobs_finished=njobs_finished+1
    # remove this finished job from the initial joblist
    joblist.pop(doneIdx)
    
print('#### All jobs completed #####')
### If we chose to keep our job results for subsequent processing, it will often be crucial to reorder as a function of their initial
### submission order, rather than their return order (from the cluster). Here we only keep the results of the job
if clean_jobs_live==False:
  finished_results = [finished_list[finished_order[i]].result() for i in finished_order]
  print('Concatenated results obtained by applying yourFunction() to all items in array_parallel:')
  print(finished_results)

Next part

Click here to go to Part 2

Comments, questions?

If you have a Github account you can comment below.