Source code for openquake.calculators.event_based_damage
# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2021 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importos.pathimportloggingimportnumpyimportpandasfromopenquake.baselibimporthdf5,generalfromopenquake.hazardlib.statsimportset_rlzs_statsfromopenquake.risklibimportscientific,connectivityfromopenquake.commonlibimportdatastore,calcfromopenquake.calculatorsimportbasefromopenquake.calculators.event_based_riskimportEventBasedRiskCalculatorfromopenquake.calculators.post_riskimport(get_loss_builder,fix_dtypes,PostRiskCalculator)fromopenquake.calculators.exportimportDISPLAY_NAMEU8=numpy.uint8U16=numpy.uint16U32=numpy.uint32F32=numpy.float32
[docs]defzero_dmgcsq(A,R,crmodel):""" :returns: an array of zeros of shape (A, R, L, Dc) """dmg_csq=crmodel.get_dmg_csq()L=len(crmodel.loss_types)Dc=len(dmg_csq)+1# damages + consequencesreturnnumpy.zeros((A,R,L,Dc),F32)
[docs]defdamage_from_gmfs(gmfslices,oqparam,dstore,monitor):""" :param gmfslices: an array (S, 3) with S slices (start, stop, weight) :param oqparam: OqParam instance :param dstore: DataStore instance from which to read the GMFs :param monitor: a Monitor instance :returns: a dictionary of arrays, the output of event_based_damage """ifdstore.parent:dstore.parent.open('r')dfs=[]withdstore,monitor('reading data',measuremem=True):forgmfsliceingmfslices:slc=slice(gmfslice[0],gmfslice[1])dfs.append(dstore.read_df('gmf_data',slc=slc))df=pandas.concat(dfs)returnevent_based_damage(df,oqparam,dstore,monitor)
def_update(asset_df,gmf_df,aggids,allrlzs,sec_sims,crmodel,ci,R,Dc,dmgcsq,dddict):oq=crmodel.oqparamloss_types=oq.loss_typeseids=gmf_df.eid.to_numpy()ifR>1:rlzs=allrlzs[eids]ifsec_simsornotoq.float_dmg_dist:rng=scientific.MultiEventRNG(oq.master_seed,numpy.unique(eids))forprob_field,num_simsinsec_sims:probs=gmf_df[prob_field].to_numpy()# LiqProbifnotoq.float_dmg_dist:dprobs=rng.boolean_dist(probs,num_sims).mean(axis=1)fortaxo,adfinasset_df.groupby('taxonomy'):out=crmodel.get_output(adf,gmf_df)aids=adf.index.to_numpy()assets=adf.to_records()ifoq.float_dmg_dist:number=assets['value-number']else:number=U32(assets['value-number'])forlti,ltinenumerate(loss_types):fractions=out[lt]Asid,E,D=fractions.shapeassertlen(eids)==Ed3=numpy.zeros((Asid,E,Dc),F32)ifoq.float_dmg_dist:d3[:,:,:D]=fractionsforainrange(Asid):d3[a]*=number[a]else:# this is a performance distaster; for instance# the Messina test in oq-risk-tests becomes 12x# slower even if it has only 25_736 assetsd3[:,:,:D]=rng.discrete_dmg_dist(eids,fractions,number)# secondary perils and consequencesfora,assetinenumerate(assets):ifsec_sims:fordinrange(1,D):# doing the mean on the secondary simulationsifoq.float_dmg_dist:d3[a,:,d]*=probselse:d3[a,:,d]*=dprobscsq=crmodel.compute_csq(asset,d3[a,:,:D]/number[a],lt,oq.time_event)forname,valuesincsq.items():d3[a,:,ci[name]]=valuesifR==1:dmgcsq[aids,0,lti]+=d3.sum(axis=1)else:fore,rlzinenumerate(rlzs):dmgcsq[aids,rlz,lti]+=d3[:,e]tot=d3.sum(axis=0)# sum on the assetsfore,eidinenumerate(eids):dddict[eid,oq.K][lti]+=tot[e]ifoq.K:forkidsinaggids:fora,aidinenumerate(aids):dddict[eid,kids[aid]][lti]+=d3[a,e]
[docs]defevent_based_damage(df,oq,dstore,monitor):""" :param df: a DataFrame of GMFs with fields sid, eid, gmv_X, ... :param oq: parameters coming from the job.ini :param dstore: a DataStore instance :param monitor: a Monitor instance :returns: (damages (eid, kid) -> LDc plus damages (A, Dc)) """mon_risk=monitor('computing risk',measuremem=False)withmonitor('reading gmf_data'):ifoq.parentdir:dstore=datastore.read(oq.hdf5path,parentdir=oq.parentdir)else:dstore.open('r')assetcol=dstore['assetcol']ifoq.K:# TODO: move this in the controller!aggids,_=assetcol.build_aggids(oq.aggregate_by,oq.max_aggregations)else:aggids=numpy.zeros(len(assetcol),U16)crmodel=monitor.read('crmodel')sec_sims=oq.secondary_simulations.items()dmg_csq=crmodel.get_dmg_csq()ci={dc:i+1fori,dcinenumerate(dmg_csq)}dmgcsq=zero_dmgcsq(len(assetcol),oq.R,crmodel)_A,R,L,Dc=dmgcsq.shapeifR>1:allrlzs=dstore['events']['rlz_id']else:allrlzs=[0]assertlen(oq.loss_types)==Lwithmon_risk:dddict=general.AccumDict(accum=numpy.zeros((L,Dc),F32))# eid, kidforsid,asset_dfinassetcol.to_dframe().groupby('site_id'):# working one site at the timegmf_df=df[df.sid==sid]iflen(gmf_df)==0:continue_update(asset_df,gmf_df,aggids,allrlzs,sec_sims,crmodel,ci,R,Dc,dmgcsq,dddict)return_dframe(dddict,ci,oq.loss_types),dmgcsq
def_dframe(adic,ci,loss_types):# convert {eid, kid: dd} into a DataFrame (agg_id, event_id, loss_id)dic=general.AccumDict(accum=[])for(eid,kid),ddinsorted(adic.items()):forli,ltinenumerate(loss_types):dic['agg_id'].append(kid)dic['event_id'].append(eid)dic['loss_id'].append(scientific.LOSSID[lt])forsname,siinci.items():dic[sname].append(dd[li,si])fix_dtypes(dic)returnpandas.DataFrame(dic)
[docs]defcreate_avg_losses(self):""" Do nothing: there are no losses in the DamageCalculator """
[docs]defexecute(self):""" Compute risk from GMFs or ruptures depending on what is stored """oq=self.oqparamnumber=self.assetcol['value-number']num_floats=(U32(number)!=number).sum()ifoq.discrete_damage_distributionandnum_floats:raiseValueError('The exposure contains %d non-integer asset numbers: ''you cannot use dicrete_damage_distribution=true'%num_floats)oq.R=self.R# 1 if collect_rlzsoq.float_dmg_dist=notoq.discrete_damage_distributionifoq.hazard_calculation_id:oq.parentdir=os.path.dirname(self.datastore.ppath)ifoq.investigation_time:# event basedself.builder=get_loss_builder(self.datastore,oq)# checkself.dmgcsq=zero_dmgcsq(len(self.assetcol),self.R,self.crmodel)smap=calc.starmap_from_gmfs(damage_from_gmfs,oq,self.datastore,self._monitor)smap.monitor.save('assets',self.assetcol.to_dframe('id'))smap.monitor.save('crmodel',self.crmodel)returnsmap.reduce(self.combine)
[docs]defcombine(self,acc,res):""" :param acc: unused :param res: DataFrame with fields (event_id, agg_id, loss_id, dmg1 ...) plus array with damages and consequences of shape (A, Dc) Combine the results and grows risk_by_event with fields (event_id, agg_id, loss_id) and (dmg_0, dmg_1, dmg_2, ...) """df,dmgcsq=resself.dmgcsq+=dmgcsqwithself.monitor('saving risk_by_event',measuremem=True):fornameindf.columns:dset=self.datastore['risk_by_event/'+name]hdf5.extend(dset,df[name].to_numpy())return1
[docs]defpost_execute(self,dummy):""" Store damages-rlzs/stats, aggrisk and aggcurves """oq=self.oqparam# no damage check, perhaps the sites where disjoint from gmf_dataifself.dmgcsq[:,:,:,1:].sum()==0:haz_sids=self.datastore['gmf_data/sid'][:]count=numpy.isin(haz_sids,self.sitecol.sids).sum()ifcount==0:raiseValueError('The sites in gmf_data are disjoint from the ''site collection!?')else:logging.warning('There is no damage, perhaps the hazard is too small?')returnprc=PostRiskCalculator(oq,self.datastore.calc_id)prc.assetcol=self.assetcolifhasattr(self,'exported'):prc.exported=self.exportedwithprc.datastore:prc.run(exports='')_A,_R,L,_Dc=self.dmgcsq.shapeD=len(self.crmodel.damage_states)# fix no_damage distribution for events with zero damagenumber=self.assetcol['value-number']forrinrange(self.R):ne=prc.num_events[r]forliinrange(L):self.dmgcsq[:,r,li,0]=(# no damagenumber*ne-self.dmgcsq[:,r,li,1:D].sum(axis=1))self.dmgcsq[:,r]/=neself.datastore['damages-rlzs']=self.dmgcsqset_rlzs_stats(self.datastore,'damages-rlzs',asset_id=self.assetcol['id'],rlz=numpy.arange(self.R),loss_type=oq.loss_types,dmg_state=['no_damage']+self.crmodel.get_dmg_csq())if(hasattr(oq,'infrastructure_connectivity_analysis')andoq.infrastructure_connectivity_analysis):logging.info('Running connectivity analysis')conn_results=connectivity.analysis(self.datastore)self._store_connectivity_analysis_results(conn_results)
def_store_connectivity_analysis_results(self,conn_results):avg_dict={}if'avg_connectivity_loss_eff'inconn_results:avg_dict['efl']=[conn_results['avg_connectivity_loss_eff']]if'avg_connectivity_loss_pcl'inconn_results:avg_dict['pcl']=[conn_results['avg_connectivity_loss_pcl']]if'avg_connectivity_loss_wcl'inconn_results:avg_dict['wcl']=[conn_results['avg_connectivity_loss_wcl']]if'avg_connectivity_loss_ccl'inconn_results:avg_dict['ccl']=[conn_results['avg_connectivity_loss_ccl']]ifavg_dict:avg_df=pandas.DataFrame(data=avg_dict)self.datastore.create_df('infra-avg_loss',avg_df,display_name=DISPLAY_NAME['infra-avg_loss'])logging.info('Stored avarage connectivity loss (infra-avg_loss)')if'event_connectivity_loss_eff'inconn_results:self.datastore.create_df('infra-event_efl',conn_results['event_connectivity_loss_eff'],display_name=DISPLAY_NAME['infra-event_efl'])logging.info('Stored efficiency loss by event (infra-event_efl)')if'event_connectivity_loss_pcl'inconn_results:self.datastore.create_df('infra-event_pcl',conn_results['event_connectivity_loss_pcl'],display_name=DISPLAY_NAME['infra-event_pcl'])logging.info('Stored partial connectivity loss by event (infra-event_pcl)')if'event_connectivity_loss_wcl'inconn_results:self.datastore.create_df('infra-event_wcl',conn_results['event_connectivity_loss_wcl'],display_name=DISPLAY_NAME['infra-event_wcl'])logging.info('Stored weighted connectivity loss by event (infra-event_wcl)')if'event_connectivity_loss_ccl'inconn_results:self.datastore.create_df('infra-event_ccl',conn_results['event_connectivity_loss_ccl'],display_name=DISPLAY_NAME['infra-event_ccl'])logging.info('Stored complete connectivity loss by event (infra-event_ccl)')if'taz_cl'inconn_results:self.datastore.create_df('infra-taz_cl',conn_results['taz_cl'],display_name=DISPLAY_NAME['infra-taz_cl'])logging.info('Stored connectivity loss of TAZ nodes (taz_cl)')if'dem_cl'inconn_results:self.datastore.create_df('infra-dem_cl',conn_results['dem_cl'],display_name=DISPLAY_NAME['infra-dem_cl'])logging.info('Stored connectivity loss of demand nodes (dem_cl)')if'node_el'inconn_results:self.datastore.create_df('infra-node_el',conn_results['node_el'],display_name=DISPLAY_NAME['infra-node_el'])logging.info('Stored efficiency loss of nodes (node_el)')