# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2010-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>."""Engine: A collection of fundamental functions for initializing and runningcalculations."""importosimportreimportsysimportjsonimporttimeimportpickleimportsocketimportsignalimportgetpassimportloggingimportplatformimportfunctools#import multiprocessing.poolfromos.pathimportgetsizefromdatetimeimportdatetime,timezoneimportpsutilimporth5pyimportnumpytry:fromsetproctitleimportsetproctitleexceptImportError:defsetproctitle(title):"Do nothing"fromurllib.requestimporturlopen,Requestfromopenquake.baselib.python3compatimportdecodefromopenquake.baselibimportparallel,general,config,slurm,workerpoolaswfromopenquake.commonlib.oqvalidationimportOqParamfromopenquake.commonlibimportreadinput,logsfromopenquake.calculatorsimportbasefromopenquake.calculators.baseimportexpose_outputsUTC=timezone.utcUSER=getpass.getuser()OQ_API='https://api.openquake.org'MB=1024**2_PID=os.getpid()# the PID_PPID=os.getppid()# the controlling terminal PIDGET_JOBS='''--- executing or submittedSELECT * FROM job WHERE status IN ('executing', 'submitted')AND host=?x AND is_running=1 AND pid > 0 ORDER BY id'''
[docs]defget_zmq_ports():""" :returns: an array with the receiver ports """start,stop=config.dbserver.receiver_ports.split('-')returnnumpy.arange(int(start),int(stop))
[docs]defset_concurrent_tasks_default(calc):""" Look at the number of available workers and update the parameter OqParam.concurrent_tasks.default. Abort the calculations if no workers are available. Do nothing for trivial distributions. """dist=parallel.oq_distribute()ifdistin('zmq','slurm'):master=w.WorkerMaster(calc.datastore.calc_id)num_workers=sum(totalforhost,running,totalinmaster.wait())ifnum_workers==0:logging.critical("No live compute nodes, aborting calculation")logs.dbcmd('finish',calc.datastore.calc_id,'failed')sys.exit(1)parallel.Starmap.CT=num_workers*2OqParam.concurrent_tasks.default=num_workers*2else:num_workers=parallel.Starmap.num_coresifdist=='no':logging.warning('Disabled distribution')else:logging.warning('Using %d%s workers',num_workers,dist)
[docs]classMasterKilled(KeyboardInterrupt):"Exception raised when a job is killed manually"
[docs]defmanage_signals(job_id,signum,_stack):""" Convert a SIGTERM into a SystemExit exception and a SIGINT/SIGHUP into a MasterKilled exception with an appropriate error message. :param int signum: the number of the received signal :param _stack: the current frame object, ignored """ifsignum==signal.SIGINT:raiseMasterKilled('The openquake master process was killed manually')ifsignum==signal.SIGTERM:sys.exit(f'Killed {job_id}')ifhasattr(signal,'SIGHUP'):# kill the calculation only if os.getppid() != _PPID, i.e. the# controlling terminal died; in the workers, do nothing# Note: there is no SIGHUP on Windowsifsignum==signal.SIGHUPandos.getppid()!=_PPID:raiseMasterKilled('The openquake master lost its controlling terminal')
[docs]defregister_signals(job_id):# register the manage_signals callback for SIGTERM, SIGINT, SIGHUP;# when using the Django development server this module is imported by a# thread, so one gets a `ValueError: signal only works in main thread` that# can be safely ignoredmanage=functools.partial(manage_signals,job_id)try:signal.signal(signal.SIGTERM,manage)signal.signal(signal.SIGINT,manage)ifhasattr(signal,'SIGHUP'):# Do not register our SIGHUP handler if running with 'nohup'ifsignal.getsignal(signal.SIGHUP)!=signal.SIG_IGN:signal.signal(signal.SIGHUP,manage)exceptValueError:pass
[docs]defpoll_queue(job_id,poll_time):""" Check the queue of executing/submitted jobs and exit when there is a free slot. """try:host=socket.gethostname()exceptException:# gaierrorhost=Noneoffset=config.distribution.serialize_jobs-1ifoffset>=0:first_time=TruewhileTrue:running=logs.dbcmd(GET_JOBS,host)previous=[job.idforjobinrunningifjob.id<job_id-offset]ifprevious:iffirst_time:logs.dbcmd('update_job',job_id,{'status':'submitted','pid':_PID})first_time=False# the logging is not yet initialized, so use a printprint('Waiting for jobs %s'%' '.join(map(str,previous)))time.sleep(poll_time)else:break
[docs]defrun_calc(log):""" Run a calculation. :param log: LogContext of the current job """register_signals(log.calc_id)setproctitle('oq-job-%d'%log.calc_id)withlog:# check the available memory before startingwhileTrue:used_mem=psutil.virtual_memory().percentifused_mem<80:# continue if little memory is in usebreaklogging.info('Memory occupation %d%%, the user should free ''some memory',used_mem)time.sleep(5)oqparam=log.get_oqparam()calc=base.calculators(oqparam,log.calc_id)try:hostname=socket.gethostname()exceptException:# gaierrorhostname='localhost'logging.info('%s@%s running %s [--hc=%s]',USER,hostname,calc.oqparam.inputs['job_ini'],calc.oqparam.hazard_calculation_id)obsolete_msg=check_obsolete_version(oqparam.calculation_mode)# NB: the warning should not be logged for users with# an updated LTS versionifobsolete_msg:logging.warning(obsolete_msg)calc.from_engine=Trueset_concurrent_tasks_default(calc)t0=time.time()calc.run(shutdown=True)logging.info('Exposing the outputs to the database')expose_outputs(calc.datastore)calc.datastore.close()outs='\n'.join(logs.dbcmd('list_outputs',log.calc_id,False))logging.info(outs)path=calc.datastore.filenamesize=general.humansize(getsize(path))logging.info('Stored %s on %s in %d seconds',size,path,time.time()-t0)# sanity check to make sure that the logging on file is workingif(log.log_fileandlog.log_file!=os.devnullandgetsize(log.log_file)==0):logging.warning('The log file %s is empty!?'%log.log_file)returncalc
[docs]defcheck_directories(calc_id):""" Make sure that the datadir and the scratch_dir (if any) are writeable """datadir=logs.get_datadir()scratch_dir=parallel.scratch_dir(calc_id)fordirin(datadir,scratch_dir):assertos.path.exists(dir),dirfname=os.path.join(dir,'check')open(fname,'w').close()# check writeableos.remove(fname)
[docs]defcreate_jobs(job_inis,log_level=logging.INFO,log_file=None,user_name=USER,hc_id=None,host=None):""" Create job records on the database. :param job_inis: a list of pathnames or a list of dictionaries :returns: a list of LogContext objects """try:host=socket.gethostname()exceptException:# gaierrorhost=Nonejobs=[]forjob_iniinjob_inis:ifisinstance(job_ini,dict):dic=job_inielse:# NB: `get_params` must NOT log, since the logging is not# configured yet, otherwise the log will disappear :-(dic=readinput.get_params(job_ini)jobs.append(logs.init(dic,None,log_level,log_file,user_name,hc_id,host))check_directories(jobs[0].calc_id)returnjobs
[docs]defstart_workers(job_id,dist,nodes):""" Start the workers via the DbServer or via slurm """ifdist=='zmq':print('Starting the workers %s'%config.zworkers.host_cores)logs.dbcmd('workers_start',config.zworkers)# start the workerselifdist=='slurm':slurm.start_workers(job_id,nodes)slurm.wait_workers(job_id,nodes)
[docs]defstop_workers(job_id):""" Stop the workers spawned by the current job via the WorkerMaster """print(w.WorkerMaster(job_id).stop())
[docs]defwatchdog(calc_id,pid,timeout):""" If the job takes longer than the timeout, kills it """whileTrue:time.sleep(30)[(start,status)]=logs.dbcmd('SELECT start_time, status FROM job WHERE id=?x',calc_id)ifstatus!='executing':breakelif(datetime.now()-start).seconds>timeout:os.kill(pid,signal.SIGTERM)logs.dbcmd('finish',calc_id,'aborted')break
[docs]defrun_jobs(jobctxs,concurrent_jobs=None,nodes=1,sbatch=False,precalc=False):""" Run jobs using the specified config file and other options. :param jobctxs: List of LogContexts :param concurrent_jobs: How many jobs to run concurrently (default num_cores/4) """dist=parallel.oq_distribute()ifdist=='slurm':# check the total number of required corestot_cores=parallel.Starmap.num_cores*nodesmax_cores=int(config.distribution.max_cores)iftot_cores>max_cores:raiseValueError('You can use at most %d nodes'%max_cores//parallel.Starmap.num_cores)ifconcurrent_jobsisNone:# // 8 is chosen so that the core occupation in cole is decentconcurrent_jobs=parallel.Starmap.CT//8or1ifdistin('slurm','zmq'):print(f'{concurrent_jobs=}')job_id=jobctxs[0].calc_idifprecalc:# assume the first job is a precalculation from which the other startsforjobctxinjobctxs[1:]:jobctx.params['hazard_calculation_id']=job_idelse:forjobctxinjobctxs:hc_id=jobctx.params.get('hazard_calculation_id')ifhc_id:job=logs.dbcmd('get_job',hc_id)ppath=job.ds_calc_dir+'.hdf5'ifos.path.exists(ppath):version=logs.dbcmd('engine_version')withh5py.File(ppath,'r')asf:prev_version=f.attrs['engine_version']ifprev_version!=version:# here the logger is not initialized yetprint('Starting from a hazard (%d) computed with'' an obsolete version of the engine: %s'%(hc_id,prev_version))ifdist=='slurm'andsbatch:pass# do not wait in the job queueelse:try:poll_queue(job_id,poll_time=15)# wait for an empty slot or a CTRL-CexceptBaseException:# the job aborted even before startingforjobinjobctxs:logs.dbcmd('finish',job.calc_id,'aborted')raiseforjobinjobctxs:dic={'status':'executing','pid':_PID,'start_time':datetime.now(UTC)}logs.dbcmd('update_job',job.calc_id,dic)try:ifdistin('zmq','slurm')andw.WorkerMaster(job_id).status()==[]:start_workers(job_id,dist,nodes)# run the jobs sequentially or in parallel, with slurm or withoutifdist=='slurm'andsbatch:scratch_dir=parallel.scratch_dir(job_id)withopen(os.path.join(scratch_dir,'jobs.pik'),'wb')asf:pickle.dump(jobctxs,f)w.WorkerMaster(job_id).send_jobs()print('oq engine --show-log %d to see the progress'%job_id)eliflen(jobctxs)>1anddistin('zmq','slurm'):ifprecalc:run_calc(jobctxs[0])args=[(ctx,)forctxinjobctxs[1:]]else:args=[(ctx,)forctxinjobctxs]#with multiprocessing.pool.Pool(concurrent_jobs) as pool:# pool.starmap(run_calc, args)parallel.multispawn(run_calc,args,concurrent_jobs)else:forjobctxinjobctxs:run_calc(jobctx)finally:ifdist=='zmq'or(dist=='slurm'andnotsbatch):stop_workers(job_id)returnjobctxs
[docs]defversion_triple(tag):""" returns: a triple of integers from a version tag """groups=re.match(r'v?(\d+)\.(\d+)\.(\d+)',tag).groups()returntuple(int(n)forningroups)
[docs]defcheck_obsolete_version(calculation_mode='WebUI'):""" Check if there is a newer version of the engine. :param calculation_mode: - the calculation mode when called from the engine - an empty string when called from the WebUI :returns: - a message if the running version of the engine is obsolete - the empty string if the engine is updated - None if the check could not be performed (i.e. github is down) """ifos.environ.get('JENKINS_URL')oros.environ.get('CI'):# avoid flooding our API server with requests from CI systemsreturnversion=logs.dbcmd('engine_version')logging.info('Using engine version %s',version)headers={'User-Agent':'OpenQuake Engine %s;%s;%s;%s'%(version,calculation_mode,platform.platform(),config.distribution.oq_distribute)}try:req=Request(OQ_API+'/engine/latest',headers=headers)# NB: a timeout < 1 does not workdata=urlopen(req,timeout=1).read()# bytestag_name=json.loads(decode(data))['tag_name']current=version_triple(version)latest=version_triple(tag_name)exceptException:# page not available or wrong version tagmsg=('An error occurred while calling %s/engine/latest to check'' if the installed version of the engine is up to date.'%OQ_API)logging.warning(msg)returnifcurrent<latest:return('Version %s of the engine is available, but you are ''still using version %s'%(tag_name,version))else:return''
if__name__=='__main__':# run LogContexts stored in jobs.pik, called by job.yaml or slurmwithopen(sys.argv[1],'rb')asf:jobctxs=pickle.load(f)try:iflen(jobctxs)>1andjobctxs[0].multi:parallel.multispawn(run_calc,[(ctx,)forctxinjobctxs],parallel.Starmap.CT//10or1)else:forjobctxinjobctxs:run_calc(jobctx)exceptException:ids=[jc.calc_idforjcinjobctxs]rows=logs.dbcmd("SELECT id FROM job WHERE id IN (?X) ""AND status IN ('created', 'executing')",ids)forjid,inrows:logs.dbcmd("set_status",jid,'failed')raisefinally:stop_workers(jobctxs[0].calc_id)