# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2014-2023 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importosimportsysimportgetpassimportloggingfromopenquake.baselibimportconfigfromopenquake.baselib.generalimportsafeprintfromopenquake.hazardlibimportvalidfromopenquake.commonlibimportlogs,datastorefromopenquake.engine.engineimportcreate_jobs,run_jobsfromopenquake.engine.exportimportcorefromopenquake.engine.utilsimportconfirmfromopenquake.engine.tools.make_html_reportimportmake_reportfromopenquake.serverimportdbserverfromopenquake.commands.abortimportmainasabortDEFAULT_EXPORTS='csv,xml,rst'HAZARD_CALCULATION_ARG="--hazard-calculation-id"MISSING_HAZARD_MSG="Please specify '%s=<id>'"%HAZARD_CALCULATION_ARGZMQ=os.environ.get('OQ_DISTRIBUTE',config.distribution.oq_distribute)=='zmq'
[docs]defget_job_id(job_id,username=None):job=logs.dbcmd('get_job',job_id,username)ifnotjob:sys.exit('Job %s not found'%job_id)returnjob.id
[docs]defdel_calculation(job_id,confirmed=False):""" Delete a calculation and all associated outputs. """job=logs.dbcmd('get_job',job_id)ifnotjob:print('There is no job %d'%job_id)returnifconfirmedorconfirm('Are you sure you want to (abort and) delete this calculation and ''all associated outputs?\nThis action cannot be undone. (y/n): '):try:abort([job.id])resp=logs.dbcmd('del_calc',job.id,getpass.getuser(),False)exceptRuntimeErroraserr:safeprint(err)else:if'success'inresp:os.remove(resp['hdf5path'])print('Removed %d'%job.id)else:print(resp['error'])
[docs]defmain(no_distribute=False,yes=False,upgrade_db=False,db_version=False,what_if_I_upgrade=False,list_hazard_calculations=False,list_risk_calculations=False,delete_uncompleted_calculations=False,multi=False,reuse_input=False,*,log_file=None,make_html_report=None,run=None,delete_calculation:int=None,hazard_calculation_id:int=None,list_outputs:int=None,show_log=None,export_output=None,export_outputs=None,param='',config_file=None,exports='',log_level='info',sample_sources=False,):""" Run a calculation using the traditional command line API """user_name=getpass.getuser()ifnotrun:# configure a basic logginglogging.basicConfig(level=logging.INFO)ifconfig_file:config.read(os.path.abspath(os.path.expanduser(config_file)),limit=int,soft_mem_limit=int,hard_mem_limit=int,port=int,serialize_jobs=valid.boolean,strict=valid.boolean,code=exec)ifno_distribute:os.environ['OQ_DISTRIBUTE']='no'ifsample_sources:assert0<float(sample_sources)<1os.environ['OQ_SAMPLE_SOURCES']=sample_sources# check if the datadir existsdatadir=datastore.get_datadir()ifnotos.path.exists(datadir):os.makedirs(datadir)fname=os.path.expanduser(config.dbserver.file)ifos.environ.get('OQ_DATABASE',config.dbserver.host)=='local':ifnotos.path.exists(fname):upgrade_db=True# automatically creates the dbyes=Trueelse:dbserver.ensure_on()# check that we are talking to the right servererr=dbserver.check_foreign()iferr:sys.exit(err)ifupgrade_db:msg=logs.dbcmd('what_if_I_upgrade','read_scripts')ifmsg.startswith('Your database is already updated'):passelifyesorconfirm('Proceed? (y/n) '):logs.dbcmd('upgrade_db')ifnotrun:sys.exit(0)ifdb_version:safeprint(logs.dbcmd('db_version'))sys.exit(0)ifwhat_if_I_upgrade:safeprint(logs.dbcmd('what_if_I_upgrade','extract_upgrade_scripts'))sys.exit(0)# check if the db is outdatedoutdated=logs.dbcmd('check_outdated')ifoutdated:sys.exit(outdated)# hazard or hazard+riskifhazard_calculation_id==-1:# get the latest calculation of the current userhc_id=get_job_id(hazard_calculation_id,user_name)elifhazard_calculation_id:# make it possible to use calculations made by another userhc_id=get_job_id(hazard_calculation_id)else:hc_id=Noneifrun:pars=dict(p.split('=',1)forpinparam.split(','))ifparamelse{}ifreuse_input:pars['cachedir']=datadirlog_file=os.path.expanduser(log_file) \
iflog_fileisnotNoneelseNonejob_inis=[os.path.expanduser(f)forfinrun]jobs=create_jobs(job_inis,log_level,log_file,user_name,hc_id,multi)forjobinjobs:job.params.update(pars)job.params['exports']=exportsrun_jobs(jobs)# hazardeliflist_hazard_calculations:forlineinlogs.dbcmd('list_calculations','hazard',getpass.getuser()):safeprint(line)elifdelete_calculationisnotNone:del_calculation(delete_calculation,yes)# riskeliflist_risk_calculations:forlineinlogs.dbcmd('list_calculations','risk',getpass.getuser()):safeprint(line)# exportelifmake_html_report:safeprint('Written %s'%make_report(make_html_report))sys.exit(0)eliflist_outputsisnotNone:hc_id=get_job_id(list_outputs)forlineinlogs.dbcmd('list_outputs',hc_id):safeprint(line)elifshow_logisnotNone:hc_id=get_job_id(show_log)forlineinlogs.dbcmd('get_log',hc_id):safeprint(line)elifexport_outputisnotNone:output_id,target_dir=export_outputdskey,calc_id,datadir=logs.dbcmd('get_output',int(output_id))forlineincore.export_output(dskey,calc_id,datadir,os.path.expanduser(target_dir),exportsorDEFAULT_EXPORTS):safeprint(line)elifexport_outputsisnotNone:job_id,target_dir=export_outputshc_id=get_job_id(job_id)forlineincore.export_outputs(hc_id,os.path.expanduser(target_dir),exportsorDEFAULT_EXPORTS):safeprint(line)elifdelete_uncompleted_calculations:logs.dbcmd('delete_uncompleted_calculations',getpass.getuser())else:print("Please pass some option, see oq engine --help")
# flagsmain.no_distribute=dict(abbrev='--nd',help='''\Disable calculation task distribution and run thecomputation in a single process. This is intended foruse in debugging and profiling.''')main.yes='Automatically answer "yes" when asked to confirm an action'main.upgrade_db='Upgrade the openquake database'main.db_version='Show the current version of the openquake database'main.what_if_I_upgrade=('Show what will happen to the openquake database if you upgrade')main.list_hazard_calculations=dict(abbrev='--lhc',help='List hazard calculation information')main.list_risk_calculations=dict(abbrev='--lrc',help='List risk calculation information')main.delete_uncompleted_calculations=dict(abbrev='--duc',help='Delete all the uncompleted calculations')main.multi='Run multiple job.inis in parallel'main.reuse_input='Read the CompositeSourceModel from the cache (if any)'# optionsmain.log_file=dict(abbrev='-L',help='''\Location where to store log messages; if not specified, log messageswill be printed to the console (to stderr)''')main.make_html_report=dict(abbrev='--r',metavar='YYYY-MM-DD|today',help='Build an HTML report of the computation at the given date')main.run=dict(abbrev='--run',help='Run a job with the specified config file',metavar='JOB_INI',nargs='+')main.delete_calculation=dict(abbrev='--dc',help='Delete a calculation and all associated outputs',metavar='CALCULATION_ID')main.hazard_calculation_id=dict(abbrev='--hc',help='Use the given job as input for the next job')main.list_outputs=dict(abbrev='--lo',help='List outputs for the specified calculation',metavar='CALCULATION_ID')main.show_log=dict(abbrev='--sl',help='Show the log of the specified calculation',metavar='CALCULATION_ID')main.export_output=dict(abbrev='--eo',nargs=2,metavar=('OUTPUT_ID','TARGET_DIR'),help='Export the desired output to the specified directory')main.export_outputs=dict(abbrev='--eos',nargs=2,metavar=('CALCULATION_ID','TARGET_DIR'),help='Export all of the calculation outputs to the specified directory')main.param=dict(abbrev='-p',help='Override parameters specified with the syntax ''NAME1=VALUE1,NAME2=VALUE2,...')main.config_file=('Custom openquake.cfg file, to override default ''configurations')main.exports=('Comma-separated string specifing the export formats, ''in order of priority')main.log_level=dict(help='Defaults to "info"',choices=['debug','info','warn','error','critical'])main.sample_sources=dict(abbrev='--ss',help="Sample fraction in the range 0..1")