# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (c) 2016-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importcopyimportloggingimportwarningsimportnumpyimportpandasimportnumbafromopenquake.baselib.generalimportcached_property,humansizefromopenquake.baselib.performanceimportcompilefromopenquake.hazardlib.tomimportget_pnesU16=numpy.uint16U32=numpy.uint32F32=numpy.float32F64=numpy.float64BYTES_PER_FLOAT=8TWO20=2**20# 1 MBTWO24=2**24rates_dt=numpy.dtype([('sid',U32),('lid',U16),('gid',U16),('rate',F32)])
[docs]defget_mean_curve(dstore,imt,site_id=0):""" Extract the mean hazard curve from the datastore for the first site. """if'hcurves-stats'indstore:# shape (N, S, M, L1)arr=dstore.sel('hcurves-stats',stat='mean',imt=imt)else:# there is only 1 realizationarr=dstore.sel('hcurves-rlzs',rlz_id=0,imt=imt)returnarr[site_id,0,0]
[docs]defget_poe_from_mean_curve(dstore,imt,iml,site_id=0):""" Extract the poe corresponding to the given iml by looking at the mean curve for the given imt. `iml` can also be an array. """imls=dstore['oqparam'].imtls[imt]mean_curve=get_mean_curve(dstore,imt,site_id)returnnumpy.interp(imls,mean_curve)[iml]
# ######################### hazard maps ################################### ## cutoff value for the poeEPSILON=1E-30
[docs]defcompute_hazard_maps(curves,imls,poes):""" Given a set of hazard curve poes, interpolate hazard maps at the specified ``poes``. :param curves: Array of floats of shape N x L. Each row represents a curve, where the values in the row are the PoEs (Probabilities of Exceedance) corresponding to the ``imls``. Each curve corresponds to a geographical location. :param imls: Intensity Measure Levels associated with these hazard ``curves``. Type should be an array-like of floats. :param poes: Value(s) on which to interpolate a hazard map from the input ``curves``. :returns: An array of shape N x P, where N is the number of curves and P the number of poes. """P=len(poes)N,L=curves.shape# number of levelsifL!=len(imls):raiseValueError('The curves have %d levels, %d were passed'%(L,len(imls)))log_poes=numpy.log(poes)hmap=numpy.zeros((N,P))withwarnings.catch_warnings():warnings.simplefilter("ignore")# avoid RuntimeWarning: divide by zero for zero levelsimls=numpy.log(numpy.array(imls[::-1]))forn,curveinenumerate(curves):# the hazard curve, having replaced the too small poes with EPSILONlog_curve=numpy.log([max(poe,EPSILON)forpoeincurve[::-1]])forp,log_poeinenumerate(log_poes):iflog_poe>log_curve[-1]:# special case when the interpolation poe is bigger than the# maximum, i.e the iml must be smaller than the minimum;# extrapolate the iml to zero as per# https://bugs.launchpad.net/oq-engine/+bug/1292093;# then the hmap goes automatically to zeropasselse:# exp-log interpolation, to reduce numerical errors# see https://bugs.launchpad.net/oq-engine/+bug/1252770hmap[n,p]=numpy.exp(numpy.interp(log_poe,log_curve,imls))returnhmap
[docs]defcompute_hmaps(curvesNML,imtls,poes):""" :param curvesNML: an array of shape (N, M, L1) :param imlts: a DictArray with M keys :param poes: a sequence of P poes :returns: array of shape (N, M, P) with the hazard maps """N=len(curvesNML)M=len(imtls)P=len(poes)assertM==len(imtls)iml3=numpy.zeros((N,M,P))form,imlsinenumerate(imtls.values()):curves=curvesNML[:,m]iml3[:,m]=compute_hazard_maps(curves,imls,poes)returniml3
[docs]defcheck_hmaps(hcurves,imtls,poes):""" :param hcurves: hazard curves of shape (N, M, L1) :param imtls: a dictionary imt -> imls :param poes: a list of poes :param poes: P poes """N,M,_L1=hcurves.shapeassertM==len(imtls),(M,len(imtls))all_poes=[]forpoeinpoes:all_poes.extend([poe,poe*.99])form,(imt,imls)inenumerate(imtls.items()):hmaps=compute_hazard_maps(hcurves[:,m],imls,all_poes)# (N, 2*P)forsite_idinrange(N):forp,poeinenumerate(poes):iml=hmaps[site_id,p*2]iml99=hmaps[site_id,p*2+1]ifiml+iml99==0:# zero curvelogging.error(f'The {imt} hazard curve for {site_id=} cannot 'f'be inverted around {poe=}')continuerel_err=abs(iml-iml99)/abs(iml+iml99)ifrel_err>.05:logging.error(f'The {imt} hazard curve for {site_id=} cannot 'f'be inverted reliably around {poe=}')
# not used right now
[docs]defget_lvl(hcurve,imls,poe):""" :param hcurve: a hazard curve, i.e. array of L1 PoEs :param imls: L1 intensity measure levels :returns: index of the intensity measure level associated to the poe >>> imls = numpy.array([.1, .2, .3, .4]) >>> hcurve = numpy.array([1., .99, .90, .8]) >>> get_lvl(hcurve, imls, 1) 0 >>> get_lvl(hcurve, imls, .99) 1 >>> get_lvl(hcurve, imls, .91) 2 >>> get_lvl(hcurve, imls, .8) 3 """[[iml]]=compute_hazard_maps(hcurve.reshape(1,-1),imls,[poe])iml-=1E-10# small bufferreturnnumpy.searchsorted(imls,iml)
[docs]deffix_probs_occur(probs_occur):""" Try to convert object arrays into regular arrays """ifprobs_occur.dtype.name=='object':n=len(probs_occur)p=len(probs_occur[0])po=numpy.zeros((n,p))forp,probsinenumerate(probs_occur):po[p]=probs_occur[p]returnporeturnprobs_occur
[docs]classMapArray(object):""" Thin wrapper over a 3D-array of probabilities. """def__init__(self,sids,shape_y,shape_z,rates=False):self.sids=sidsself.shape=(len(sids),shape_y,shape_z)self.rates=rates@cached_propertydefsidx(self):""" :returns: an array of length N site_id -> index """idxs=numpy.zeros(self.sids.max()+1,numpy.uint32)foridx,sidinenumerate(self.sids):idxs[sid]=idxreturnidxs@propertydefsize_mb(self):returnself.array.nbytes/TWO20
[docs]defsplit(self):""" :yields: G MapArrays of shape (N, L, 1) """_N,L,G=self.shapeforginrange(G):new=self.__class__(self.sids,L,1).new(self.array[:,:,[g]])new.gids=[g]yieldnew
[docs]deffill(self,value):""" :param value: a scalar probability Fill the MapArray underlying array with the given scalar and build the .sidx array """assert0<=value<=1,valueself.array=numpy.empty(self.shape,F32)self.array.fill(value)returnself
[docs]defreshape(self,N,M,P):""" :returns: a new Pmap associated to a reshaped array """returnself.new(self.array.reshape(N,M,P))
# used in calc/disagg_test.py
[docs]defexpand(self,full_lt,trt_rlzs):""" Convert a MapArray with shape (N, L, Gt) into a MapArray with shape (N, L, R): works only for rates """N,L,Gt=self.array.shapeassertGt==len(trt_rlzs),(Gt,len(trt_rlzs))R=full_lt.get_num_paths()out=MapArray(range(N),L,R).fill(0.,F32)forg,trsinenumerate(trt_rlzs):forsidinrange(N):forrlzintrs%TWO24:out.array[sid,:,rlz]+=self.array[sid,:,g]# NB: for probabilities use# combine_probs(out.array[sid], self.array[sid, :, g], rlzs)returnout
# used in calc_hazard_curves
[docs]defconvert(self,imtls,nsites,idx=0):""" Convert a probability map into a composite array of length `nsites` and dtype `imtls.dt`. :param imtls: DictArray instance :param nsites: the total number of sites :param idx: index on the z-axis (default 0) """curves=numpy.zeros(nsites,imtls.dt)forimtincurves.dtype.names:curves[imt][self.sids]=self.array[:,imtls(imt),idx]returncurves
[docs]defto_rates(self,itime=1.):""" Convert into a map of rates unless the map already contains rates """ifself.rates:returnselfpnes=self.array# Physically, an extremely small intensity measure level can have an# extremely large probability of exceedence,however that probability# cannot be exactly 1 unless the level is exactly 0. Numerically,# the PoE can be 1 and this give issues when calculating the damage:# there is a log(0) in scientific.annual_frequency_of_exceedence.# Here we solve the issue by replacing the unphysical probabilities# 1 with .9999999999999999 (the float64 closest to 1).pnes[pnes==0.]=1.11E-16returnself.new(-numpy.log(pnes)/itime)
[docs]defto_array(self,gid):""" Assuming self contains an array of rates, returns a composite array with fields sid, lid, gid, rate """iflen(gid)==0:returnnumpy.array([],rates_dt)outs=[]fori,ginenumerate(gid):rates_g=self.array[:,:,i]outs.append(from_rates_g(rates_g,g,self.sids))iflen(outs)==1:returnouts[0]returnnumpy.concatenate(outs,dtype=rates_dt)
[docs]definterp4D(self,imtls,poes):""" :param imtls: a dictionary imt->imls with M items :param poes: a list of P PoEs :returns: an array of shape (N, M, P, Z) """poes3=self.arrayN,_L,Z=poes3.shapeM=len(imtls)P=len(poes)L1=len(imtls[next(iter(imtls))])hmap4=numpy.zeros((N,M,P,Z))form,imtinenumerate(imtls):slc=slice(m*L1,m*L1+L1)forzinrange(Z):hmap4[:,m,:,z]=compute_hazard_maps(poes3[:,slc,z],imtls[imt],poes)returnhmap4
# dangerous since it changes the shape by removing sites
[docs]defto_dframe(self):""" :returns: a DataFrame with fields sid, gid, lid, poe """G=self.array.shape[2]arr=self.to_rates().to_array(numpy.arange(G))returnpandas.DataFrame({name:arr[name]fornameinarr.dtype.names})
[docs]defupdate_indep(self,poes,ctxt,itime):""" Update probabilities for independent ruptures """rates=ctxt.occurrence_ratesidxs=self.sidx[ctxt.sids]ifself.rates:update_pmap_r(self.array,poes,rates,ctxt.probs_occur,sidxs,itime)else:update_pmap_i(self.array,poes,rates,ctxt.probs_occur,sidxs,itime)
[docs]defupdate_mutex(self,poes,ctxt,itime,mutex_weight):""" Update probabilities for mutex ruptures """rates=ctxt.occurrence_rateprobs_occur=fix_probs_occur(ctxt.probs_occur)sidxs=self.sidx[ctxt.sids]weights=numpy.array([mutex_weight[src_id,rup_id]forsrc_id,rup_idinzip(ctxt.src_id,ctxt.rup_id)])update_pmap_m(self.array,poes,rates,probs_occur,weights,sidxs,itime)
def__invert__(self):returnself.new(1.-self.array)def__pow__(self,n):returnself.new(self.array**n)def__iadd__(self,other):# used in calc.mean_ratessidx=self.sidx[other.sids]G=other.array.shape[2]# NLGfori,ginenumerate(other.gid):iadd(self.array[:,:,g],other.array[:,:,i%G],sidx)returnselfdef__repr__(self):tup=self.shape+(humansize(self.array.nbytes),)returnf'<{self.__class__.__name__}(%d, %d, %d)[%s]>'%tup
[docs]deffrom_rates_g(rates_g,g,sids):""" :param rates_g: an array of shape (N, L) :param g: an integer representing a GSIM index :param sids: an array of site IDs """outs=[]forlid,ratesinenumerate(rates_g.T):idxs,=rates.nonzero()iflen(idxs):out=numpy.zeros(len(idxs),rates_dt)out['sid']=sids[idxs]out['lid']=lidout['gid']=gout['rate']=rates[idxs]outs.append(out)ifnotouts:returnnumpy.array([],rates_dt)eliflen(outs)==1:returnouts[0]returnnumpy.concatenate(outs,dtype=rates_dt)
[docs]classRateMap:""" A kind of MapArray specifically for rates """sidx=MapArray.sidxsize_mb=MapArray.size_mb__repr__=MapArray.__repr__def__init__(self,sids,L,gids):self.sids=sidsself.shape=len(sids),L,len(gids)self.array=numpy.zeros(self.shape,F32)self.jid={g:jforj,ginenumerate(gids)}def__iadd__(self,other):G=self.shape[2]sidx=self.sidx[other.sids]fori,ginenumerate(other.gid):iadd(self.array[:,:,self.jid[g]],other.array[:,:,i%G],sidx)returnself
[docs]defto_array(self,g):""" Assuming self contains an array of rates, returns a composite array with fields sid, lid, gid, rate """rates_g=self.array[:,:,self.jid[g]]returnfrom_rates_g(rates_g,g,self.sids)