How the parallelization works in the engine¶
The engine builds on top of existing parallelization libraries. Specifically, on a single machine it is based on multiprocessing, which is part of the Python standard library, while on a cluster it is based on the combination celery/rabbitmq, which are well-known and maintained tools.
While the parallelization used by the engine may look trivial in theory (it only addresses embarrassingly parallel problems, not true concurrency) in practice it is far from being so. For instance a crucial feature that the GEM staff requires is the ability to kill (revoke) a running calculation without affecting other calculations that may be running concurrently.
Because of this requirement, we abandoned concurrent.futures, which is also in the standard library, but is lacking the ability to kill the pool of processes, which is instead available in multiprocessing with the Pool.shutdown method. For the same reason, we discarded dask, which is a lot more powerful than celery but lacks the revoke functionality.
Using a real cluster scheduling mechanism (like SLURM) would be of course better, but we do not want to impose on our users a specific cluster architecture. celery/rabbitmq have the advantage of being simple to install and manage. Still, the architecture of the engine parallelization library is such that it is very simple to replace celery/rabbitmq with other parallelization mechanisms: people interested in doing so should just contact us.
Another tricky aspects of parallelizing large scientific calculations is that the amount of data returned can exceed the 4 GB limit of Python pickles: in this case one gets ugly runtime errors. The solution we found is to make it possible to yield partial results from a task: in this way instead of returning say 40 GB from one task, one can yield 40 times partial results of 1 GB each, thus bypassing the 4 GB limit. It is up to the implementor to code the task carefully. In order to do so, it is essential to have in place some monitoring mechanism measuring how much data is returned back from a task, as well as other essential informations like how much memory is allocated and how long it takes to run a task.
To this aim the OpenQuake engine offers a Monitor
class
(located in openquake.baselib.performance
) which is perfectly well
integrated with the parallelization framework, so much that every
task gets a Monitor
object, a context manager that can be used
to measure time and memory of specific parts of a task. Moreover,
the monitor automatically measures time and memory for the whole task,
as well as the size of the returned output (or outputs). Such information
is stored in an HDF5 file that you must pass to the monitor when instantiating
it. The engine automatically does that for you by passing the pathname of the
datastore.
In OpenQuake a task is just a Python function (or generator)
with positional arguments, where the last argument is a Monitor
instance.
For instance the rupture generator task in an event based calculation
is coded more or less like this:
def sample_ruptures(sources, num_samples, monitor): # simplified code
ebruptures = []
for src in sources:
for rup, n_occ in src.sample_ruptures(num_samples):
ebr = EBRupture(rup, src.id, grp_id, n_occ)
eb_ruptures.append(ebr)
if len(eb_ruptures) > MAX_RUPTURES:
# yield partial result to avoid running out of memory
yield eb_ruptures
eb_ruptures.clear()
if ebruptures:
yield eb_ruptures
If you know that there is no risk of running out of memory and/or passing the pickle limit you can just use a regular function and return a single result instead of yielding partial results. This is the case when computing the hazard curves, because the algorithm is considering one rupture at the time and it is not accumulating ruptures in memory, differently from what happens when sampling the ruptures in event based.
If you have ever coded in celery, you will see that the OpenQuake engine
concept of task is different: there is no @task
decorator and while at the
end engine tasks will become celery tasks this is hidden to the developer.
The reason is that we needed a celery-independent abstraction layer to make
it possible to use different kinds of parallelization frameworks/
From the point of view of the coder, in the engine there is no difference
between a task running on a cluster using celery and a task running locally
using multiprocessing.Pool
: they are coded the same, but depending
on a configuration parameter in openquake.cfg (distribute=celery
or distribute=processpool
) the engine will treat them differently.
You can also set an environment variable OQ_DISTRIBUTE
, which takes
the precedence over openquake.cfg, to specify which kind of distribution
you want to use (celery
or processpool
): this is mostly used
when debugging, when you typically insert a breakpoint in the task
and then run the calculation with
$ OQ_DISTRIBUTE=no oq run job.ini
no
is a perfectly valid distribution mechanism in which there is actually
no distribution and all the tasks run sequentially in the same core. Having
this functionality is invaluable for debugging.
Another tricky bit of real life parallelism in Python is that forking
does not play well with the HDF5 library: so in the engine we are
using multiprocessing in the spawn
mode, not in fork
mode:
fortunately this feature has become available to us in Python 3 and it
made our life a lot happier. Before it was extremely easy to incur
unspecified behavior, meaning that reading an HDF5 file from a
forked process could
- work perfectly well
- read bogus numbers
- cause a segmentation fault
and all of the three things could happen unpredictably at any moment, depending on the machine where the calculation was running, the load on the machine, and any kind of environmental circumstances.
Also, while with the newest HDF5 libraries it is possible to use a Single Writer Multiple Reader architecture (SWMR), and we are actually using it - even if it is sometimes tricky to use it correctly - the performance is not always good. So, when it matters, we are using a two files approach which is simple and very effective: we read from one file (with multiple readers) and we write on the other file (with a single writer). This approach bypasses all the limitations of the SWMR mode in HDF5 and did not require a large refactoring of our existing code.
Another tricky point in cluster situations is that rabbitmq is not good at transferring gigabytes of data: it was meant to manage lots of small messages, but here we are perverting it to manage huge messages, i.e. the large arrays coming from a scientific calculations.
Hence, since recent versions of the engine we are no longer returning data from the tasks via celery/rabbitmq: instead, we use zeromq. This is hidden from the user, but internally the engine keeps track of all tasks that were submitted and waits until they send the message that they finished. If one tasks runs out of memory badly and never sends the message that it finished, the engine may hang, waiting for the results of a task that does not exist anymore. You have to be careful. What we did in our cluster is to set some memory limit on the celery user with the cgroups technology, so that an out of memory task normally fails in a clean way with a Python MemoryError, sends the message that it finished and nothing particularly bad happens. Still, in situations of very heavy load the OOM killer may enter in action aggressively and the main calculation may hang: in such cases you need a good sysadmin having put in place some monitor mechanism, so that you can see if the OOM killer entered in action and then you can kill the main process of the calculation by hand. There is not much more that can be done for really huge calculations that stress the hardware as much as possible. You must be prepared for failures.
How to use openquake.baselib.parallel¶
Suppose you want to code a character-counting algorithm, which is a textbook
exercise in parallel computing and suppose that you want to store information
about the performance of the algorithm. Then you should use the OpenQuake
Monitor
class, as well as the utility
openquake.baselib.commonlib.hdf5new
that build an empty datastore
for you. Having done that, the openquake.baselib.parallel.Starmap
class can take care of the parallelization for you as in the following
example:
import os
import sys
import pathlib
import collections
from openquake.baselib.performance import Monitor
from openquake.baselib.parallel import Starmap
from openquake.commonlib.datastore import hdf5new
def count(text):
c = collections.Counter()
for word in text.split():
c += collections.Counter(word)
return c
def main(dirname):
dname = pathlib.Path(dirname)
with hdf5new() as hdf5: # create a new datastore
monitor = Monitor('count', hdf5) # create a new monitor
iterargs = ((open(dname/fname, encoding='utf-8').read(),)
for fname in os.listdir(dname)
if fname.endswith('.rst')) # read the docs
c = collections.Counter() # intially empty counter
for counter in Starmap(count, iterargs, monitor):
c += counter
print(c) # total counts
print('Performance info stored in', hdf5)
if __name__ == '__main__':
main(sys.argv[1]) # pass the directory where the .rst files are
The name Starmap
was chosen it looks very similar to
multiprocessing.Pool.starmap
works, the only apparent difference
being in the additional monitor argument:
pool.starmap(func, iterargs) -> Starmap(func, iterargs, monitor)
In reality the Starmap
has a few other differences:
- it does not use the multiprocessing mechanism to returns back the results, it uses zmq instead;
- thanks to that, it can be extended to generator functions and can yield partial results, thus overcoming the limitations of multiprocessing
- the
Starmap
has a.submit
method and it is actually more similar toconcurrent.futures
than to multiprocessing.
Here is how you would write the same example by using .submit
:
def main(dirname):
dname = pathlib.Path(dirname)
with hdf5new() as hdf5:
smap = Starmap(count, monitor=Monitor('count', hdf5))
for fname in os.listdir(dname):
if fname.endswith('.rst'):
smap.submit(open(dname/fname, encoding='utf-8').read())
c = collections.Counter()
for counter in smap:
c += counter
The difference with concurrent.futures
is that
the Starmap
takes care for of all submitted tasks, so you do not
need to use something like concurrent.futures.completed
, you can just
loop on the Starmap
object to get the results from the various tasks.
The .submit
approach is more general: for instance you could define
more than one Starmap
object at the same time and submit some tasks
with a starmap and some others with another starmap: this may help
parallelizing complex situations where it is expensive to use a single
starmap. However, there is limit on the number of starmaps that can be
alive at the same moment.
Moreover the Starmap
has a .shutdown
methods that allows to
shutdown the underlying pool.
The idea is to submit the text of each file - here I am considering .rst files,
like the ones composing this manual - and then loop over the results of the
Starmap
. This is very similar to how concurrent.futures
works.