# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2020 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import getpass
import logging
from openquake.baselib import sap, config, datastore, parallel
from openquake.baselib.general import safeprint, start_many
from openquake.hazardlib import valid
from openquake.commonlib import logs, oqvalidation
from openquake.engine import engine as eng
from openquake.engine.export import core
from openquake.engine.utils import confirm
from openquake.engine.tools.make_html_report import make_report
from openquake.server import dbserver
from openquake.commands.abort import abort
DEFAULT_EXPORTS = 'csv,xml,rst'
HAZARD_CALCULATION_ARG = "--hazard-calculation-id"
MISSING_HAZARD_MSG = "Please specify '%s=<id>'" % HAZARD_CALCULATION_ARG
ZMQ = os.environ.get(
'OQ_DISTRIBUTE', config.distribution.oq_distribute) == 'zmq'
[docs]def get_job_id(job_id, username=None):
job = logs.dbcmd('get_job', job_id, username)
if not job:
sys.exit('Job %s not found' % job_id)
return job.id
[docs]def run_jobs(job_inis, log_level='info', log_file=None, exports='',
username=getpass.getuser(), **kw):
"""
Run jobs using the specified config file and other options.
:param str job_inis:
A list of paths to .ini files.
:param str log_level:
'debug', 'info', 'warn', 'error', or 'critical'
:param str log_file:
Path to log file.
:param exports:
A comma-separated string of export types requested by the user.
:param username:
Name of the user running the job
:param kw:
Extra parameters like hazard_calculation_id and calculation_mode
"""
dist = parallel.oq_distribute()
jobparams = []
multi = kw.pop('multi', None)
for job_ini in job_inis:
# NB: the logs must be initialized BEFORE everything
job_id = logs.init('job', getattr(logging, log_level.upper()))
with logs.handle(job_id, log_level, log_file):
oqparam = eng.job_from_file(os.path.abspath(job_ini), job_id,
username, **kw)
if (not jobparams and not multi
and 'hazard_calculation_id' not in kw):
kw['hazard_calculation_id'] = job_id
jobparams.append((job_id, oqparam))
jobarray = len(jobparams) > 1 and multi
try:
eng.poll_queue(job_id, poll_time=15)
# wait for an empty slot or a CTRL-C
except BaseException:
# the job aborted even before starting
for job_id, oqparam in jobparams:
logs.dbcmd('finish', job_id, 'aborted')
return jobparams
else:
for job_id, oqparam in jobparams:
dic = {'status': 'executing', 'pid': eng._PID}
if jobarray:
dic['hazard_calculation_id'] = jobparams[0][0]
logs.dbcmd('update_job', job_id, dic)
try:
if dist == 'zmq' and config.zworkers['host_cores']:
logging.info('Asking the DbServer to start the workers')
logs.dbcmd('zmq_start') # start the zworkers
logs.dbcmd('zmq_wait') # wait for them to go up
allargs = [(job_id, oqparam, exports, log_level, log_file)
for job_id, oqparam in jobparams]
if jobarray:
with start_many(eng.run_calc, allargs):
pass
else:
for args in allargs:
eng.run_calc(*args)
finally:
if dist == 'zmq' and config.zworkers['host_cores']:
logging.info('Stopping the zworkers')
logs.dbcmd('zmq_stop')
elif dist.startswith('celery'):
eng.celery_cleanup(config.distribution.terminate_workers_on_revoke)
return jobparams
[docs]def del_calculation(job_id, confirmed=False):
"""
Delete a calculation and all associated outputs.
"""
if logs.dbcmd('get_job', job_id) is None:
print('There is no job %d' % job_id)
return
if confirmed or confirm(
'Are you sure you want to (abort and) delete this calculation and '
'all associated outputs?\nThis action cannot be undone. (y/n): '):
try:
abort(job_id)
resp = logs.dbcmd('del_calc', job_id, getpass.getuser())
except RuntimeError as err:
safeprint(err)
else:
if 'success' in resp:
print('Removed %d' % job_id)
else:
print(resp['error'])
@sap.Script # do not use sap.script, other oq engine will break
def engine(log_file, no_distribute, yes, config_file, make_html_report,
upgrade_db, db_version, what_if_I_upgrade, run,
list_hazard_calculations, list_risk_calculations,
delete_calculation, delete_uncompleted_calculations,
hazard_calculation_id, list_outputs, show_log,
export_output, export_outputs, exports='',
log_level='info', multi=False, reuse_hazard=False, param=''):
"""
Run a calculation using the traditional command line API
"""
if not run:
# configure a basic logging
logs.init()
if config_file:
config.read(os.path.abspath(os.path.expanduser(config_file)),
soft_mem_limit=int, hard_mem_limit=int, port=int,
multi_user=valid.boolean,
serialize_jobs=valid.boolean, strict=valid.boolean,
code=exec)
if no_distribute:
os.environ['OQ_DISTRIBUTE'] = 'no'
# check if the datadir exists
datadir = datastore.get_datadir()
if not os.path.exists(datadir):
os.makedirs(datadir)
dbserver.ensure_on()
# check if we are talking to the right server
err = dbserver.check_foreign()
if err:
sys.exit(err)
if upgrade_db:
msg = logs.dbcmd('what_if_I_upgrade', 'read_scripts')
if msg.startswith('Your database is already updated'):
pass
elif yes or confirm('Proceed? (y/n) '):
logs.dbcmd('upgrade_db')
sys.exit(0)
if db_version:
safeprint(logs.dbcmd('db_version'))
sys.exit(0)
if what_if_I_upgrade:
safeprint(logs.dbcmd('what_if_I_upgrade', 'extract_upgrade_scripts'))
sys.exit(0)
# check if the db is outdated
outdated = logs.dbcmd('check_outdated')
if outdated:
sys.exit(outdated)
# hazard or hazard+risk
if hazard_calculation_id == -1:
# get the latest calculation of the current user
hc_id = get_job_id(hazard_calculation_id, getpass.getuser())
elif hazard_calculation_id:
# make it possible to use calculations made by another user
hc_id = get_job_id(hazard_calculation_id)
else:
hc_id = None
if run:
pars = dict(p.split('=', 1) for p in param.split(',')) if param else {}
if reuse_hazard:
pars['csm_cache'] = datadir
if hc_id:
pars['hazard_calculation_id'] = str(hc_id)
oqvalidation.OqParam.check(pars)
log_file = os.path.expanduser(log_file) \
if log_file is not None else None
job_inis = [os.path.expanduser(f) for f in run]
pars['multi'] = multi
run_jobs(job_inis, log_level, log_file, exports, **pars)
# hazard
elif list_hazard_calculations:
for line in logs.dbcmd(
'list_calculations', 'hazard', getpass.getuser()):
safeprint(line)
elif delete_calculation is not None:
del_calculation(delete_calculation, yes)
# risk
elif list_risk_calculations:
for line in logs.dbcmd('list_calculations', 'risk', getpass.getuser()):
safeprint(line)
# export
elif make_html_report:
safeprint('Written %s' % make_report(make_html_report))
sys.exit(0)
elif list_outputs is not None:
hc_id = get_job_id(list_outputs)
for line in logs.dbcmd('list_outputs', hc_id):
safeprint(line)
elif show_log is not None:
hc_id = get_job_id(show_log)
for line in logs.dbcmd('get_log', hc_id):
safeprint(line)
elif export_output is not None:
output_id, target_dir = export_output
dskey, calc_id, datadir = logs.dbcmd('get_output', int(output_id))
for line in core.export_output(
dskey, calc_id, datadir, os.path.expanduser(target_dir),
exports or DEFAULT_EXPORTS):
safeprint(line)
elif export_outputs is not None:
job_id, target_dir = export_outputs
hc_id = get_job_id(job_id)
for line in core.export_outputs(
hc_id, os.path.expanduser(target_dir),
exports or DEFAULT_EXPORTS):
safeprint(line)
elif delete_uncompleted_calculations:
logs.dbcmd('delete_uncompleted_calculations', getpass.getuser())
else:
engine.parentparser.prog = 'oq engine'
engine.parentparser.print_usage()
engine._add('log_file', '--log-file', '-L', help='''\
Location where to store log messages; if not specified, log messages
will be printed to the console (to stderr)''')
engine._add('no_distribute', '--no-distribute', '--nd', help='''\
Disable calculation task distribution and run the
computation in a single process. This is intended for
use in debugging and profiling.''', action='store_true')
engine.flg('yes', 'Automatically answer "yes" when asked to confirm an action')
engine.opt('config_file', 'Custom openquake.cfg file, to override default '
'configurations')
engine._add('make_html_report', '--make-html-report', '--r',
help='Build an HTML report of the computation at the given date',
metavar='YYYY-MM-DD|today')
engine.flg('upgrade_db', 'Upgrade the openquake database')
engine.flg('db_version', 'Show the current version of the openquake database')
engine.flg('what_if_I_upgrade', 'Show what will happen to the openquake '
'database if you upgrade')
engine._add('run', '--run', help='Run a job with the specified config file',
metavar='JOB_INI', nargs='+')
engine._add('list_hazard_calculations', '--list-hazard-calculations', '--lhc',
help='List hazard calculation information', action='store_true')
engine._add('list_risk_calculations', '--list-risk-calculations', '--lrc',
help='List risk calculation information', action='store_true')
engine._add('delete_calculation', '--delete-calculation', '--dc',
help='Delete a calculation and all associated outputs',
metavar='CALCULATION_ID', type=int)
engine._add('delete_uncompleted_calculations',
'--delete-uncompleted-calculations', '--duc',
help='Delete all the uncompleted calculations',
action='store_true')
engine._add('hazard_calculation_id', '--hazard-calculation-id', '--hc',
help='Use the given job as input for the next job')
engine._add('list_outputs', '--list-outputs', '--lo',
help='List outputs for the specified calculation',
metavar='CALCULATION_ID')
engine._add('show_log', '--show-log', '--sl',
help='Show the log of the specified calculation',
metavar='CALCULATION_ID')
engine._add('export_output', '--export-output', '--eo',
nargs=2, metavar=('OUTPUT_ID', 'TARGET_DIR'),
help='Export the desired output to the specified directory')
engine._add('export_outputs', '--export-outputs', '--eos',
nargs=2, metavar=('CALCULATION_ID', 'TARGET_DIR'),
help='Export all of the calculation outputs to the '
'specified directory')
engine.opt('exports', 'Comma-separated string specifing the export formats, '
'in order of priority')
engine.opt('log_level', 'Defaults to "info"',
choices=['debug', 'info', 'warn', 'error', 'critical'])
engine.flg('multi', 'Run multiple job.inis in parallel')
engine.flg('reuse_hazard', 'Read the source models from the cache (if any)')
engine._add('param', '--param', '-p',
help='Override parameters specified with the syntax '
'NAME1=VALUE1,NAME2=VALUE2,...')