# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2010-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>."""Set up some system-wide loggers"""importosimportreimportgetpassimportloggingfromdatetimeimportdatetime,timezonefromopenquake.baselibimportconfig,zeromq,parallel,workerpoolaswfromopenquake.commonlibimportreadinput,dbapiUTC=timezone.utcLEVELS={'debug':logging.DEBUG,'info':logging.INFO,'warn':logging.WARNING,'error':logging.ERROR,'critical':logging.CRITICAL}SIMPLE_TYPES=(str,int,float,bool,datetime,list,tuple,dict,type(None))CALC_REGEX=r'(calc|cache)_(\d+)\.hdf5'MODELS=[]# to be populated in get_tag
[docs]defget_tag(job_ini):""" :returns: the name of the model if job_ini belongs to the mosaic_dir """ifnotMODELS:# first timeMODELS.extend(readinput.read_mosaic_df(buffer=.1).code)splits=job_ini.split('/')# es. /home/michele/mosaic/EUR/in/job.iniiflen(splits)>3andsplits[-3]inMODELS:returnsplits[-3]# EURreturn''
[docs]defdbcmd(action,*args):""" A dispatcher to the database server. :param string action: database action to perform :param tuple args: arguments """# make sure the passed arguments are simple (i.e. not Django# QueryDict that cannot be deserialized without settings.py)forarginargs:iftype(arg)notinSIMPLE_TYPES:raiseTypeError(f'{arg} is not a simple type')dbhost=os.environ.get('OQ_DATABASE',config.dbserver.host)ifdbhost=='127.0.0.1'andgetpass.getuser()!='openquake':# access the database directlyifaction.startswith('workers_'):master=w.WorkerMaster(-1)# current jobreturngetattr(master,action[8:])()# workers_(stop|kill)fromopenquake.server.dbimportactionstry:func=getattr(actions,action)exceptAttributeError:returndbapi.db(action,*args)else:returnfunc(dbapi.db,*args)# send a command to the databasetcp='tcp://%s:%s'%(dbhost,config.dbserver.port)sock=zeromq.Socket(tcp,zeromq.zmq.REQ,'connect',timeout=600)# when the system is loadedwithsock:res=sock.send((action,)+args)ifisinstance(res,parallel.Result):returnres.get()returnres
[docs]defdblog(level:str,job_id:int,task_no:int,msg:str):""" Log on the database """task='task #%d'%task_noreturndbcmd('log',job_id,datetime.now(UTC),level,task,msg)
[docs]defget_datadir():""" Extracts the path of the directory where the openquake data are stored from the environment ($OQ_DATADIR) or from the shared_dir in the configuration file. """datadir=os.environ.get('OQ_DATADIR')ifnotdatadir:shared_dir=config.directory.shared_dirifshared_dir:user=getpass.getuser()# special case for /opt/openquake/openquake -> /opt/openquakedatadir=os.path.join(shared_dir,user,'oqdata').replace('openquake/openquake','openquake')else:# use the home of the userdatadir=os.path.join(os.path.expanduser('~'),'oqdata')returndatadir
[docs]defget_calc_ids(datadir=None):""" Extract the available calculation IDs from the datadir, in order. """datadir=datadirorget_datadir()ifnotos.path.exists(datadir):return[]calc_ids=set()forfinos.listdir(datadir):mo=re.match(CALC_REGEX,f)ifmo:calc_ids.add(int(mo.group(2)))returnsorted(calc_ids)
[docs]defget_last_calc_id(datadir=None):""" Extract the latest calculation ID from the given directory. If none is found, return 0. """datadir=datadirorget_datadir()calcs=get_calc_ids(datadir)ifnotcalcs:return0returncalcs[-1]
def_update_log_record(self,record):""" Massage a log record before emitting it. Intended to be used by the custom log handlers defined in this module. """ifnothasattr(record,'hostname'):record.hostname='-'ifnothasattr(record,'job_id'):record.job_id=self.job_id
[docs]classLogContext:""" Context manager managing the logging functionality """oqparam=Nonedef__init__(self,params,log_level='info',log_file=None,user_name=None,hc_id=None,host=None,tag=''):ifnotdbcmd("SELECT name FROM sqlite_master WHERE name='job'"):raiseRuntimeError('You forgot to run oq engine --upgrade-db -y')self.log_level=log_levelself.log_file=log_fileself.user_name=user_nameorgetpass.getuser()self.params=paramsif'inputs'notinself.params:# for reaggregateself.tag=tagelse:inputs=self.params['inputs']self.tag=tagorget_tag(inputs.get('job_ini','<in-memory>'))ifhc_id:self.params['hazard_calculation_id']=hc_idcalc_id=int(params.get('job_id',0))ifcalc_id==0:datadir=get_datadir()self.calc_id=dbcmd('create_job',datadir,self.params['calculation_mode'],self.params.get('description','test'),user_name,hc_id,host)path=os.path.join(datadir,'calc_%d.hdf5'%self.calc_id)ifos.path.exists(path):# sanity check on the calculation IDraiseRuntimeError('There is a pre-existing file %s'%path)self.usedb=Trueelse:# assume the calc_id was alreay created in the dbassertcalc_id>0,calc_idself.calc_id=calc_idself.usedb=True
[docs]defget_oqparam(self,validate=True):""" :returns: an OqParam instance """ifself.oqparam:# set by submit_jobreturnself.oqparamreturnreadinput.get_oqparam(self.params,validate=validate)
def__enter__(self):ifnotlogging.root.handlers:# first timelevel=LEVELS.get(self.log_level,self.log_level)logging.basicConfig(level=level,handlers=[])f='[%(asctime)s #{}{}%(levelname)s] %(message)s'.format(self.calc_id,self.tag+' 'ifself.tagelse'')self.handlers=[LogDatabaseHandler(self.calc_id)] \
ifself.usedbelse[]ifself.log_fileisNone:# add a StreamHandler if not already thereifnotany(hforhinlogging.root.handlersifisinstance(h,logging.StreamHandler)):self.handlers.append(LogStreamHandler(self.calc_id))else:self.handlers.append(LogFileHandler(self.calc_id,self.log_file))forhandlerinself.handlers:handler.setFormatter(logging.Formatter(f,datefmt='%Y-%m-%d %H:%M:%S'))logging.root.addHandler(handler)ifos.environ.get('NUMBA_DISABLE_JIT'):logging.warning('NUMBA_DISABLE_JIT is set')returnselfdef__exit__(self,etype,exc,tb):iftb:ifetypeisSystemExit:dbcmd('finish',self.calc_id,'aborted')else:# remove StreamHandler to avoid logging twicelogging.root.removeHandler(self.handlers[-1])logging.exception(f'{etype.__name__}: {exc}')dbcmd('finish',self.calc_id,'failed')else:dbcmd('finish',self.calc_id,'complete')forhandlerinself.handlers:logging.root.removeHandler(handler)parallel.Starmap.shutdown()def__getstate__(self):# ensure pickleabilityreturndict(calc_id=self.calc_id,params=self.params,usedb=self.usedb,log_level=self.log_level,log_file=self.log_file,user_name=self.user_name,oqparam=self.oqparam,tag=self.tag)def__repr__(self):hc_id=self.params.get('hazard_calculation_id')return'<%s#%d, hc_id=%s>'%(self.__class__.__name__,self.calc_id,hc_id)
[docs]definit(job_ini,dummy=None,log_level='info',log_file=None,user_name=None,hc_id=None,host=None,tag=''):""" :param job_ini: path to the job.ini file or dictionary of parameters :param dummy: ignored parameter, exists for backward compatibility :param log_level: the log level as a string or number :param log_file: path to the log file (if any) :param user_name: user running the job (None means current user) :param hc_id: parent calculation ID (default None) :param host: machine where the calculation is running (default None) :param tag: tag (for instance the model name) to show before the log message :returns: a LogContext instance 1. initialize the root logger (if not already initialized) 2. set the format of the root log handlers (if any) 3. create a job in the database if job_or_calc == "job" 4. return a LogContext instance associated to a calculation ID """ifjob_iniin('job','calc'):# backward compatibilityjob_ini=dummyifnotisinstance(job_ini,dict):job_ini=readinput.get_params(job_ini)returnLogContext(job_ini,log_level,log_file,user_name,hc_id,host,tag)