# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import getpass
import logging
from openquake.baselib import config
from openquake.baselib.general import safeprint
from openquake.hazardlib import valid
from openquake.commonlib import logs, datastore
from openquake.engine.engine import create_jobs, run_jobs
from openquake.engine.export import core
from openquake.engine.utils import confirm
from openquake.engine.tools.make_html_report import make_report
from openquake.server import dbserver
from openquake.commands.abort import main as abort
DEFAULT_EXPORTS = 'csv,xml,rst'
HAZARD_CALCULATION_ARG = "--hazard-calculation-id"
MISSING_HAZARD_MSG = "Please specify '%s=<id>'" % HAZARD_CALCULATION_ARG
ZMQ = os.environ.get(
'OQ_DISTRIBUTE', config.distribution.oq_distribute) == 'zmq'
[docs]def get_job_id(job_id, username=None):
job = logs.dbcmd('get_job', job_id, username)
if not job:
sys.exit('Job %s not found' % job_id)
return job.id
[docs]def del_calculation(job_id, confirmed=False):
"""
Delete a calculation and all associated outputs.
"""
job = logs.dbcmd('get_job', job_id)
if not job:
print('There is no job %d' % job_id)
return
if confirmed or confirm(
'Are you sure you want to (abort and) delete this calculation and '
'all associated outputs?\nThis action cannot be undone. (y/n): '):
try:
abort([job.id])
resp = logs.dbcmd('del_calc', job.id, getpass.getuser(), False)
except RuntimeError as err:
safeprint(err)
else:
if 'success' in resp:
os.remove(resp['hdf5path'])
print('Removed %d' % job.id)
else:
print(resp['error'])
[docs]def main(
no_distribute=False,
yes=False,
upgrade_db=False,
db_version=False,
what_if_I_upgrade=False,
list_hazard_calculations=False,
list_risk_calculations=False,
delete_uncompleted_calculations=False,
multi=False,
reuse_input=False,
*,
log_file=None,
make_html_report=None,
run=None,
delete_calculation: int = None,
hazard_calculation_id: int = None,
list_outputs: int = None,
show_log=None,
export_output=None,
export_outputs=None,
param='',
config_file=None,
exports='',
log_level='info',
sample_sources=False,):
"""
Run a calculation using the traditional command line API
"""
user_name = getpass.getuser()
if not run:
# configure a basic logging
logging.basicConfig(level=logging.INFO)
if config_file:
config.read(os.path.abspath(os.path.expanduser(config_file)),
limit=int, soft_mem_limit=int, hard_mem_limit=int,
port=int, serialize_jobs=valid.boolean,
strict=valid.boolean, code=exec)
if no_distribute:
os.environ['OQ_DISTRIBUTE'] = 'no'
if sample_sources:
assert 0 < float(sample_sources) < 1
os.environ['OQ_SAMPLE_SOURCES'] = sample_sources
# check if the datadir exists
datadir = datastore.get_datadir()
if not os.path.exists(datadir):
os.makedirs(datadir)
fname = os.path.expanduser(config.dbserver.file)
if os.environ.get('OQ_DATABASE', config.dbserver.host) == 'local':
if not os.path.exists(fname):
upgrade_db = True # automatically creates the db
yes = True
else:
dbserver.ensure_on()
# check that we are talking to the right server
err = dbserver.check_foreign()
if err:
sys.exit(err)
if upgrade_db:
msg = logs.dbcmd('what_if_I_upgrade', 'read_scripts')
if msg.startswith('Your database is already updated'):
pass
elif yes or confirm('Proceed? (y/n) '):
logs.dbcmd('upgrade_db')
if not run:
sys.exit(0)
if db_version:
safeprint(logs.dbcmd('db_version'))
sys.exit(0)
if what_if_I_upgrade:
safeprint(logs.dbcmd('what_if_I_upgrade', 'extract_upgrade_scripts'))
sys.exit(0)
# check if the db is outdated
outdated = logs.dbcmd('check_outdated')
if outdated:
sys.exit(outdated)
# hazard or hazard+risk
if hazard_calculation_id == -1:
# get the latest calculation of the current user
hc_id = get_job_id(hazard_calculation_id, user_name)
elif hazard_calculation_id:
# make it possible to use calculations made by another user
hc_id = get_job_id(hazard_calculation_id)
else:
hc_id = None
if run:
pars = dict(p.split('=', 1) for p in param.split(',')) if param else {}
if reuse_input:
pars['cachedir'] = datadir
log_file = os.path.expanduser(log_file) \
if log_file is not None else None
job_inis = [os.path.expanduser(f) for f in run]
jobs = create_jobs(job_inis, log_level, log_file, user_name,
hc_id, multi)
for job in jobs:
job.params.update(pars)
job.params['exports'] = exports
run_jobs(jobs)
# hazard
elif list_hazard_calculations:
for line in logs.dbcmd(
'list_calculations', 'hazard', getpass.getuser()):
safeprint(line)
elif delete_calculation is not None:
del_calculation(delete_calculation, yes)
# risk
elif list_risk_calculations:
for line in logs.dbcmd('list_calculations', 'risk', getpass.getuser()):
safeprint(line)
# export
elif make_html_report:
safeprint('Written %s' % make_report(make_html_report))
sys.exit(0)
elif list_outputs is not None:
hc_id = get_job_id(list_outputs)
for line in logs.dbcmd('list_outputs', hc_id):
safeprint(line)
elif show_log is not None:
hc_id = get_job_id(show_log)
for line in logs.dbcmd('get_log', hc_id):
safeprint(line)
elif export_output is not None:
output_id, target_dir = export_output
dskey, calc_id, datadir = logs.dbcmd('get_output', int(output_id))
for line in core.export_output(
dskey, calc_id, datadir, os.path.expanduser(target_dir),
exports or DEFAULT_EXPORTS):
safeprint(line)
elif export_outputs is not None:
job_id, target_dir = export_outputs
hc_id = get_job_id(job_id)
for line in core.export_outputs(
hc_id, os.path.expanduser(target_dir),
exports or DEFAULT_EXPORTS):
safeprint(line)
elif delete_uncompleted_calculations:
logs.dbcmd('delete_uncompleted_calculations', getpass.getuser())
else:
print("Please pass some option, see oq engine --help")
# flags
main.no_distribute = dict(abbrev='--nd', help='''\
Disable calculation task distribution and run the
computation in a single process. This is intended for
use in debugging and profiling.''')
main.yes = 'Automatically answer "yes" when asked to confirm an action'
main.upgrade_db = 'Upgrade the openquake database'
main.db_version = 'Show the current version of the openquake database'
main.what_if_I_upgrade = (
'Show what will happen to the openquake database if you upgrade')
main.list_hazard_calculations = dict(
abbrev='--lhc', help='List hazard calculation information')
main.list_risk_calculations = dict(
abbrev='--lrc', help='List risk calculation information')
main.delete_uncompleted_calculations = dict(
abbrev='--duc', help='Delete all the uncompleted calculations')
main.multi = 'Run multiple job.inis in parallel'
main.reuse_input = 'Read the CompositeSourceModel from the cache (if any)'
# options
main.log_file = dict(
abbrev='-L', help='''\
Location where to store log messages; if not specified, log messages
will be printed to the console (to stderr)''')
main.make_html_report = dict(
abbrev='--r', metavar='YYYY-MM-DD|today',
help='Build an HTML report of the computation at the given date')
main.run = dict(abbrev='--run',
help='Run a job with the specified config file',
metavar='JOB_INI', nargs='+')
main.delete_calculation = dict(
abbrev='--dc',
help='Delete a calculation and all associated outputs',
metavar='CALCULATION_ID')
main.hazard_calculation_id = dict(
abbrev='--hc', help='Use the given job as input for the next job')
main.list_outputs = dict(
abbrev='--lo', help='List outputs for the specified calculation',
metavar='CALCULATION_ID')
main.show_log = dict(
abbrev='--sl', help='Show the log of the specified calculation',
metavar='CALCULATION_ID')
main.export_output = dict(
abbrev='--eo', nargs=2, metavar=('OUTPUT_ID', 'TARGET_DIR'),
help='Export the desired output to the specified directory')
main.export_outputs = dict(
abbrev='--eos', nargs=2, metavar=('CALCULATION_ID', 'TARGET_DIR'),
help='Export all of the calculation outputs to the specified directory')
main.param = dict(
abbrev='-p', help='Override parameters specified with the syntax '
'NAME1=VALUE1,NAME2=VALUE2,...')
main.config_file = ('Custom openquake.cfg file, to override default '
'configurations')
main.exports = ('Comma-separated string specifing the export formats, '
'in order of priority')
main.log_level = dict(help='Defaults to "info"',
choices=['debug', 'info', 'warn', 'error', 'critical'])
main.sample_sources = dict(abbrev='--ss',
help="Sample fraction in the range 0..1")