Source code for openquake.calculators.event_based_damage
# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2021 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importos.pathimportloggingimportnumpyimportpandasfromopenquake.baselibimporthdf5,generalfromopenquake.hazardlib.statsimportcompute_stats2fromopenquake.risklibimportscientific,connectivityfromopenquake.commonlibimportdatastore,calcfromopenquake.calculatorsimportbasefromopenquake.calculators.event_based_riskimportEventBasedRiskCalculatorfromopenquake.calculators.post_riskimport(get_loss_builder,fix_dtypes,PostRiskCalculator)fromopenquake.calculators.exportimportDISPLAY_NAMEU8=numpy.uint8U16=numpy.uint16U32=numpy.uint32F32=numpy.float32
[docs]defzero_dmgcsq(A,R,L,crmodel):""" :returns: an array of zeros of shape (A, R, L, Dc) """dmg_csq=crmodel.get_dmg_csq()Dc=len(dmg_csq)+1# damages + consequencesP=len(crmodel.perils)returnnumpy.zeros((P,A,R,L,Dc),F32)
[docs]defdamage_from_gmfs(gmfslices,oqparam,dstore,monitor):""" :param gmfslices: an array (S, 3) with S slices (start, stop, weight) :param oqparam: OqParam instance :param dstore: DataStore instance from which to read the GMFs :param monitor: a Monitor instance :returns: a dictionary of arrays, the output of event_based_damage """ifdstore.parent:dstore.parent.open('r')dfs=[]withdstore,monitor('reading data',measuremem=True):forgmfsliceingmfslices:slc=slice(gmfslice[0],gmfslice[1])dfs.append(dstore.read_df('gmf_data',slc=slc))df=pandas.concat(dfs)returnevent_based_damage(df,oqparam,dstore,monitor)
[docs]defevent_based_damage(df,oq,dstore,monitor):""" :param df: a DataFrame of GMFs with fields sid, eid, imt, ... :param oq: parameters coming from the job.ini :param dstore: a DataStore instance :param monitor: a Monitor instance :returns: (damages (eid, kid) -> LDc plus damages (A, Dc)) """mon=monitor('computing dds',measuremem=False)withmonitor('reading gmf_data'):ifoq.parentdir:dstore=datastore.read(oq.hdf5path,parentdir=oq.parentdir)else:dstore.open('r')assetcol=dstore['assetcol']crmodel=monitor.read('crmodel')aggids=monitor.read('aggids')dmgcsq=zero_dmgcsq(len(assetcol),oq.R,oq.L,crmodel)P,_A,R,L,Dc=dmgcsq.shapeD=len(crmodel.damage_states)rlzs=dstore['events']['rlz_id']dddict=general.AccumDict(accum=numpy.zeros((L,Dc),F32))# eid, kidforsid,asset_dfinassetcol.to_dframe().groupby('site_id'):# working one site at the timegmf_df=df[df.sid==sid]iflen(gmf_df)==0:continueeids=gmf_df.eid.to_numpy()E=len(eids)ifnotoq.float_dmg_dist:rng=scientific.MultiEventRNG(oq.master_seed,numpy.unique(eids))else:rng=Nonefortaxo,adfinasset_df.groupby('taxonomy'):aids=adf.index.to_numpy()A=len(aids)withmon:rc=scientific.RiskComputer(crmodel,taxo)dd5=rc.get_dd5(adf,gmf_df,rng,Dc-D,crmodel)# (A, E, L, Dc)ifR==1:# possibly because of collect_rlzsdmgcsq[:,aids,0]+=dd5.sum(axis=2)else:fore,rlzinenumerate(rlzs[eids]):dmgcsq[:,aids,rlz]+=dd5[:,:,e]ifP>1:dd4=numpy.empty(dd5.shape[1:])forliinrange(L):forainrange(A):foreinrange(E):dd4[a,e,li,:D]=scientific.compose_dds(dd5[:,a,e,li,:D])dd4[a,e,li,D:]=dd5[:,a,e,li,D:].max(axis=0)else:dd4=dd5[0]tot=dd4.sum(axis=0)# (E, L, Dc)fore,eidinenumerate(eids):dddict[eid,oq.K]+=tot[e]ifoq.K:forkidsinaggids:fora,aidinenumerate(aids):dddict[eid,kids[aid]]+=dd4[a,e]csqidx={dc:i+1fori,dcinenumerate(crmodel.get_dmg_csq())}return_dframe(dddict,csqidx,oq.loss_types),dmgcsq
def_dframe(dddic,csqidx,loss_types):# convert {(eid, kid): dd} into a DataFrame (agg_id, event_id, loss_id)dic=general.AccumDict(accum=[])for(eid,kid),ddinsorted(dddic.items()):forli,ltinenumerate(loss_types):dic['agg_id'].append(kid)dic['event_id'].append(eid)dic['loss_id'].append(scientific.LOSSID[lt])forcname,ciincsqidx.items():dic[cname].append(dd[li,ci])fix_dtypes(dic)returnpandas.DataFrame(dic)
[docs]defcreate_avg_losses(self):""" Do nothing: there are no losses in the DamageCalculator """
[docs]defexecute(self):""" Compute risk from GMFs or ruptures depending on what is stored """oq=self.oqparamnumber=self.assetcol['value-number']num_floats=(U32(number)!=number).sum()ifoq.discrete_damage_distributionandnum_floats:raiseValueError('The exposure contains %d non-integer asset numbers: ''you cannot use dicrete_damage_distribution=true'%num_floats)oq.R=self.R# 1 if collect_rlzsoq.float_dmg_dist=notoq.discrete_damage_distributionifoq.hazard_calculation_id:oq.parentdir=os.path.dirname(self.datastore.ppath)ifoq.investigation_time:# event basedself.builder=get_loss_builder(self.datastore,oq)# checkself.dmgcsq=zero_dmgcsq(len(self.assetcol),self.R,oq.L,self.crmodel)ifoq.K:aggids,_=self.assetcol.build_aggids(oq.aggregate_by)else:aggids=0smap=calc.starmap_from_gmfs(damage_from_gmfs,oq,self.datastore,self._monitor)smap.monitor.save('aggids',aggids)smap.monitor.save('assets',self.assetcol.to_dframe('id'))smap.monitor.save('crmodel',self.crmodel)returnsmap.reduce(self.combine)
[docs]defcombine(self,acc,res):""" :param acc: unused :param res: DataFrame with fields (event_id, agg_id, loss_id, dmg1 ...) plus array with damages and consequences of shape (A, Dc) Combine the results and grows risk_by_event with fields (event_id, agg_id, loss_id) and (dmg_0, dmg_1, dmg_2, ...) """df,dmgcsq=resself.dmgcsq+=dmgcsqwithself.monitor('saving risk_by_event',measuremem=True):fornameindf.columns:dset=self.datastore['risk_by_event/'+name]hdf5.extend(dset,df[name].to_numpy())return1
[docs]defpost_execute(self,dummy):""" Store damages-rlzs/stats, aggrisk and aggcurves """oq=self.oqparam# no damage check, perhaps the sites where disjoint from gmf_dataifself.dmgcsq[:,:,:,:,1:].sum()==0:haz_sids=self.datastore['gmf_data/sid'][:]count=numpy.isin(haz_sids,self.sitecol.sids).sum()ifcount==0:raiseValueError('The sites in gmf_data are disjoint from the ''site collection!?')else:logging.warning('There is no damage, perhaps the hazard is too small?')returnprc=PostRiskCalculator(oq,self.datastore.calc_id)prc.assetcol=self.assetcolifhasattr(self,'exported'):prc.exported=self.exportedprc.pre_execute()res=prc.execute()prc.post_execute(res)P,_A,_R,L,_Dc=self.dmgcsq.shapeD=len(self.crmodel.damage_states)# fix no_damage distribution for events with zero damagenumber=self.assetcol['value-number']forpinrange(P):forrinrange(self.R):self.dmgcsq[p,:,r]/=prc.num_events[r]ndamaged=self.dmgcsq[p,:,r,:,1:D].sum(axis=2)# shape (A, L)forliinrange(L):# set no_damageself.dmgcsq[p,:,r,li,0]=number-ndamaged[:,li]assert(self.dmgcsq>=0).all()# sanity checkself.datastore['damages-rlzs']=arr=self.crmodel.to_multi_damage(self.dmgcsq)s=oq.hazard_stats()ifsandself.R>1:_statnames,statfuncs=zip(*s.items())weights=self.datastore['weights'][:]self.datastore.hdf5.create_dataset('damages-stats',data=compute_stats2(arr,statfuncs,weights))ifoq.infrastructure_connectivity_analysis:logging.info('Running connectivity analysis')results=connectivity.analysis(self.datastore)self._store_connectivity_analysis_results(results)