###### Import packages/modules
import submitit
# memory profiler to evaluate how much your jobs demand
from memory_profiler import memory_usage
# import garbage collector: it is sometimes useful to trigger the garbage collector manually with gc.collect()
import gc
# import other modules
import time
Part 1: clean start on SLURM clusters with Python
This tutorial was written to help my colleagues access our local cluster CRNL (FR), but as long as you have a cluster equipped with SLURM, the tutorial should also work smoothly on your infrastructure.
This tutorial will help you to set up your personal Python environment and hopefully improve your way of using it on a compute cluster.
Install miniconda to manage your virtual environment
Installing miniconda is not incompatible with using venv later on.
Open a terminal and type the following commands:
cd ~
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
source ~/.bashrc
Follow the instructions and say yes to everything (you make press Ctrl+C once followed by Enter to skip the text faster)
NB: In JupyterLab, you can open a new terminal at ‘File->New->Terminal’. You can then bring this terminal just below your notebook by clicking on its tab and dragging it toward the lower part of the window.
Create your first conda environment
Still in the terminal, type:
conda create -n crnlenv python=3.9
conda activate crnlenv
The crnlenv virtual environment is active! Everything that will be installed from now on will only be accessible when crnlenv has been activated
Make your conda environment visible to Jupyter lab
Still in the terminal type:
conda install ipykernel
python -m ipykernel install --user --name crnlenv --display-name "Python (crnlenv)"
This commands allow your kernel to be accessed from Jupyter Lab, not only from the command line. If you create more conda environments / kernels, you will also have to run these lines
Populate your conda environment / kernel with essential tools
Install a package that allow to submit your jobs easily from any Jupyter notebook on Slurm
conda install submitit
Install numpy
conda install numpy
Install a memory_profiler
pip install memory_profiler -U
Later on you could install various other tools in your virtual environment, but the priority is to check that you can use the cluster and distribute your jobs.
NB: if you wonder why install alternatively with conda or pip, the answer is: you can almost always do it with pip but if it works with conda, the package might be “better” installed in some case.
Let’s start computing
Since we had , you should be able to see the crnlenv in Jupyterlab if you go in “Kernel->Change Kernel”.
Select it and then restart the kernel (“Kernel->Restart Kernel”) to continue this tutorial.
On the top right of this window, you should see something like “Python (crnlenv)”. It means your notebook is running in the right virtual environment!
From now on, you will execute the code cells below, in order. You can do it either by pressing the play button (at the top of the notebook) or by clicking in the target cell and pressing Shift+Enter.
You may also want to check the tutorials of the module submitit used here.
###### Define a function that should run on the cluster
# this specific function is very dumb and only for demonstration purposes
# we will just feed it with a number and a string, but we could pass any object to it (filepath, DataFrames, etc.)
# here, the function only return one argument but it could return several (result result1, result2)
def yourFunction(argument1, argument2):
# print something to the log
print('I am running with argument1=' + str(argument1))
# sleep for the duration specified by argument1
# just to illustrate the parallel processing implemented
time.sleep(argument1)
# we simply duplicate argument2 as a function of argument1 and return it as our results
=''
resultsfor i in range(argument1):
=results+'_'+argument2
resultsreturn results
# check time and memory usage of your function
# ideally, try to test it with the input values that will produce the biggest memory consumption
= time.time()
start_time =memory_usage((yourFunction, (3,'consumption',)))
mem_usage= time.time()
end_time print('Maximum memory usage (in MB): %s' % max(mem_usage))
print('Maximum memory usage (in GB): %s' % (max(mem_usage)/1000))
print('Time taken (in s): %s' % (end_time-start_time))
#### Set some environment variables for our jobs
### for some reason, some default values are set on the cluster, which do not match
### each other and submitit will complain
import os
'SLURM_CPUS_PER_TASK'] = '1'
os.environ['SLURM_TRES_PER_TASK'] = os.environ['SLURM_CPUS_PER_TASK'] os.environ[
#### define some array for which each item will be associated with an independent job on the cluster
#### when you execute this cells, the jobs are sent to the cluster
# here we define an array of numbers: since this array will be used to feed the first argument of yourFunction
# and that yourFunction waits for as many second as its first argument, the jobs will return in the wrong order
# (with the output of the second call about 20s after the first one!)
=[1, 20, 2, 5]
array_parallel
# define an additional parameter to be passed to the function
='whatever'
additional_parameter
# define a variable that will be outside the scope of the function
= 'something'
variable_outside_scope
# initialize a list in which our returning jobs will be stored
=[]
joblist
# loop over array_parallel
print('#### Start submitting jobs #####')
=0
jcountfor i, value in enumerate(array_parallel):
# executor is the submission interface (logs are dumped in the folder)
= submitit.AutoExecutor(folder=os.getcwd()+'/tuto_logs/')
executor
# set memory, timeout in min, and partition for running the job
# if you expect your job to be longer or to require more memory: you will need to increase corresponding values
# however, note that increase mem_gb too much is an antisocial selfish behavior :)
=1, timeout_min=5, slurm_partition="CPU")
executor.update_parameters(mem_gb
# actually submit the job: note that "value" correspond to that of array_parallel in this iteration
= executor.submit(yourFunction, value, additional_parameter)
job
# add info about job submission order
=i
job.job_initial_indice
# print the ID of your job
print("submit job" + str(job.job_id))
# append the job to the joblist
joblist.append(job)
# increase the job count
=jcount+1
jcount
### now that the loop has ended we check whether any job is already done
print('#### Start waiting for jobs to return #####')
= sum(job.done() for job in joblist)
njobs_finished
# decide whether we clean our job live or not
=False
clean_jobs_live
# create a list to store finished jobs (optional, and depends on whether we need to cleanup job live)
=[]
finished_list=[]
finished_order
### now we will keep looking for a new finished job until all jobs are done:
=0
njobs_finishedwhile njobs_finished<jcount:
=-1
doneIdxfor j, job in enumerate(joblist):
if job.done():
=j
doneIdxbreak
if doneIdx>=0:
print(str(1+njobs_finished)+' on ' + str(jcount))
# report last job finished
print("last job finished: " + job.job_id)
# obtain result from job
=job.result()
job_result# do some processing with this job (we could directly do print(job.results() though obviously!)
print(job_result)
# decide what to do with the finished job object
if clean_jobs_live:
# delete the job object
del job
# collect all the garbage immediately to spare memory
gc.collect()else:
# if we decided to keep the jobs in a list for further processing, add it finished job list
finished_list.append(job)
finished_order.append(job.job_initial_indice)# increment the count of finished jobs
=njobs_finished+1
njobs_finished# remove this finished job from the initial joblist
joblist.pop(doneIdx)
print('#### All jobs completed #####')
### If we chose to keep our job results for subsequent processing, it will often be crucial to reorder as a function of their initial
### submission order, rather than their return order (from the cluster). Here we only keep the results of the job
if clean_jobs_live==False:
= [finished_list[finished_order[i]].result() for i in finished_order]
finished_results print('Concatenated results obtained by applying yourFunction() to all items in array_parallel:')
print(finished_results)
Comments, questions?
If you have a Github account you can comment below.