Source code for openquake.calculators.event_based_risk
# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2015-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importtimeimportos.pathimportloggingimportoperatorfromfunctoolsimportpartialimportnumpyimportpandasfromscipyimportsparsefromopenquake.baselibimporthdf5,performance,general,python3compat,configfromopenquake.hazardlibimportstats,InvalidFilefromopenquake.commonlib.calcimportstarmap_from_gmfs,compactify3fromopenquake.risklib.scientificimport(total_losses,insurance_losses,MultiEventRNG,LOSSID)fromopenquake.calculatorsimportbase,event_basedfromopenquake.calculators.post_riskimport(PostRiskCalculator,post_aggregate,fix_dtypes,fix_investigation_time)U8=numpy.uint8U16=numpy.uint16U32=numpy.uint32U64=numpy.uint64F32=numpy.float32F64=numpy.float64TWO16=2**16TWO32=U64(2**32)get_n_occ=operator.itemgetter(1)
[docs]deffast_agg(keys,values,correl,li,acc):""" :param keys: an array of N uint64 numbers encoding (event_id, agg_id) :param values: an array of (N, D) floats :param correl: True if there is asset correlation :param li: loss type index :param acc: dictionary unique key -> array(L, D) """ukeys,avalues=general.fast_agg2(keys,values)ifcorrel:# restore the variancesavalues[:,0]=avalues[:,0]**2forukey,avalueinzip(ukeys,avalues):acc[ukey][li]+=avalue
[docs]defaverage_losses(ln,alt,rlz_id,AR,collect_rlzs):""" :returns: a sparse coo matrix with the losses per asset and realization """ifcollect_rlzsorlen(numpy.unique(rlz_id))==1:ldf=pandas.DataFrame(dict(aid=alt.aid.to_numpy(),loss=alt.loss.to_numpy()))tot=ldf.groupby('aid').loss.sum()aids=tot.index.to_numpy()rlzs=numpy.zeros_like(tot)returnsparse.coo_matrix((tot.to_numpy(),(aids,rlzs)),AR)else:ldf=pandas.DataFrame(dict(aid=alt.aid.to_numpy(),loss=alt.loss.to_numpy(),rlz=rlz_id[U32(alt.eid)]))# NB: without the U32 here# the SURA calculation would fail with alt.eid being F64 (?)tot=ldf.groupby(['aid','rlz']).loss.sum()aids,rlzs=zip(*tot.index)returnsparse.coo_matrix((tot.to_numpy(),(aids,rlzs)),AR)
[docs]defdebugprint(ln,asset_loss_table,adf):""" Print risk_by_event in a reasonable format. To be used with --nd """if'+'inlnorln=='claim':df=asset_loss_table.set_index('aid').rename(columns={'loss':ln})df['asset_id']=python3compat.decode(adf.id[df.index].to_numpy())deldf['variance']print(df)
[docs]defaggreg(outputs,crmodel,ARK,aggids,rlz_id,ideduc,monitor):""" :returns: (avg_losses, agg_loss_table) """mon_agg=monitor('aggregating losses',measuremem=False)mon_avg=monitor('averaging losses',measuremem=False)oq=crmodel.oqparamxtypes=oq.ext_loss_typesifideduc:xtypes.append('claim')loss_by_AR={ln:[]forlninxtypes}correl=int(oq.asset_correlation)(A,R,K),L=ARK,len(xtypes)acc=general.AccumDict(accum=numpy.zeros((L,2)))# u8idx->arrayvalue_cols=['variance','loss']foroutinoutputs:forli,lninenumerate(xtypes):iflnnotinoutorlen(out[ln])==0:continuealt=out[ln]ifoq.avg_losses:withmon_avg:coo=average_losses(ln,alt,rlz_id,(A,R),oq.collect_rlzs)loss_by_AR[ln].append(coo)withmon_agg:ifcorrel:# use sigma^2 = (sum sigma_i)^2alt['variance']=numpy.sqrt(alt.variance)eids=alt.eid.to_numpy()*TWO32# U64values=numpy.array([alt[col]forcolinvalue_cols]).T# aggregate all assetsfast_agg(eids+U64(K),values,correl,li,acc)iflen(aggids):# aggregate assets for each tag combinationaids=alt.aid.to_numpy()forkidsinaggids[:,aids]:fast_agg(eids+U64(kids),values,correl,li,acc)lis=range(len(xtypes))withmonitor('building event loss table',measuremem=True):dic=general.AccumDict(accum=[])forukey,arrinacc.items():eid,kid=divmod(ukey,TWO32)forliinlis:ifarr[li].any():dic['event_id'].append(eid)dic['agg_id'].append(kid)dic['loss_id'].append(LOSSID[xtypes[li]])forc,colinenumerate(['variance','loss']):dic[col].append(arr[li,c])fix_dtypes(dic)returnloss_by_AR,pandas.DataFrame(dic)
[docs]defebr_from_gmfs(sbe,oqparam,dstore,monitor):""" :param slice_by_event: composite array with fields 'start', 'stop' :param oqparam: OqParam instance :param dstore: DataStore instance from which to read the GMFs :param monitor: a Monitor instance :yields: dictionary of arrays, the output of event_based_risk """ifdstore.parent:dstore.parent.open('r')gmfcols=oqparam.gmf_data_dt().nameswithdstore:# this is fast compared to reading the GMFsrisk_sids=monitor.read('sids')s0,s1=sbe[0]['start'],sbe[-1]['stop']t0=time.time()haz_sids=dstore['gmf_data/sid'][s0:s1]dt=time.time()-t0idx,=numpy.where(numpy.isin(haz_sids,risk_sids))iflen(idx)==0:return{}# print('waiting %.1f' % dt)time.sleep(dt)withdstore,monitor('reading GMFs',measuremem=True):start,stop=idx.min(),idx.max()+1dic={}forcolingmfcols:ifcol=='sid':dic[col]=haz_sids[idx]else:dset=dstore['gmf_data/'+col]dic[col]=dset[s0+start:s0+stop][idx-start]df=pandas.DataFrame(dic)deldic# if max_gmvs_chunk is too small, there is a huge data transfer in# avg_losses and the calculation may hang; if too large, run out of memoryslices=performance.split_slices(df.eid.to_numpy(),oqparam.max_gmvs_chunk)fors0,s1inslices:yieldevent_based_risk(df[s0:s1],oqparam,monitor)
[docs]defevent_based_risk(df,oqparam,monitor):""" :param df: a DataFrame of GMFs with fields sid, eid, gmv_X, ... :param oqparam: parameters coming from the job.ini :param monitor: a Monitor instance :returns: a dictionary of arrays """ifos.environ.get('OQ_DEBUG_SITE'):print(df)withmonitor('reading crmodel',measuremem=True):crmodel=monitor.read('crmodel')ideduc=monitor.read('assets/ideductible')aggids=monitor.read('aggids')rlz_id=monitor.read('rlz_id')weights=[1]ifoqparam.collect_rlzselsemonitor.read('weights')ARK=(oqparam.A,len(weights),oqparam.K)ifoqparam.ignore_master_seedoroqparam.ignore_covs:rng=Noneelse:rng=MultiEventRNG(oqparam.master_seed,df.eid.unique(),int(oqparam.asset_correlation))outs=gen_outputs(df,crmodel,rng,monitor)avg,alt=aggreg(outs,crmodel,ARK,aggids,rlz_id,ideduc.any(),monitor)returndict(avg=avg,alt=alt,gmf_bytes=df.memory_usage().sum())
[docs]defgen_outputs(df,crmodel,rng,monitor):""" :param df: GMF dataframe (a slice of events) :param crmodel: CompositeRiskModel instance :param rng: random number generator :param monitor: Monitor instance :yields: one output per taxonomy and slice of events """mon_risk=monitor('computing risk',measuremem=False)fil_mon=monitor('filtering GMFs',measuremem=False)ass_mon=monitor('reading assets',measuremem=False)sids=df.sid.to_numpy()fors0,s1inmonitor.read('start-stop'):withass_mon:assets=monitor.read('assets',slice(s0,s1)).set_index('ordinal')if'ID_0'notinassets.columns:assets['ID_0']=0for(id0,taxo),adfinassets.groupby(['ID_0','taxonomy']):# multiple countries are tested in impact/case_02country=crmodel.countries[id0]withfil_mon:# *crucial* for the performance of the next stepgmf_df=df[numpy.isin(sids,adf.site_id.unique())]iflen(gmf_df)==0:# common enoughcontinuewithmon_risk:[out]=crmodel.get_outputs(adf,gmf_df,crmodel.oqparam._sec_losses,rng,country)yieldout
def_tot_loss_unit_consistency(units,total_losses,loss_types):total_losses_units=set()forseparate_ltintotal_losses.split('+'):assertseparate_ltinloss_types,(separate_lt,loss_types)forunit,ltinzip(units,loss_types):ifseparate_lt==lt:total_losses_units.add(unit)iflen(total_losses_units)!=1:logging.warning('The units of the single components of the total losses'' are not homogeneous: %s" '%total_losses_units)
[docs]defset_oqparam(oq,assetcol,dstore):""" Set the attributes .M, .K, .A, .ideduc, ._sec_losses """try:K=len(dstore['agg_keys'])exceptKeyError:K=0sec_losses=[]# one insured loss for each loss type with a policytry:policy_df=dstore.read_df('policy')exceptKeyError:passelse:if'reinsurance'notinoq.inputs:sec_losses.append(partial(insurance_losses,policy_df=policy_df))ideduc=assetcol['ideductible'].any()cc=dstore['exposure'].cost_calculatorifoq.total_lossesandoq.total_loss_typesandcc.cost_types:# cc.cost_types is empty in scenario_damage/case_21 (consequences)units=cc.get_units(oq.total_loss_types)_tot_loss_unit_consistency(units.split(),oq.total_losses,oq.total_loss_types)sec_losses.append(partial(total_losses,kind=oq.total_losses,ideduc=ideduc))elifideduc:# subtract the insurance deductible for a single loss_type[lt]=oq.loss_typessec_losses.append(partial(total_losses,kind=lt,ideduc=ideduc))oq._sec_losses=sec_lossesoq.ideduc=int(ideduc)oq.M=len(oq.all_imts())oq.K=Koq.A=assetcol['ordinal'].max()+1
[docs]defebrisk(proxies,cmaker,sitecol,stations,dstore,monitor):""" :param proxies: list of RuptureProxies with the same trt_smr :param cmaker: ContextMaker instance associated to the trt_smr :param stations: empty pair or (station_data, station_sitecol) :param monitor: a Monitor instance :returns: a dictionary of arrays """cmaker.oq.ground_motion_fields=Trueforblockingeneral.block_splitter(proxies,20_000,event_based.rup_weight):fordicinevent_based.event_based(block,cmaker,sitecol,stations,dstore,monitor):iflen(dic['gmfdata']):gmf_df=pandas.DataFrame(dic['gmfdata'])yieldevent_based_risk(gmf_df,cmaker.oq,monitor)
[docs]@base.calculators.add('ebrisk','scenario_risk','event_based_risk')classEventBasedRiskCalculator(event_based.EventBasedCalculator):""" Event based risk calculator generating event loss tables """core_task=ebriskis_stochastic=Trueprecalc='event_based'accept_precalc=['scenario','event_based','event_based_risk','ebrisk']
[docs]defsave_tmp(self,monitor):""" Save some useful data in the file calc_XXX_tmp.hdf5 """oq=self.oqparammonitor.save('sids',self.sitecol.sids)adf=self.assetcol.to_dframe().sort_values(['taxonomy','ordinal'])# NB: this is subtle! without the second ordering by 'ordinal'# the asset dataframe will be ordered differently on AMD machines# with respect to Intel machines, depending on the machine, thus# causing different lossesdeladf['id']monitor.save('assets',adf)if'ID_0'inself.assetcol.tagnames:self.crmodel.countries=self.assetcol.tagcol.ID_0else:self.crmodel.countries=['?']# storing start-stop indices in a smart way, so that the assets are# read from the workers in chunks of at most 1 million elementstss=performance.idx_start_stop(adf.taxonomy.to_numpy())monitor.save('start-stop',compactify3(tss))monitor.save('crmodel',self.crmodel)monitor.save('rlz_id',self.rlzs)monitor.save('weights',self.datastore['weights'][:])ifoq.K:aggids,_=self.assetcol.build_aggids(oq.aggregate_by)else:aggids=()monitor.save('aggids',aggids)
[docs]defpre_execute(self):oq=self.oqparamifoq.calculation_mode=='ebrisk':oq.ground_motion_fields=Falseparent=self.datastore.parentifparent:self.datastore['full_lt']=parent['full_lt']self.parent_events=ne=len(parent['events'])logging.info('There are %d ruptures and %d events',len(parent['ruptures']),ne)else:self.parent_events=Nonesuper().pre_execute()parentdir=(os.path.dirname(self.datastore.ppath)ifself.datastore.ppathelseNone)oq.hdf5path=self.datastore.filenameoq.parentdir=parentdirlogging.info('There are {:_d} ruptures and {:_d} events'.format(len(self.datastore['ruptures']),len(self.datastore['events'])))self.events_per_sid=numpy.zeros(self.N,U32)self.datastore.swmr_on()set_oqparam(oq,self.assetcol,self.datastore)self.A=A=len(self.assetcol)self.L=L=len(oq.loss_types)ELT=len(oq.ext_loss_types)ifoq.calculation_mode=='event_based_risk'andoq.avg_losses:R=1ifoq.collect_rlzselseself.Rlogging.info('Transfering %s per core in avg_losses',general.humansize(A*ELT*8*R))ifA*ELT*8>int(config.memory.avg_losses_max):raiseValueError('For large exposures you must set ''avg_losses=false')elifA*ELT*self.R*8>int(config.memory.avg_losses_max):raiseValueError('For large exposures you must set ''collect_rlzs = true')if(oq.aggregate_byandself.E*A>oq.max_potential_gmfsandall(val==0forvalinoq.minimum_asset_loss.values())):logging.warning('The calculation is really big; consider setting ''minimum_asset_loss')base.create_risk_by_event(self)self.rlzs=self.datastore['events']['rlz_id']self.num_events=numpy.bincount(self.rlzs,minlength=self.R)self.xtypes=oq.ext_loss_typesifself.assetcol['ideductible'].any():self.xtypes.append('claim')ifoq.avg_losses:self.create_avg_losses()alt_nbytes=4*self.E*Lifalt_nbytes/(oq.concurrent_tasksor1)>TWO32:raiseRuntimeError('The risk_by_event is too big to be transfer''ed with %d tasks'%oq.concurrent_tasks)
[docs]defexecute(self):""" Compute risk from GMFs or ruptures depending on what is stored """oq=self.oqparamself.gmf_bytes=0ifoq.calculation_mode=='ebrisk'or'gmf_data'notinself.datastore:# start from rupturesif(oq.ground_motion_fieldsand'gsim_logic_tree'notinoq.inputsandoq.gsim=='[FromFile]'):raiseInvalidFile('%s: missing gsim or gsim_logic_tree_file'%oq.inputs['job_ini'])elifnothasattr(oq,'maximum_distance'):raiseInvalidFile('Missing maximum_distance in %s'%oq.inputs['job_ini'])full_lt=self.datastore['full_lt']smap=event_based.starmap_from_rups(ebrisk,oq,full_lt,self.sitecol,self.datastore,self.save_tmp)smap.reduce(self.agg_dicts)ifself.gmf_bytes==0:raiseRuntimeError('No GMFs were generated, perhaps they were ''all below the minimum_intensity threshold')logging.info('Produced %s of GMFs',general.humansize(self.gmf_bytes))else:# start from GMFssmap=starmap_from_gmfs(ebr_from_gmfs,oq,self.datastore,self._monitor)self.save_tmp(smap.monitor)smap.reduce(self.agg_dicts)ifself.parent_events:assertself.parent_events==len(self.datastore['events'])return1
[docs]deflog_info(self,eids):""" Printing some information about the risk calculation """logging.info('Processing {:_d} rows of gmf_data'.format(len(eids)))E=len(numpy.unique(eids))K=self.oqparam.Klogging.info('Risk parameters (rel_E={:_d}, K={:_d}, L={})'.format(E,K,self.L))
[docs]defpost_execute(self,dummy):""" Compute and store average losses from the risk_by_event dataset, and then loss curves and maps. """oq=self.oqparamK=self.datastore['risk_by_event'].attrs.get('K',0)upper_limit=self.E*(K+1)*len(self.xtypes)ifupper_limit<1E7:# sanity check on risk_by_event if not too largealt=self.datastore.read_df('risk_by_event')size=len(alt)assertsize<=upper_limit,(size,upper_limit)# sanity check on uniqueness by (agg_id, loss_id, event_id)arr=alt[['agg_id','loss_id','event_id']].to_numpy()uni,cnt=numpy.unique(arr,axis=0,return_counts=True)iflen(uni)<len(arr):dupl=uni[cnt>1]# (agg_id, loss_id, event_id)raiseRuntimeError('risk_by_event contains %d duplicates for event %s'%(len(arr)-len(uni),dupl[0,2]))s=oq.hazard_stats()ifs:_statnames,statfuncs=zip(*s.items())weights=self.datastore['weights'][:]ifoq.avg_losses:forltinself.xtypes:al=self.avg_losses[lt]# shape (A, R)forrinrange(self.R):al[:,r]*=self.avg_ratio[r]name='avg_losses-rlzs/'+ltlogging.info(f'Storing {name}')self.datastore[name][:]=alifsandself.R>1:self.datastore[name.replace('-rlzs','-stats')][:]= \
stats.compute_stats2(al,statfuncs,weights)self.build_aggcurves()ifoq.reaggregate_by:post_aggregate(self.datastore.calc_id,','.join(oq.reaggregate_by))