Source code for openquake.commonlib.concurrent_futures_process_mpatch

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

try:
    import prctl
    import signal
except ImportError:
    def concurrent_futures_process_monkeypatch():
        """Do nothing"""
else:
    import sys
    from concurrent.futures import process
    from concurrent.futures.process import _ResultItem

    def _process_worker(call_queue, result_queue):
        """
    Evaluates calls from call_queue and places the results in result_queue.

    This worker is run in a separate process.

    Args:
        call_queue: A multiprocessing.Queue of _CallItems that will be read and
            evaluated by the worker.
        result_queue: A multiprocessing.Queue of _ResultItems that will written
            to by the worker.
        shutdown: A multiprocessing.Event that will be set as a signal to the
            worker that it should exit when call_queue is empty.
        """
        prctl.set_pdeathsig(signal.SIGKILL)
        while True:
            call_item = call_queue.get(block=True)
            if call_item is None:
                # Wake up queue management thread
                result_queue.put(None)
                return
            try:
                r = call_item.fn(*call_item.args, **call_item.kwargs)
            except BaseException:
                e = sys.exc_info()[1]
                result_queue.put(_ResultItem(call_item.work_id,
                                             exception=e))
            else:
                result_queue.put(_ResultItem(call_item.work_id,
                                             result=r))

[docs] def concurrent_futures_process_monkeypatch(): process._process_worker = _process_worker