Source code for openquake.commands.engine

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2020 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import getpass
import logging
from openquake.baselib import sap, config, datastore, parallel
from openquake.baselib.general import safeprint, start_many
from openquake.hazardlib import valid
from openquake.commonlib import logs, oqvalidation
from openquake.engine import engine as eng
from openquake.engine.export import core
from openquake.engine.utils import confirm
from openquake.engine.tools.make_html_report import make_report
from openquake.server import dbserver
from openquake.commands.abort import abort


DEFAULT_EXPORTS = 'csv,xml,rst'
HAZARD_CALCULATION_ARG = "--hazard-calculation-id"
MISSING_HAZARD_MSG = "Please specify '%s=<id>'" % HAZARD_CALCULATION_ARG
ZMQ = os.environ.get(
    'OQ_DISTRIBUTE', config.distribution.oq_distribute) == 'zmq'


[docs]def get_job_id(job_id, username=None): job = logs.dbcmd('get_job', job_id, username) if not job: sys.exit('Job %s not found' % job_id) return job.id
[docs]def run_jobs(job_inis, log_level='info', log_file=None, exports='', username=getpass.getuser(), **kw): """ Run jobs using the specified config file and other options. :param str job_inis: A list of paths to .ini files. :param str log_level: 'debug', 'info', 'warn', 'error', or 'critical' :param str log_file: Path to log file. :param exports: A comma-separated string of export types requested by the user. :param username: Name of the user running the job :param kw: Extra parameters like hazard_calculation_id and calculation_mode """ dist = parallel.oq_distribute() jobparams = [] multi = kw.pop('multi', None) for job_ini in job_inis: # NB: the logs must be initialized BEFORE everything job_id = logs.init('job', getattr(logging, log_level.upper())) with logs.handle(job_id, log_level, log_file): oqparam = eng.job_from_file(os.path.abspath(job_ini), job_id, username, **kw) if (not jobparams and not multi and 'hazard_calculation_id' not in kw): kw['hazard_calculation_id'] = job_id jobparams.append((job_id, oqparam)) jobarray = len(jobparams) > 1 and multi try: eng.poll_queue(job_id, poll_time=15) # wait for an empty slot or a CTRL-C except BaseException: # the job aborted even before starting for job_id, oqparam in jobparams: logs.dbcmd('finish', job_id, 'aborted') return jobparams else: for job_id, oqparam in jobparams: dic = {'status': 'executing', 'pid': eng._PID} if jobarray: dic['hazard_calculation_id'] = jobparams[0][0] logs.dbcmd('update_job', job_id, dic) try: if dist == 'zmq' and config.zworkers['host_cores']: logging.info('Asking the DbServer to start the workers') logs.dbcmd('zmq_start') # start the zworkers logs.dbcmd('zmq_wait') # wait for them to go up allargs = [(job_id, oqparam, exports, log_level, log_file) for job_id, oqparam in jobparams] if jobarray: with start_many(eng.run_calc, allargs): pass else: for args in allargs: eng.run_calc(*args) finally: if dist == 'zmq' and config.zworkers['host_cores']: logging.info('Stopping the zworkers') logs.dbcmd('zmq_stop') elif dist.startswith('celery'): eng.celery_cleanup(config.distribution.terminate_workers_on_revoke) return jobparams
[docs]def del_calculation(job_id, confirmed=False): """ Delete a calculation and all associated outputs. """ if logs.dbcmd('get_job', job_id) is None: print('There is no job %d' % job_id) return if confirmed or confirm( 'Are you sure you want to (abort and) delete this calculation and ' 'all associated outputs?\nThis action cannot be undone. (y/n): '): try: abort(job_id) resp = logs.dbcmd('del_calc', job_id, getpass.getuser()) except RuntimeError as err: safeprint(err) else: if 'success' in resp: print('Removed %d' % job_id) else: print(resp['error'])
@sap.Script # do not use sap.script, other oq engine will break def engine(log_file, no_distribute, yes, config_file, make_html_report, upgrade_db, db_version, what_if_I_upgrade, run, list_hazard_calculations, list_risk_calculations, delete_calculation, delete_uncompleted_calculations, hazard_calculation_id, list_outputs, show_log, export_output, export_outputs, exports='', log_level='info', multi=False, reuse_hazard=False, param=''): """ Run a calculation using the traditional command line API """ if not run: # configure a basic logging logs.init() if config_file: config.read(os.path.abspath(os.path.expanduser(config_file)), soft_mem_limit=int, hard_mem_limit=int, port=int, multi_user=valid.boolean, serialize_jobs=valid.boolean, strict=valid.boolean, code=exec) if no_distribute: os.environ['OQ_DISTRIBUTE'] = 'no' # check if the datadir exists datadir = datastore.get_datadir() if not os.path.exists(datadir): os.makedirs(datadir) dbserver.ensure_on() # check if we are talking to the right server err = dbserver.check_foreign() if err: sys.exit(err) if upgrade_db: msg = logs.dbcmd('what_if_I_upgrade', 'read_scripts') if msg.startswith('Your database is already updated'): pass elif yes or confirm('Proceed? (y/n) '): logs.dbcmd('upgrade_db') sys.exit(0) if db_version: safeprint(logs.dbcmd('db_version')) sys.exit(0) if what_if_I_upgrade: safeprint(logs.dbcmd('what_if_I_upgrade', 'extract_upgrade_scripts')) sys.exit(0) # check if the db is outdated outdated = logs.dbcmd('check_outdated') if outdated: sys.exit(outdated) # hazard or hazard+risk if hazard_calculation_id == -1: # get the latest calculation of the current user hc_id = get_job_id(hazard_calculation_id, getpass.getuser()) elif hazard_calculation_id: # make it possible to use calculations made by another user hc_id = get_job_id(hazard_calculation_id) else: hc_id = None if run: pars = dict(p.split('=', 1) for p in param.split(',')) if param else {} if reuse_hazard: pars['csm_cache'] = datadir if hc_id: pars['hazard_calculation_id'] = str(hc_id) oqvalidation.OqParam.check(pars) log_file = os.path.expanduser(log_file) \ if log_file is not None else None job_inis = [os.path.expanduser(f) for f in run] pars['multi'] = multi run_jobs(job_inis, log_level, log_file, exports, **pars) # hazard elif list_hazard_calculations: for line in logs.dbcmd( 'list_calculations', 'hazard', getpass.getuser()): safeprint(line) elif delete_calculation is not None: del_calculation(delete_calculation, yes) # risk elif list_risk_calculations: for line in logs.dbcmd('list_calculations', 'risk', getpass.getuser()): safeprint(line) # export elif make_html_report: safeprint('Written %s' % make_report(make_html_report)) sys.exit(0) elif list_outputs is not None: hc_id = get_job_id(list_outputs) for line in logs.dbcmd('list_outputs', hc_id): safeprint(line) elif show_log is not None: hc_id = get_job_id(show_log) for line in logs.dbcmd('get_log', hc_id): safeprint(line) elif export_output is not None: output_id, target_dir = export_output dskey, calc_id, datadir = logs.dbcmd('get_output', int(output_id)) for line in core.export_output( dskey, calc_id, datadir, os.path.expanduser(target_dir), exports or DEFAULT_EXPORTS): safeprint(line) elif export_outputs is not None: job_id, target_dir = export_outputs hc_id = get_job_id(job_id) for line in core.export_outputs( hc_id, os.path.expanduser(target_dir), exports or DEFAULT_EXPORTS): safeprint(line) elif delete_uncompleted_calculations: logs.dbcmd('delete_uncompleted_calculations', getpass.getuser()) else: engine.parentparser.prog = 'oq engine' engine.parentparser.print_usage() engine._add('log_file', '--log-file', '-L', help='''\ Location where to store log messages; if not specified, log messages will be printed to the console (to stderr)''') engine._add('no_distribute', '--no-distribute', '--nd', help='''\ Disable calculation task distribution and run the computation in a single process. This is intended for use in debugging and profiling.''', action='store_true') engine.flg('yes', 'Automatically answer "yes" when asked to confirm an action') engine.opt('config_file', 'Custom openquake.cfg file, to override default ' 'configurations') engine._add('make_html_report', '--make-html-report', '--r', help='Build an HTML report of the computation at the given date', metavar='YYYY-MM-DD|today') engine.flg('upgrade_db', 'Upgrade the openquake database') engine.flg('db_version', 'Show the current version of the openquake database') engine.flg('what_if_I_upgrade', 'Show what will happen to the openquake ' 'database if you upgrade') engine._add('run', '--run', help='Run a job with the specified config file', metavar='JOB_INI', nargs='+') engine._add('list_hazard_calculations', '--list-hazard-calculations', '--lhc', help='List hazard calculation information', action='store_true') engine._add('list_risk_calculations', '--list-risk-calculations', '--lrc', help='List risk calculation information', action='store_true') engine._add('delete_calculation', '--delete-calculation', '--dc', help='Delete a calculation and all associated outputs', metavar='CALCULATION_ID', type=int) engine._add('delete_uncompleted_calculations', '--delete-uncompleted-calculations', '--duc', help='Delete all the uncompleted calculations', action='store_true') engine._add('hazard_calculation_id', '--hazard-calculation-id', '--hc', help='Use the given job as input for the next job') engine._add('list_outputs', '--list-outputs', '--lo', help='List outputs for the specified calculation', metavar='CALCULATION_ID') engine._add('show_log', '--show-log', '--sl', help='Show the log of the specified calculation', metavar='CALCULATION_ID') engine._add('export_output', '--export-output', '--eo', nargs=2, metavar=('OUTPUT_ID', 'TARGET_DIR'), help='Export the desired output to the specified directory') engine._add('export_outputs', '--export-outputs', '--eos', nargs=2, metavar=('CALCULATION_ID', 'TARGET_DIR'), help='Export all of the calculation outputs to the ' 'specified directory') engine.opt('exports', 'Comma-separated string specifing the export formats, ' 'in order of priority') engine.opt('log_level', 'Defaults to "info"', choices=['debug', 'info', 'warn', 'error', 'critical']) engine.flg('multi', 'Run multiple job.inis in parallel') engine.flg('reuse_hazard', 'Read the source models from the cache (if any)') engine._add('param', '--param', '-p', help='Override parameters specified with the syntax ' 'NAME1=VALUE1,NAME2=VALUE2,...')