# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2010-2017 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
"""Engine: A collection of fundamental functions for initializing and running
calculations."""
import os
import re
import sys
import json
import signal
import traceback
import platform
try:
from setproctitle import setproctitle
except ImportError:
def setproctitle(title):
"Do nothing"
from openquake.baselib.performance import Monitor
from openquake.baselib.python3compat import urlopen, Request, decode
from openquake.baselib import (
parallel, general, config, datastore, __version__, zeromq as z)
from openquake.commonlib.oqvalidation import OqParam
from openquake.commonlib import readinput
from openquake.calculators import base, views, export
from openquake.commonlib import logs
OQ_API = 'https://api.openquake.org'
TERMINATE = config.distribution.terminate_workers_on_revoke
USE_CELERY = os.environ.get('OQ_DISTRIBUTE') == 'celery'
if parallel.oq_distribute() == 'zmq':
def set_concurrent_tasks_default():
"""
Set the default for concurrent_tasks based on the available
worker pools .
"""
num_workers = 0
w = config.zworkers
for host, _cores in [hc.split() for hc in w.host_cores.split(',')]:
url = 'tcp://%s:%s' % (host, w.ctrl_port)
with z.Socket(url, z.zmq.REQ, 'connect') as sock:
if not general.socket_ready(url):
logs.LOG.warn('%s is not running', host)
continue
num_workers += sock.send('get_num_workers')
OqParam.concurrent_tasks.default = num_workers * 3
logs.LOG.info('Using %d zmq workers', num_workers)
elif USE_CELERY:
import celery.task.control
def set_concurrent_tasks_default():
"""
Set the default for concurrent_tasks based on the number of available
celery workers.
"""
stats = celery.task.control.inspect(timeout=1).stats()
if not stats:
sys.exit("No live compute nodes, aborting calculation")
num_cores = sum(stats[k]['pool']['max-concurrency'] for k in stats)
OqParam.concurrent_tasks.default = num_cores * 3
logs.LOG.info(
'Using %s, %d cores', ', '.join(sorted(stats)), num_cores)
def celery_cleanup(terminate, task_ids=()):
"""
Release the resources used by an openquake job.
In particular revoke the running tasks (if any).
:param bool terminate: the celery revoke command terminate flag
:param task_ids: celery task IDs
"""
# Using the celery API, terminate and revoke and terminate any running
# tasks associated with the current job.
if task_ids:
logs.LOG.warn('Revoking %d tasks', len(task_ids))
else: # this is normal when OQ_DISTRIBUTE=no
logs.LOG.debug('No task to revoke')
for tid in task_ids:
celery.task.control.revoke(tid, terminate=terminate)
logs.LOG.debug('Revoked task %s', tid)
[docs]def expose_outputs(dstore):
"""
Build a correspondence between the outputs in the datastore and the
ones in the database.
:param dstore: datastore
"""
oq = dstore['oqparam']
exportable = set(ekey[0] for ekey in export.export)
calcmode = oq.calculation_mode
dskeys = set(dstore) & exportable # exportable datastore keys
dskeys.add('fullreport')
rlzs = dstore['csm_info'].rlzs
if len(rlzs) > 1:
dskeys.add('realizations')
# expose gmf_data only if < 10 MB
if oq.ground_motion_fields and calcmode == 'event_based':
nbytes = dstore['gmf_data'].attrs['nbytes']
if nbytes < 10 * 1024 ** 2:
dskeys.add('gmf_data')
if 'scenario' not in calcmode: # export sourcegroups.csv
dskeys.add('sourcegroups')
hdf5 = dstore.hdf5
if (len(rlzs) == 1 and 'poes' in hdf5) or 'hcurves' in hdf5:
dskeys.add('hcurves')
if oq.uniform_hazard_spectra:
dskeys.add('uhs') # export them
if oq.hazard_maps:
dskeys.add('hmaps') # export them
if 'avg_losses-stats' in dstore or (
'avg_losses-rlzs' in dstore and len(rlzs)):
dskeys.add('avg_losses-stats')
if 'curves-stats' in dstore:
logs.LOG.warn('loss curves are exportable with oq export')
if oq.conditional_loss_poes: # expose loss_maps outputs
if 'loss_curves-stats' in dstore:
dskeys.add('loss_maps-stats')
if 'all_loss_ratios' in dskeys:
dskeys.remove('all_loss_ratios') # export only specific IDs
if 'ruptures' in dskeys and 'scenario' in calcmode:
exportable.remove('ruptures') # do not export, as requested by Vitor
if 'rup_loss_table' in dskeys: # keep it hidden for the moment
dskeys.remove('rup_loss_table')
logs.dbcmd('create_outputs', dstore.calc_id, sorted(dskeys & exportable))
[docs]class MasterKilled(KeyboardInterrupt):
"Exception raised when a job is killed manually"
[docs]def raiseMasterKilled(signum, _stack):
"""
When a SIGTERM is received, raise the MasterKilled
exception with an appropriate error message.
:param int signum: the number of the received signal
:param _stack: the current frame object, ignored
"""
if signum in (signal.SIGTERM, signal.SIGINT):
msg = 'The openquake master process was killed manually'
else:
msg = 'Received a signal %d' % signum
if sys.version_info >= (3, 5, 0):
# Python 2 is buggy and this code would hang
for pid in parallel.executor.pids: # when using futures
try:
os.kill(pid, signal.SIGKILL) # SIGTERM is not enough :-(
except OSError: # pid not found
pass
raise MasterKilled(msg)
# register the raiseMasterKilled callback for SIGTERM
# when using the Django development server this module is imported by a thread,
# so one gets a `ValueError: signal only works in main thread` that
# can be safely ignored
try:
signal.signal(signal.SIGTERM, raiseMasterKilled)
signal.signal(signal.SIGINT, raiseMasterKilled)
except ValueError:
pass
[docs]def job_from_file(cfg_file, username, hazard_calculation_id=None):
"""
Create a full job profile from a job config file.
:param str cfg_file:
Path to a job.ini file.
:param str username:
The user who will own this job profile and all results
:param str datadir:
Data directory of the user
:param hazard_calculation_id:
ID of a previous calculation or None
:returns:
a pair (job_id, oqparam)
"""
oq = readinput.get_oqparam(cfg_file, hc_id=hazard_calculation_id)
job_id = logs.dbcmd('create_job', oq.calculation_mode, oq.description,
username, datastore.get_datadir(),
hazard_calculation_id)
return job_id, oq
[docs]def run_calc(job_id, oqparam, log_level, log_file, exports,
hazard_calculation_id=None, **kw):
"""
Run a calculation.
:param job_id:
ID of the current job
:param oqparam:
:class:`openquake.commonlib.oqvalidation.OqParam` instance
:param str log_level:
The desired logging level. Valid choices are 'debug', 'info',
'progress', 'warn', 'error', and 'critical'.
:param str log_file:
Complete path (including file name) to file where logs will be written.
If `None`, logging will just be printed to standard output.
:param exports:
A comma-separated string of export types.
"""
setproctitle('oq-job-%d' % job_id)
monitor = Monitor('total runtime', measuremem=True)
with logs.handle(job_id, log_level, log_file): # run the job
if os.environ.get('OQ_DISTRIBUTE') in ('zmq', 'celery'):
set_concurrent_tasks_default()
msg = check_obsolete_version(oqparam.calculation_mode)
if msg:
logs.LOG.warn(msg)
calc = base.calculators(oqparam, monitor, calc_id=job_id)
monitor.hdf5path = calc.datastore.hdf5path
calc.from_engine = True
tb = 'None\n'
try:
logs.dbcmd('set_status', job_id, 'executing')
_do_run_calc(calc, exports, hazard_calculation_id, **kw)
duration = monitor.duration
expose_outputs(calc.datastore)
monitor.flush()
records = views.performance_view(calc.datastore)
logs.dbcmd('save_performance', job_id, records)
calc.datastore.close()
logs.LOG.info('Calculation %d finished correctly in %d seconds',
job_id, duration)
logs.dbcmd('finish', job_id, 'complete')
except:
tb = traceback.format_exc()
try:
logs.LOG.critical(tb)
logs.dbcmd('finish', job_id, 'failed')
except: # an OperationalError may always happen
sys.stderr.write(tb)
raise
finally:
# if there was an error in the calculation, this part may fail;
# in such a situation, we simply log the cleanup error without
# taking further action, so that the real error can propagate
try:
if USE_CELERY:
celery_cleanup(TERMINATE, parallel.Starmap.task_ids)
except:
# log the finalization error only if there is no real error
if tb == 'None\n':
logs.LOG.error('finalizing', exc_info=True)
return calc
def _do_run_calc(calc, exports, hazard_calculation_id, **kw):
with calc._monitor:
calc.run(exports=exports, hazard_calculation_id=hazard_calculation_id,
close=False, **kw) # don't close the datastore too soon
[docs]def version_triple(tag):
"""
returns: a triple of integers from a version tag
"""
groups = re.match(r'v?(\d+)\.(\d+)\.(\d+)', tag).groups()
return tuple(int(n) for n in groups)
[docs]def check_obsolete_version(calculation_mode='WebUI'):
"""
Check if there is a newer version of the engine.
:param calculation_mode:
- the calculation mode when called from the engine
- an empty string when called from the WebUI
:returns:
- a message if the running version of the engine is obsolete
- the empty string if the engine is updated
- None if the check could not be performed (i.e. github is down)
"""
if os.environ.get('JENKINS_URL') or os.environ.get('TRAVIS'):
# avoid flooding our API server with requests from CI systems
return
headers = {'User-Agent': 'OpenQuake Engine %s;%s;%s' %
(__version__, calculation_mode, platform.platform())}
try:
req = Request(OQ_API + '/engine/latest', headers=headers)
# NB: a timeout < 1 does not work
data = urlopen(req, timeout=1).read() # bytes
tag_name = json.loads(decode(data))['tag_name']
current = version_triple(__version__)
latest = version_triple(tag_name)
except: # page not available or wrong version tag
return
if current < latest:
return ('Version %s of the engine is available, but you are '
'still using version %s' % (tag_name, __version__))
else:
return ''