# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2013-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importreimportjsonimportcopyimportfunctoolsimportcollectionsimportnumpyimportpandasfromopenquake.baselibimporthdf5fromopenquake.baselib.nodeimportNodefromopenquake.baselib.generalimportAccumDict,cached_propertyfromopenquake.hazardlibimportnrml,InvalidFilefromopenquake.hazardlib.sourcewriterimportobj_to_nodefromopenquake.risklibimportscientificU8=numpy.uint8U16=numpy.uint16U32=numpy.uint32F32=numpy.float32F64=numpy.float64lts=numpy.concatenate([scientific.LOSSTYPE,scientific.PERILTYPE])LTYPE_REGEX='|'.join(ltforltinltsif'+'notinltand'_ins'notinlt)RISK_TYPE_REGEX=re.compile(r'(%s)_([\w_]+)'%LTYPE_REGEX)def_assert_equal(d1,d2):d1.pop('loss_type',None)d2.pop('loss_type',None)assertsorted(d1)==sorted(d2),(sorted(d1),sorted(d2))fork,vind1.items():ifisinstance(v,dict):_assert_equal(v,d2[k])else:assertv==d2[k],(v,d2[k])
[docs]defget_risk_files(inputs):""" :param inputs: a dictionary key -> path name :returns: a dictionary "peril/kind/cost_type" -> fname """rfs={}job_ini=inputs['job_ini']forkeyinsorted(inputs):ifkey=='fragility':# backward compatibily for .ini files with key fragility_file# instead of structural_fragility_filerfs['groundshaking/fragility/structural']=inputs['structural_fragility']=inputs[key]delinputs['fragility']elifkey.endswith(('_fragility','_vulnerability','_vulnerability_retrofitted')):match=RISK_TYPE_REGEX.match(key)ifmatch:kind=match.group(2)# fragility or vulnerabilityvalue=inputs[key]ifisinstance(value,dict):# cost_type -> fnameperil=match.group(1)forcost_type,fnameinvalue.items():rfs[f'{peril}/{kind}/{cost_type}']=fnameelse:cost_type=match.group(1)rfs[f'groundshaking/{kind}/{cost_type}']=valueelse:raiseValueError('Invalid key in %s: %s_file'%(job_ini,key))returnrfs
[docs]@obj_to_node.add('VulnerabilityFunction')defbuild_vf_node(vf):""" Convert a VulnerabilityFunction object into a Node suitable for XML conversion. """nodes=[Node('imls',{'imt':vf.imt},vf.imls),Node('meanLRs',{},vf.mean_loss_ratios),Node('covLRs',{},vf.covs)]returnNode('vulnerabilityFunction',{'id':vf.id,'dist':vf.distribution_name},nodes=nodes)
[docs]defgroup_by_lt(funclist):""" Converts a list of objects with attribute .loss_type into a dictionary peril -> loss_type -> risk_function """dic=AccumDict(accum=[])# peril -> lt -> rfforrfinfunclist:dic[rf.loss_type].append(rf)forlt,lstindic.items():iflen(lst)==1:dic[lt]=lst[0]eliflst[1].kind=='fragility':# EventBasedDamageTestCase.test_case_11cf,ffl=lstffl.cf=cfdic[lt]=ffleliflst[1].kind=='vulnerability_retrofitted':vf,retro=lstvf.retro=retrodic[lt]=vfelse:raiseRuntimeError(lst)returndic
[docs]classRiskFuncList(list):""" A list of risk functions with attributes .id, .loss_type, .kind """
[docs]defget_risk_functions(oqparam):""" :param oqparam: an OqParam instance :returns: a list of risk functions """job_ini=oqparam.inputs['job_ini']rmodels=AccumDict()# (peril, loss_type, kind) -> rmodelforkey,fnameinget_risk_files(oqparam.inputs).items():peril,kind,loss_type=key.split('/')# ex. groundshaking/vulnerability/structuralrmodel=nrml.to_python(fname)iflen(rmodel)==0:raiseInvalidFile(f'{job_ini}: {fname} is empty!')rmodels[peril,loss_type,kind]=rmodelifrmodel.lossCategoryisNone:# NRML 0.4continuecost_type=str(rmodel.lossCategory)rmodel_kind=rmodel.__class__.__name__kind_=kind.replace('_retrofitted','')# strip retrofittedifnotrmodel_kind.lower().startswith(kind_):raiseValueError(f'Error in the file "{key}_file={fname}": is 'f'of kind {rmodel_kind}, expected {kind.capitalize()+"Model"}')ifcost_type!=loss_type:raiseValueError(f'Error in the file "{key}_file={fname}": lossCategory is of 'f'type "{rmodel.lossCategory}", expected "{loss_type}"')cl_risk=oqparam.calculation_modein('classical','classical_risk')rlist=RiskFuncList()rlist.limit_states=[]for(peril,loss_type,kind),rminsorted(rmodels.items()):ifkind=='fragility':for(imt,riskid),fflinsorted(rm.items()):ifnotrlist.limit_states:rlist.limit_states.extend(rm.limitStates)# we are rejecting the case of loss types with different# limit states; this may change in the futureassertrlist.limit_states==rm.limitStates,(rlist.limit_states,rm.limitStates)ffl.peril=perilffl.loss_type=loss_typeffl.kind=kindrlist.append(ffl)else:# vulnerability, vulnerability_retrofitted# only for classical_risk reduce the loss_ratios# to make sure they are strictly increasingfor(imt,riskid),rfinsorted(rm.items()):rf=rf.strictly_increasing()ifcl_riskelserfrf.peril=perilrf.loss_type=loss_typerf.kind=kindrlist.append(rf)returnrlist
[docs]defrescale(curves,values):""" Multiply the losses in each curve of kind (losses, poes) by the corresponding value. :param curves: an array of shape (A, 2, C) :param values: an array of shape (A,) """A,_,C=curves.shapeassertA==len(values),(A,len(values))array=numpy.zeros((A,C),loss_poe_dt)array['loss']=[c*vforc,vinzip(curves[:,0],values)]array['poe']=curves[:,1]returnarray
[docs]classRiskModel(object):""" Base class. Can be used in the tests as a mock. :param taxonomy: a taxonomy string :param risk_functions: a dict peril -> (loss_type, kind) -> risk_function """time_event=None# used in scenario_riskcompositemodel=None# set by get_crmodelalias=None# set in save_crmodeldef__init__(self,calcmode,taxonomy,risk_functions,**kw):self.calcmode=calcmodeself.taxonomy=taxonomyself.risk_functions=risk_functionsvars(self).update(kw)# updates risk_investigation_time toosteps=kw.get('lrem_steps_per_interval')ifcalcmodein('classical','classical_risk'):self.loss_ratios={lt:tuple(vf.mean_loss_ratios_with_steps(steps))forlt,vfinrisk_functions['groundshaking'].items()}ifcalcmode=='classical_bcr':self.loss_ratios_orig={}self.loss_ratios_retro={}forlt,vfinrisk_functions['groundshaking'].items():self.loss_ratios_orig[lt]=tuple(vf.mean_loss_ratios_with_steps(steps))self.loss_ratios_retro[lt]=tuple(vf.retro.mean_loss_ratios_with_steps(steps))# set imt_by_ltself.imt_by_lt={}# dictionary loss_type -> imtforlt,rfinrisk_functions['groundshaking'].items():ifrf.kindin('vulnerability','fragility'):self.imt_by_lt[lt]=rf.imt@propertydefloss_types(self):""" The list of loss types in the underlying vulnerability functions, in lexicographic order """returnsorted(self.risk_functions['groundshaking'])def__call__(self,assets,gmf_df,rndgen=None):meth=getattr(self,self.calcmode)res={(peril,lt):meth(peril,lt,assets,gmf_df,rndgen)forperilinself.risk_functionsforltinself.loss_types}# for event_based_risk `res` is loss_type -> DataFrame(eid, aid, loss)returnPerilDict(res)def__toh5__(self):returnself.risk_functions,{'taxonomy':self.taxonomy}def__fromh5__(self,dic,attrs):vars(self).update(attrs)assert'groundshaking'indic,list(dic)self.risk_functions=dicdef__repr__(self):return'<%s%s>'%(self.__class__.__name__,self.taxonomy)# ######################## calculation methods ######################### #
[docs]defclassical_risk(self,peril,loss_type,assets,hazard_curve,rng=None):""" :param str loss_type: the loss type considered :param assets: assets is an iterator over A :class:`openquake.risklib.scientific.Asset` instances :param hazard_curve: an array of poes :param eps: ignored, here only for API compatibility with other calculators :returns: a composite array (loss, poe) of shape (A, C) """n=len(assets)vf=self.risk_functions[peril][loss_type]lratios=self.loss_ratios[loss_type]imls=self.hazard_imtls[vf.imt]poes=hazard_curve[self.hazard_imtls(vf.imt)]ifloss_type=='occupants':values=assets['occupants_avg'].to_numpy()else:values=assets['value-'+loss_type].to_numpy()rtime=self.risk_investigation_timeorself.investigation_timelrcurves=numpy.array([scientific.classical(vf,imls,poes,lratios,self.investigation_time,rtime)]*n)returnrescale(lrcurves,values)
[docs]defclassical_bcr(self,peril,loss_type,assets,hazard,rng=None):""" :param loss_type: the loss type :param assets: a list of N assets of the same taxonomy :param hazard: a dictionary col -> hazard curve :param _eps: dummy parameter, unused :returns: a list of triples (eal_orig, eal_retro, bcr_result) """ifloss_type!='structural':raiseNotImplementedError('retrofitted is not defined for '+loss_type)n=len(assets)self.assets=assetsvf=self.risk_functions[peril][loss_type]imls=self.hazard_imtls[vf.imt]poes=hazard[self.hazard_imtls(vf.imt)]rtime=self.risk_investigation_timeorself.investigation_timecurves_orig=functools.partial(scientific.classical,vf,imls,loss_ratios=self.loss_ratios_orig[loss_type],investigation_time=self.investigation_time,risk_investigation_time=rtime)curves_retro=functools.partial(scientific.classical,vf.retro,imls,loss_ratios=self.loss_ratios_retro[loss_type],investigation_time=self.investigation_time,risk_investigation_time=rtime)original_loss_curves=numpy.array([curves_orig(poes)]*n)retrofitted_loss_curves=numpy.array([curves_retro(poes)]*n)eal_original=numpy.array([scientific.average_loss(lc)forlcinoriginal_loss_curves])eal_retrofitted=numpy.array([scientific.average_loss(lc)forlcinretrofitted_loss_curves])bcr_results=[scientific.bcr(eal_original[i],eal_retrofitted[i],self.interest_rate,self.asset_life_expectancy,asset['value-'+loss_type],asset['retrofitted'])fori,assetinenumerate(assets.to_records())]returnlist(zip(eal_original,eal_retrofitted,bcr_results))
[docs]defclassical_damage(self,peril,loss_type,assets,hazard_curve,rng=None):""" :param loss_type: the loss type :param assets: a list of N assets of the same taxonomy :param hazard_curve: a dictionary col -> hazard curve :returns: an array of N x D elements where N is the number of points and D the number of damage states. """ffl=self.risk_functions[peril][loss_type]imls=self.hazard_imtls[ffl.imt]poes=hazard_curve[self.hazard_imtls(ffl.imt)]rtime=self.risk_investigation_timeorself.investigation_timedamage=scientific.classical_damage(ffl,imls,poes,investigation_time=self.investigation_time,risk_investigation_time=rtime,steps_per_interval=self.steps_per_interval)damages=numpy.array([a['value-number']*damageforainassets.to_records()])returndamages
[docs]defevent_based_risk(self,peril,loss_type,assets,gmf_df,rndgen):""" :returns: a DataFrame with columns eid, eid, loss """imt=self.imt_by_lt[loss_type]col=self.alias.get(imt,imt)sid=assets['site_id']ifloss_type=='occupants':val=assets['occupants_%s'%self.time_event].to_numpy()else:val=assets['value-'+loss_type].to_numpy()asset_df=pandas.DataFrame(dict(aid=assets.index,val=val),sid)vf=self.risk_functions[peril][loss_type]returnvf(asset_df,gmf_df,col,rndgen,self.minimum_asset_loss.get(loss_type,0.))
scenario=ebrisk=scenario_risk=event_based_risk
[docs]defscenario_damage(self,peril,loss_type,assets,gmf_df,rng=None):""" :param loss_type: the loss type :param assets: a list of A assets of the same taxonomy :param gmf_df: a DataFrame of GMFs :param epsilons: dummy parameter, unused :returns: an array of shape (A, E, D) elements where N is the number of points, E the number of events and D the number of damage states. """imt=self.imt_by_lt[loss_type]col=self.alias.get(imt,imt)gmvs=gmf_df[col].to_numpy()ffs=self.risk_functions[peril][loss_type]damages=scientific.scenario_damage(ffs,gmvs).Treturnnumpy.array([damages]*len(assets))
event_based_damage=scenario_damage
# NB: the approach used here relies on the convention of having the# names of the arguments of the RiskModel class to be equal to the# names of the parameter in the oqparam object. This is seen as a# feature, since it forces people to be consistent with the names,# in the spirit of the 'convention over configuration' philosophy
[docs]defget_riskmodel(taxonomy,oqparam,risk_functions):""" Return an instance of the correct risk model class, depending on the attribute `calculation_mode` of the object `oqparam`. :param taxonomy: a taxonomy string :param oqparam: an object containing the parameters needed by the RiskModel class :param extra: extra parameters to pass to the RiskModel class """extra={'hazard_imtls':oqparam.imtls}extra['investigation_time']=oqparam.investigation_timeextra['risk_investigation_time']=oqparam.risk_investigation_timeextra['lrem_steps_per_interval']=oqparam.lrem_steps_per_intervalextra['steps_per_interval']=oqparam.steps_per_intervalextra['time_event']=oqparam.time_eventextra['minimum_asset_loss']=oqparam.minimum_asset_lossifoqparam.calculation_mode=='classical_bcr':extra['interest_rate']=oqparam.interest_rateextra['asset_life_expectancy']=oqparam.asset_life_expectancyreturnRiskModel(oqparam.calculation_mode,taxonomy,risk_functions,**extra)
# used only in riskmodels_test
[docs]defget_riskcomputer(dic,alias,limit_states=()):# builds a RiskComputer instance from a suitable dictionaryrc=scientific.RiskComputer.__new__(scientific.RiskComputer)rc.D=len(limit_states)+1rc.wdic={}rfs=AccumDict(accum=[])steps=dic.get('lrem_steps_per_interval',1)lts=set()riskid_perils=set()perils=set()forrlk,funcindic['risk_functions'].items():peril,lt,riskid=rlk.split('#')perils.add(peril)riskid_perils.add((riskid,peril))lts.add(lt)rf=hdf5.json_to_obj(json.dumps(func))ifhasattr(rf,'init'):rf.init()rf.loss_type=ltrf.peril=perilifgetattr(rf,'retro',False):rf.retro=hdf5.json_to_obj(json.dumps(rf.retro))rf.retro.init()rf.retro.loss_type=ltifhasattr(rf,'array'):# fragilityrf=rf.build(limit_states)rfs[riskid].append(rf)lts=sorted(lts)mal=dic.setdefault('minimum_asset_loss',{lt:0.forltinlts})forriskidinrfs:by_peril=AccumDict(accum=[])forrfinrfs[riskid]:by_peril[rf.peril].append(rf)rm=RiskModel(dic['calculation_mode'],'taxonomy',{peril:group_by_lt(by_peril[peril])forperilinby_peril},lrem_steps_per_interval=steps,minimum_asset_loss=mal)rm.alias=aliasrc[riskid]=rmif'wdic'indic:forrlt,weightindic['wdic'].items():riskid,peril=rlt.split('#')rc.wdic[riskid,peril]=weightelse:rc.wdic={(riskid,peril):1.forriskid,perilinsorted(riskid_perils)}rc.P=len(perils)rc.loss_types=ltsrc.minimum_asset_loss=malrc.calculation_mode=dic['calculation_mode']returnrc
[docs]classCompositeRiskModel(collections.abc.Mapping):""" A container (riskid, kind) -> riskmodel :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param fragdict: a dictionary riskid -> loss_type -> fragility functions :param vulndict: a dictionary riskid -> loss_type -> vulnerability function :param consdict: a dictionary riskid -> loss_type -> consequence functions """tmap_df=()# to be set
[docs]@classmethod# TODO: reading new-style consequences is missingdefread(cls,dstore,oqparam):""" :param dstore: a DataStore instance :returns: a :class:`CompositeRiskModel` instance """risklist=RiskFuncList()ifhasattr(dstore,'get_attr'):# missing only in Aristotle mode, where dstore is an hdf5.Filerisklist.limit_states=dstore.get_attr('crm','limit_states')df=dstore.read_df('crm')fori,rf_jsoninenumerate(df.riskfunc):rf=hdf5.json_to_obj(rf_json)try:rf.peril=df.loc[i].perilexceptAttributeError:# in engine < 3.22 the peril was not storedrf.peril='groundshaking'lt=rf.loss_typeifrf.kind=='fragility':# rf is a FragilityFunctionListrisklist.append(rf)else:# rf is a vulnerability functionrf.init()iflt.endswith('_retrofitted'):# strip _retrofitted, since len('_retrofitted') = 12rf.loss_type=lt[:-12]rf.kind='vulnerability_retrofitted'else:rf.loss_type=ltrf.kind='vulnerability'risklist.append(rf)crm=CompositeRiskModel(oqparam,risklist)if'taxmap'indstore:crm.tmap_df=dstore.read_df('taxmap')returncrm
def__init__(self,oqparam,risklist,consdict=()):self.oqparam=oqparamself.risklist=risklist# by taxonomyself.consdict=consdictor{}# new style consequences, by anythingself.init()
[docs]defset_tmap(self,tmap_df,taxidx):""" Set the attribute .tmap_df if the risk IDs in the taxonomy mapping are consistent with the fragility functions. """self.tmap_df=tmap_dfif'consequence'notinself.oqparam.inputs:returncsq_files=[]forfnamesinself.oqparam.inputs['consequence'].values():ifisinstance(fnames,list):csq_files.extend(fnames)else:csq_files.append(fnames)forfnameincsq_files:df=pandas.read_csv(fname)if'peril'indf.columns:forline,perilinenumerate(df['peril'],1):ifperilnotinself.perils:raiseInvalidFile(f'{fname}: unknown {peril=} at {line=}')cfs='\n'.join(csq_files)df=self.tmap_dfforperilinself.perils:forbyname,coeffsinself.consdict.items():# ex. byname = "losses_by_taxonomy"iflen(coeffs):forper,risk_id,weightinzip(df.peril,df.risk_id,df.weight):if(per=='*'orper==peril)andrisk_id!='?':try:coeffs[risk_id][peril]exceptKeyError:raiseInvalidFile(f'Missing {risk_id=}, {peril=} in\n{cfs}')
[docs]defcheck_risk_ids(self,inputs):""" Check that there are no missing risk IDs for some risk functions """ids_by_kind=AccumDict(accum=set())forriskfuncinself.risklist:ids_by_kind[riskfunc.kind].add(riskfunc.id)kinds=tuple(ids_by_kind)# vulnerability, fragility, ...fnames=[fnameforkind,fnameininputs.items()ifkind.endswith(kinds)]iflen(ids_by_kind)>1:k=next(iter(ids_by_kind))base_ids=set(ids_by_kind.pop(k))forkind,idsinids_by_kind.items():ifids!=base_ids:raiseNameError('Check in the files %s the IDs %s'%(fnames,sorted(base_ids.symmetric_difference(ids))))ifself._riskmodels:forperilinself.perils:# check imt_by_lt has consistent loss types for all taxonomiesmissing=AccumDict(accum=[])rms=[]iflen(self.tmap_df):iflen(self.tmap_df.peril.unique())==1:risk_ids=self.tmap_df.risk_idelse:risk_ids=self.tmap_df[self.tmap_df.peril==peril].risk_idforrisk_idinrisk_ids.unique():rms.append(self._riskmodels[risk_id])else:rms.extend(self._riskmodels.values())forrminrms:# NB: in event_based_risk/case_8 the loss types are# area, number, occupants, residentsforltinself.loss_types:try:rm.imt_by_lt[lt]exceptKeyError:key='%s/%s/%s'%(peril,kinds[0],lt)fname=self.oqparam._risk_files[key]missing[fname].append(rm.taxonomy)ifmissing:forfname,idsinmissing.items():raiseInvalidFile('%s: missing %s%s'%(fname,peril,' '.join(ids)))
[docs]defcompute_csq(self,assets,dd5,tmap_df,oq):""" :param assets: asset array :param dd5: distribution functions of shape (P, A, E, L, D) :param tmap_df: DataFrame corresponding to the given taxonomy :param oq: OqParam instance with .loss_types and .time_event :returns: a dict consequence_name, loss_type -> array[P, A, E] """# by construction all assets have the same taxonomyP,A,E,_L,_D=dd5.shapecsq=AccumDict(accum=numpy.zeros((P,A,E)))forbyname,coeffsinself.consdict.items():# ex. byname = "losses_by_taxonomy"iflen(coeffs):consequence,_tagname=byname.split('_by_')forrisk_id,dfintmap_df.groupby('risk_id'):forli,ltinenumerate(oq.loss_types):# dict loss_type -> coeffs for the given loss typeforpi,perilinenumerate(self.perils):iflen(df)==1:[w]=df.weightelse:# assume one weigth per peril[w]=df[df.peril==peril].weightcoeff=(dd5[pi,:,:,li,1:]@coeffs[risk_id][peril][lt]*w)cAE=scientific.consequence(consequence,assets,coeff,lt,oq.time_event)csq[consequence,li][pi]+=cAEreturncsq
[docs]definit(self):oq=self.oqparamifself.risklist:self.perils=oq.set_risk_imts(self.risklist)self.damage_states=[]self._riskmodels={}# riskid -> crmodelifoq.calculation_mode.endswith('_bcr'):# classical_bcr calculatorforriskid,risk_functionsinself.risklist.groupby_id().items():self._riskmodels[riskid]=get_riskmodel(riskid,oq,risk_functions)elif(any(rf.kind=='fragility'forrfinself.risklist)or'damage'inoq.calculation_mode):# classical_damage/scenario_damage calculatorifoq.calculation_modein('classical','scenario'):# case when the risk files are in the job_hazard.ini fileoq.calculation_mode+='_damage'if'exposure'notinoq.inputs:raiseRuntimeError('There are risk files in %r but not ''an exposure'%oq.inputs['job_ini'])self.damage_states=['no_damage']+list(self.risklist.limit_states)forriskid,ffs_by_ltinself.risklist.groupby_id().items():self._riskmodels[riskid]=get_riskmodel(riskid,oq,ffs_by_lt)else:# classical, event based and scenario calculatorsforriskid,vfsinself.risklist.groupby_id().items():self._riskmodels[riskid]=get_riskmodel(riskid,oq,vfs)self.imtls=oq.imtlsself.lti={}# loss_type -> idxself.covs=0# number of coefficients of variation# build a sorted list with all the loss_types contained in the modelltypes=set()forrminself.values():ltypes.update(rm.loss_types)self.loss_types=sorted(ltypes)self.riskids=set()self.distributions=set()forriskid,rminself._riskmodels.items():self.riskids.add(riskid)rm.compositemodel=selfforlt,rfinrm.risk_functions.items():ifhasattr(rf,'distribution_name'):self.distributions.add(rf.distribution_name)ifhasattr(rf,'init'):# vulnerability functionifoq.ignore_covs:rf.covs=numpy.zeros_like(rf.covs)rf.init()# save the number of nonzero coefficients of variationifhasattr(rf,'covs')andrf.covs.any():self.covs+=1self.curve_params=self.make_curve_params()# possibly set oq.minimum_intensityiml=collections.defaultdict(list)# ._riskmodels is empty if read from the hazard calculationforriskid,rminself._riskmodels.items():forlt,rfinrm.risk_functions['groundshaking'].items():iml[rf.imt].append(rf.imls[0])ifoq.impact:pass# don't set minimum_intensityelifsum(oq.minimum_intensity.values())==0andiml:oq.minimum_intensity={imt:min(ls)forimt,lsiniml.items()}
[docs]defeid_dmg_dt(self):""" :returns: a dtype (eid, dmg) """L=len(self.lti)D=len(self.damage_states)returnnumpy.dtype([('eid',U32),('dmg',(F32,(L,D)))])
[docs]defasset_damage_dt(self,float_dmg_dist):""" :returns: a composite dtype with damages and consequences """dt=F32iffloat_dmg_distelseU32descr=([('agg_id',U32),('event_id',U32),('loss_id',U8)]+[(dc,dt)fordcinself.get_dmg_csq()])returnnumpy.dtype(descr)
@cached_propertydeftaxonomy_dict(self):""" :returns: a dict taxonomy string -> taxonomy index """# .taxonomy must be set by the enginetdict={taxo:idxforidx,taxoinenumerate(self.taxonomy)}returntdict
[docs]defget_consequences(self):""" :returns: the list of available consequences """csq=[]forconsequence_by_tagname,arrinself.consdict.items():iflen(arr):csq.append(consequence_by_tagname.split('_by_')[0])returncsq
[docs]defget_dmg_csq(self):""" :returns: damage states (except no_damage) plus consequences """D=len(self.damage_states)dmgs=['dmg_%d'%dfordinrange(1,D)]returndmgs+self.get_consequences()
[docs]defmulti_damage_dt(self):""" :returns: composite datatype with fields peril-loss_type-damage_state """dcs=self.damage_states+self.get_consequences()iflen(self.perils)==1:lst=[(f'{ltype}-{dc}',F32)forltypeinself.oqparam.loss_typesfordcindcs]returnnumpy.dtype(lst)lst=[]forperilinself.perils:forltypeinself.oqparam.loss_types:fordcindcs:lst.append((f'{peril}-{ltype}-{dc}',F32))returnnumpy.dtype(lst)
[docs]defto_multi_damage(self,array5d):""" :param array5d: array of shape (P, A, R, L, Dc) :returns: array of shape (A, R) of dtype multi_damage_dt """P,A,R,L,Dc=array5d.shapearr=numpy.zeros((A,R),self.multi_damage_dt())forainrange(A):forrinrange(R):lst=[]forpiinrange(P):forliinrange(L):fordiinrange(Dc):lst.append(array5d[pi,a,r,li,di])arr[a,r]=tuple(lst)returnarr
[docs]defmake_curve_params(self):# the CurveParams are used only in classical_risk, classical_bcr# NB: populate the inner lists .loss_types toocps=[]forlti,loss_typeinenumerate(self.loss_types):ifself.oqparam.calculation_modein('classical','classical_risk'):curve_resolutions=set()lines=[]allratios=[]fortaxoinsorted(self):rm=self[taxo]rf=rm.risk_functions['groundshaking'][loss_type]ifloss_typeinrm.loss_ratios:ratios=rm.loss_ratios[loss_type]allratios.append(ratios)curve_resolutions.add(len(ratios))lines.append('%s%d'%(rf,len(ratios)))iflen(curve_resolutions)>1:# number of loss ratios is not the same for all taxonomies:# then use the longest array; see classical_risk case_5allratios.sort(key=len)forrminself.values():ifrm.loss_ratios[loss_type]!=allratios[-1]:rm.loss_ratios[loss_type]=allratios[-1]# logging.debug(f'Redefining loss ratios for {rm}')cp=scientific.CurveParams(lti,loss_type,max(curve_resolutions),allratios[-1],True)ifcurve_resolutionselsescientific.CurveParams(lti,loss_type,0,[],False)else:# used only to store the association l -> loss_typecp=scientific.CurveParams(lti,loss_type,0,[],False)cps.append(cp)self.lti[loss_type]=ltireturncps
[docs]defget_loss_ratios(self):""" :returns: a 1-dimensional composite array with loss ratios by loss type """lst=[('user_provided',bool)]forcpinself.curve_params:lst.append((cp.loss_type,F32,len(cp.ratios)))loss_ratios=numpy.zeros(1,numpy.dtype(lst))forcpinself.curve_params:loss_ratios['user_provided']=cp.user_providedloss_ratios[cp.loss_type]=tuple(cp.ratios)returnloss_ratios
[docs]defget_outputs(self,asset_df,haz,sec_losses=(),rndgen=None,country='?'):""" :param asset_df: a DataFrame of assets with the same taxonomy and country :param haz: a DataFrame of GMVs on the sites of the assets :param sec_losses: a list of functions :param rndgen: a MultiEventRNG instance :returns: a list of dictionaries loss_type-> output """# rc.pprint()# dic = rc.todict()# rc2 = get_riskcomputer(dic)# dic2 = rc2.todict()# _assert_equal(dic, dic2)[taxidx]=asset_df.taxonomy.unique()rc=scientific.RiskComputer(self,taxidx,country)out=rc.output(asset_df,haz,sec_losses,rndgen)returnlist(out)
[docs]defreduce(self,taxonomies):""" :param taxonomies: a set of taxonomies :returns: a new CompositeRiskModel reduced to the given taxonomies """new=copy.copy(self)new._riskmodels={}forriskid,rminself._riskmodels.items():ifriskidintaxonomies:new._riskmodels[riskid]=rmrm.compositemodel=newreturnnew
[docs]defto_dframe(self):""" :returns: a DataFrame containing all risk functions """dic={'peril':[],'riskid':[],'loss_type':[],'riskfunc':[]}forriskid,rminself._riskmodels.items():forperil,rfdictinrm.risk_functions.items():forlt,rfinrfdict.items():dic['peril'].append(peril)dic['riskid'].append(riskid)dic['loss_type'].append(lt)dic['riskfunc'].append(hdf5.obj_to_json(rf))returnpandas.DataFrame(dic)