# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2012-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>."""This module includes the scientific API of the oq-risklib"""importastimportcopyimportbisectimportitertoolsimportcollectionsfrompprintimportpprintfromfunctoolsimportlru_cacheimportnumpyimportpandasfromnumpy.testingimportassert_equalfromscipyimportinterpolate,statsfromopenquake.baselibimporthdf5,generalF64=numpy.float64F32=numpy.float32U32=numpy.uint32U64=numpy.uint64U16=numpy.uint16U8=numpy.uint8TWO32=2**32KNOWN_CONSEQUENCES=['loss','loss_aep','loss_oep','pla_loss','pla_loss_aep','pla_loss_oep','losses','collapsed','injured','fatalities','homeless','non_operational']PERILTYPE=numpy.array(['groundshaking','liquefaction','landslide'])LOSSTYPE=numpy.array('''\business_interruption contents nonstructural structuraloccupants occupants_day occupants_night occupants_transitstructural+nonstructural structural+contents nonstructural+contentsstructural+nonstructural+contentsstructural+nonstructural_ins structural+contents_ins nonstructural+contents_insstructural+nonstructural+contents_insstructural_ins nonstructural_insreinsurance claim area number residentsstructural+business_interruption nonstructural+business_interruptioncontents+business_interruptionstructural+nonstructural+business_interruptionstructural+contents+business_interruptionnonstructural+contents+business_interruptionstructural+nonstructural+contents+business_interruptioncontents_ins business_interruption_insstructural_ins+nonstructural_ins structural_ins+contents_insstructural_ins+business_interruption_ins nonstructural_ins+contents_insnonstructural_ins+business_interruption_inscontents_ins+business_interruption_insstructural_ins+nonstructural_ins+contents_insstructural_ins+nonstructural_ins+business_interruption_insstructural_ins+contents_ins+business_interruption_insnonstructural_ins+contents_ins+business_interruption_insstructural_ins+nonstructural_ins+contents_ins+business_interruption_ins'''.split())TOTLOSSES=[ltforltinLOSSTYPEif'+'inlt]LOSSID={lt:ifori,ltinenumerate(LOSSTYPE)}def_reduce(nested_dic):# reduce lists of floats into empty lists inside a nested dictionary# used for pretty printing purposesout={}fork,dicinnested_dic.items():ifisinstance(dic,list)andnotisinstance(dic[0],(str,bytes)):out[k]=[]elifisinstance(dic,dict):out[k]=_reduce(dic)else:out[k]=dicreturnout
[docs]defpairwise(iterable):""" :param iterable: a sequence of N values (s0, s1, ...) :returns: N-1 pairs (s0, s1), (s1, s2), (s2, s3), ... >>> list(pairwise('ABC')) [('A', 'B'), ('B', 'C')] """a,b=itertools.tee(iterable)# b ahead one step; if b is empty do not raise StopIterationnext(b,None)returnzip(a,b)# if a is empty will return an empty iter
[docs]deffine_graining(points,steps):""" :param points: a list of floats :param int steps: expansion steps (>= 2) >>> fine_graining([0, 1], steps=0) [0, 1] >>> fine_graining([0, 1], steps=1) [0, 1] >>> fine_graining([0, 1], steps=2) array([0. , 0.5, 1. ]) >>> fine_graining([0, 1], steps=3) array([0. , 0.33333333, 0.66666667, 1. ]) >>> fine_graining([0, 0.5, 0.7, 1], steps=2) array([0. , 0.25, 0.5 , 0.6 , 0.7 , 0.85, 1. ]) N points become S * (N - 1) + 1 points with S > 0 """ifsteps<2:returnpointsls=numpy.concatenate([numpy.linspace(x,y,num=steps+1)[:-1]forx,yinpairwise(points)])returnnumpy.concatenate([ls,[points[-1]]])
# sampling functions
[docs]classSampler(object):def__init__(self,distname,rng,lratios=(),cols=None):self.distname=distnameself.rng=rngself.arange=numpy.arange(len(lratios))# for the PM distributionself.lratios=lratios# for the PM distributionself.cols=cols# for the PM distribution
[docs]defget_losses(self,df,covs):vals=df['val'].to_numpy()ifnotself.rngornotcovs:# fast lanelosses=vals*df['mean'].to_numpy()else:# slow lanelosses=vals*getattr(self,'sample'+self.distname)(df)returnlosses
[docs]defsamplePM(self,df):eids=df['eid'].to_numpy()allprobs=df[self.cols].to_numpy()pmf=[]foreid,probsinzip(eids,allprobs):# probs by assetifprobs.sum()==0:# oq-risk-tests/case_1g# means are zeros for events below the thresholdpmf.append(0)else:pmf.append(stats.rv_discrete(name='pmf',values=(self.arange,probs),seed=self.rng.master_seed+eid).rvs())returnself.lratios[pmf]
## Input models#
[docs]classVulnerabilityFunction(object):dtype=numpy.dtype([('iml',F64),('loss_ratio',F64),('cov',F64)])seed=None# to be overriddenkind='vulnerability'def__init__(self,vf_id,imt,imls,mean_loss_ratios,covs=None,distribution="LN"):""" A wrapper around a probabilistic distribution function (currently, the Log-normal ("LN") and Beta ("BT") distributions are supported amongst the continuous probability distributions. For specifying a discrete probability distribution refer to the class VulnerabilityFunctionWithPMF. It is meant to be pickeable to allow distributed computation. The only important method is `.__call__`, which applies the vulnerability function to a given set of ground motion fields and epsilons and return a loss matrix with N x R elements. :param str vf_id: Vulnerability Function ID :param str imt: Intensity Measure Type as a string :param list imls: Intensity Measure Levels for the vulnerability function. All values must be >= 0.0, values must be arranged in ascending order with no duplicates :param list mean_loss_ratios: Mean Loss ratio values, equal in length to imls, where value >= 0. :param list covs: Coefficients of Variation. Equal in length to mean loss ratios. All values must be >= 0.0. :param str distribution_name: The probabilistic distribution related to this function. """self.id=vf_idself.imt=imtself._check_vulnerability_data(imls,mean_loss_ratios,covs,distribution)self.imls=numpy.array(imls)self.mean_loss_ratios=numpy.array(mean_loss_ratios)self.retro=FalseifcovsisnotNone:self.covs=numpy.array(covs)else:self.covs=numpy.zeros(self.imls.shape)forlr,covinzip(self.mean_loss_ratios,self.covs):iflr==0andcov>0:msg=("It is not valid to define a mean loss ratio = 0 ""with a corresponding coefficient of variation > 0")raiseValueError(msg)ifcov<0:raiseValueError('Found a negative coefficient of variation in %s'%self.covs)ifdistribution=='BT':iflr==0:# possible with cov == 0passeliflr>1:raiseValueError('The meanLRs must be below 1, got %s'%lr)elifcov**2>1/lr-1:# see https://github.com/gem/oq-engine/issues/4841raiseValueError('The coefficient of variation %s > %s does not ''satisfy the requirement 0 < sig < sqrt[mu × (1 - mu)]'' in %s'%(cov,numpy.sqrt(1/lr-1),self))self.distribution_name=distribution
[docs]definit(self):# called by CompositeRiskModel and by __setstate__self.covs=F64(self.covs)self.mean_loss_ratios=F64(self.mean_loss_ratios)self._stddevs=self.covs*self.mean_loss_ratiosself._mlr_i1d=interpolate.interp1d(self.imls,self.mean_loss_ratios)self._covs_i1d=interpolate.interp1d(self.imls,self.covs)
[docs]definterpolate(self,gmf_df,col):""" :param gmf_df: DataFrame of GMFs :returns: DataFrame of interpolated loss ratios and covs """gmvs=gmf_df[col].to_numpy()dic=dict(eid=gmf_df.eid.to_numpy(),mean=numpy.zeros(len(gmvs)),cov=numpy.zeros(len(gmvs)))# gmvs are clipped to max(iml)gmvs_curve=numpy.piecewise(gmvs,[gmvs>self.imls[-1]],[self.imls[-1],lambdax:x])ok=gmvs_curve>=self.imls[0]# indices over the minimumcurve_ok=gmvs_curve[ok]dic['mean'][ok]=self._mlr_i1d(curve_ok)dic['cov'][ok]=self._cov_for(curve_ok)returnpandas.DataFrame(dic,gmf_df.sid)
[docs]defsurvival(self,loss_ratio,mean,stddev):""" Compute the survival probability based on the underlying distribution. """# scipy does not handle correctly the limit case stddev = 0.# In that case, when `mean` > 0 the survival function# approaches to a step function, otherwise (`mean` == 0) we# returns 0ifstddev==0:returnnumpy.piecewise(loss_ratio,[loss_ratio>meanornotmean],[0,1])ifself.distribution_name=='LN':variance=stddev**2.0sigma=numpy.sqrt(numpy.log((variance/mean**2.0)+1.0))mu=mean**2.0/numpy.sqrt(variance+mean**2.0)returnstats.lognorm.sf(loss_ratio,sigma,scale=mu)elifself.distribution_name=='BT':returnstats.beta.sf(loss_ratio,*_alpha_beta(mean,stddev))else:raiseNotImplementedError(self.distribution_name)
def__call__(self,asset_df,gmf_df,col,rng=None,minloss=0):""" :param asset_df: a DataFrame with A assets :param gmf_df: a DataFrame of GMFs for the given assets :param col: GMF column associated to the IMT (i.e. "gmv_0") :param rng: a MultiEventRNG or None :returns: a DataFrame with columns eid, aid, loss """ifasset_dfisNone:# in the testsasset_df=pandas.DataFrame(dict(aid=0,val=1),[0])ratio_df=self.interpolate(gmf_df,col)# really fastifself.distribution_name=='PM':# special caselratios=F64(self.loss_ratios)cols=[colforcolinratio_df.columnsifisinstance(col,int)]else:lratios=()cols=Nonedf=ratio_df.join(asset_df,how='inner')sampler=Sampler(self.distribution_name,rng,lratios,cols)covs=nothasattr(self,'covs')orself.covs.any()losses=sampler.get_losses(df,covs)ok=losses>minlossifself.distribution_name=='PM':# special casevariances=numpy.zeros(len(losses))else:variances=(losses*df['cov'].to_numpy())**2returnpandas.DataFrame(dict(eid=df.eid[ok],aid=df.aid[ok],variance=variances[ok],loss=losses[ok]))
[docs]defstrictly_increasing(self):""" :returns: a new vulnerability function that is strictly increasing. It is built by removing piece of the function where the mean loss ratio is constant. """imls,mlrs,covs=[],[],[]previous_mlr=Nonefori,mlrinenumerate(self.mean_loss_ratios):ifprevious_mlr==mlr:continueelse:mlrs.append(mlr)imls.append(self.imls[i])covs.append(self.covs[i])previous_mlr=mlrreturnself.__class__(self.id,self.imt,imls,mlrs,covs,self.distribution_name)
[docs]defmean_loss_ratios_with_steps(self,steps):""" Split the mean loss ratios, producing a new set of loss ratios. The new set of loss ratios always includes 0.0 and 1.0 :param int steps: the number of steps we make to go from one loss ratio to the next. For example, if we have [0.5, 0.7]:: steps = 1 produces [0.0, 0.5, 0.7, 1] steps = 2 produces [0.0, 0.25, 0.5, 0.6, 0.7, 0.85, 1] steps = 3 produces [0.0, 0.17, 0.33, 0.5, 0.57, 0.63, 0.7, 0.8, 0.9, 1] """loss_ratios=self.mean_loss_ratiosifmin(loss_ratios)>0.0:# prepend with a zeroloss_ratios=numpy.concatenate([[0.0],loss_ratios])ifmax(loss_ratios)<1.0:# append a 1.0loss_ratios=numpy.concatenate([loss_ratios,[1.0]])returnfine_graining(loss_ratios,steps)
def_cov_for(self,imls):""" Clip `imls` to the range associated with the support of the vulnerability function and returns the corresponding covariance values by linear interpolation. For instance if the range is [0.005, 0.0269] and the imls are [0.0049, 0.006, 0.027], the clipped imls are [0.005, 0.006, 0.0269]. """returnself._covs_i1d(numpy.piecewise(imls,[imls>self.imls[-1],imls<self.imls[0]],[self.imls[-1],self.imls[0],lambdax:x]))def__getstate__(self):return(self.id,self.imt,self.imls,self.mean_loss_ratios,self.covs,self.distribution_name,self.retro)def__setstate__(self,state):self.id=state[0]self.imt=state[1]self.imls=state[2]self.mean_loss_ratios=state[3]self.covs=state[4]self.distribution_name=state[5]self.retro=state[6]self.init()def_check_vulnerability_data(self,imls,loss_ratios,covs,distribution):assert_equal(imls,sorted(set(imls)))assertall(x>=0.0forxinimls)assertcovsisNoneorlen(covs)==len(imls)assertlen(loss_ratios)==len(imls)assertall(x>=0.0forxinloss_ratios)assertcovsisNoneorall(x>=0.0forxincovs)assertdistributionin["LN","BT","PM"]
[docs]@lru_cache()defloss_ratio_exceedance_matrix(self,loss_ratios):""" Compute the LREM (Loss Ratio Exceedance Matrix). """# LREM has number of rows equal to the number of loss ratios# and number of columns equal to the number of imlslrem=numpy.empty((len(loss_ratios),len(self.imls)))forrow,loss_ratioinenumerate(loss_ratios):forcol,(mean_loss_ratio,stddev)inenumerate(zip(self.mean_loss_ratios,self._stddevs)):lrem[row,col]=self.survival(loss_ratio,mean_loss_ratio,stddev)returnlrem
[docs]@lru_cache()defmean_imls(self):""" Compute the mean IMLs (Intensity Measure Level) for the given vulnerability function. :param vulnerability_function: the vulnerability function where the IMLs (Intensity Measure Level) are taken from. :type vuln_function: :py:class:`openquake.risklib.vulnerability_function.\ VulnerabilityFunction` """returnnumpy.array([max(0,self.imls[0]-(self.imls[1]-self.imls[0])/2.)]+[numpy.mean(pair)forpairinpairwise(self.imls)]+[self.imls[-1]+(self.imls[-1]-self.imls[-2])/2.])
[docs]classVulnerabilityFunctionWithPMF(VulnerabilityFunction):""" Vulnerability function with an explicit distribution of probabilities :param str vf_id: vulnerability function ID :param str imt: Intensity Measure Type :param imls: intensity measure levels (L) :param ratios: an array of mean ratios (M) :param probs: a matrix of probabilities of shape (M, L) """def__init__(self,vf_id,imt,imls,loss_ratios,probs):self.id=vf_idself.imt=imtself.retro=Falseself._check_vulnerability_data(imls,loss_ratios,probs)self.imls=imlsself.loss_ratios=loss_ratiosself.probs=probsself.distribution_name="PM"# to be set in .init(), called also by __setstate__(self._probs_i1d,self.distribution)=None,Noneself.init()ls=[('iml',F32)]+[('prob-%s'%lr,F32)forlrinloss_ratios]self._dtype=numpy.dtype(ls)
def__getstate__(self):return(self.id,self.imt,self.imls,self.loss_ratios,self.probs,self.distribution_name,self.retro)def__setstate__(self,state):self.id=state[0]self.imt=state[1]self.imls=state[2]self.loss_ratios=state[3]self.probs=state[4]self.distribution_name=state[5]self.retro=state[6]self.init()def_check_vulnerability_data(self,imls,loss_ratios,probs):assertall(x>=0.0forxinimls)assertall(x>=0.0forxinloss_ratios)assertall([1.0>=x>=0.0forxiny]foryinprobs)assertprobs.shape[0]==len(loss_ratios)assertprobs.shape[1]==len(imls)# MN: in the test gmvs_curve is of shape (5,), self.probs of shape (7, 8)# self.imls of shape (8,) and the returned means have shape (5, 7)
[docs]definterpolate(self,gmf_df,col):""" :param gmvs: DataFrame of GMFs :param col: name of the column to consider :returns: DataFrame of interpolated probabilities """# gmvs are clipped to max(iml)M=len(self.probs)gmvs=gmf_df[col].to_numpy()dic={m:numpy.zeros_like(gmvs)forminrange(M)}dic['eid']=gmf_df.eid.to_numpy()gmvs_curve=numpy.piecewise(gmvs,[gmvs>self.imls[-1]],[self.imls[-1],lambdax:x])ok=gmvs_curve>=self.imls[0]# indices over the minimumform,probsinenumerate(self._probs_i1d(gmvs_curve[ok])):dic[m][ok]=probsreturnpandas.DataFrame(dic,gmf_df.sid)
[docs]@lru_cache()defloss_ratio_exceedance_matrix(self,loss_ratios):""" Compute the LREM (Loss Ratio Exceedance Matrix). Required for the Classical Risk and BCR Calculators. Currently left unimplemented as the PMF format is used only for the Scenario and Event Based Risk Calculators. :param int steps: Number of steps between loss ratios. """
# TODO: to be implemented if the classical risk calculator# needs to support the pmf vulnerability formatdef__repr__(self):return'<VulnerabilityFunctionWithPMF(%s, %s)>'%(self.id,self.imt)
# this is meant to be instantiated by riskmodels.get_risk_functions
[docs]classVulnerabilityModel(dict):""" Container for a set of vulnerability functions. You can access each function given the IMT and taxonomy with the square bracket notation. :param str id: ID of the model :param str assetCategory: asset category (i.e. buildings, population) :param str lossCategory: loss type (i.e. structural, contents, ...) All such attributes are None for a vulnerability model coming from a NRML 0.4 file. """def__init__(self,id=None,assetCategory=None,lossCategory=None):self.id=idself.assetCategory=assetCategoryself.lossCategory=lossCategorydef__repr__(self):return'<%s%s%s>'%(self.__class__.__name__,self.lossCategory,sorted(self))
[docs]classFragilityFunctionContinuous(object):kind='fragility'def__init__(self,limit_state,mean,stddev,minIML,maxIML,nodamage=0):self.limit_state=limit_stateself.mean=meanself.stddev=stddevself.minIML=minIMLself.maxIML=maxIMLself.no_damage_limit=nodamagedef__call__(self,imls):""" Compute the Probability of Exceedance (PoE) for the given Intensity Measure Levels (IMLs). """# it is essentially to make a copy of the intensity measure levels,# otherwise the minIML feature in continuous fragility functions will# change the levels, thus breaking case_master for OQ_DISTRIBUTE=noifself.minIMLorself.maxIML:imls=numpy.array(imls)variance=self.stddev**2.0sigma=numpy.sqrt(numpy.log((variance/self.mean**2.0)+1.0))mu=self.mean**2.0/numpy.sqrt(variance+self.mean**2.0)ifself.maxIML:imls[imls>self.maxIML]=self.maxIMLifself.minIML:imls[imls<self.minIML]=self.minIMLresult=stats.lognorm.cdf(imls,sigma,scale=mu)ifself.no_damage_limit:result[imls<=self.no_damage_limit]=0returnresultdef__repr__(self):return'<%s(%s, %s, %s)>'%(self.__class__.__name__,self.limit_state,self.mean,self.stddev)
[docs]classFragilityFunctionDiscrete(object):kind='fragility'def__init__(self,limit_state,imls,poes,no_damage_limit=None):self.limit_state=limit_stateself.imls=imlsself.poes=poesiflen(imls)!=len(poes):raiseValueError('%s: %d levels but %d poes'%(limit_state,len(imls),len(poes)))self._interp=Noneself.no_damage_limit=no_damage_limit@propertydefinterp(self):ifself._interpisnotNone:returnself._interpself._interp=interpolate.interp1d(self.imls,self.poes,bounds_error=False)returnself._interpdef__call__(self,imls):""" Compute the Probability of Exceedance (PoE) for the given Intensity Measure Levels (IMLs). """highest_iml=self.imls[-1]imls=numpy.array(imls)ifimls.sum()==0.0:returnnumpy.zeros_like(imls)imls[imls>highest_iml]=highest_imlresult=self.interp(imls)ifself.no_damage_limit:result[imls<self.no_damage_limit]=0returnresult# so that the curve is pickeabledef__getstate__(self):returndict(limit_state=self.limit_state,poes=self.poes,imls=self.imls,_interp=None,no_damage_limit=self.no_damage_limit)def__eq__(self,other):return(self.poes==other.poesandself.imls==other.imlsandself.no_damage_limit==other.no_damage_limit)def__ne__(self,other):returnnotself==otherdef__repr__(self):return'<%s(%s, %s, %s)>'%(self.__class__.__name__,self.limit_state,self.imls,self.poes)
[docs]classFragilityFunctionList(list):""" A list of fragility functions with common attributes; there is a function for each limit state. """kind='fragility'# NB: the list is populated after instantiation by .append callsdef__init__(self,array,**attrs):self.array=arrayvars(self).update(attrs)
[docs]defmean_loss_ratios_with_steps(self,steps):"""For compatibility with vulnerability functions"""returnfine_graining(self.imls,steps)
[docs]defbuild(self,limit_states,discretization=20,steps_per_interval=1):""" :param limit_states: a sequence of limit states :param discretization: continouos fragility discretization parameter :param steps_per_interval: steps_per_interval parameter :returns: a populated FragilityFunctionList instance """new=copy.copy(self)new.clear()add_zero=(self.format=='discrete'andself.nodamageandself.nodamage<=self.imls[0])new.imls=build_imls(new,discretization)ifsteps_per_interval>1:new._interp_imls=build_imls(# passed to classical_damagenew,discretization,steps_per_interval)fori,lsinenumerate(limit_states):data=self.array[i]ifself.format=='discrete':ifadd_zero:iflen(self.imls)==len(data):# add no_damageimls=[self.nodamage]+self.imlselse:# already addedimls=self.imlsnew.append(FragilityFunctionDiscrete(ls,imls,numpy.concatenate([[0.],data]),self.nodamage))else:new.append(FragilityFunctionDiscrete(ls,self.imls,data,self.nodamage))else:# continuousnew.append(FragilityFunctionContinuous(ls,data[0],data[1],# mean and stddevself.minIML,self.maxIML,self.nodamage))returnnew
[docs]classConsequenceModel(dict):""" Dictionary of consequence functions. You can access each function given its name with the square bracket notation. :param str id: ID of the model :param str assetCategory: asset category (i.e. buildings, population) :param str lossCategory: loss type (i.e. structural, contents, ...) :param str description: description of the model :param limitStates: a list of limit state strings """kind='consequence'def__init__(self,id,assetCategory,lossCategory,description,limitStates):self.id=idself.assetCategory=assetCategoryself.lossCategory=lossCategoryself.description=descriptionself.limitStates=limitStatesdef__repr__(self):return'<%s%s%s%s>'%(self.__class__.__name__,self.lossCategory,', '.join(self.limitStates),' '.join(sorted(self)))
[docs]defbuild_imls(ff,continuous_fragility_discretization,steps_per_interval=0):""" Build intensity measure levels from a fragility function. If the function is continuous, they are produced simply as a linear space between minIML and maxIML. If the function is discrete, they are generated with a complex logic depending on the noDamageLimit and the parameter steps per interval. :param ff: a fragility function object :param continuous_fragility_discretization: .ini file parameter :param steps_per_interval: .ini file parameter :returns: generated imls """ifff.format=='discrete':imls=ff.imlsifff.nodamageandff.nodamage<imls[0]:imls=[ff.nodamage]+imlsifsteps_per_interval>1:gen_imls=fine_graining(imls,steps_per_interval)else:gen_imls=imlselse:# continuousgen_imls=numpy.linspace(ff.minIML,ff.maxIML,continuous_fragility_discretization)returngen_imls
# this is meant to be instantiated by riskmodels.get_fragility_model
[docs]classFragilityModel(dict):""" Container for a set of fragility functions. You can access each function given the IMT and taxonomy with the square bracket notation. :param str id: ID of the model :param str assetCategory: asset category (i.e. buildings, population) :param str lossCategory: loss type (i.e. structural, contents, ...) :param str description: description of the model :param limitStates: a list of limit state strings """def__init__(self,id,assetCategory,lossCategory,description,limitStates):self.id=idself.assetCategory=assetCategoryself.lossCategory=lossCategoryself.description=descriptionself.limitStates=limitStatesdef__repr__(self):return'<%s%s%s%s>'%(self.__class__.__name__,self.lossCategory,self.limitStates,sorted(self))
# ########################### random generators ###########################def_alpha_beta(mean,stddev):c=(1-mean)/stddev**2-1/meanreturnc*mean**2,c*(mean-mean**2)
[docs]classMultiEventRNG(object):""" An object ``MultiEventRNG(master_seed, eids, asset_correlation=0)`` has a method ``.get(A, eids)`` which returns a matrix of (A, E) normally distributed random numbers. If the ``asset_correlation`` is 1 the numbers are the same. >>> rng = MultiEventRNG( ... master_seed=42, eids=[0, 1, 2], asset_correlation=1) >>> eids = numpy.array([1] * 3) >>> means = numpy.array([.5] * 3) >>> covs = numpy.array([.1] * 3) >>> rng.lognormal(eids, means, covs) array([0.38892466, 0.38892466, 0.38892466]) >>> rng.beta(eids, means, covs) array([0.4372343 , 0.57308132, 0.56392573]) >>> fractions = numpy.array([[[.8, .1, .1]]]) >>> rng.discrete_dmg_dist([0], fractions, [10]) array([[[8, 2, 0]]], dtype=uint32) """def__init__(self,master_seed,eids,asset_correlation=0):self.master_seed=master_seedself.asset_correlation=asset_correlationself.rng={}foreidineids:# NB: int below is necessary for totally mysterious reasons:# a calculation on cluster1 #41904 failed with a floating# point seed, but I cannot reproduce the issueph=numpy.random.Philox(int(self.master_seed+eid))self.rng[eid]=numpy.random.Generator(ph)def_get_eps(self,eid,corrcache):ifself.asset_correlation:try:returncorrcache[eid]exceptKeyError:corrcache[eid]=eps=self.rng[eid].normal()returnepsreturnself.rng[eid].normal()
[docs]deflognormal(self,eids,means,covs):""" :param eids: event IDs :param means: array of floats in the range 0..1 :param covs: array of floats with the same shape :returns: array of floats """corrcache={}eps=numpy.array([self._get_eps(eid,corrcache)foreidineids])sigma=numpy.sqrt(numpy.log(1+covs**2))div=numpy.sqrt(1+covs**2)returnmeans*numpy.exp(eps*sigma)/div
# NB: asset correlation is ignored
[docs]defbeta(self,eids,means,covs):""" :param eids: event IDs :param means: array of floats in the range 0..1 :param covs: array of floats with the same shape :returns: array of floats following the beta distribution This function works properly even when some or all of the stddevs are zero: in that case it returns the means since the distribution becomes extremely peaked. It also works properly when some one or all of the means are zero, returning zero in that case. """# NB: you should not expect a smooth limit for the case of on cov->0# since the random number generator will advance of a different number# of steps with cov == 0 and cov != 0res=numpy.array(means)ok=(means!=0)&(covs!=0)# nonsingular valuesalpha,beta=_alpha_beta(means[ok],means[ok]*covs[ok])res[ok]=[self.rng[eid].beta(alpha[i],beta[i])fori,eidinenumerate(eids[ok])]returnres
[docs]defdiscrete_dmg_dist(self,eids,fractions,numbers):""" Converting fractions into discrete damage distributions using bincount and random.choice. :param eids: E event IDs :param fractions: array of shape (A, E, D) :param numbers: A asset numbers :returns: array of integers of shape (A, E, D) """A,E,D=fractions.shapeassertlen(eids)==E,(len(eids),E)assertlen(numbers)==A,(len(eids),A)ddd=numpy.zeros(fractions.shape,U32)fore,eidinenumerate(eids):choice=self.rng[eid].choicefora,ninenumerate(numbers):frac=fractions[a,e]# shape Dstates=choice(D,n,p=frac/frac.sum())# n states# ex. [0, 0, 0, 1, 0, 0, 0, 1, 0, 0], 8 times 0, 2 times 1ddd[a,e]=numpy.bincount(states,minlength=D)returnddd
[docs]defboolean_dist(self,probs,num_sims):""" Convert E probabilities into an array of (E, S) booleans, being S the number of secondary simulations. >>> rng = MultiEventRNG(master_seed=42, eids=[0, 1, 2]) >>> dist = rng.boolean_dist(probs=[.1, .2, 0.], num_sims=100) >>> dist.sum(axis=1) # around 10% and 20% respectively array([12., 17., 0.]) """E=len(self.rng)assertlen(probs)==E,(len(probs),E)booldist=numpy.zeros((E,num_sims))fore,eid,probinzip(range(E),self.rng,probs):ifprob>0:booldist[e]=self.rng[eid].random(num_sims)<probreturnbooldist
[docs]defscenario_damage(fragility_functions,gmvs):""" :param fragility_functions: a list of D - 1 fragility functions :param gmvs: an array of E ground motion values :returns: an array of (D, E) damage fractions """lst=[numpy.ones_like(gmvs)]forf,ffinenumerate(fragility_functions):# D - 1 functionslst.append(ff(gmvs))lst.append(numpy.zeros_like(gmvs))# convert a (D + 1, E) array into a (D, E) arrayarr=pairwise_diff(numpy.array(lst))arr[arr<1E-7]=0# sanity checkreturnarr
## Classical Damage#
[docs]defannual_frequency_of_exceedence(poe,t_haz):""" :param poe: array of probabilities of exceedence in time t_haz :param t_haz: hazard investigation time :returns: array of frequencies (with +inf values where poe=1) """# replace 1 with the closest to 1 float64 number to avoid log(1-1)poe[poe==1.]=.9999999999999999return-numpy.log(1-poe)/t_haz
[docs]defprobability_of_exceedance(afoe,t_risk):""" :param afoe: array of annual frequencies of exceedence :param t_risk: risk investigation time :returns: array of probabilities of exceedance in time t_risk """return1-numpy.exp(-t_risk*afoe)
[docs]defclassical_damage(fragility_functions,hazard_imls,hazard_poes,investigation_time,risk_investigation_time,steps_per_interval=1):""" :param fragility_functions: a list of fragility functions for each damage state :param hazard_imls: Intensity Measure Levels :param hazard_poes: hazard curve :param investigation_time: hazard investigation time :param risk_investigation_time: risk investigation time :param steps_per_interval: steps per interval :returns: an array of D probabilities of occurrence where D is the numbers of damage states. """ifsteps_per_interval>1:# interpolateimls=numpy.array(fragility_functions._interp_imls)min_val,max_val=hazard_imls[0],hazard_imls[-1]assertmin_val>0,hazard_imls# sanity checkimls[imls<min_val]=min_valimls[imls>max_val]=max_valpoes=interpolate.interp1d(hazard_imls,hazard_poes)(imls)else:imls=hazard_imlspoes=numpy.array(hazard_poes)# convert the hazard probabilities of exceedance to# annual frequencies of exceedance, and then occurrenceafoes=annual_frequency_of_exceedence(poes,investigation_time)afoos=pairwise_diff(pairwise_mean([afoes[0]]+list(afoes)+[afoes[-1]]))poes_per_dmgstate=[]forffinfragility_functions:fx=afoos@ff(imls)poe_per_dmgstate=1.-numpy.exp(-fx*risk_investigation_time)poes_per_dmgstate.append(poe_per_dmgstate)poos=pairwise_diff([1]+poes_per_dmgstate+[0])returnpoos
## Classical Risk#
[docs]defclassical(vulnerability_function,hazard_imls,hazard_poes,loss_ratios,investigation_time,risk_investigation_time):""" :param vulnerability_function: an instance of :py:class:`openquake.risklib.scientific.VulnerabilityFunction` representing the vulnerability function used to compute the curve. :param hazard_imls: the hazard intensity measure type and levels :type hazard_poes: the hazard curve :param loss_ratios: a tuple of C loss ratios :param investigation_time: hazard investigation time :param risk_investigation_time: risk investigation time :returns: an array of shape (2, C) """assertlen(hazard_imls)==len(hazard_poes),(len(hazard_imls),len(hazard_poes))vf=vulnerability_functionimls=vf.mean_imls()lrem=vf.loss_ratio_exceedance_matrix(loss_ratios)# saturate imls to hazard imlsmin_val,max_val=hazard_imls[0],hazard_imls[-1]imls[imls<min_val]=min_valimls[imls>max_val]=max_val# interpolate the hazard curvepoes=interpolate.interp1d(hazard_imls,hazard_poes)(imls)ifabs((1-poes).mean())<1E-4:# flat curveraiseValueError('The hazard curve is flat (all ones) probably due to ''a (hazard) investigation time too large')# convert the hazard probabilities of exceedance ot annual# frequencies of exceedance, and then occurrenceafoes=annual_frequency_of_exceedence(poes,investigation_time)afoos=pairwise_diff(afoes)# compute the annual frequency of exceedance of the loss ratios# lrem = loss ratio exceedance matrixlr_afoes=numpy.empty(lrem.shape)foridx,afooinenumerate(afoos):lr_afoes[:,idx]=lrem[:,idx]*afoo# column * afoolr_poes=probability_of_exceedance(lr_afoes.sum(axis=1),risk_investigation_time)returnnumpy.array([loss_ratios,lr_poes])
# used in classical_risk only
[docs]defconditional_loss_ratio(loss_ratios,poes,probability):""" Return the loss ratio corresponding to the given PoE (Probability of Exceendance). We can have four cases: 1. If `probability` is in `poes` it takes the bigger corresponding loss_ratios. 2. If it is in `(poe1, poe2)` where both `poe1` and `poe2` are in `poes`, then we perform a linear interpolation on the corresponding losses 3. if the given probability is smaller than the lowest PoE defined, it returns the max loss ratio . 4. if the given probability is greater than the highest PoE defined it returns zero. :param loss_ratios: non-decreasing loss ratio values (float32) :param poes: non-increasing probabilities of exceedance values (float32) :param float probability: the probability value used to interpolate the loss curve """assertlen(loss_ratios)>=3,loss_ratiosprobability=numpy.float32(probability)ifnotisinstance(loss_ratios,numpy.ndarray):loss_ratios=numpy.float32(loss_ratios)ifnotisinstance(poes,numpy.ndarray):poes=numpy.float32(poes)rpoes=poes[::-1]ifprobability>poes[0]:# max poesreturn0.0elifprobability<poes[-1]:# min PoEreturnloss_ratios[-1]ifprobabilityinpoes:returnloss_ratios[probability==poes].max()else:interval_index=bisect.bisect_right(rpoes,probability)ifinterval_index==len(poes):# poes are all nanreturnfloat('nan')elifinterval_index==1:# boundary casex1,x2=poes[-2:]y1,y2=loss_ratios[-2:]else:x1,x2=poes[-interval_index-1:-interval_index+1]y1,y2=loss_ratios[-interval_index-1:-interval_index+1]return(y2-y1)/(x2-x1)*(probability-x1)+y1
## Insured Losses#
[docs]definsured_losses(losses,deductible,insurance_limit):""" :param losses: array of ground-up losses :param deductible: array of deductible values :param insurance_limit: array of insurance limit values Compute insured losses for the given asset and losses, from the point of view of the insurance company. For instance: >>> insured_losses(numpy.array([3, 20, 101]), ... numpy.array([5, 5, 5]), numpy.array([100, 100, 100])) array([ 0, 15, 95]) - if the loss is 3 (< 5) the company does not pay anything - if the loss is 20 the company pays 20 - 5 = 15 - if the loss is 101 the company pays 100 - 5 = 95 """assertisinstance(losses,numpy.ndarray),lossesifnotisinstance(deductible,numpy.ndarray):deductible=numpy.full_like(losses,deductible)ifnotisinstance(insurance_limit,numpy.ndarray):insurance_limit=numpy.full_like(losses,insurance_limit)assert(deductible<insurance_limit).all()small=losses<deductiblebig=losses>insurance_limitout=losses-deductibleout[small]=0.out[big]=insurance_limit[big]-deductible[big]returnout
[docs]definsurance_losses(asset_df,losses_by_lt,policy_df):""" :param asset_df: DataFrame of assets :param losses_by_lt: loss_type -> DataFrame[eid, aid, variance, loss] :param policy_df: a DataFrame of policies """asset_policy_df=asset_df.join(policy_df.set_index('policy'),on='policy',how='inner')forltinpolicy_df.loss_type.unique():if'+'inlt:# add a key like structural+nonstructural to losses_by_lttotal_losses(asset_df,losses_by_lt,lt)forlt,outinlist(losses_by_lt.items()):iflen(out)==0:continueadf=asset_policy_df[asset_policy_df.loss_type==lt]new=out[numpy.isin(out.aid,adf.index)]iflen(new)==0:continuenew['variance']=0.j=new.join(adf,on='aid',how='inner')if'+'inlt:lst=[j['value-'+ltype].to_numpy()forltypeinlt.split('+')]values=numpy.sum(lst,axis=0)# shape num_valueselse:values=j['value-'+lt].to_numpy()aids=j.aid.to_numpy()losses=j.loss.to_numpy()deds=j.deductible.to_numpy()*valueslims=j.insurance_limit.to_numpy()*valuesids_of_invalid_assets=aids[deds>lims]iflen(ids_of_invalid_assets):invalid_assets=set(ids_of_invalid_assets)raiseValueError(f"Please check deductible values. Values larger than the"f" insurance limit were found for asset(s) {invalid_assets}.")new['loss']=insured_losses(losses,deds,lims)losses_by_lt[lt+'_ins']=new
[docs]deftotal_losses(asset_df,losses_by_lt,kind,ideduc=False):""" :param asset_df: DataFrame of assets :param losses_by_lt: lt -> DataFrame[eid, aid] :param kind: kind of total loss (i.e. "structural+nonstructural") :param ideduc: if True compute the insurance claim """ltypes=kind.split('+')losses_by_lt[kind]=df=_agg([losses_by_lt[lt]forltinltypes])# event loss table eid aid variance lossifideduc:loss=df.loss.to_numpy()ideductible=asset_df.ideductible[df.aid].to_numpy()df=df.copy()df['loss']=numpy.maximum(loss-ideductible,0)losses_by_lt['claim']=df
[docs]definsurance_loss_curve(curve,deductible,insurance_limit):""" Compute an insured loss ratio curve given a loss ratio curve :param curve: an array 2 x R (where R is the curve resolution) :param float deductible: the deductible limit in fraction form :param float insurance_limit: the insured limit in fraction form >>> losses = numpy.array([3, 20, 101]) >>> poes = numpy.array([0.9, 0.5, 0.1]) >>> insurance_loss_curve(numpy.array([losses, poes]), 5, 100) array([[ 3. , 20. ], [ 0.85294118, 0.5 ]]) """losses,poes=curve[:,curve[0]<=insurance_limit]limit_poe=interpolate.interp1d(*curve,bounds_error=False,fill_value=1)(deductible)returnnumpy.array([losses,numpy.piecewise(poes,[poes>limit_poe],[limit_poe,lambdax:x])])
## Benefit Cost Ratio Analysis#
[docs]defbcr(eal_original,eal_retrofitted,interest_rate,asset_life_expectancy,asset_value,retrofitting_cost):""" Compute the Benefit-Cost Ratio. BCR = (EALo - EALr)(1-exp(-r*t))/(r*C) Where: * BCR -- Benefit cost ratio * EALo -- Expected annual loss for original asset * EALr -- Expected annual loss for retrofitted asset * r -- Interest rate * t -- Life expectancy of the asset * C -- Retrofitting cost """return((eal_original-eal_retrofitted)*asset_value*(1-numpy.exp(-interest_rate*asset_life_expectancy))/(interest_rate*retrofitting_cost))
[docs]defpla_factor(df):""" Post-Loss-Amplification factor interpolator. To be instantiated with a DataFrame with columns return_period and pla_factor. """factors=df.pla_factor.to_numpy()# ordered from 1 to maxvaluereturninterpolate.interp1d(df.return_period.to_numpy(),factors,bounds_error=False,fill_value=(1.,factors[-1]))
[docs]defpairwise_mean(values):""" Averages between a value and the next value in a sequence """returnnumpy.array([numpy.mean(pair)forpairinpairwise(values)])
[docs]defpairwise_diff(values,addlast=False):""" Differences between a value and the next value in a sequence. If addlast is set the last value is added to the difference, i.e. N values are returned instead of N-1. """diff=[x-yforx,yinpairwise(values)]ifaddlast:diff.append(values[-1])returnnumpy.array(diff)
[docs]defdds_to_poes(dmg_dists):""" Convert an array of damage distributions into an array of PoEs >>> dds_to_poes([[.7, .2, .1], [0., 0., 1.0]]) array([[1. , 0.3, 0.1], [1. , 1. , 1. ]]) """arr=numpy.fliplr(numpy.fliplr(dmg_dists).cumsum(axis=1))returnarr
[docs]defcompose_dds(dmg_dists):""" Compose an array of N damage distributions: >>> compose_dds([[.6, .2, .1, .1], [.5, .3 ,.1, .1]]) array([0.3 , 0.34, 0.17, 0.19]) """poes_per_dmgstate=general.pprod(dds_to_poes(dmg_dists),axis=0)returnpairwise_diff(poes_per_dmgstate,addlast=True)
[docs]defmean_std(fractions):""" Given an N x M matrix, returns mean and std computed on the rows, i.e. two M-dimensional vectors. """n=fractions.shape[0]ifn==1:# avoid warnings when computing the stddevreturnfractions[0],numpy.ones_like(fractions[0])*numpy.nanreturnnumpy.mean(fractions,axis=0),numpy.std(fractions,axis=0,ddof=1)
[docs]defloss_maps(curves,conditional_loss_poes):""" :param curves: an array of loss curves :param conditional_loss_poes: a list of conditional loss poes :returns: a composite array of loss maps with the same shape """loss_maps_dt=numpy.dtype([('poe-%s'%poe,F32)forpoeinconditional_loss_poes])loss_maps=numpy.zeros(curves.shape,loss_maps_dt)foridx,curveinnumpy.ndenumerate(curves):forpoeinconditional_loss_poes:loss_maps['poe-%s'%poe][idx]=conditional_loss_ratio(curve['losses'],curve['poes'],poe)returnloss_maps
[docs]defbroadcast(func,composite_array,*args):""" Broadcast an array function over a composite array """dic={}dtypes=[]fornameincomposite_array.dtype.names:dic[name]=func(composite_array[name],*args)dtypes.append((name,dic[name].dtype))res=numpy.zeros(dic[name].shape,numpy.dtype(dtypes))fornameindic:res[name]=dic[name]returnres
[docs]defaverage_loss(lc):""" Given a loss curve array with `poe` and `loss` fields, computes the average loss on a period of time. :note: As the loss curve is supposed to be piecewise linear as it is a result of a linear interpolation, we compute an exact integral by using the trapeizodal rule with the width given by the loss bin width. """losses,poes=(lc['loss'],lc['poe'])iflc.dtype.nameselselcreturn-pairwise_diff(losses)@pairwise_mean(poes)
[docs]defnormalize_curves_eb(curves):""" A more sophisticated version of normalize_curves, used in the event based calculator. :param curves: a list of pairs (losses, poes) :returns: first losses, all_poes """# we assume non-decreasing losses, so losses[-1] is the maximum lossnon_zero_curves=[(losses,poes)forlosses,poesincurvesiflosses[-1]>0]ifnotnon_zero_curves:# no damage. all zero curvesreturncurves[0][0],numpy.array([poesfor_losses,poesincurves])else:# standard casemax_losses=[losses[-1]forlosses,_poesinnon_zero_curves]reference_curve=non_zero_curves[numpy.argmax(max_losses)]loss_ratios=reference_curve[0]curves_poes=[interpolate.interp1d(losses,poes,bounds_error=False,fill_value=0)(loss_ratios)forlosses,poesincurves]# fix degenerated case with flat curveforcpincurves_poes:ifnumpy.isnan(cp[0]):cp[0]=0returnloss_ratios,numpy.array(curves_poes)
[docs]defreturn_periods(eff_time,num_losses):""" :param eff_time: ses_per_logic_tree_path * investigation_time :param num_losses: used to determine the minimum period :returns: an array of periods of dtype uint32 Here are a few examples: >>> return_periods(1, 1) Traceback (most recent call last): ... ValueError: eff_time too small: 1 >>> return_periods(2, 2) array([1, 2], dtype=uint32) >>> return_periods(2, 10) array([1, 2], dtype=uint32) >>> return_periods(100, 2) array([ 50, 100], dtype=uint32) >>> return_periods(1000, 1000) array([ 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000], dtype=uint32) """ifeff_time<2:raiseValueError('eff_time too small: %s'%eff_time)ifnum_losses<2:raiseValueError('num_losses too small: %s'%num_losses)min_time=eff_time/num_lossesperiod=1periods=[]loop=Truewhileloop:forvalin[1,2,5]:time=period*valiftime>=min_time:iftime>eff_time:loop=Falsebreakperiods.append(time)period*=10returnU32(periods)
[docs]defmaximum_probable_loss(losses,return_period,eff_time,sorting=True):""" :returns: Maximum Probable Loss at the given return period >>> losses = [1000., 0., 2000., 1500., 780., 900., 1700., 0., 100., 200.] >>> maximum_probable_loss(losses, 2000, 10_000) 900.0 """returnlosses_by_period(losses,[return_period],len(losses),eff_time,sorting)['curve'][0]
[docs]deffix_losses(orig_losses,num_events,eff_time=0,sorting=True):""" Possibly add zeros and sort the passed losses. :param orig_losses: an array of size num_losses :param num_events: an integer >= num_losses :returns: three arrays of size num_events """ifsorting:sorting_idxs=numpy.argsort(orig_losses)else:sorting_idxs=slice(None)sorted_losses=orig_losses[sorting_idxs]# add zeros on the left if there are less losses than events.num_losses=len(sorted_losses)ifnum_events>num_losses:losses=numpy.zeros(num_events,sorted_losses.dtype)losses[num_events-num_losses:num_events]=sorted_losseselifnum_losses==num_events:losses=sorted_losseselifnum_events<num_losses:raiseValueError('More losses (%d) than events (%d) ??'%(num_losses,num_events))eperiods=eff_time/numpy.arange(num_events,0.,-1)returnlosses,sorting_idxs,eperiods
[docs]deflosses_by_period(losses,return_periods,num_events,eff_time=None,sorting=True,name='curve',pla_factor=None):# NB: sorting = False is used in test_claim""" :param losses: simulated losses as an array, list or DataFrame column :param return_periods: return periods of interest :param num_events: the number of events (>= number of losses) :param eff_time: investigation_time * ses_per_logic_tree_path :returns: a dictionary with the interpolated losses for the return periods, possibly with NaNs and possibly also a post-loss-amplified curve NB: the return periods must be ordered integers >= 1. The interpolated losses are defined inside the interval min_time < time < eff_time where min_time = eff_time /num_events. On the right of the interval they have NaN values; on the left zero values. If num_events is not passed, it is inferred from the number of losses; if eff_time is not passed, it is inferred from the longest return period. Here is an example: >>> losses = [3, 2, 3.5, 4, 3, 23, 11, 2, 1, 4, 5, 7, 8, 9, 13] >>> losses_by_period(losses, [1, 2, 5, 10, 20, 50, 100], 20) {'curve': array([ 0. , 0. , 0. , 3.5, 8. , 13. , 23. ])} """P=len(return_periods)assertlen(losses)ifisinstance(losses,list):losses=numpy.array(losses)elifhasattr(losses,'to_numpy'):# DataFramelosses=losses.to_numpy()ifeff_timeisNone:eff_time=return_periods[-1]losses,_sorting_idxs,eperiods=fix_losses(losses,num_events,eff_time,sorting)num_left=sum(1forrpinreturn_periodsifrp<eperiods[0])num_right=sum(1forrpinreturn_periodsifrp>eperiods[-1])rperiods=[rpforrpinreturn_periodsifeperiods[0]<=rp<=eperiods[-1]]logr,loge=numpy.log(rperiods),numpy.log(eperiods)curve=numpy.zeros(len(return_periods),losses.dtype)curve[num_left:P-num_right]=numpy.interp(logr,loge,losses)curve[P-num_right:]=numpy.nanres={name:curve}ifpla_factor:pla=numpy.zeros(len(return_periods),losses.dtype)pla[num_left:P-num_right]=numpy.interp(logr,loge,losses*pla_factor(eperiods))pla[P-num_right:]=numpy.nanres['pla_'+name]=plareturnres
[docs]classLossCurvesMapsBuilder(object):""" Build losses curves and maps for all loss types at the same time. :param conditional_loss_poes: a list of PoEs, possibly empty :param return_periods: ordered array of return periods :param loss_dt: composite dtype for the loss types :param weights: weights of the realizations :param num_events: number of events for each realization :param eff_time: ses_per_logic_tree_path * hazard investigation time """def__init__(self,conditional_loss_poes,return_periods,loss_dt,weights,eff_time,risk_investigation_time,pla_factor=None):ifreturn_periods[-1]>eff_time:raiseValueError('The return_period %s is longer than the eff_time per rlz %s'%(return_periods[-1],eff_time))self.conditional_loss_poes=conditional_loss_poesself.return_periods=return_periodsself.loss_dt=loss_dtself.weights=weightsself.eff_time=eff_timeifreturn_periods.sum()==0:self.poes=1else:self.poes=1.-numpy.exp(-risk_investigation_time/return_periods)self.pla_factor=pla_factor# used in post_risk, for normal loss curves and reinsurance curves
[docs]defbuild_curve(self,years,col,losses,agg_types,loss_type,ne):""" Compute the requested curves (AEP and OEP curves only if years is not None) """# NB: agg_types can be the string "ep, aep, oep"periods=self.return_periodsdic={}agg_types_list=agg_types.split(', ')if'ep'inagg_types_list:res=losses_by_period(losses,periods,ne,self.eff_time,name=col,pla_factor=self.pla_factor)dic.update(res)iflen(years):gby=pandas.DataFrame(dict(year=years,loss=losses)).groupby('year')# see specs in https://github.com/gem/oq-engine/issues/8971if'aep'inagg_types_list:dic.update(losses_by_period(gby.loss.sum(),periods,ne,self.eff_time,name=col+'_aep',pla_factor=self.pla_factor))if'oep'inagg_types_list:dic.update(losses_by_period(gby.loss.max(),periods,ne,self.eff_time,name=col+'_oep',pla_factor=self.pla_factor))returndic
def_agg(loss_dfs,weights=None):# average loss DataFrames with fields (eid, aid, variance, loss)# NB: if there are weights the DataFrames are changed!!ifweightsisnotNone:forloss_df,winzip(loss_dfs,weights):loss_df['variance']*=wloss_df['loss']*=wreturnpandas.concat(loss_dfs).groupby(['aid','eid']).sum().reset_index()
[docs]classRiskComputer(dict):""" A callable dictionary of risk models able to compute average losses according to the taxonomy mapping. It also computes secondary losses *after* the average (this is a hugely simplifying approximation). :param crm: a CompositeRiskModel :param asset_df: a DataFrame of assets with the same taxonomy """def__init__(self,crm,taxidx,country_str='?'):oq=crm.oqparamself.D=len(crm.damage_states)self.P=len(crm.perils)self.calculation_mode=oq.calculation_modeself.loss_types=crm.loss_typesself.minimum_asset_loss=oq.minimum_asset_loss# lt->floatself.wdic={}# (riskid, peril) -> weighttm=crm.tmap_df[crm.tmap_df.taxi==taxidx]forcountry,peril,riskid,weightinzip(tm.country,tm.peril,tm.risk_id,tm.weight):ifcountry=='?'orcountry_strincountry:self[riskid]=crm._riskmodels[riskid]ifperil=='*':forperincrm.perils:self.wdic[riskid,per]=weightelse:self.wdic[riskid,peril]=weight
[docs]defoutput(self,asset_df,haz,sec_losses=(),rndgen=None):""" Compute averages by using the taxonomy mapping :param asset_df: assets on the same site with the same taxonomy :param haz: a DataFrame of GMFs or an array of PoEs :param sec_losses: a list of functions updating the loss dict :param rndgen: None or MultiEventRNG instance :yields: dictionaries {loss_type: loss_output} """dic=collections.defaultdict(list)# peril, lt -> outsweights=collections.defaultdict(list)# peril, lt -> weightsperils={'groundshaking'}forriskid,rminself.items():for(peril,lt),resinrm(asset_df,haz,rndgen).items():# res is an array of fractions of shape (A, E, D)weights[peril,lt].append(self.wdic[riskid,peril])dic[peril,lt].append(res)perils.add(peril)forperilinsorted(perils):out={}forltinself.minimum_asset_loss:outs=dic[peril,lt]iflen(outs)==0:# can happen for nonstructural_inscontinueeliflen(outs)>1andhasattr(outs[0],'loss'):# computing the average dataframe for event_based_risk/case_8out[lt]=_agg(outs,weights[peril,lt])eliflen(outs)>1:# for oq-risk-tests/test/event_based_damage/inputs/cali/job.iniout[lt]=numpy.average(outs,weights=weights[peril,lt],axis=0)else:out[lt]=outs[0]ifhasattr(haz,'eid'):# event basedforupdate_lossesinsec_losses:update_losses(asset_df,out)yieldout
[docs]defget_dd5(self,adf,gmf_df,rng=None,C=0,crm=None):""" :param adf: DataFrame of assets on the given site with the same taxonomy :param gmf_df: GMFs on the given site for E events :param rng: MultiEvent random generator or None :param C: Number of consequences :returns: damage distribution of shape (P, A, E, L, D+C) """A=len(adf)E=len(gmf_df)L=len(self.loss_types)D=self.Dassets=adf.to_records()ifrngisNone:number=assets['value-number']else:number=assets['value-number']=U32(assets['value-number'])dd5=numpy.zeros((self.P,A,E,L,D+C),F32)outs=self.output(adf,gmf_df)# dicts loss_type -> arrayforp,outinenumerate(outs):forli,ltinenumerate(self.loss_types):fractions=out[lt]# shape (A, E, Dc)ifrngisNone:forainrange(A):dd5[p,a,:,li,:D]=fractions[a]*number[a]else:# this is a performance distaster; for instance# the Messina test in oq-risk-tests becomes 12x# slower even if it has only 25_736 assetsdd5[p,:,:,li,:D]=rng.discrete_dmg_dist(gmf_df.eid,fractions,number)ifcrm:csqs=crm.get_consequences()df=crm.tmap_df[crm.tmap_df.taxi==assets[0]['taxonomy']]csq=crm.compute_csq(assets,dd5[:,:,:,:,:D],df,crm.oqparam)csqidx={dc:ifori,dcinenumerate(csqs,D)}for(cons,li),valuesincsq.items():dd5[:,:,:,li,csqidx[cons]]=values# (P, A, E)returndd5
[docs]deftodict(self):""" :returns: a literal dict describing the RiskComputer """rfdic={}forrminself.values():forperil,rfdictinrm.risk_functions.items():forlt,rfinrfdict.items():dic=ast.literal_eval(hdf5.obj_to_json(rf))ifgetattr(rf,'retro',False):retro=ast.literal_eval(hdf5.obj_to_json(rf.retro))dic['openquake.risklib.scientific.VulnerabilityFunction']['retro']=retrorfdic['%s#%s#%s'%(rf.peril,lt,rf.id)]=dicdic=dict(risk_functions=rfdic,calculation_mode=self.calculation_mode)ifany(malformalinself.minimum_asset_loss.values()):dic['minimum_asset_loss']=self.minimum_asset_lossifany(self.wdic[k]!=1forkinself.wdic):dic['wdic']={'%s#%s'%k:vfork,vinself.wdic.items()},returndic
[docs]defconsequence(consequence,assets,coeff,loss_type,time_event):""" :param consequence: kind of consequence :param assets: asset array (shape A) :param coeff: composite array of coefficients of shape (A, E) :param time_event: time event string :returns: array of shape (A, E) """ifconsequencenotinKNOWN_CONSEQUENCES:raiseNotImplementedError(consequence)ifconsequence.startswith('losses'):res=(assets['value-'+loss_type].reshape(-1,1)*coeff)/assets['value-number'].reshape(-1,1)returnreselifconsequencein['collapsed','non_operational']:returncoeffelifconsequencein['injured','fatalities']:# NOTE: time_event default is 'avg'values=assets[f'occupants_{time_event}']/assets['value-number']returnvalues.reshape(-1,1)*coeffelifconsequence=='homeless':values=assets['value-residents']/assets['value-number']returnvalues.reshape(-1,1)*coeffelse:raiseNotImplementedError(consequence)
[docs]defget_agg_value(consequence,agg_values,agg_id,xltype,time_event):""" :returns: sum of the values corresponding to agg_id for the given consequence """ifconsequencenotinKNOWN_CONSEQUENCES:raiseNotImplementedError(consequence)aval=agg_values[agg_id]ifconsequencein['collapsed','non_operational']:returnaval['number']elifconsequencein['injured','fatalities']:# NOTE: time_event default is 'avg'returnaval[f'occupants_{time_event}']elifconsequence=='homeless':returnaval['residents']elif'loss'inconsequence:ifxltype.endswith('_ins'):xltype=xltype[:-4]if'+'inxltype:# total loss typereturnsum(aval[lt]forltinxltype.split('+'))try:returnaval[xltype]exceptValueError:# liquefaction, landslidereturn0else:raiseNotImplementedError(consequence)
if__name__=='__main__':# plots of the beta distribution in terms of mean and stddev# see https://en.wikipedia.org/wiki/Beta_distributionimportmatplotlib.pyplotaspltx=numpy.arange(0,1,.01)defbeta(mean,stddev):a,b=_alpha_beta(numpy.array([mean]*100),numpy.array([stddev]*100))returnstats.beta.pdf(x,a,b)rng=MultiEventRNG(42,[1])ones=numpy.ones(100)vals=rng.beta(1,.5*ones,.05*ones)print(vals.mean(),vals.std())# print(vals)vals=rng.beta(1,.5*ones,.01*ones)print(vals.mean(),vals.std())# print(vals)plt.plot(x,beta(.5,.05),label='.5[.05]')plt.plot(x,beta(.5,.01),label='.5[.01]')plt.legend()plt.show()