# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2014-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importosimportjsonimportshutilimporttempfileimportitertoolsimportcollectionsimportnumpyimportpandasfromopenquake.baselibimporthdf5,writers,general,node,configfromopenquake.baselib.python3compatimportdecodefromopenquake.hazardlibimportnrmlfromopenquake.hazardlib.statsimportcompute_stats2fromopenquake.risklibimportscientificfromopenquake.calculators.extractimportextract,sanitize,avglossesfromopenquake.calculatorsimportpost_riskfromopenquake.calculators.exportimportexport,loss_curvesfromopenquake.calculators.export.hazardimportsavezfromopenquake.commonlib.utilimportget_assets,compose_arraysOutput=collections.namedtuple('Output','ltype path array')F32=numpy.float32F64=numpy.float64U16=numpy.uint16U32=numpy.uint32stat_dt=numpy.dtype([('mean',F32),('stddev',F32)])
[docs]defget_aggtags(dstore):# returns a list of tag tuplesif'agg_keys'indstore:# there was an aggregate_byaggtags=[ln.decode('utf8').split(',')forlnindstore['agg_keys'][:]]aggtags+=[('*total*',)*len(aggtags[0])]else:# np aggregate_byaggtags=[()]returnaggtags
def_aggrisk(oq,aggids,aggtags,agg_values,aggrisk,md,dest):writer=writers.CsvWriter(fmt=writers.FIVEDIGITS)cols=[colforcolinaggrisk.columnsifcolnotin{'agg_id','rlz_id','loss_id'}]csqs=[colforcolincolsifnotcol.startswith('dmg_')]manyrlzs=hasattr(aggrisk,'rlz_id')andlen(aggrisk.rlz_id.unique())>1fnames=[]K=len(agg_values)-1pairs=[([],aggrisk.agg_id==K)]# full aggregationfortagnames,agg_idsinzip(oq.aggregate_by,aggids):pairs.append((tagnames,numpy.isin(aggrisk.agg_id,agg_ids)))fortagnames,okinpairs:out=general.AccumDict(accum=[])for(agg_id,lid),dfinaggrisk[ok].groupby(['agg_id','loss_id']):n=len(df)loss_type=scientific.LOSSTYPE[lid]ifloss_type=='occupants':loss_type+='_'+oq.time_eventifloss_type=='claim':# temporary hackcontinueout['loss_type'].extend([loss_type]*n)iftagnames:fortagname,taginzip(tagnames,aggtags[agg_id]):out[tagname].extend([tag]*n)ifmanyrlzs:out['rlz_id'].extend(df.rlz_id)forcolincols:ifcolincsqs:# normally csqs = ['loss']aval=scientific.get_agg_value(col,agg_values,agg_id,loss_type,oq.time_event)out[col+'_value'].extend(df[col])out[col+'_ratio'].extend(df[col]/aval)else:# in ScenarioDamageTestCase:test_case_12out[col].extend(df[col])dsdic={'dmg_0':'no_damage'}fors,lsinenumerate(oq.limit_states,1):dsdic['dmg_%d'%s]=lsdf=pandas.DataFrame(out).rename(columns=dsdic)fname=dest.format('-'.join(tagnames))writer.save(df,fname,comment=md)fnames.append(fname)returnfnames
[docs]@export.add(('aggrisk','csv'))defexport_aggrisk(ekey,dstore):""" :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """oq=dstore['oqparam']assetcol=dstore['assetcol']md=dstore.metadatamd.update(dict(investigation_time=oq.investigation_time,risk_investigation_time=oq.risk_investigation_timeoroq.investigation_time))aggrisk=dstore.read_df('aggrisk')dest=dstore.build_fname('aggrisk-{}','','csv')agg_values=assetcol.get_agg_values(oq.aggregate_by)aggids,aggtags=assetcol.build_aggids(oq.aggregate_by)return_aggrisk(oq,aggids,aggtags,agg_values,aggrisk,md,dest)
[docs]@export.add(('aggrisk-stats','csv'),('aggcurves-stats','csv'))defexport_aggrisk_stats(ekey,dstore):""" :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """oq=dstore['oqparam']key=ekey[0].split('-')[0]# aggrisk or aggcurveswriter=writers.CsvWriter(fmt=writers.FIVEDIGITS)dest=dstore.build_fname(key+'-stats-{}','','csv')dataf=extract(dstore,'risk_stats/'+key)assetcol=dstore['assetcol']agg_values=assetcol.get_agg_values(oq.aggregate_by)K=len(agg_values)-1aggids,aggtags=assetcol.build_aggids(oq.aggregate_by)pairs=[([],dataf.agg_id==K)]# full aggregationfortagnames,agg_idsinzip(oq.aggregate_by,aggids):pairs.append((tagnames,numpy.isin(dataf.agg_id,agg_ids)))fnames=[]fortagnames,okinpairs:df=dataf[ok].copy()iftagnames:tagvalues=numpy.array([aggtags[agg_id]foragg_idindf.agg_id])forn,nameinenumerate(tagnames):df[name]=tagvalues[:,n]deldf['agg_id']fname=dest.format('-'.join(tagnames))writer.save(df,fname,df.columns,comment=dstore.metadata)fnames.append(fname)returnfnames
[docs]@export.add(('mmi_tags','csv'))defexport_mmi_tags(ekey,dstore):""" :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """writer=writers.CsvWriter(fmt=writers.FIVEDIGITS)dest=dstore.build_fname('mmi_tags','','csv')df=extract(dstore,'mmi_tags')writer.save(df,dest,df.columns,comment=dstore.metadata)return[dest]
def_get_data(dstore,dskey,loss_types,stats):name,kind=dskey.split('-')# i.e. ('avg_losses', 'stats')ifkind=='stats':try:weights=dstore['weights'][()]exceptKeyError:# there is single realization, like in classical_risk/case_2weights=[1.]ifdskeyinset(dstore):# precomputedrlzs_or_stats=list(stats)statfuncs=[stats[ros]forrosinstats]value=avglosses(dstore,loss_types,'stats')# shape (A, S, L)elifdstore['oqparam'].collect_rlzs:rlzs_or_stats=list(stats)value=avglosses(dstore,loss_types,'rlzs')else:# compute on the flyrlzs_or_stats,statfuncs=zip(*stats.items())value=compute_stats2(avglosses(dstore,loss_types,'rlzs'),statfuncs,weights)else:# rlzsvalue=avglosses(dstore,loss_types,kind)# shape (A, R, L)R=value.shape[1]rlzs_or_stats=['rlz-%03d'%rforrinrange(R)]returnname,value,rlzs_or_stats# this is used by event_based_risk, classical_risk and scenario_risk
[docs]@export.add(('avg_losses-rlzs','csv'),('avg_losses-stats','csv'))defexport_avg_losses(ekey,dstore):""" :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """dskey=ekey[0]oq=dstore['oqparam']dt=[(ln,F32)forlninoq.ext_loss_types]name,value,rlzs_or_stats=_get_data(dstore,dskey,oq.ext_loss_types,oq.hazard_stats())writer=writers.CsvWriter(fmt=writers.FIVEDIGITS)assets=get_assets(dstore)md=dstore.metadatamd.update(dict(investigation_time=oq.investigation_time,risk_investigation_time=oq.risk_investigation_timeoroq.investigation_time))forros,valuesinzip(rlzs_or_stats,value.transpose(1,0,2)):dest=dstore.build_fname(name,ros,'csv')array=numpy.zeros(len(values),dt)forli,lninenumerate(oq.ext_loss_types):array[ln]=values[:,li]writer.save(compose_arrays(assets,array),dest,comment=md,renamedict=dict(id='asset_id'))returnwriter.getsaved()
[docs]@export.add(('src_loss_table','csv'))defexport_src_loss_table(ekey,dstore):""" :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """oq=dstore['oqparam']md=dstore.metadatamd.update(dict(investigation_time=oq.investigation_time,risk_investigation_time=oq.risk_investigation_timeoroq.investigation_time))writer=writers.CsvWriter(fmt=writers.FIVEDIGITS)forltindstore['src_loss_table']:aw=hdf5.ArrayWrapper.from_(dstore['src_loss_table/'+lt])dest=dstore.build_fname('src_loss_'+lt,'','csv')writer.save(aw.to_dframe(),dest,comment=md)returnwriter.getsaved()
# this is used by all GMF-based risk calculators# NB: it exports only the event loss table, i.e. the totals
[docs]@export.add(('risk_by_event','csv'))defexport_event_loss_table(ekey,dstore):""" :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """oq=dstore['oqparam']writer=writers.CsvWriter(fmt=writers.FIVEDIGITS)dest=dstore.build_fname('risk_by_event','','csv')md=dstore.metadataif'scenario'notinoq.calculation_mode:md.update(dict(investigation_time=oq.investigation_time,risk_investigation_time=oq.risk_investigation_timeoroq.investigation_time))events=dstore.read_df('events','id')R=post_risk.fix_investigation_time(oq,dstore)ifoq.investigation_time:eff_time=oq.investigation_time*oq.ses_per_logic_tree_path*RK=dstore.get_attr('risk_by_event','K',0)try:lstates=dstore.get_attr('risk_by_event','limit_states').split()exceptKeyError:# ebrisk, no limit stateslstates=[]df=dstore.read_df('risk_by_event','agg_id',dict(agg_id=K))df['loss_type']=scientific.LOSSTYPE[df.loss_id.to_numpy()]if'variance'indf.columns:deldf['variance']ren={'dmg_%d'%i:lstatefori,lstateinenumerate(lstates,1)}df.rename(columns=ren,inplace=True)df=df.join(events,on='event_id')if'ses_id'indf.columns:deldf['ses_id']ifoq.collect_rlzs:df['rlz_id']=0try:pla_factor=scientific.pla_factor(dstore.read_df('post_loss_amplification'))exceptKeyError:pla_factor=Noneif'loss'indf.columns:# missing for damagedfs=[]for(loss_id,rlz),dindf.groupby(['loss_id','rlz_id']):d=d.sort_values('loss')ifpla_factor:eperiods=eff_time/numpy.arange(len(d),0.,-1)d['pla_loss']=pla_factor(eperiods)*d.lossdfs.append(d)df=pandas.concat(dfs)else:df=df.sort_values(['loss_id','event_id'])deldf['rlz_id']deldf['loss_id']if'scenario'inoq.calculation_mode:deldf['rup_id']if'year'indf.columns:deldf['year']writer.save(df,dest,comment=md)returnwriter.getsaved()
def_compact(array):# convert an array of shape (a, e) into an array of shape (a,)dt=array.dtypea,e=array.shapelst=[]fornameindt.names:lst.append((name,(dt[name],e)))returnarray.view(numpy.dtype(lst)).reshape(a)# this is used by classical_risk
[docs]@export.add(('loss_curves-rlzs','csv'),('loss_curves-stats','csv'),('loss_curves','csv'))defexport_loss_curves(ekey,dstore):if'/'inekey[0]:kind=ekey[0].split('/',1)[1]else:kind=ekey[0].split('-',1)[1]# rlzs or statsreturnloss_curves.LossCurveExporter(dstore).export('csv',kind)
# used by classical_risk
[docs]@export.add(('loss_maps-rlzs','csv'),('loss_maps-stats','csv'))defexport_loss_maps_csv(ekey,dstore):kind=ekey[0].split('-')[1]# rlzs or statsassets=get_assets(dstore)value=get_loss_maps(dstore,kind)oq=dstore['oqparam']ifkind=='rlzs':rlzs_or_stats=dstore['full_lt'].get_realizations()else:rlzs_or_stats=oq.hazard_stats()writer=writers.CsvWriter(fmt=writers.FIVEDIGITS)md=dstore.metadatafori,rosinenumerate(rlzs_or_stats):ifhasattr(ros,'ordinal'):# is a realizationros='rlz-%d'%ros.ordinalfname=dstore.build_fname('loss_maps',ros,ekey[1])md.update(dict(kind=ros,risk_investigation_time=oq.risk_investigation_timeoroq.investigation_time))writer.save(compose_arrays(assets,value[:,i]),fname,comment=md,renamedict=dict(id='asset_id'))returnwriter.getsaved()
# used by classical_risk
[docs]@export.add(('loss_maps-rlzs','npz'),('loss_maps-stats','npz'))defexport_loss_maps_npz(ekey,dstore):kind=ekey[0].split('-')[1]# rlzs or statsassets=get_assets(dstore)value=get_loss_maps(dstore,kind)R=dstore['full_lt'].get_num_paths()ifkind=='rlzs':rlzs_or_stats=['rlz-%03d'%rforrinrange(R)]else:oq=dstore['oqparam']rlzs_or_stats=oq.hazard_stats()fname=dstore.export_path('%s.%s'%ekey)dic={}fori,rosinenumerate(rlzs_or_stats):dic[ros]=compose_arrays(assets,value[:,i])savez(fname,**dic)return[fname]
[docs]defmodal_damage_array(data,dstates):# determine the damage state with the highest probabilityacc=general.AccumDict(accum=[])fornameindata.dtype.names:# peril-ltype-dstatetry:peril,ltype,_dstate=name.split('-')modal=f'modal-ds-{peril}~{ltype}'exceptValueError:ltype,_dstate=name.split('-')modal='modal-ds-'+ltypeifltype!='no_damage':acc[modal].append(data[name])acc={k:numpy.array(acc[k]).argmax(axis=0)forkinacc}arr=numpy.zeros(len(data),[(key,object)forkeyinacc])forkeyinacc:arr[key]=dstates[acc[key]]returnarr
# used by event_based_damage, scenario_damage, classical_damage
[docs]@export.add(('damages-rlzs','csv'),('damages-stats','csv'))defexport_damages_csv(ekey,dstore):oq=dstore['oqparam']dmgstates=numpy.concatenate([['no_damage'],dstore.getitem('crm').attrs['limit_states']])ebd=oq.calculation_mode=='event_based_damage'rlzs=dstore['full_lt'].get_realizations()orig=dstore[ekey[0]][:]# shape (A, R, L, D, P)writer=writers.CsvWriter(fmt='%.6E')assets=get_assets(dstore)md=dstore.metadataifoq.investigation_time:rit=oq.risk_investigation_timeoroq.investigation_timemd.update(dict(investigation_time=oq.investigation_time,risk_investigation_time=rit))R=1ifoq.collect_rlzselselen(rlzs)ifekey[0].endswith('stats'):rlzs_or_stats=oq.hazard_stats()else:rlzs_or_stats=['rlz-%03d'%rforrinrange(R)]name=ekey[0].split('-')[0]ifoq.calculation_mode!='classical_damage':name='avg_'+namecsqs=tuple(dstore.getitem('crm').attrs['consequences'])fori,rosinenumerate(rlzs_or_stats):ifebd:# export only the consequences from damages-rlzs, i == 0iflen(csqs)==0:# no consequences, export nothingreturn[]rate=len(dstore['events'])*oq.time_ratio/len(rlzs)data=orig[:,i]dtlist=[(col,F32)forcolindata.dtype.namesifcol.endswith(csqs)]damages=numpy.zeros(len(data),dtlist)forcsq,_indtlist:damages[csq]=data[csq]*ratefname=dstore.build_fname('avg_risk',ros,ekey[1])else:# scenario_damage, classical_damageifoq.modal_damage_state:damages=modal_damage_array(orig[:,i],dmgstates)else:damages=orig[:,i]fname=dstore.build_fname(name,ros,ekey[1])arr=compose_arrays(assets,damages)writer.save(arr,fname,comment=md,renamedict=dict(id='asset_id'))returnwriter.getsaved()
def_to_loss_maps(array,loss_maps_dt):# convert a 4D array into a 2D array of dtype loss_maps_dtA,R,_C,_LI=array.shapelm=numpy.zeros((A,R),loss_maps_dt)forli,nameinenumerate(loss_maps_dt.names):forp,poeinenumerate(loss_maps_dt[name].names):lm[name][poe]=array[:,:,p,li]returnlm
[docs]defget_loss_maps(dstore,kind):""" :param dstore: a DataStore instance :param kind: 'rlzs' or 'stats' """oq=dstore['oqparam']name='loss_maps-%s'%kindifnameindstore:# event_based riskreturn_to_loss_maps(dstore[name][()],oq.loss_maps_dt())name='loss_curves-%s'%kindifnameindstore:# classical_risk# the loss maps are built on the fly from the loss curvesloss_curves=dstore[name]loss_maps=scientific.broadcast(scientific.loss_maps,loss_curves,oq.conditional_loss_poes)returnloss_mapsraiseKeyError('loss_maps/loss_curves missing in %s'%dstore)
[docs]defget_paths(rlz):""" :param rlz: a logic tree realization (composite or simple) :returns: a dict {'source_model_tree_path': string, 'gsim_tree_path': string} """dic={}ifhasattr(rlz,'sm_lt_path'):# composite realizationdic['source_model_tree_path']='_'.join(rlz.sm_lt_path)dic['gsim_tree_path']='_'.join(rlz.gsim_lt_path)else:# simple GSIM realizationdic['source_model_tree_path']=''dic['gsim_tree_path']='_'.join(rlz.lt_path)returndic