# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2010-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
"""
TODO: write documentation.
"""
from __future__ import print_function
import os
import sys
import time
import socket
import inspect
import logging
import operator
import traceback
import functools
import multiprocessing.dummy
from concurrent.futures import as_completed, ProcessPoolExecutor, Future
import numpy
from openquake.baselib import hdf5
from openquake.baselib.python3compat import pickle
from openquake.baselib.performance import Monitor, virtual_memory
from openquake.baselib.general import (
block_splitter, split_in_blocks, AccumDict, humansize)
executor = ProcessPoolExecutor()
# the num_tasks_hint is chosen to be 5 times bigger than the name of
# cores; it is a heuristic number to get a good distribution;
# it has no more significance than that
executor.num_tasks_hint = executor._max_workers * 5
OQ_DISTRIBUTE = os.environ.get('OQ_DISTRIBUTE', 'futures').lower()
if OQ_DISTRIBUTE == 'celery':
from celery.result import ResultSet
from celery import Celery
from celery.task import task
from openquake.engine.celeryconfig import BROKER_URL, CELERY_RESULT_BACKEND
app = Celery('openquake', backend=CELERY_RESULT_BACKEND, broker=BROKER_URL)
elif OQ_DISTRIBUTE == 'ipython':
import ipyparallel as ipp
[docs]def oq_distribute():
"""
Return the current value of the variable OQ_DISTRIBUTE; if undefined,
return 'futures'.
"""
return os.environ.get('OQ_DISTRIBUTE', 'futures').lower()
[docs]def check_mem_usage(monitor=Monitor(),
soft_percent=90, hard_percent=100):
"""
Display a warning if we are running out of memory
:param int mem_percent: the memory limit as a percentage
"""
used_mem_percent = virtual_memory().percent
if used_mem_percent > hard_percent:
raise MemoryError('Using more memory than allowed by configuration '
'(Used: %d%% / Allowed: %d%%)! Shutting down.' %
(used_mem_percent, hard_percent))
elif used_mem_percent > soft_percent:
hostname = socket.gethostname()
monitor.send('warn', 'Using over %d%% of the memory in %s!',
used_mem_percent, hostname)
[docs]def safely_call(func, args, pickle=False):
"""
Call the given function with the given arguments safely, i.e.
by trapping the exceptions. Return a pair (result, exc_type)
where exc_type is None if no exceptions occur, otherwise it
is the exception class and the result is a string containing
error message and traceback.
:param func: the function to call
:param args: the arguments
:param pickle:
if set, the input arguments are unpickled and the return value
is pickled; otherwise they are left unchanged
"""
with Monitor('total ' + func.__name__, measuremem=True) as child:
if pickle: # measure the unpickling time too
args = [a.unpickle() for a in args]
if args and isinstance(args[-1], Monitor):
mon = args[-1]
mon.children.append(child) # child is a child of mon
child.hdf5path = mon.hdf5path
else:
mon = child
check_mem_usage(mon) # check if too much memory is used
mon.flush = NoFlush(mon, func.__name__)
try:
got = func(*args)
if inspect.isgenerator(got):
got = list(got)
res = got, None, mon
except:
etype, exc, tb = sys.exc_info()
tb_str = ''.join(traceback.format_tb(tb))
res = ('\n%s%s: %s' % (tb_str, etype.__name__, exc), etype, mon)
# NB: flush must not be called in the workers - they must not
# have access to the datastore - so we remove it
rec_delattr(mon, 'flush')
if pickle: # it is impossible to measure the pickling time :-(
res = Pickled(res)
return res
[docs]class Pickled(object):
"""
An utility to manually pickling/unpickling objects.
The reason is that celery does not use the HIGHEST_PROTOCOL,
so relying on celery is slower. Moreover Pickled instances
have a nice string representation and length giving the size
of the pickled bytestring.
:param obj: the object to pickle
"""
def __init__(self, obj):
self.clsname = obj.__class__.__name__
self.calc_id = str(getattr(obj, 'calc_id', '')) # for monitors
self.pik = pickle.dumps(obj, pickle.HIGHEST_PROTOCOL)
def __repr__(self):
"""String representation of the pickled object"""
return '<Pickled %s %s %s>' % (
self.clsname, self.calc_id, humansize(len(self)))
def __len__(self):
"""Length of the pickled bytestring"""
return len(self.pik)
[docs] def unpickle(self):
"""Unpickle the underlying object"""
return pickle.loads(self.pik)
[docs]def get_pickled_sizes(obj):
"""
Return the pickled sizes of an object and its direct attributes,
ordered by decreasing size. Here is an example:
>> total_size, partial_sizes = get_pickled_sizes(Monitor(''))
>> total_size
345
>> partial_sizes
[('_procs', 214), ('exc', 4), ('mem', 4), ('start_time', 4),
('_start_time', 4), ('duration', 4)]
Notice that the sizes depend on the operating system and the machine.
"""
sizes = []
attrs = getattr(obj, '__dict__', {})
for name, value in attrs.items():
sizes.append((name, len(Pickled(value))))
return len(Pickled(obj)), sorted(
sizes, key=lambda pair: pair[1], reverse=True)
[docs]def pickle_sequence(objects):
"""
Convert an iterable of objects into a list of pickled objects.
If the iterable contains copies, the pickling will be done only once.
If the iterable contains objects already pickled, they will not be
pickled again.
:param objects: a sequence of objects to pickle
"""
cache = {}
out = []
for obj in objects:
obj_id = id(obj)
if obj_id not in cache:
if isinstance(obj, Pickled): # already pickled
cache[obj_id] = obj
else: # pickle the object
cache[obj_id] = Pickled(obj)
out.append(cache[obj_id])
return out
[docs]class IterResult(object):
"""
:param futures:
an iterator over futures
:param taskname:
the name of the task
:param num_tasks
the total number of expected futures (None if unknown)
:param progress:
a logging function for the progress report
"""
task_data_dt = numpy.dtype(
[('taskno', numpy.uint32), ('weight', numpy.float32),
('duration', numpy.float32)])
def __init__(self, futures, taskname, num_tasks=None,
progress=logging.info):
self.futures = futures
self.name = taskname
self.num_tasks = num_tasks
if self.name.startswith("_"): # private task, log only in debug
self.progress = logging.debug
else:
self.progress = progress
self.sent = 0 # set in TaskManager.submit_all
self.received = []
if self.num_tasks:
self.log_percent = self._log_percent()
next(self.log_percent)
def _log_percent(self):
yield 0
done = 1
prev_percent = 0
while done < self.num_tasks:
percent = int(float(done) / self.num_tasks * 100)
if percent > prev_percent:
self.progress('%s %3d%%', self.name, percent)
prev_percent = percent
yield done
done += 1
self.progress('%s 100%%', self.name)
yield done
def __iter__(self):
self.received = []
for fut in self.futures:
check_mem_usage() # log a warning if too much memory is used
if hasattr(fut, 'result'):
result = fut.result()
else:
result = fut
if hasattr(result, 'unpickle'):
self.received.append(len(result))
val, etype, mon = result.unpickle()
else:
val, etype, mon = result
if etype:
raise etype(val)
if self.num_tasks:
next(self.log_percent)
self.save_task_data(mon)
yield val
if self.received:
self.progress('Received %s of data, maximum per task %s',
humansize(sum(self.received)),
humansize(max(self.received)))
[docs] def save_task_data(self, mon):
if hasattr(mon, 'weight'):
duration = mon.children[0].duration # the task is the first child
tup = (mon.task_no, mon.weight, duration)
data = numpy.array([tup], self.task_data_dt)
hdf5.extend3(mon.hdf5path, 'task_info/' + self.name, data)
mon.flush()
[docs] def reduce(self, agg=operator.add, acc=None):
for result in self:
if acc is None: # first time
acc = result
else:
acc = agg(acc, result)
return acc
@classmethod
[docs] def sum(cls, iresults):
"""
Sum the data transfer information of a set of results
"""
res = object.__new__(cls)
res.received = []
res.sent = 0
for iresult in iresults:
res.received.extend(iresult.received)
res.sent += iresult.sent
name = iresult.name.split('#', 1)[0]
if hasattr(res, 'name'):
assert res.name.split('#', 1)[0] == name, (res.name, name)
else:
res.name = iresult.name.split('#')[0]
return res
[docs]class TaskManager(object):
"""
A manager to submit several tasks of the same type.
The usage is::
tm = TaskManager(do_something, logging.info)
tm.send(arg1, arg2)
tm.send(arg3, arg4)
print tm.reduce()
Progress report is built-in.
"""
executor = executor
task_ids = []
@classmethod
[docs] def restart(cls):
cls.executor.shutdown()
cls.executor = ProcessPoolExecutor()
@classmethod
[docs] def starmap(cls, task, task_args, name=None):
"""
Spawn a bunch of tasks with the given list of arguments
:returns: a TaskManager object with a .result method.
"""
self = cls(task, name)
self.task_args = task_args
return self
@classmethod
[docs] def apply(cls, task, task_args,
concurrent_tasks=executor.num_tasks_hint,
maxweight=None,
weight=lambda item: 1,
key=lambda item: 'Unspecified',
name=None):
"""
Apply a task to a tuple of the form (sequence, \*other_args)
by first splitting the sequence in chunks, according to the weight
of the elements and possibly to a key (see :function:
`openquake.baselib.general.split_in_blocks`).
Then reduce the results with an aggregation function.
The chunks which are generated internally can be seen directly (
useful for debugging purposes) by looking at the attribute `._chunks`,
right after the `apply` function has been called.
:param task: a task to run in parallel
:param task_args: the arguments to be passed to the task function
:param agg: the aggregation function
:param acc: initial value of the accumulator (default empty AccumDict)
:param concurrent_tasks: hint about how many tasks to generate
:param maxweight: if not None, used to split the tasks
:param weight: function to extract the weight of an item in arg0
:param key: function to extract the kind of an item in arg0
"""
arg0 = task_args[0] # this is assumed to be a sequence
args = task_args[1:]
if maxweight:
chunks = block_splitter(arg0, maxweight, weight, key)
else:
chunks = split_in_blocks(arg0, concurrent_tasks or 1, weight, key)
return cls.starmap(task, [(chunk,) + args for chunk in chunks], name)
def __init__(self, oqtask, name=None):
self.task_func = oqtask
self.name = name or oqtask.__name__
self.results = []
self.sent = AccumDict()
self.distribute = oq_distribute()
self.argnames = inspect.getargspec(self.task_func).args
if self.distribute == 'ipython' and isinstance(
self.executor, ProcessPoolExecutor):
client = ipp.Client()
self.__class__.executor = client.executor()
[docs] def progress(self, *args):
"""
Log in INFO mode regular tasks and in DEBUG private tasks
"""
if self.name.startswith('_'):
logging.debug(*args)
else:
logging.info(*args)
[docs] def submit(self, *args):
"""
Submit a function with the given arguments to the process pool
and add a Future to the list `.results`. If the attribute
distribute is set, the function is run in process and the
result is returned.
"""
check_mem_usage()
# log a warning if too much memory is used
if self.distribute == 'no':
sent = {}
res = safely_call(self.task_func, args)
else:
piks = pickle_sequence(args)
sent = {arg: len(p) for arg, p in zip(self.argnames, piks)}
res = self._submit(piks)
self.sent += sent
self.results.append(res)
return sent
def _submit(self, piks):
if self.distribute == 'celery':
res = safe_task.delay(self.task_func, piks, True)
self.task_ids.append(res.task_id)
return res
else: # submit tasks by using the ProcessPoolExecutor or ipyparallel
return self.executor.submit(
safely_call, self.task_func, piks, True)
def _iterfutures(self):
# compatibility wrapper for different concurrency frameworks
if self.distribute == 'no':
for result in self.results:
fut = Future()
fut.set_result(result)
yield fut
elif self.distribute == 'celery':
rset = ResultSet(self.results)
for task_id, result_dict in rset.iter_native():
idx = self.task_ids.index(task_id)
self.task_ids.pop(idx)
fut = Future()
fut.set_result(result_dict['result'])
# work around a celery bug
del app.backend._cache[task_id]
yield fut
else: # future interface
for fut in as_completed(self.results):
yield fut
[docs] def reduce(self, agg=operator.add, acc=None):
"""
Loop on a set of results and update the accumulator
by using the aggregation function.
:param agg: the aggregation function, (acc, val) -> new acc
:param acc: the initial value of the accumulator
:returns: the final value of the accumulator
"""
if acc is None:
acc = AccumDict()
iter_result = self.submit_all()
for res in iter_result:
acc = agg(acc, res)
self.results = []
return acc
[docs] def wait(self):
"""
Wait until all the task terminate. Discard the results.
:returns: the total number of tasks that were spawned
"""
return self.reduce(self, lambda acc, res: acc + 1, 0)
[docs] def submit_all(self):
"""
:returns: an IterResult object
"""
try:
nargs = len(self.task_args)
except TypeError: # generators have no len
nargs = ''
if nargs == 1:
[args] = self.task_args
self.progress('Executing a single task in process')
return IterResult([safely_call(self.task_func, args)], self.name)
task_no = 0
for args in self.task_args:
task_no += 1
if task_no == 1: # first time
self.progress('Submitting %s "%s" tasks', nargs, self.name)
if isinstance(args[-1], Monitor): # add incremental task number
args[-1].task_no = task_no
weight = getattr(args[0], 'weight', None)
if weight:
args[-1].weight = weight
self.submit(*args)
if not task_no:
self.progress('No %s tasks were submitted', self.name)
ir = IterResult(self._iterfutures(), self.name, task_no, self.progress)
ir.sent = self.sent # for information purposes
if self.sent:
self.progress('Sent %s of data in %d task(s)',
humansize(sum(self.sent.values())),
ir.num_tasks)
return ir
def __iter__(self):
return iter(self.submit_all())
# convenient aliases
starmap = TaskManager.starmap
apply = TaskManager.apply
[docs]def do_not_aggregate(acc, value):
"""
Do nothing aggregation function.
:param acc: the accumulator
:param value: the value to accumulate
:returns: the accumulator unchanged
"""
return acc
[docs]class NoFlush(object):
# this is instantiated by safely_call
def __init__(self, monitor, taskname):
self.monitor = monitor
self.taskname = taskname
def __call__(self):
raise RuntimeError('Monitor(%r).flush() must not be called '
'by %s!' % (self.monitor.operation, self.taskname))
[docs]def rec_delattr(mon, name):
"""
Delete attribute from a monitor recursively
"""
for child in mon.children:
rec_delattr(child, name)
if name in vars(mon):
delattr(mon, name)
if OQ_DISTRIBUTE == 'celery':
safe_task = task(safely_call, queue='celery')
def _wakeup(sec):
"""Waiting functions, used to wake up the process pool"""
time.sleep(sec)
[docs]def wakeup_pool():
"""
This is used at startup, only when the ProcessPoolExecutor is used,
to fork the processes before loading any big data structure.
"""
if oq_distribute() == 'futures': # when using the ProcessPoolExecutor
list(starmap(_wakeup, ((.2,) for _ in range(executor._max_workers))))
[docs]class Starmap(object):
poolfactory = None # to be overridden
pool = None # to be overridden
@classmethod
[docs] def apply(cls, func, args, concurrent_tasks=executor._max_workers * 5,
weight=lambda item: 1, key=lambda item: 'Unspecified'):
chunks = split_in_blocks(args[0], concurrent_tasks, weight, key)
return cls(func, (((chunk,) + args[1:]) for chunk in chunks))
def __init__(self, func, iterargs):
# build the pool at first instantiation only
if self.__class__.pool is None:
self.__class__.pool = self.poolfactory()
self.func = func
allargs = list(iterargs)
self.num_tasks = len(allargs)
logging.info('Starting %d tasks', self.num_tasks)
self.imap = self.pool.imap_unordered(
functools.partial(safely_call, func), allargs)
[docs] def reduce(self, agg=operator.add, acc=None, progress=logging.info):
if acc is None:
acc = AccumDict()
for res in IterResult(
self.imap, self.func.__name__, self.num_tasks, progress):
acc = agg(acc, res)
return acc
[docs]class Serialmap(Starmap):
"""
A sequential Starmap, useful for debugging purpose.
"""
def __init__(self, func, iterargs):
self.func = func
allargs = list(iterargs)
self.num_tasks = len(allargs)
logging.info('Starting %d tasks', self.num_tasks)
self.imap = [safely_call(func, args) for args in allargs]
[docs]class Threadmap(Starmap):
"""
MapReduce implementation based on threads. For instance
>>> from collections import Counter
>>> c = Threadmap(Counter, [('hello',), ('world',)]).reduce(acc=Counter())
"""
poolfactory = staticmethod(
# following the same convention of the standard library, num_proc * 5
lambda: multiprocessing.dummy.Pool(executor._max_workers * 5))
pool = None # built at instantiation time
[docs]class Processmap(Starmap):
"""
MapReduce implementation based on processes. For instance
>>> from collections import Counter
>>> c = Processmap(Counter, [('hello',), ('world',)]).reduce(acc=Counter())
"""
poolfactory = staticmethod(multiprocessing.Pool)
pool = None # built at instantiation time