# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2014-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importioimportosimportcsvimportsysimportabcimportpdbimportjsonimporttimeimportinspectimportloggingimportoperatorimporttracebackimportgetpassfromdatetimeimportdatetimefromshapelyimportwkt,geometryimportpsutilimportnumpyimportpandasfromPILimportImageimportconfigparserimportcollectionsfromopenquake.commands.plot_assetsimportmainasplot_assetsfromopenquake.baselibimportgeneral,hdf5,configfromopenquake.baselibimportparallelfromopenquake.baselib.performanceimportMonitor,idx_start_stopfromopenquake.hazardlibimport(InvalidFile,valid,geo,site,stats,logictree,source_reader)fromopenquake.hazardlib.site_amplificationimportAmplifierfromopenquake.hazardlib.site_amplificationimportAmplFunctionfromopenquake.hazardlib.calc.gmfimportGmfComputerfromopenquake.hazardlib.calc.filtersimportSourceFilter,getdefaultfromopenquake.hazardlib.sourceimportrupturefromopenquake.hazardlib.shakemap.mapsimportget_sitecol_shakemapfromopenquake.hazardlib.shakemap.gmfsimportto_gmfsfromopenquake.risklibimportriskinput,riskmodels,reinsurancefromopenquake.commonlibimportreadinput,datastore,logsfromopenquake.calculators.exportimportexportasexpfromopenquake.calculatorsimportgetters,postprocget_taxonomy=operator.attrgetter('taxonomy')get_weight=operator.attrgetter('weight')get_imt=operator.attrgetter('imt')calculators=general.CallableDict(operator.attrgetter('calculation_mode'))U8=numpy.uint8U16=numpy.uint16U32=numpy.uint32F32=numpy.float32TWO16=2**16TWO32=2**32EBDOC=('https://docs.openquake.org/oq-engine/master/manual/user-guide/''advanced/advanced-calculations.html#understanding-the-hazard')stats_dt=numpy.dtype([('mean',F32),('std',F32),('min',F32),('max',F32),('len',U16)])USER=getpass.getuser()MB=1024**2
[docs]defcheck_imtls(this,parent):""" Fix the hazard_imtls of two calculations if possible """forimt,imlsinthis.items():iflen(imls)!=len(parent[imt])or(F32(imls)!=F32(parent[imt])).any():raiseValueError('The intensity measure levels %s are different ''from the parent levels %s for %s'%(imls,parent[imt],imt))
# this is used for the minimum_intensity dictionaries
[docs]defconsistent(dic1,dic2):""" Check if two dictionaries with default are consistent: >>> consistent({'PGA': 0.05, 'SA(0.3)': 0.05}, {'default': 0.05}) True >>> consistent({'SA(0.3)': 0.1, 'SA(0.6)': 0.05}, ... {'default': 0.1, 'SA(0.3)': 0.1, 'SA(0.6)': 0.05}) True """ifdic1==dic2:returnTruev1=set(dic1.values())v2=set(dic2.values())missing=set(dic2)-set(dic1)-{'default'}iflen(v1)==1andlen(v2)==1andv1==v2:# {'PGA': 0.05, 'SA(0.3)': 0.05} is consistent with {'default': 0.05}returnTruereturnnotmissing
[docs]classInvalidCalculationID(Exception):""" Raised when running a post-calculation on top of an incompatible pre-calculation """
[docs]defset_array(longarray,shortarray):""" :param longarray: a numpy array of floats of length L >= l :param shortarray: a numpy array of floats of length l Fill `longarray` with the values of `shortarray`, starting from the left. If `shortarry` is shorter than `longarray`, then the remaining elements on the right are filled with `numpy.nan` values. """longarray[:len(shortarray)]=shortarraylongarray[len(shortarray):]=numpy.nan
[docs]defcsv2peril(fname,name,sitecol,tofloat,asset_hazard_distance):""" Converts a CSV file into a peril array of length N """data=[]withopen(fname)asf:forrowincsv.DictReader(f):intensity=tofloat(row['intensity'])ifintensity>0:data.append((valid.longitude(row['lon']),valid.latitude(row['lat']),intensity))data=numpy.array(data,[('lon',float),('lat',float),('number',float)])logging.info('Read %s with %d rows'%(fname,len(data)))iflen(data)!=len(numpy.unique(data[['lon','lat']])):raiseInvalidFile('There are duplicated points in %s'%fname)try:distance=asset_hazard_distance[name]exceptKeyError:distance=asset_hazard_distance['default']sites,filtdata,_discarded=geo.utils.assoc(data,sitecol,distance,'filter')peril=numpy.zeros(len(sitecol),float)peril[sites.sids]=filtdata['number']returnperil
[docs]defwkt2peril(fname,name,sitecol):""" Converts a WKT file into a peril array of length N """withopen(fname)asf:header=next(f)# skip headerifheader!='geom\n':raiseValueError('%s has header %r, should be geom instead'%(fname,header))text=f.read()ifnottext.startswith('"'):raiseValueError('The geometry must be quoted in %s : "%s..."'%(fname,text.split('(')[0]))geom=wkt.loads(text.strip('"'))# strip quotesperil=numpy.zeros(len(sitecol),float)forsid,lon,latinsitecol.complete.array[['sids','lon','lat']]:peril[sid]=geometry.Point(lon,lat).within(geom)returnperil
[docs]classBaseCalculator(metaclass=abc.ABCMeta):""" Abstract base class for all calculators. :param oqparam: OqParam object :param monitor: monitor object :param calc_id: numeric calculation ID """precalc=Noneaccept_precalc=[]from_engine=False# set by engine.run_calcis_stochastic=False# True for scenario and event based calculatorsdef__init__(self,oqparam,calc_id):self.oqparam=oqparamself.datastore=datastore.new(calc_id,oqparam)self.engine_version=logs.dbcmd('engine_version')ifos.environ.get('OQ_APPLICATION_MODE')=='AELO':self.aelo_version=get_aelo_version()# save the version in the monitor, to be used in the version# check in the workersself._monitor=Monitor('%s.run'%self.__class__.__name__,measuremem=True,h5=self.datastore,version=self.engine_versionifparallel.oq_distribute()=='zmq'elseNone)self._monitor.filename=self.datastore.filename# NB: using h5=self.datastore.hdf5 would mean losing the performance# info about Calculator.run since the file will be closed later on
[docs]defpre_checks(self):""" Checks to run after the pre_execute but before the execute """
[docs]defmonitor(self,operation='',**kw):""" :returns: a new Monitor instance """mon=self._monitor(operation,h5=self.datastore.hdf5)self._monitor.calc_id=mon.calc_id=self.datastore.calc_idvars(mon).update(kw)returnmon
[docs]defsave_params(self,**kw):""" Update the current calculation parameters and save engine_version """if('hazard_calculation_id'inkwandkw['hazard_calculation_id']isNone):delkw['hazard_calculation_id']vars(self.oqparam).update(**kw)ifisinstance(self.oqparam.risk_imtls,dict):# always except in case_shakemapself.datastore['oqparam']=self.oqparamattrs=self.datastore['/'].attrsattrs['engine_version']=self.engine_versionifhasattr(self,'aelo_version'):attrs['aelo_version']=self.aelo_versionattrs['date']=datetime.now().isoformat()[:19]if'checksum32'notinattrs:attrs['input_size']=size=self.oqparam.get_input_size()attrs['checksum32']=check=readinput.get_checksum32(self.oqparam,self.datastore.hdf5)logging.info(f'Checksum of the inputs: {check} 'f'(total size {general.humansize(size)})')
[docs]defcheck_precalc(self,precalc_mode):""" Defensive programming against users providing an incorrect pre-calculation ID (with ``--hazard-calculation-id``). :param precalc_mode: calculation_mode of the previous calculation """calc_mode=self.oqparam.calculation_modeok_mode=self.accept_precalcifcalc_mode!=precalc_modeandprecalc_modenotinok_mode:raiseInvalidCalculationID('In order to run a calculation of kind %r, ''you need to provide a calculation of kind %r, ''but you provided a %r instead'%(calc_mode,ok_mode,precalc_mode))
[docs]defrun(self,pre_execute=True,concurrent_tasks=None,remove=False,shutdown=False,**kw):""" Run the calculation and return the exported outputs. :param pre_execute: set it to False to avoid running pre_execute :param concurrent_tasks: set it to 0 to disable parallelization :param remove: set it to False to remove the hdf5cache file (if any) :param shutdown: set it to True to shutdown the ProcessPool """oq=self.oqparamwithself._monitor:self._monitor.username=kw.get('username','')ifconcurrent_tasksisNone:# use the job.ini parameterct=oq.concurrent_taskselse:# used the parameter passed in the command-linect=concurrent_tasksifct==0:# disable distribution temporarilyoq_distribute=os.environ.get('OQ_DISTRIBUTE')os.environ['OQ_DISTRIBUTE']='no'ifct!=oq.concurrent_tasks:# save the used concurrent_tasksoq.concurrent_tasks=ctifself.precalcisNone:logging.info('Running %s with concurrent_tasks = %d',self.__class__.__name__,ct)self.save_params(**kw)try:ifpre_execute:self.pre_execute()self.result=self.execute()ifself.resultisnotNone:self.post_execute(self.result)# FIXME: this part can be called multiple times, for instance for# EventBasedCalculator,EventBasedRiskCalculatorself.post_process()self.export(kw.get('exports',''))exceptExceptionasexc:ifkw.get('pdb'):# post-mortem debugtb=sys.exc_info()[2]traceback.print_tb(tb)pdb.post_mortem(tb)else:raiseexcfromNonefinally:ifshutdown:parallel.Starmap.shutdown()# cleanup globalsifct==0:# restore OQ_DISTRIBUTEifoq_distributeisNone:# was not setdelos.environ['OQ_DISTRIBUTE']else:os.environ['OQ_DISTRIBUTE']=oq_distribute# remove temporary hdf5 file, if anyifos.path.exists(self.datastore.tempname):ifremoveandoq.calculation_mode!='preclassical':# removing in preclassical with multiFaultSources# would break --hc which is reading the temp fileos.remove(self.datastore.tempname)returngetattr(self,'exported',{})
[docs]defcore_task(*args):""" Core routine running on the workers. """raiseNotImplementedError
[docs]@abc.abstractmethoddefexecute(self):""" Execution phase. Usually will run in parallel the core function and return a dictionary with the results. """
[docs]@abc.abstractmethoddefpost_execute(self,result):""" Post-processing phase of the aggregated output. It must be overridden with the export code. It will return a dictionary of output files. """
[docs]defgzip_inputs(self):""" Gzipping the inputs and saving them in the datastore """logging.info('gzipping the input files')fnames=readinput.get_input_files(self.oqparam)self.datastore.store_files(fnames)
[docs]defexport(self,exports=None):""" Export all the outputs in the datastore in the given export formats. Individual outputs are not exported if there are multiple realizations. """self.exported=getattr(self,'exported',{})ifisinstance(exports,tuple):fmts=exportselifexports:# is a stringfmts=exports.split(',')elifisinstance(self.oqparam.exports,(tuple,list)):fmts=self.oqparam.exportselse:# is a stringfmts=self.oqparam.exports.split(',')keys=set(self.datastore)|{'fullreport'}has_hcurves=('hcurves-stats'inself.datastoreor'hcurves-rlzs'inself.datastore)ifhas_hcurves:keys.add('hcurves')if'ruptures'inself.datastoreandlen(self.datastore['ruptures']):keys.add('event_based_mfd')elif'ruptures'inkeys:keys.remove('ruptures')forfmtinfmts:ifnotfmt:continueiffmt=='csv':self._export(('realizations',fmt))forkeyinsorted(keys):# top level keysif'rlzs'inkeyandself.R>1:if(key[:-4]+'stats')inself.datastore:continue# skip individual curvesself._export((key,fmt))ifhas_hcurvesandself.oqparam.hazard_maps:self._export(('hmaps',fmt))ifhas_hcurvesandself.oqparam.uniform_hazard_spectra:self._export(('uhs',fmt))
def_export(self,ekey):ifekeynotinexporself.exported.get(ekey):# already exportedreturnwithself.monitor('export'):try:self.exported[ekey]=fnames=exp(ekey,self.datastore)exceptExceptionasexc:fnames=[]logging.error('Could not export %s: %s',ekey,exc)iffnames:logging.info('exported %s: %s',ekey[0],fnames)def__repr__(self):return'<%s#%d>'%(self.__class__.__name__,self.datastore.calc_id)
[docs]defcheck_time_event(oqparam,occupancy_periods):""" Check the `time_event` parameter in the datastore, by comparing with the periods found in the exposure. """time_event=oqparam.time_eventiftime_event!='avg'andtime_eventnotinoccupancy_periods:raiseValueError('time_event is %s in %s, but the exposure contains %s'%(time_event,oqparam.inputs['job_ini'],', '.join(occupancy_periods)))
[docs]defcheck_amplification(ampl_df,sitecol):""" Make sure the amplification codes in the site collection match the ones in the amplification table. :param ampl_df: the amplification table as a pandas DataFrame :param sitecol: the site collection """codeset=set(ampl_df.index)iflen(codeset)==1:# there is a single amplification function, there is no need to# extend the sitecol with an ampcode fieldreturncodes=set(sitecol.ampcode)missing=codes-codesetifmissing:raiseValueError('The site collection contains references to missing ''amplification functions: %s'%b' '.join(missing).decode('utf8'))
[docs]classHazardCalculator(BaseCalculator):""" Base class for hazard calculators based on source models """af=Noneamplifier=None
[docs]defsrc_filter(self):""" :returns: a SourceFilter """oq=self.oqparamifgetattr(self,'sitecol',None):sitecol=self.sitecol.completeelse:# can happen to the ruptures-only calculatorsitecol=NonereturnSourceFilter(sitecol,oq.maximum_distance)
@propertydefE(self):""" :returns: the number of stored events """try:returnlen(self.datastore['events'])exceptKeyError:return0@propertydefN(self):""" :returns: the number of sites """ifhasattr(self,'sitecol'):returnlen(self.sitecol)ifself.sitecolelse0if'sitecol'notinself.datastore:return0returnlen(self.datastore['sitecol'])@propertydeffew_sites(self):""" :returns: True if there are less than max_sites_disagg """returnlen(self.sitecol.complete)<=self.oqparam.max_sites_disagg
[docs]defcheck_overflow(self):"""Overridden in event based"""
[docs]defcheck_floating_spinning(self):oq=self.oqparamf,s=self.csm.get_floating_spinning_factors()iff!=1:logging.info('Rupture floating factor = %s',f)ifs!=1:logging.info('Rupture spinning factor = %s',s)if(f*s>=1.5andoq.no_pointsource_distanceand('classical'inoq.calculation_modeor'disaggregation'inoq.calculation_mode)):logging.info('You are not using the pointsource_distance approximation:\n''https://docs.openquake.org/oq-engine/advanced/general.html#''pointsource-distance')elif'classical'inoq.calculation_mode:ifoq.ps_grid_spacing:logging.info('Using pointsource_distance=%s + %d',oq.pointsource_distance,int(oq.ps_grid_spacing))else:logging.info('Using pointsource_distance=%s',oq.pointsource_distance)
[docs]defcheck_consequences(self):oq=self.oqparamnames=self.assetcol.array.dtype.namesforconsequenceinself.crmodel.get_consequences():ifconsequence=='homeless':if'value-residents'notinnames:msg='<field oq="residents" input="OCCUPANTS_PER_ASSET"/>'fnames='\n'.join(oq.inputs['exposure'])raiseInvalidFile("%s: Missing %s in the exposureFields node"%(fnames,msg))
[docs]defread_inputs(self):""" Read risk data and sources if any """oq=self.oqparamdist=parallel.oq_distribute()avail=psutil.virtual_memory().available/1024**3required=.25*(1ifdist=='no'elseparallel.Starmap.num_cores)if(dist=='processpool'andavail<requiredandsys.platform!='darwin'):# macos tells that there is no memory when there is, so we# must not enter in SLOW MODE theremsg=('Entering SLOW MODE. You have %.1f GB available, ''but the engine would like at least 0.25 GB per core, ''i.e. %.1f GB: ''https://github.com/gem/oq-engine/blob/master/doc/faq.md')%(avail,required)ifoq.concurrent_tasks:oq.concurrent_tasks=0logging.warning(msg)else:raiseMemoryError('You have only %.1f GB available'%avail)self._read_risk1()self._read_risk2()self._read_risk3()ifhasattr(self,'assetcol'):self.check_consequences()self.check_overflow()# check if self.sitecol is too largeif('amplification'inoq.inputsandoq.amplification_method=='kernel'):logging.info('Reading %s',oq.inputs['amplification'])df=AmplFunction.read_df(oq.inputs['amplification'])check_amplification(df,self.sitecol)self.af=AmplFunction.from_dframe(df)if(oq.calculation_mode=='disaggregation'andoq.max_sites_disagg<len(self.sitecol)):raiseValueError('Please set max_sites_disagg=%d in %s'%(len(self.sitecol),oq.inputs['job_ini']))if('source_model_logic_tree'inoq.inputsandoq.hazard_calculation_idisNone):withself.monitor('composite source model',measuremem=True):self.csm=csm=readinput.get_composite_source_model(oq,self.datastore)self.datastore['full_lt']=self.full_lt=csm.full_ltoq.mags_by_trt=csm.get_mags_by_trt(oq.maximum_distance)assertoq.mags_by_trt,'Filtered out all magnitudes!'fortrtinoq.mags_by_trt:mags=numpy.array(oq.mags_by_trt[trt])self.datastore['source_mags/'+trt]=magsinterp=oq.maximum_distance(trt)iflen(interp.x)>2:md='%s->%d, ... %s->%d, %s->%d'%(interp.x[0],interp.y[0],interp.x[-2],interp.y[-2],interp.x[-1],interp.y[-1])logging.info('max_dist %s: %s',trt,md)self.init()# do this at the end of pre-executeself.pre_checks()ifoq.calculation_mode=='multi_risk':self.gzip_inputs()# check DEFINED_FOR_REFERENCE_VELOCITYifself.amplifier:gsim_lt=readinput.get_gsim_lt(oq)self.amplifier.check(self.sitecol.vs30,oq.vs30_tolerance,gsim_lt.values)
[docs]defimport_perils(self):# called in pre_execute""" Read the hazard fields as csv files, associate them to the sites and create suitable `gmf_data` and `events`. """oq=self.oqparamperils,fnames=zip(*oq.inputs['multi_peril'].items())N=len(self.sitecol)data={'sid':self.sitecol.sids,'eid':numpy.zeros(N,numpy.uint32)}names=[]forname,fnameinzip(perils,fnames):tofloat=valid.positivefloatifname=='ASH'elsevalid.probabilitywithopen(fname)asf:header=next(f)if'geom'inheader:peril=wkt2peril(fname,name,self.sitecol)else:peril=csv2peril(fname,name,self.sitecol,tofloat,oq.asset_hazard_distance)ifperil.sum()==0:logging.warning('No sites were affected by %s'%name)data[name]=perilnames.append(name)self.datastore['events']=numpy.zeros(1,rupture.events_dt)create_gmf_data(self.datastore,[],names,data,N)
[docs]defpre_execute(self):""" Check if there is a previous calculation ID. If yes, read the inputs by retrieving the previous calculation; if not, read the inputs directly. """oq=self.oqparamself.t0=time.time()if'gmfs'inoq.inputsor'multi_peril'inoq.inputs:# read hazard from filesassertnotoq.hazard_calculation_id,('You cannot use --hc together with gmfs_file')withself.monitor('importing inputs',measuremem=True):self.read_inputs()if'gmfs'inoq.inputs:self.datastore['full_lt']=logictree.FullLogicTree.fake()ifoq.inputs['gmfs'][0].endswith('.csv'):eids=import_gmfs_csv(self.datastore,oq,self.sitecol)elifoq.inputs['gmfs'][0].endswith('.hdf5'):eids=import_gmfs_hdf5(self.datastore,oq)else:raiseNotImplementedError('Importer for %s'%oq.inputs['gmfs'])E=len(eids)ifhasattr(oq,'number_of_ground_motion_fields'):ifoq.number_of_ground_motion_fields!=E:raiseRuntimeError('Expected %d ground motion fields, found %d'%(oq.number_of_ground_motion_fields,E))else:# set the number of GMFs from the fileoq.number_of_ground_motion_fields=Eelse:self.import_perils()self.save_crmodel()elif'hazard_curves'inoq.inputs:# read hazard from fileassertnotoq.hazard_calculation_id,('You cannot use --hc together with hazard_curves')haz_sitecol=readinput.get_site_collection(oq,self.datastore.hdf5)self.load_crmodel()# must be after get_site_collectionself.read_exposure(haz_sitecol)# define .assets_by_site_mesh,pmap=readinput.get_pmap_from_csv(oq,oq.inputs['hazard_curves'])df=(~pmap).to_dframe()self.datastore.create_df('_rates',df)slices=numpy.array([(0,0,len(df))],getters.slice_dt)self.datastore['_rates/slice_by_idx']=slicesself.datastore['assetcol']=self.assetcolself.datastore['full_lt']=logictree.FullLogicTree.fake()self.datastore['trt_rlzs']=U32([[0]])self.save_crmodel()self.datastore.swmr_on()elifoq.hazard_calculation_id:self.pre_execute_from_parent()elifself.__class__.precalc:calc=calculators[self.__class__.precalc](self.oqparam,self.datastore.calc_id)calc.from_engine=self.from_enginecalc.run(remove=False)calc.datastore.close()fornamein('csm param sitecol assetcol crmodel realizations max_gb max_weight ''amplifier policy_df treaty_df full_lt exported trt_rlzs gids').split():ifhasattr(calc,name):setattr(self,name,getattr(calc,name))else:withself.monitor('importing inputs',measuremem=True):self.read_inputs()self.save_crmodel()
[docs]defpre_execute_from_parent(self):""" Read data from the parent calculation and perform some checks """oq=self.oqparamparent=datastore.read(oq.hazard_calculation_id)oqparent=parent['oqparam']if'weights'inparent:weights=numpy.unique(parent['weights'][:])if(oq.job_type=='risk'andoq.collect_rlzsandlen(weights)>1):raiseValueError('collect_rlzs=true can be specified only if ''the realizations have identical weights')ifoqparent.imtls:check_imtls(oq.imtls,oqparent.imtls)self.check_precalc(oqparent.calculation_mode)self.datastore.parent=parent# copy missing parameters from the parentif'concurrent_tasks'notinvars(self.oqparam):self.oqparam.concurrent_tasks=(self.oqparam.__class__.concurrent_tasks.default)params={name:valueforname,valueinvars(parent['oqparam']).items()ifnamenotinvars(self.oqparam)andname!='ground_motion_fields'}ifparams:self.save_params(**params)withself.monitor('importing inputs',measuremem=True):self.read_inputs()oqp=parent['oqparam']ifoqp.investigation_time!=oq.investigation_time:raiseValueError('The parent calculation was using investigation_time=%s'' != %s'%(oqp.investigation_time,oq.investigation_time))hstats,rstats=list(oqp.hazard_stats()),list(oq.hazard_stats())ifhstats!=rstats:raiseValueError('The parent calculation had stats %s != %s'%(hstats,rstats))sec_imts=set(oq.sec_imts)missing_imts=set(oq.risk_imtls)-sec_imts-set(oqp.imtls)ifoqp.imtlsandmissing_imts:raiseValueError('The parent calculation is missing the IMT(s) %s'%', '.join(missing_imts))self.save_crmodel()
[docs]definit(self):""" To be overridden to initialize the datasets needed by the calculation """oq=self.oqparamifnotoq.risk_imtls:ifself.datastore.parent:oq.risk_imtls=(self.datastore.parent['oqparam'].risk_imtls)ifhasattr(self,'csm'):self.check_floating_spinning()elif'full_lt'inself.datastore:# for instance in classical damage case_8apasselse:# build a fake; used by risk-from-file calculatorsself.datastore['full_lt']=logictree.FullLogicTree.fake()
@general.cached_propertydefR(self):""" :returns: the number of realizations """ifself.oqparam.collect_rlzsandself.oqparam.job_type=='risk':return1elif'weights'inself.datastore:returnlen(self.datastore['weights'])try:returnself.csm.full_lt.get_num_paths()exceptAttributeError:# no self.csmreturnself.datastore['full_lt'].get_num_paths()
[docs]defread_exposure(self,haz_sitecol):# after load_risk_model""" Read the exposure, the risk models and update the attributes .sitecol, .assetcol """oq=self.oqparamself.sitecol,self.assetcol,discarded,exposure= \
readinput.get_sitecol_assetcol(oq,haz_sitecol,self.crmodel.loss_types,self.datastore)# this is overriding the sitecol in test_case_miriamself.datastore['sitecol']=self.sitecoliflen(discarded):self.datastore['discarded']=discardedif'scenario'inoq.calculation_mode:# this is normal for the case of scenario from rupturelogging.info('%d assets were discarded because too far ''from the rupture; use `oq show discarded` ''to show them and `oq plot_assets` to plot ''them'%len(discarded))elifnotoq.discard_assets:# raise an errorself.datastore['assetcol']=self.assetcolraiseRuntimeError('%d assets were discarded; use `oq show discarded` to'' show them and `oq plot_assets` to plot them'%len(discarded))if'insurance'inoq.inputs:self.load_insurance_data(oq.inputs['insurance'].items())elif'reinsurance'inoq.inputs:self.load_insurance_data(oq.inputs['reinsurance'].items())returnexposure
[docs]defload_insurance_data(self,lt_fnames):""" Read the insurance files and populate the policy_df """oq=self.oqparampolicy_acc=general.AccumDict(accum=[])# here is an example of policy_idx: {'?': 0, 'B': 1, 'A': 2}if'reinsurance'inoq.inputs:loss_type=list(lt_fnames)[0][0]policy_df,treaty_df,fieldmap=readinput.get_reinsurance(oq,self.assetcol)treaties=set(treaty_df.id)assertlen(treaties)==len(treaty_df),'Not unique treaties'self.datastore.create_df('treaty_df',treaty_df,field_map=json.dumps(fieldmap))self.treaty_df=treaty_df# add policy_grp columnfor_,polinpolicy_df.iterrows():grp=reinsurance.build_policy_grp(pol,treaty_df)policy_acc['policy_grp'].append(grp)forcolinpolicy_df.columns:policy_acc[col].extend(policy_df[col])policy_acc['loss_type'].extend([loss_type]*len(policy_df))else:# insurancepolicy_idx=self.assetcol.tagcol.policy_idxforloss_type,fnameinlt_fnames:# `deductible` and `insurance_limit` as fractionspolicy_df=pandas.read_csv(fname,keep_default_na=False)policy_df['policy']=[policy_idx[pol]forpolinpolicy_df.policy]forcolin['deductible','insurance_limit']:reinsurance.check_fractions([col],[policy_df[col].to_numpy()],fname)forcolinpolicy_df.columns:policy_acc[col].extend(policy_df[col])policy_acc['loss_type'].extend([loss_type]*len(policy_df))assertpolicy_accself.policy_df=pandas.DataFrame(policy_acc)self.datastore.create_df('policy',self.policy_df)
[docs]defload_crmodel(self):# to be called before read_exposure# NB: this is called even if there is no risk model""" Read the risk models and set the attribute .crmodel. The crmodel can be empty for hazard calculations. Save the loss ratios (if any) in the datastore. """oq=self.oqparamself.crmodel=readinput.get_crmodel(oq)ifnotself.crmodel:parent=self.datastore.parentif'crm'inparent:self.crmodel=riskmodels.CompositeRiskModel.read(parent,oq)returnifoq.ground_motion_fieldsandnotoq.imtls:raiseInvalidFile('No intensity_measure_types specified in %s'%self.oqparam.inputs['job_ini'])self.save_params()# re-save oqparam
[docs]defsave_crmodel(self):""" Save the risk models in the datastore """iflen(self.crmodel):# NB: the alias dict must be filled after import_gmfalias={imt:'gmv_%d'%ifori,imtinenumerate(self.oqparam.get_primary_imtls())}forrminself.crmodel._riskmodels.values():rm.alias=aliaslogging.info('Storing risk model')attrs=self.crmodel.get_attrs()self.datastore.create_df('crm',self.crmodel.to_dframe(),'gzip',**attrs)iflen(self.crmodel.tmap_df):self.datastore.create_df('taxmap',self.crmodel.tmap_df,'gzip')
def_plot_assets(self):# called by post_risk in ARISTOTLE modeplt=plot_assets(self.datastore.calc_id,show=False,assets_only=True)bio=io.BytesIO()plt.savefig(bio,format='png',bbox_inches='tight')fig_path='png/assets.png'logging.info(f'Saving {fig_path} into the datastore')self.datastore[fig_path]=Image.open(bio)def_read_risk1(self):# read the risk model (if any) and then the site collection,# possibly extracted from the exposureoq=self.oqparamself.load_crmodel()# must be called firstif(notoq.imtlsand'shakemap'notinoq.inputsand'ins_loss'notinoq.inputsandoq.ground_motion_fields):raiseInvalidFile('There are no intensity measure types in %s'%oq.inputs['job_ini'])elifoq.hazard_calculation_id:haz_sitecol=read_parent_sitecol(oq,self.datastore)else:if'gmfs'inoq.inputsandoq.inputs['gmfs'][0].endswith('.hdf5'):haz_sitecol,_=site.merge_sitecols(oq.inputs['gmfs'],oq.mosaic_model,check_gmfs=True)else:haz_sitecol=readinput.get_site_collection(oq,self.datastore.hdf5)ifhasattr(self,'rup'):# for scenario we reduce the site collection to the sites# within the maximum distance from the rupturehaz_sitecol,_dctx=self.cmaker.filter(haz_sitecol,self.rup)haz_sitecol.make_complete()oq_hazard=(self.datastore.parent['oqparam']ifself.datastore.parentelseNone)if'exposure'inoq.inputsand'assetcol'notinself.datastore.parent:exposure=self.read_exposure(haz_sitecol)self.datastore['assetcol']=self.assetcolself.datastore['exposure']=exposureifhasattr(exposure,'exposures'):self.datastore.getitem('assetcol')['exposures']=numpy.array(exposure.exposures,hdf5.vstr)elif'assetcol'inself.datastore.parent:logging.info('Reusing hazard exposure')haz_sitecol=read_parent_sitecol(oq,self.datastore)assetcol=self.datastore.parent['assetcol']assetcol.update_tagcol(oq.aggregate_by)ifoq.region:region=wkt.loads(oq.region)self.sitecol=haz_sitecol.within(region)ifoq.shakemap_idor'shakemap'inoq.inputsoroq.shakemap_uri:self.sitecol,self.assetcol=store_gmfs_from_shakemap(self,haz_sitecol,assetcol)self.datastore['sitecol']=self.sitecolself.datastore['assetcol']=self.assetcolelifhasattr(self,'sitecol')andgeneral.not_equal(self.sitecol.sids,haz_sitecol.sids):self.assetcol=assetcol.reduce(self.sitecol)self.datastore['assetcol']=self.assetcollogging.info('Extracted %d/%d assets',len(self.assetcol),len(assetcol))else:self.assetcol=assetcolself.sitecol=haz_sitecolif('site_id'inoq.aggregate_byand'site_id'notinassetcol.tagcol.tagnames):assetcol.tagcol.add_tagname('site_id')assetcol.tagcol.site_id.extend(range(self.N))else:# no exposureifoq.hazard_calculation_id:# NB: this is tested in event_based case_27 and case_31child=readinput.get_site_collection(oq,self.datastore.hdf5)assoc_dist=(oq.region_grid_spacing*1.414ifoq.region_grid_spacingelse5)# Graeme's 5km# keep the sites of the parent close to the sites of the childself.sitecol,_array,_discarded=geo.utils.assoc(child,haz_sitecol,assoc_dist,'filter')self.datastore['sitecol']=self.sitecolelse:# base caseself.sitecol=haz_sitecolifself.sitecolandoq.imtls:logging.info('Read N=%d hazard sites and L=%d hazard levels',len(self.sitecol),oq.imtls.size)if(oq.calculation_mode.startswith(('event_based','ebrisk'))andself.N>1000andlen(oq.min_iml)==0):oq.raise_invalid(f'minimum_intensity must be set, see {EBDOC}')ifoq_hazard:parent=self.datastore.parentif'assetcol'inparent:check_time_event(oq,parent['assetcol'].occupancy_periods)elifoq.job_type=='risk'and'exposure'notinoq.inputs:raiseValueError('Missing exposure both in hazard and risk!')if(oq_hazard.time_event!='avg'andoq_hazard.time_event!=oq.time_event):raiseValueError('The risk configuration file has time_event=%s but the ''hazard was computed with time_event=%s'%(oq.time_event,oq_hazard.time_event))def_read_risk2(self):oq=self.oqparamifoq.job_type=='risk':ifnothasattr(self,'assetcol'):oq.raise_invalid('missing exposure')taxidx=self.assetcol.get_taxidx()# i.e. {'Concrete1': 1, 'Wood1': 2}tmap_df=readinput.taxonomy_mapping(oq,taxidx)self.crmodel.set_tmap(tmap_df,taxidx)risk_ids=set(tmap_df.risk_id)# check that we are covering all the taxonomies in the exposure# (exercised in EventBasedRiskTestCase::test_missing_taxonomy)missing=risk_ids-set(self.crmodel.riskids)ifself.crmodelandmissing:# in scenario_damage/case_14 the fragility model contains# 'CR+PC/LDUAL/HBET:8.19/m ' with a trailing space while# tmap.risk_id is extracted from the exposure and has no spaceraiseRuntimeError('The tmap.risk_id %s are not in the CompositeRiskModel'%missing)self.crmodel.check_risk_ids(oq.inputs)iflen(self.crmodel.riskids)>len(risk_ids):logging.info('Reducing risk model from %d to %d risk functions',len(self.crmodel.riskids),len(risk_ids))self.crmodel=self.crmodel.reduce(risk_ids)self.crmodel.tmap_df=tmap_dfdef_read_risk3(self):oq=self.oqparamif'station_data'inoq.inputs:logging.info('Reading station data from %s',oq.inputs['station_data'])# NB: get_station_data is extending the complete sitecol# which then is associated to the site parameters belowself.station_data,self.observed_imts= \
readinput.get_station_data(oq,self.sitecol,duplicates_strategy='avg')self.datastore.create_df('station_data',self.station_data)oq.observed_imts=self.observed_imtsifhasattr(self,'sitecol')andself.sitecolandnotoq.ruptures_hdf5:if'site_model'inoq.inputsoroq.impact:assoc_dist=(oq.region_grid_spacing*1.414ifoq.region_grid_spacingelse5)# Graeme's 5kmsm=readinput.get_site_model(oq,self.datastore.hdf5)ifoq.prefer_global_site_params:self.sitecol.set_global_params(oq)else:# use the site model parametersself.sitecol.assoc(sm,assoc_dist)ifoq.override_vs30:# override vs30, z1pt0 and z2pt5names=self.sitecol.array.dtype.namesself.sitecol.array['vs30']=oq.override_vs30if'z1pt0'innames:self.sitecol.calculate_z1pt0()if'z2pt5'innames:self.sitecol.calculate_z2pt5()self.datastore['sitecol']=self.sitecolifself.sitecolisnotself.sitecol.complete:self.datastore['complete']=self.sitecol.completeelif'complete'inself.datastore.parent:# fix: the sitecol is not completeself.sitecol.complete=self.datastore.parent['complete']# store amplification functions if anyif'amplification'inoq.inputs:logging.info('Reading %s',oq.inputs['amplification'])df=AmplFunction.read_df(oq.inputs['amplification'])check_amplification(df,self.sitecol)ifoq.amplification_method=='kernel':# TODO: need to add additional checks on the main calculation# methodology since the kernel method is currently tested only# for classical PSHAself.af=AmplFunction.from_dframe(df)else:self.amplifier=Amplifier(oq.imtls,df,oq.soil_intensities)# manage secondary perilssec_perils=oq.get_sec_perils()forspinsec_perils:sp.prepare(self.sitecol)# add columns as neededifsec_perils:self.datastore['sitecol']=self.sitecolmal={lt:getdefault(oq.minimum_asset_loss,lt)forltinoq.loss_types}ifmal:logging.info('minimum_asset_loss=%s',mal)oq._amplifier=self.amplifieroq._sec_perils=sec_perils# compute exposure statsifhasattr(self,'assetcol'):save_agg_values(self.datastore,self.assetcol,oq.loss_types,oq.aggregate_by,oq.max_aggregations)if'post_loss_amplification'inoq.inputs:df=pandas.read_csv(oq.inputs['post_loss_amplification']).sort_values('return_period')self.datastore.create_df('post_loss_amplification',df)
[docs]defstore_rlz_info(self,rel_ruptures):""" Save info about the composite source model inside the full_lt dataset :param rel_ruptures: dictionary TRT -> number of relevant ruptures """ifhasattr(self,'full_lt'):# no scenarioifself.full_lt.get_num_paths()==0:raiseRuntimeError('Empty logic tree: too much filtering?')else:# scenarioself.full_lt=self.datastore['full_lt']if'weights'notinself.datastore:self.datastore['weights']=F32([rlz.weight[-1]forrlzinself.full_lt.get_realizations()])ifrel_ruptures:self.check_discardable(rel_ruptures)
[docs]defcheck_discardable(self,rel_ruptures):""" Check if logic tree reduction is possible """keep_trts=set()nrups=[]forgrp_id,trt_smrsinenumerate(self.csm.get_trt_smrs()):trti,_smrs=numpy.divmod(trt_smrs,2**24)trt=self.full_lt.trts[trti[0]]nr=rel_ruptures.get(grp_id,0)nrups.append(nr)ifnr:keep_trts.add(trt)self.datastore['est_rups_by_grp']=U32(nrups)discard_trts=set(self.full_lt.trts)-keep_trtsifdiscard_trtsandself.oqparam.calculation_mode=='disaggregation':self.oqparam.discard_trts=discard_trtselifdiscard_trts:msg=('No sources for some TRTs: you should set\n''discard_trts = %s\nin %s')%(', '.join(discard_trts),self.oqparam.inputs['job_ini'])logging.warning(msg)
# to be called after csm.fix_src_offset()
[docs]defstore_source_info(self,source_data):""" Save (eff_ruptures, num_sites, calc_time) inside the source_info """# called first in preclassical, then called again in classicalfirst_time='source_info'notinself.datastoreiffirst_time:source_reader.create_source_info(self.csm,self.datastore.hdf5)self.csm.update_source_info(source_data)recs=[tuple(row)forrowinself.csm.source_info.values()]self.datastore['source_info'][:]=numpy.array(recs,source_reader.source_info_dt)# sanity check on the total weighttotw=sum(src.weightforsrcinself.csm.get_sources())saved=sum(row[source_reader.WEIGHT]forrowinself.csm.source_info.values())numpy.testing.assert_allclose(totw,saved,atol=1E-3)
[docs]defpost_process(self):""" Run postprocessing function, if any """oq=self.oqparamifoq.postproc_func:modname,funcname=oq.postproc_func.rsplit('.',1)mod=getattr(postproc,modname)func=getattr(mod,funcname)if'csm'ininspect.getfullargspec(func).args:ifhasattr(self,'csm'):# already therecsm=self.csmelse:# read the csm from the parent calculationcsm=self.datastore.parent['_csm']csm.full_lt=self.datastore.parent['full_lt'].init()oq.postproc_args['csm']=csmfunc(self.datastore,**oq.postproc_args)
[docs]classRiskCalculator(HazardCalculator):""" Base class for all risk calculators. A risk calculator must set the attributes .crmodel, .sitecol, .assetcol, .riskinputs in the pre_execute phase. """
[docs]defbuild_riskinputs(self):""" :returns: a list of RiskInputs objects, sorted by IMT. """logging.info('Building risk inputs from %d realization(s)',self.R)imtset=set(self.oqparam.imtls)|set(self.oqparam.sec_imts)ifnotset(self.oqparam.risk_imtls)&imtset:rsk=', '.join(self.oqparam.risk_imtls)haz=', '.join(imtset)raiseValueError('The IMTs in the risk models (%s) are disjoint '"from the IMTs in the hazard (%s)"%(rsk,haz))iflen(self.crmodel.tmap_df)==0:taxonomies=self.assetcol.tagcol.taxonomy[1:]taxidx={taxo:ifori,taxoinenumerate(taxonomies,1)}self.crmodel.tmap_df=readinput.taxonomy_mapping(self.oqparam,taxidx)withself.monitor('building riskinputs'):ifself.oqparam.hazard_calculation_id:dstore=self.datastore.parentelse:dstore=self.datastoreriskinputs=self._gen_riskinputs(dstore)assertriskinputslogging.info('Built %d risk inputs',len(riskinputs))self.acc=Nonereturnriskinputs
# used only for classical_risk and classical_damagedef_gen_riskinputs(self,dstore):out=[]asset_df=self.assetcol.to_dframe('site_id')getterdict=getters.CurveGetter.build(dstore)forsid,assetsinasset_df.groupby(asset_df.index):getter=getterdict[sid]# hcurves, shape (R, N)forslcingeneral.split_in_slices(len(assets),self.oqparam.assets_per_site_limit):out.append(riskinput.RiskInput(getter,assets[slc]))ifslc.stop-slc.start>=TWO16:logging.error('There are %d assets on site #%d!',slc.stop-slc.start,sid)returnout
[docs]defexecute(self):""" Parallelize on the riskinputs and returns a dictionary of results. Require a `.core_task` to be defined with signature (riskinputs, crmodel, param, monitor). """ifnothasattr(self,'riskinputs'):# in the reportwriterreturnct=self.oqparam.concurrent_tasksor1maxw=sum(ri.weightforriinself.riskinputs)/ctself.datastore.swmr_on()smap=parallel.Starmap(self.core_task.__func__,h5=self.datastore.hdf5)smap.monitor.save('crmodel',self.crmodel)forblockingeneral.block_splitter(self.riskinputs,maxw,get_weight,sort=True):smap.submit((block,self.oqparam))returnsmap.reduce(self.combine,self.acc)
[docs]defcombine(self,acc,res):""" Combine the outputs assuming acc and res are dictionaries """ifresisNone:raiseMemoryError('You ran out of memory!')returnacc+res
# NB: changes oq.imtls by side effect!
[docs]defimport_gmfs_csv(dstore,oqparam,sitecol):""" Import in the datastore a ground motion field CSV file. :param dstore: the datastore :param oqparam: an OqParam instance :param sitecol: the site collection :returns: event_ids """[fname]=oqparam.inputs['gmfs']dtdict={'sid':U32,'eid':U32,'custom_site_id':(numpy.bytes_,8),None:F32}array=hdf5.read_csv(fname,dtdict,renamedict=dict(site_id='sid',event_id='eid',rlz_id='rlzi')).arraynames=array.dtype.names# rlz_id, sid, ...ifnames[0]=='rlzi':# backward compatibilitynames=names[1:]# discard the field rlzinames=[nforninnamesifn!='custom_site_id']imts=[name.lstrip('gmv_')fornameinnamesifnamenotin('sid','eid')]oqparam.hazard_imtls={imt:[0]forimtinimts}missing=set(oqparam.imtls)-set(imts)ifmissing:raiseValueError('The calculation needs %s which is missing from %s'%(', '.join(missing),fname))imt2idx={imt:ifori,imtinenumerate(oqparam.imtls)}arr=numpy.zeros(len(array),oqparam.gmf_data_dt())fornameinnames:ifname.startswith('gmv_'):try:m=imt2idx[name[4:]]exceptKeyError:# the file contains more than enough IMTspasselse:arr[f'gmv_{m}'][:]=array[name]else:arr[name]=array[name]if'sid'notinnames:# there is a custom_site_id insteadcustoms=sitecol.complete.custom_site_idto_sid={csi:sidforsid,csiinenumerate(customs)}forcsiinnumpy.unique(array['custom_site_id']):ok=array['custom_site_id']==csiarr['sid'][ok]=to_sid[csi]n=len(numpy.unique(arr[['sid','eid']]))ifn!=len(array):raiseValueError('Duplicated site_id, event_id in %s'%fname)# store the eventseids=numpy.unique(array['eid'])eids.sort()ifeids[0]!=0:raiseValueError('The event_id must start from zero in %s'%fname)E=len(eids)events=numpy.zeros(E,rupture.events_dt)events['id']=eidslogging.info('Storing %d events, all relevant',E)dstore['events']=events# store the GMFsdic=general.group_array(arr,'sid')offset=0gmvlst=[]forsidinsitecol.complete.sids:n=len(dic.get(sid,[]))ifn:offset+=ngmvs=dic[sid]gmvlst.append(gmvs)data=numpy.concatenate(gmvlst)data.sort(order='eid')create_gmf_data(dstore,oqparam.get_primary_imtls(),oqparam.sec_imts,data=data,N=len(sitecol.complete))dstore['weights']=numpy.ones(1)returneids
def_getset_attrs(oq):# read effective_time, num_events and imts from oq.inputs['gmfs']# if the format of the file is old (v3.11) also sets the attributes# investigation_time and ses_per_logic_tree_path on `oq`num_sites=[]num_events=[]forfnameinoq.inputs['gmfs']:withhdf5.File(fname,'r')asf:try:attrs=dict(f['gmf_data'].attrs)num_events.append(attrs['num_events'])exceptKeyError:attrs={}num_events.append(len(f['events']))num_sites.append(len(f['sitecol']))returndict(effective_time=attrs.get('effective_time'),num_events=num_events,num_sites=num_sites,imts=list(oq.imtls))
[docs]defimport_sites_hdf5(dstore,fnames):""" Import site collections by merging them. :returns: a list of dictionaries local_sid->global_sid for each sitecol """iflen(fnames)==1:withhdf5.File(fnames[0],'r')asf:dstore['sitecol']=f['sitecol']convs=[]else:# merge the sitesdstore['sitecol'],convs=site.merge_sitecols(fnames)returnconvs
# tested in scenario_test/case_33
[docs]defimport_ruptures_hdf5(h5,fnames):""" Importing the ruptures and the events """size=sum(os.path.getsize(f)forfinfnames)logging.warning('Importing %d files, %s',len(fnames),general.humansize(size))rups=[]h5.create_dataset('events',(0,),rupture.events_dt,maxshape=(None,),chunks=True,compression='gzip')h5.create_dataset('rupgeoms',(0,),hdf5.vfloat32,maxshape=(None,),chunks=True)E=0offset=0forfileno,fnameinenumerate(fnames):withhdf5.File(fname,'r')asf:oq=f['oqparam']events=f['events'][:]events['id']+=Eevents['rup_id']+=offsetE+=len(events)hdf5.extend(h5['events'],events)arr=f['rupgeoms'][:]h5.save_vlen('rupgeoms',list(arr))rup=f['ruptures'][:]rup['id']+=offsetrup['geom_id']+=offsetoffset+=len(rup)rups.extend(rup)ifoq.mosaic_modeland'full_lt'inf:h5[f'full_lt/{oq.mosaic_model}']=f['full_lt']ruptures=numpy.array(rups,dtype=rups[0].dtype)ruptures['e0'][1:]=ruptures['n_occ'].cumsum()[:-1]h5.create_dataset('ruptures',data=ruptures,compression='gzip')h5.create_dataset('trt_smr_start_stop',data=idx_start_stop(ruptures['trt_smr']))
[docs]defimport_gmfs_hdf5(dstore,oqparam):""" Import in the datastore a ground motion field HDF5 file. :param dstore: the datastore :param oqparam: an OqParam instance :returns: event_ids """# NB: once we tried to use ExternalLinks to avoid copying the GMFs,# but you cannot access an external link if the file it points to is# already open, therefore you cannot run in parallel two calculations# starting from the same GMFs; moreover a calc_XXX.hdf5 downloaded# from the webui would be small but orphan of the GMFs; moreover# users changing the name of the external file or changing the# ownership would break calc_XXX.hdf5; therefore we copy everything# even if bloated (also because of SURA issues having the external# file under NFS and calc_XXX.hdf5 in the local filesystem)if'oqparam'notindstore:dstore['oqparam']=oqparamfnames=oqparam.inputs['gmfs']size=sum(os.path.getsize(f)forfinfnames)logging.warning('Importing %d files, %s',len(fnames),general.humansize(size))attrs=_getset_attrs(oqparam)E=sum(attrs['num_events'])rups=[]iflen(fnames)==1:withhdf5.File(fnames[0],'r')asf:dstore['sitecol']=f['sitecol']# complete by constructionf.copy('gmf_data',dstore.hdf5)else:# merge the sites and the gmfs, tested in scenario/case_33convs=import_sites_hdf5(dstore,fnames)create_gmf_data(dstore,oqparam.get_primary_imtls(),E=E,R=oqparam.number_of_logic_tree_samples)num_ev_rup_site=[]fileno=0nE=0forfname,conv,neinzip(fnames,convs,attrs['num_events']):logging.warning('Importing %s',fname)withhdf5.File(fname,'r')asf:fileno+=1size=len(f['gmf_data/sid'])logging.info('Reading {:_d} rows from {}'.format(size,fname))sids=numpy.array(list(conv))forslcingeneral.gen_slices(0,size,10_000_000):df=f.read_df('gmf_data',slc=slc)df=df[numpy.isin(df.sid,sids)]forsid,idxinconv.items():df.loc[df.sid==sid,'sid']=idxdf['eid']+=nE# add an offset to the event IDsforcolindf.columns:hdf5.extend(dstore[f'gmf_data/{col}'],df[col])nE+=nenum_ev_rup_site.append((nE,len(rups),len(conv)))oqparam.hazard_imtls={imt:[0]forimtinattrs['imts']}# store the eventsevents=numpy.zeros(E,rupture.events_dt)rel=numpy.unique(dstore['gmf_data/eid'][:])e=len(rel)assertE>=e,(E,e)events['id']=numpy.concatenate([rel,numpy.arange(E-e)+rel.max()+1])logging.info('Storing %d events, %d relevant',E,e)dstore['events']=eventsn=oqparam.number_of_logic_tree_samplesifn:dstore['weights']=numpy.full(n,1/n)else:dstore['weights']=numpy.ones(1)returnevents['id']
[docs]defcreate_gmf_data(dstore,prim_imts,sec_imts=(),data=None,N=None,E=None,R=None):""" Create and possibly populate the datasets in the gmf_data group """oq=dstore['oqparam']R=Rordstore['full_lt'].get_num_paths()M=len(prim_imts)ifdataisNone:N=0else:assertNisnotNone# pass len(complete) hereitems=[('sid',U32ifN==0elsedata['sid']),('eid',U32ifN==0elsedata['eid'])]forminrange(M):col=f'gmv_{m}'items.append((col,F32ifdataisNoneelsedata[col]))forimtinsec_imts:items.append((str(imt),F32ifN==0elsedata[imt]))ifoq.investigation_time:eff_time=oq.investigation_time*oq.ses_per_logic_tree_path*Relse:eff_time=0# not gzipping for speeddstore.create_df('gmf_data',items,num_events=Eorlen(dstore['events']),imts=' '.join(map(str,prim_imts)),investigation_time=oq.investigation_timeor0,effective_time=eff_time)ifoq.mea_tau_phi:dstore.create_df('mea_tau_phi',GmfComputer.mtp_dt.descr,compression='gzip')ifdataisnotNone:_df=pandas.DataFrame(dict(items))avg_gmf=numpy.zeros((2,N,M+len(sec_imts)),F32)forsid,dfin_df.groupby(_df.sid):df.pop('eid')df.pop('sid')avg_gmf[:,sid]=stats.avg_std(df.to_numpy())dstore['avg_gmf']=avg_gmf
[docs]defsave_agg_values(dstore,assetcol,lossnames,aggby,maxagg):""" Store agg_keys, agg_values. :returns: the aggkey dictionary key -> tags """ifaggby:aggids,aggtags=assetcol.build_aggids(aggby,maxagg)logging.info('Storing %d aggregation keys',len(aggids))agg_keys=['\t'.join(tags)fortagsinaggtags]dstore['agg_keys']=numpy.array(agg_keys,hdf5.vstr)if'assetcol'notinset(dstore):dstore['assetcol']=assetcolifassetcol.get_value_fields():dstore['agg_values']=assetcol.get_agg_values(aggby,maxagg)
[docs]defstore_gmfs(calc,sitecol,shakemap,gmf_dict):""" Store a ShakeMap array as a gmf_data dataset. """logging.info('Building GMFs')oq=calc.oqparamwithcalc.monitor('building/saving GMFs'):ifoq.site_effects=='no':vs30=None# do not amplifyelifoq.site_effects=='shakemap':vs30=shakemap['vs30']elifoq.site_effects=='sitemodel':vs30=sitecol.vs30imts,gmfs=to_gmfs(shakemap,gmf_dict,vs30,oq.truncation_level,oq.number_of_ground_motion_fields,oq.random_seed,oq.imtls)N,E,_M=gmfs.shapeevents=numpy.zeros(E,rupture.events_dt)events['id']=numpy.arange(E,dtype=U32)calc.datastore['events']=events# convert into an array of dtype gmv_data_dtlst=[(sitecol.sids[s],ei)+tuple(gmfs[s,ei])forei,eventinenumerate(events)forsinnumpy.arange(N,dtype=U32)]oq.hazard_imtls={str(imt):[0]forimtinimts}data=numpy.array(lst,oq.gmf_data_dt())create_gmf_data(calc.datastore,imts,data=data,N=len(sitecol.complete),R=1)calc.datastore['full_lt']=logictree.FullLogicTree.fake()calc.datastore['weights']=numpy.ones(1)
[docs]defstore_gmfs_from_shakemap(calc,haz_sitecol,assetcol):""" Enabled only if there is a shakemap parameter in the job.ini. Download, unzip, parse USGS shakemap files and build a corresponding set of GMFs which are then filtered with the hazard site collection and stored in the datastore. """oq=calc.oqparamimtls=oq.imtlsorcalc.datastore.parent['oqparam'].imtlsoq.risk_imtls={imt:list(imls)forimt,imlsinimtls.items()}logging.info('Getting/reducing shakemap')withcalc.monitor('getting/reducing shakemap'):# for instance for the test case_shakemap the haz_sitecol# has sids in range(0, 26) while sitecol.sids is# [8, 9, 10, 11, 13, 15, 16, 17, 18];# the total assetcol has 26 assets on the total sites# and the reduced assetcol has 9 assets on the reduced sitesifoq.shakemap_id:uridict={'kind':'usgs_id','id':oq.shakemap_id}elif'shakemap'inoq.inputs:uridict={'kind':'file_npy','fname':oq.inputs['shakemap']}else:uridict=oq.shakemap_urisitecol,shakemap,discarded=get_sitecol_shakemap(uridict,oq.risk_imtls,haz_sitecol,oq.asset_hazard_distance['default'])iflen(discarded):calc.datastore['discarded']=discardedassetcol.reduce_also(sitecol)calc.datastore['assetcol']=assetcollogging.info('Extracted %d assets',len(assetcol))# assemble dictionary to decide on the calculation method for the gmfsif'MMI'inoq.imtls:# calculations with MMI should be executediflen(oq.imtls)==1:# only MMI intensitiesifoq.spatial_correlation!='no'oroq.cross_correlation!='no':logging.warning('Calculations with MMI intensities do not ''support correlation. No correlations ''are applied.')gmf_dict={'kind':'mmi'}else:# there are also other intensities than MMIraiseRuntimeError('There are the following intensities in your model: %s ''Models mixing MMI and other intensities are not supported. '%', '.join(oq.imtls))else:# no MMI intensities, calculation with or without correlationifoq.impact:gmf_dict={'kind':'basic'}# possibly add correlationelifoq.spatial_correlation!='no'oroq.cross_correlation!='no':# cross correlation and/or spatial correlation after S&Hgmf_dict={'kind':'Silva&Horspool','spatialcorr':oq.spatial_correlation,'crosscorr':oq.cross_correlation,'cholesky_limit':oq.cholesky_limit}else:# no correlation required, basic calculation is fastergmf_dict={'kind':'basic'}store_gmfs(calc,sitecol,shakemap,gmf_dict)returnsitecol,assetcol
[docs]defread_parent_sitecol(oq,dstore):""" :returns: the hazard site collection in the parent calculation """withdatastore.read(oq.hazard_calculation_id)asparent:if'sitecol'inparent:haz_sitecol=parent['sitecol'].completeelse:haz_sitecol=readinput.get_site_collection(oq,dstore.hdf5)if('amplification'inoq.inputsand'ampcode'notinhaz_sitecol.array.dtype.names):haz_sitecol.add_col('ampcode',site.ampcode_dt)returnhaz_sitecol
[docs]defcreate_risk_by_event(calc):""" Created an empty risk_by_event with keys event_id, agg_id, loss_id and fields for damages, losses and consequences """oq=calc.oqparamdstore=calc.datastoretry:K=len(dstore['agg_keys'])exceptKeyError:K=0crmodel=calc.crmodelif'risk'inoq.calculation_mode:fields=[('loss',F32)]descr=[('event_id',U32),('agg_id',U32),('loss_id',U8),('variance',F32)]+fieldsdstore.create_df('risk_by_event',descr,K=K,L=len(oq.loss_types))else:# damage + consequencesdmgs=' '.join(crmodel.damage_states[1:])descr=([('event_id',U32),('agg_id',U32),('loss_id',U8)]+[(dc,F32)fordcincrmodel.get_dmg_csq()])dstore.create_df('risk_by_event',descr,K=K,L=len(oq.loss_types),limit_states=dmgs)
[docs]defrun_calc(job_ini,**kw):""" Helper to run calculations programmatically. :param job_ini: path to a job.ini file or dictionary of parameters :param kw: parameters to override :returns: a Calculator instance """withlogs.init(job_ini)aslog:log.params.update(kw)calc=calculators(log.get_oqparam(),log.calc_id)calc.run()returncalc
[docs]defexpose_outputs(dstore,owner=USER,status='complete'):""" Build a correspondence between the outputs in the datastore and the ones in the database. :param dstore: datastore """oq=dstore['oqparam']exportable=set(ekey[0]forekeyinexp)calcmode=oq.calculation_modedskeys=set(dstore)&exportable# exportable datastore keysdskeys.add('fullreport')if'avg_gmf'indskeys:dskeys.remove('avg_gmf')# hiderlzs=dstore['full_lt'].rlzsiflen(rlzs)>1:dskeys.add('realizations')hdf5=dstore.hdf5if'hcurves-stats'inhdf5or'hcurves-rlzs'inhdf5:ifoq.hazard_stats()oroq.individual_rlzsorlen(rlzs)==1:dskeys.add('hcurves')ifoq.uniform_hazard_spectra:dskeys.add('uhs')# export themifoq.hazard_maps:dskeys.add('hmaps')# export themiflen(rlzs)>1andnotoq.collect_rlzs:if'aggrisk'indstore:dskeys.add('aggrisk-stats')if'aggcurves'indstore:dskeys.add('aggcurves-stats')ifnotoq.individual_rlzs:foroutin['avg_losses-rlzs','aggrisk','aggcurves']:ifoutindskeys:dskeys.remove(out)if'curves-rlzs'indstoreandlen(rlzs)==1:dskeys.add('loss_curves-rlzs')if'curves-stats'indstoreandlen(rlzs)>1:dskeys.add('loss_curves-stats')ifoq.conditional_loss_poes:# expose loss_maps outputsif'loss_curves-stats'indstore:dskeys.add('loss_maps-stats')if'ruptures'indskeys:if'scenario'incalcmodeorlen(dstore['ruptures'])==0:# do not export, as requested by Vitorexportable.remove('ruptures')else:dskeys.add('event_based_mfd')if'hmaps'indskeysandnotoq.hazard_maps:dskeys.remove('hmaps')# do not export the hazard mapsiflogs.dbcmd('get_job',dstore.calc_id)isNone:# the calculation has not been imported in the db yetlogs.dbcmd('import_job',dstore.calc_id,oq.calculation_mode,oq.description+' [parent]',owner,status,oq.hazard_calculation_id,dstore.datadir)keysize=[]forkeyinsorted(dskeys&exportable):try:size_mb=dstore.getsize(key)/MBexcept(KeyError,AttributeError):size_mb=-1ifsize_mb:keysize.append((key,size_mb))ds_size=dstore.getsize()/MBlogs.dbcmd('create_outputs',dstore.calc_id,keysize,ds_size)