# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2012-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>."""Module :mod:`~openquake.hazardlib.calc.gmf` exports:func:`ground_motion_fields`."""importnumpyimportpandasfromopenquake.baselib.generalimportAccumDictfromopenquake.baselib.performanceimportMonitor,compilefromopenquake.hazardlib.constimportStdDevfromopenquake.hazardlib.source.ruptureimportEBRupture,get_eid_rlzfromopenquake.hazardlib.cross_correlationimportNoCrossCorrelationfromopenquake.hazardlib.contextsimportContextMaker,FarAwayRupturefromopenquake.hazardlib.imtimportfrom_stringU8=numpy.uint8U16=numpy.uint16U32=numpy.uint32I64=numpy.int64F32=numpy.float32
[docs]classCorrelationButNoInterIntraStdDevs(Exception):def__init__(self,corr,gsim):self.corr=corrself.gsim=gsimdef__str__(self):return'''\You cannot use the correlation model %s with the GSIM %s, \that defines only the total standard deviation. If you want to use a \correlation model you have to select a GMPE that provides the inter and \intra event standard deviations.'''%(self.corr.__class__.__name__,self.gsim.__class__.__name__)
[docs]@compile(["float64[:,:](float64[:,:], boolean)","float64[:](float64[:], boolean)","float64(float64, boolean)"])defexp(vals,notMMI):""" Exponentiate the values unless the IMT is MMI """ifnotMMI:returnnumpy.exp(vals)returnvals
[docs]@compile("(float32[:,:,:],float64[:,:],float64[:],float64[:],int64)")defset_max_min(array,mean,max_iml,min_iml,mmi_index):N,M,E=array.shape# manage max_imlforminrange(M):iml=max_iml[m]forninrange(N):# capping the gmv at the median value if val > max_iml[m]maxval=exp(mean[m,n],m!=mmi_index)foreinrange(E):val=array[n,m,e]ifval>iml:array[n,m,e]=maxval# manage min_imlforninrange(N):foreinrange(E):# set to zero only if all IMTs are below the thresholdsif(array[n,:,e]<min_iml).all():array[n,:,e]=0
[docs]defcalc_gmf_simplified(ebrupture,sitecol,cmaker):""" A simplified version of the GmfComputer for event based calculations. Used only for pedagogical purposes. Here is an example of usage: from unittest.mock import Mock import numpy from openquake.hazardlib import valid, contexts, site, geo from openquake.hazardlib.source.rupture import EBRupture, build_planar from openquake.hazardlib.calc.gmf import calc_gmf_simplified, GmfComputer imts = ['PGA'] rlzs = numpy.arange(3, dtype=numpy.uint32) rlzs_by_gsim = {valid.gsim('BooreAtkinson2008'): rlzs} lons = [0., 0.] lats = [0., 1.] siteparams = Mock(reference_vs30_value=760.) sitecol = site.SiteCollection.from_points(lons, lats, sitemodel=siteparams) hypo = geo.point.Point(0, .5, 20) rup = build_planar(hypo, mag=7., rake=0.) cmaker = contexts.simple_cmaker(rlzs_by_gsim, imts, truncation_level=3.) ebr = EBRupture(rup, 0, 0, n_occ=2, id=1) ebr.seed = 42 print(cmaker) print(sitecol.array) print(ebr) gmfa = calc_gmf_simplified(ebr, sitecol, cmaker) print(gmfa) # numbers considering the full site collection sites = site.SiteCollection.from_points([0], [1], sitemodel=siteparams) gmfa = calc_gmf_simplified(ebr, sites, cmaker) print(gmfa) # different numbers considering half of the site collection """N=len(sitecol)M=len(cmaker.imtls)[ctx]=cmaker.get_ctx_iter([ebrupture.rupture],sitecol)mean,_sig,tau,phi=cmaker.get_mean_stds([ctx])# shapes (G, M, N)rlzs=numpy.concatenate(list(cmaker.gsims.values()))_eid,rlz=get_eid_rlz(vars(ebrupture),rlzs,False)rng=numpy.random.default_rng(ebrupture.seed)cross_correl=NoCrossCorrelation(cmaker.truncation_level)ccdist=cross_correl.distributiongmfs=[]forg,(gs,rlzs)inenumerate(cmaker.gsims.items()):idxs,=numpy.where(numpy.isin(rlz,rlzs))E=len(idxs)# build arrays of random numbers of shape (M, N, E) and (M, E)intra_eps=[ccdist.rvs((N,E),rng)for_inrange(M)]eps=numpy.zeros((E,M),F32)eps[idxs]=cross_correl.get_inter_eps(cmaker.imtls,E,rng).Tgmf=numpy.zeros((M,N,E))form,imtinenumerate(cmaker.imtls):intra_res=phi[g,m,:,None]*intra_eps# shape (N, E)inter_res=tau[g,m,:,None]*eps[idxs,m]# shape (N, E)gmf[m]=numpy.exp(mean[g,m,:,None]+intra_res+inter_res)gmfs.append(gmf)returnnumpy.concatenate(gmfs)# shape (M, N, E)
[docs]classGmfComputer(object):""" Given an earthquake rupture, the GmfComputer computes ground shaking over a set of sites, by randomly sampling a ground shaking intensity model. :param rupture: EBRupture to calculate ground motion fields radiated from. :param :class:`openquake.hazardlib.site.SiteCollection` sitecol: a complete SiteCollection :param cmaker: a :class:`openquake.hazardlib.gsim.base.ContextMaker` instance :param correlation_model: Instance of a spatial correlation model object. See :mod:`openquake.hazardlib.correlation`. Can be ``None``, in which case non-correlated ground motion fields are calculated. Correlation model is not used if ``truncation_level`` is zero. :param cross_correl: Instance of a cross correlation model object. See :mod:`openquake.hazardlib.cross_correlation`. Can be ``None``, in which case non-cross-correlated ground motion fields are calculated. :param amplifier: None or an instance of Amplifier :param sec_perils: Tuple of secondary perils. See :mod:`openquake.hazardlib.sep`. Can be ``None``, in which case no secondary perils need to be evaluated. """mtp_dt=numpy.dtype([('rup_id',I64),('site_id',U32),('gsim_id',U16),('imt_id',U8),('mea',F32),('tau',F32),('phi',F32)])# The GmfComputer is called from the OpenQuake Engine. In that case# the rupture is an EBRupture instance containing a# :class:`openquake.hazardlib.source.rupture.Rupture` instance as an# attribute. Then the `.compute(gsim, num_events, ms)` method is called and# a matrix of size (M, N, E) is returned, where M is the number of# IMTs, N the number of affected sites and E the number of events. The# seed is extracted from the underlying rupture.def__init__(self,rupture,sitecol,cmaker,correlation_model=None,cross_correl=None,amplifier=None,sec_perils=()):iflen(sitecol)==0:raiseValueError('No sites')eliflen(cmaker.imtls)==0:raiseValueError('No IMTs')eliflen(cmaker.gsims)==0:raiseValueError('No GSIMs')self.cmaker=cmakerself.imts=[from_string(imt)forimtincmaker.imtls]self.cmaker=cmakerself.gsims=sorted(cmaker.gsims)self.correlation_model=correlation_modelself.amplifier=amplifierself.sec_perils=sec_perilsself.ebrupture=ruptureself.rup_id=rupture.idself.seed=rupture.seedrupture=rupture.rupture# the underlying rupturectxs=list(cmaker.get_ctx_iter([rupture],sitecol))ifnotctxs:raiseFarAwayRupture[self.ctx]=ctxsself.N=len(self.ctx)ifcorrelation_model:# store the filtered sitecolself.sites=sitecol.complete.filtered(self.ctx.sids)self.cross_correl=cross_correlorNoCrossCorrelation(cmaker.truncation_level)self.mea_tau_phi=[]self.gmv_fields=[str(imt)forimtincmaker.imts]self.mmi_index=-1form,imtinenumerate(cmaker.imtls):ifimt=='MMI':self.mmi_index=m
[docs]definit_eid_rlz_sig_eps(self):""" Initialize the attributes eid, rlz, sig, eps with shapes E, E, EM, EM """self.rlzs=numpy.concatenate(list(self.cmaker.gsims.values()))self.eid,self.rlz=get_eid_rlz(vars(self.ebrupture),self.rlzs,self.cmaker.scenario)self.E=E=len(self.eid)self.M=M=len(self.gmv_fields)self.sig=numpy.zeros((E,M),F32)# same for all eventsself.eps=numpy.zeros((E,M),F32)# not the same
[docs]defbuild_sig_eps(self,se_dt):""" :returns: a structured array of size E with fields (eid, rlz_id, sig_inter_IMT, eps_inter_IMT) """sig_eps=numpy.zeros(self.E,se_dt)sig_eps['eid']=self.eidsig_eps['rlz_id']=self.rlzform,imtinenumerate(self.cmaker.imtls):sig_eps[f'sig_inter_{imt}']=self.sig[:,m]sig_eps[f'eps_inter_{imt}']=self.eps[:,m]returnsig_eps
[docs]defupdate(self,data,array,rlzs,mean,max_iml=None):""" Updates the data dictionary with the values coming from the array of GMVs. Also indirectly updates the arrays .sig and .eps. """min_iml=self.cmaker.min_imlmag=self.ebrupture.rupture.magiflen(mean.shape)==3:# shape (M, N, 1) for conditioned gmfsmean=mean[:,:,0]ifmax_imlisNone:max_iml=numpy.full(self.M,numpy.inf,float)set_max_min(array,mean,max_iml,min_iml,self.mmi_index)data['gmv'].append(array)ifself.sec_perils:n=0forrlzinrlzs:eids=self.eid[self.rlz==rlz]E=len(eids)fore,eidinenumerate(eids):gmfa=array[:,:,n+e].T# shape (M, N)forspinself.sec_perils:o=sp.compute(mag,zip(self.imts,gmfa),self.ctx)foroutkey,outarrinzip(sp.outputs,o):data[outkey].append(outarr)n+=E
[docs]defstrip_zeros(self,data):""" :returns: a DataFrame with the nonzero GMVs """# building an array of shape (3, NE)eid_sid_rlz=build_eid_sid_rlz(self.rlzs,self.ctx.sids,self.eid,self.rlz)forkey,valinsorted(data.items()):data[key]=numpy.concatenate(data[key],axis=-1,dtype=F32)gmv=data.pop('gmv')# shape (N, M, E)ok=gmv.sum(axis=1).T.reshape(-1)>0form,gmv_fieldinenumerate(self.gmv_fields):data[gmv_field]=gmv[:,m].T.reshape(-1)# build dataframedf=pandas.DataFrame(data)df['eid']=eid_sid_rlz[0]df['sid']=eid_sid_rlz[1]df['rlz']=eid_sid_rlz[2]# remove the rows with all zero valuesreturndf[ok]
[docs]defcompute_all(self,mean_stds=None,max_iml=None,mmon=Monitor(),cmon=Monitor(),umon=Monitor()):""" :returns: DataFrame with fields eid, rlz, sid, gmv_X, ... """conditioned=mean_stdsisnotNoneself.init_eid_rlz_sig_eps()rng=numpy.random.default_rng(self.seed)data=AccumDict(accum=[])forg,(gs,rlzs)inenumerate(self.cmaker.gsims.items()):gs.gid=self.cmaker.gid[g]idxs,=numpy.where(numpy.isin(self.rlz,rlzs))E=len(idxs)ifE==0:# crucial for performancecontinueifmean_stdsisNone:withmmon:ms=self.cmaker.get_4MN([self.ctx],gs)else:# conditionedms=(mean_stds[0][g],mean_stds[1][g],mean_stds[2][g])withcmon:E=len(idxs)result=numpy.zeros((len(self.imts),len(self.ctx.sids),E),F32)ccdist=self.cross_correl.distributionifconditioned:intra_eps=[None]*self.Melse:# arrays of random numbers of shape (M, N, E) and (M, E)intra_eps=[ccdist.rvs((self.N,E),rng)for_inrange(self.M)]self.eps[idxs]=self.cross_correl.get_inter_eps(self.imts,E,rng).Tform,imtinenumerate(self.imts):try:result[m]=self._compute([arr[m]forarrinms],m,imt,gs,intra_eps[m],idxs,rng)exceptExceptionasexc:raiseRuntimeError('(%s, %s, %s): %s'%(gs,imt,exc.__class__.__name__,exc)).with_traceback(exc.__traceback__)ifself.amplifier:self.amplifier.amplify_gmfs(self.ctx.ampcode,result,self.imts,self.seed)withumon:result=result.transpose(1,0,2)# shape (N, M, E)self.update(data,result,rlzs,ms[0],max_iml)withumon:returnself.strip_zeros(data)
def_compute(self,mean_stds,m,imt,gsim,intra_eps,idxs,rng=None):iflen(mean_stds)==3:# conditioned GMFs# mea, tau, phi with shapes (N,1), (N,N), (N,N)mu_Y,cov_WY_WY,cov_BY_BY=mean_stdsE=len(idxs)eps=self.cmaker.oq.correlation_cutoffifself.cmaker.truncation_level<=1E-9:gmf=exp(mu_Y,imt.string!="MMI")gmf=gmf.repeat(E,axis=1)else:# add a cutoff to remove negative eigenvaluescov_Y_Y=cov_WY_WY+cov_BY_BY+numpy.eye(len(cov_WY_WY))*epsarr=rng.multivariate_normal(mu_Y.flatten(),cov_Y_Y,size=E,check_valid="raise",tol=1e-5,method="cholesky")gmf=exp(arr,imt!="MMI").Treturngmf# shapes (N, E)# regular case, sets self.sig, returns gmfim=imt.stringmean,sig,tau,phi=mean_stds# shapes Nifself.cmaker.oq.mea_tau_phi:min_iml=self.cmaker.min_iml[m]gmv=numpy.exp(mean)fors,sidinenumerate(self.ctx.sids):ifgmv[s]>min_iml:self.mea_tau_phi.append((self.rup_id,sid,gsim.gid,m,mean[s],tau[s],phi[s]))ifself.cmaker.truncation_level<=1E-9:# for truncation_level = 0 there is only mean, no stdsifself.correlation_model:raiseValueError('truncation_level=0 requires ''no correlation model')gmf=exp(mean,im!='MMI')[:,None].repeat(len(idxs),axis=1)elifgsim.DEFINED_FOR_STANDARD_DEVIATION_TYPES=={StdDev.TOTAL}:# If the GSIM provides only total standard deviation, we need# to compute mean and total standard deviation at the sites# of interest.# In this case, we also assume no correlation model is used.ifself.correlation_model:raiseCorrelationButNoInterIntraStdDevs(self.correlation_model,gsim)gmf=exp(mean[:,None]+sig[:,None]*intra_eps,im!='MMI')self.sig[idxs,m]=numpy.nanelse:# the [:, None] is used to implement multiplication by row;# for instance if a = [1 2], b = [[1 2] [3 4]] then# a[:, None] * b = [[1 2] [6 8]] which is the expected result;# otherwise one would get multiplication by column [[1 4] [3 8]]intra_res=phi[:,None]*intra_eps# shape (N, E)ifself.correlation_modelisnotNone:intra_res=self.correlation_model.apply_correlation(self.sites,imt,intra_res,phi)iflen(intra_res.shape)==1:# a vectorintra_res=intra_res[:,None]inter_res=tau[:,None]*self.eps[idxs,m]# shape (N, 1) * E => (N, E)gmf=exp(mean[:,None]+intra_res+inter_res,im!='MMI')self.sig[idxs,m]=tau.max()# from shape (N, 1) => scalarreturngmf# shapes (N, E)
# this is not used in the engine; it is still useful for usage in IPython# when demonstrating hazardlib capabilities
[docs]defground_motion_fields(rupture,sites,imts,gsim,truncation_level,realizations,correlation_model=None,seed=0):""" Given an earthquake rupture, the ground motion field calculator computes ground shaking over a set of sites, by randomly sampling a ground shaking intensity model. A ground motion field represents a possible 'realization' of the ground shaking due to an earthquake rupture. .. note:: This calculator is using random numbers. In order to reproduce the same results numpy random numbers generator needs to be seeded, see http://docs.scipy.org/doc/numpy/reference/generated/numpy.random.seed.html :param openquake.hazardlib.source.rupture.Rupture rupture: Rupture to calculate ground motion fields radiated from. :param openquake.hazardlib.site.SiteCollection sites: Sites of interest to calculate GMFs. :param imts: List of intensity measure type objects (see :mod:`openquake.hazardlib.imt`). :param gsim: Ground-shaking intensity model, instance of subclass of either :class:`~openquake.hazardlib.gsim.base.GMPE` or :class:`~openquake.hazardlib.gsim.base.IPE`. :param truncation_level: Float, number of standard deviations for truncation of the intensity distribution :param realizations: Integer number of GMF simulations to compute. :param correlation_model: Instance of correlation model object. See :mod:`openquake.hazardlib.correlation`. Can be ``None``, in which case non-correlated ground motion fields are calculated. Correlation model is not used if ``truncation_level`` is zero. :param int seed: The seed used in the numpy random number generator :returns: Dictionary mapping intensity measure type objects (same as in parameter ``imts``) to 2d numpy arrays of floats, representing different simulations of ground shaking intensity for all sites in the collection. First dimension represents sites and second one is for simulations. """cmaker=ContextMaker(rupture.tectonic_region_type,{gsim:U32([0])},dict(truncation_level=truncation_level,imtls={str(imt):numpy.array([0.])forimtinimts}))cmaker.scenario=Trueebr=EBRupture(rupture,source_id=0,trt_smr=0,n_occ=realizations,id=0,e0=0)ebr.seed=seedN,E=len(sites),realizationsgc=GmfComputer(ebr,sites,cmaker,correlation_model)df=gc.compute_all()res={}form,imtinenumerate(gc.imts):res[imt]=arr=numpy.zeros((N,E),F32)forsid,eid,gmvinzip(df.sid,df.eid,df[str(imt)]):arr[sid,eid]=gmvreturnres