# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2018-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importosimportabcimportcopyimporttimeimportloggingimportwarningsimportitertoolsimportoperatorimportcollectionsimportnumpyimportshapelyfromscipy.interpolateimportinterp1dfromopenquake.baselibimportconfigfromopenquake.baselib.generalimport(AccumDict,DictArray,RecordBuilder,split_in_slices,block_splitter,sqrscale)fromopenquake.baselib.performanceimportMonitor,split_array,kround0,compilefromopenquake.baselib.python3compatimportdecodefromopenquake.hazardlibimportvalid,imtasimt_modulefromopenquake.hazardlib.constimportStdDev,OK_COMPONENTSfromopenquake.hazardlib.tomimportNegativeBinomialTOM,PoissonTOMfromopenquake.hazardlib.statsimportndtr,truncnorm_sffromopenquake.hazardlib.siteimportSiteCollection,site_param_dtfromopenquake.hazardlib.calc.filtersimport(SourceFilter,IntegrationDistance,magdepdist,get_dparam,get_distances,getdefault,MINMAG,MAXMAG)fromopenquake.hazardlib.map_arrayimportMapArrayfromopenquake.hazardlib.geoimportmultilinefromopenquake.hazardlib.geo.meshimportMeshfromopenquake.hazardlib.geo.surface.planarimport(project,project_back,get_distances_planar)U8=numpy.uint8I32=numpy.int32U32=numpy.uint32F16=numpy.float16F32=numpy.float32F64=numpy.float64TWO20=2**20TWO16=2**16TWO24=2**24TWO32=2**32STD_TYPES=(StdDev.TOTAL,StdDev.INTER_EVENT,StdDev.INTRA_EVENT)KNOWN_DISTANCES=frozenset('''rrup rx_ry0 rx ry0 rjb rhypo repi rcdpp azimuthazimuthcp rvolc clon_clat clon clat'''.split())NUM_BINS=256DIST_BINS=sqrscale(80,1000,NUM_BINS)MEA=0STD=1EPS=float(os.environ.get('OQ_SAMPLE_SITES',1))bymag=operator.attrgetter('mag')# These coordinates were provided by M Gerstenberger (personal# communication, 10 August 2018)cshm_polygon=shapely.geometry.Polygon([(171.6,-43.3),(173.2,-43.3),(173.2,-43.9),(171.6,-43.9)])def_get(surfaces,param,dparam,mask=slice(None)):arr=numpy.array([dparam[sec.idx,param][mask]forsecinsurfaces])returnarr# shape (S, N, ...)def_get_tu(rup,dparam,mask):tor=rup.surface.torarr=_get(rup.surface.surfaces,'tuw',dparam,mask)S,N=arr.shape[:2]# keep the flipped values and then reorder the surface indices# arr has shape (S, N, 2, 3) where 2 refer to the flippingtuw=numpy.zeros((S,N,3),F32)forsinrange(S):idx=tor.soidx[s]flip=int(tor.flipped[idx])tuw[s]=arr[idx,:,flip,:]# shape (N, 3)returnmultiline.get_tu(tor.shift,tuw)
[docs]defset_distances(ctx,rup,r_sites,param,dparam,mask,tu):""" Set the distance attributes on the context; also manages paired attributes like clon_lat and rx_ry0. """ifdparamisNone:# no multifaultdists=get_distances(rup,r_sites,param)if'_'inparam:p0,p1=param.split('_')# clon_clatsetattr(ctx,p0,dists[:,0])setattr(ctx,p1,dists[:,1])else:setattr(ctx,param,dists)else:# use the MultiLine objectu_max=rup.surface.msparam['u_max']ifparamin('rx','ry0'):tut,uut=tu''' # sanity check with the right parameters t, u t, u = rup.surface.tor.get_tu(r_sites) numpy.testing.assert_allclose(tut, t) numpy.testing.assert_allclose(uut, u) '''ifparam=='rx':ctx.rx=tutelifparam=='ry0':neg=uut<0ctx.ry0[neg]=numpy.abs(uut[neg])big=uut>u_maxctx.ry0[big]=uut[big]-u_maxelifparam=='rjb':rjbs=_get(rup.surface.surfaces,'rjb',dparam,mask)ctx['rjb']=numpy.min(rjbs,axis=0)''' # sanity check with the right rjb rjb = rup.surface.get_joyner_boore_distance(r_sites) numpy.testing.assert_allclose(ctx.rjb, rjb) '''elifparam=='clon_clat':coos=_get(rup.surface.surfaces,'clon_clat',dparam,mask)# shape (numsections, numsites, 3)m=Mesh(coos[:,:,0],coos[:,:,1]).get_closest_points(r_sites)# shape (numsites, 3)ctx['clon']=m.lonsctx['clat']=m.lats
[docs]defis_modifiable(gsim):""" :returns: True if it is a ModifiableGMPE """returnhasattr(gsim,'gmpe')andhasattr(gsim,'params')
[docs]defconcat(ctxs):""" Concatenate context arrays. :returns: [] or [poisson_ctx] or [nonpoisson_ctx, ...] """ifnotctxs:return[]ctx=ctxs[0]out=[]# if ctx has probs_occur, it is assumed to be non-poissonianifhasattr(ctx,'probs_occur')andctx.probs_occur.shape[1]>=1:# case 27, 29, 62, 65, 75, 78, 80forshpinset(ctx.probs_occur.shape[1]forctxinctxs):p_array=[pforpinctxsifp.probs_occur.shape[1]==shp]out.append(numpy.concatenate(p_array).view(numpy.recarray))else:out.append(numpy.concatenate(ctxs).view(numpy.recarray))returnout
[docs]defsize(imtls):""" :returns: size of the dictionary of arrays imtls """imls=imtls[next(iter(imtls))]returnlen(imls)*len(imtls)
[docs]deftrivial(ctx,name):""" :param ctx: a recarray :param name: name of a parameter :returns: True if the parameter is missing or single valued """ifnamenotinctx.dtype.names:returnTruereturnlen(numpy.unique(numpy.float32(ctx[name])))==1
[docs]classOq(object):""" A mock for OqParam """af=Noneimpact=Falsecross_correl=Nonemea_tau_phi=Falsesplit_sources=Trueuse_rates=Falsewith_betw_ratio=Noneinfer_occur_rates=Falseinputs=()def__init__(self,**hparams):vars(self).update(hparams)@propertydefmin_iml(self):try:imtls=self.imtlsexceptAttributeError:imtls=self.hazard_imtlsreturnnumpy.array([1E-10forimtinimtls])
[docs]classDeltaRatesGetter(object):""" Read the delta rates from an aftershock datastore """def__init__(self,dstore):self.dstore=dstoredef__call__(self,src_id):withself.dstore.open('r')asdstore:returndstore['delta_rates'][src_id]
# same speed as performance.kround, round more
[docs]defkround1(ctx,kfields):kdist=2.*ctx.mag**2# heuristic collapse distance from 32 to 200 kmclose=ctx.rrup<kdistfar=~closeout=numpy.zeros(len(ctx),[(k,ctx.dtype[k])forkinkfields])forkfieldinkfields:kval=ctx[kfield]ifkfield=='vs30':out[kfield][close]=numpy.round(kval[close])# round lessout[kfield][far]=numpy.round(kval[far],1)# round moreelifkval.dtype==F64andkfield!='mag':out[kfield][close]=F16(kval[close])# round lessout[kfield][far]=numpy.round(kval[far])# round moreelse:out[kfield]=ctx[kfield]returnout
[docs]defkround2(ctx,kfields):kdist=5.*ctx.mag**2# from 80 to 500 kmclose=ctx.rrup<kdistfar=~closeout=numpy.zeros(len(ctx),[(k,ctx.dtype[k])forkinkfields])forkfieldinkfields:kval=ctx[kfield]ifkfield=='rx':# can be negativeout[kfield]=numpy.round(kval)elifkfieldinKNOWN_DISTANCES:out[kfield][close]=numpy.ceil(kval[close])# round to 1 kmout[kfield][far]=round_dist(kval[far])# round moreelifkfield=='vs30':out[kfield][close]=numpy.round(kval[close])# round lessout[kfield][far]=numpy.round(kval[far],1)# round moreelifkval.dtype==F64andkfield!='mag':out[kfield][close]=F16(kval[close])# round lessout[kfield][far]=numpy.round(kval[far])# round moreelse:out[kfield]=ctx[kfield]returnout
kround={0:kround0,1:kround1,2:kround2}
[docs]classFarAwayRupture(Exception):"""Raised if the rupture is outside the maximum distance for all sites"""
[docs]defget_num_distances(gsims):""" :returns: the number of distances required for the given GSIMs """dists=set()forgsimingsims:dists.update(gsim.REQUIRES_DISTANCES)returnlen(dists)
# NB: minimum_magnitude is ignoreddef_interp(param,name,trt):try:mdd=param[name]exceptKeyError:returnmagdepdist([(MINMAG,1000),(MAXMAG,1000)])ifisinstance(mdd,IntegrationDistance):returnmdd(trt)elifisinstance(mdd,dict):ifmdd:magdist=getdefault(mdd,trt)else:magdist=[(MINMAG,1000),(MAXMAG,1000)]returnmagdepdist(magdist)returnmdd
[docs]defsimple_cmaker(gsims,imts,**params):""" :returns: a simplified ContextMaker for use in the tests """dic=dict(imtls={imt:[0]forimtinimts})dic.update(**params)returnContextMaker('*',gsims,dic)
# ############################ genctxs ################################## ## generator of quartets (rup_index, mag, planar_array, sites)def_quartets(cmaker,src,sitecol,cdist,magdist,planardict):minmag=cmaker.maximum_distance.x[0]maxmag=cmaker.maximum_distance.x[-1]# splitting by magnitudeifsrc.count_nphc()==1:# one rupture per magnitudeform,(mag,pla)inenumerate(planardict.items()):ifminmag<=mag<=maxmag:yieldm,mag,pla,sitecolelse:form,rupinenumerate(src.iruptures()):mag=rup.magifmag>maxmagormag<minmag:continuearr=[rup.surface.array.reshape(-1,3)]pla=planardict[mag]# NB: having a good psdist is essential for performance!psdist=src.get_psdist(m,mag,cmaker.pointsource_distance,magdist)close=sitecol.filter(cdist<=psdist)far=sitecol.filter(cdist>psdist)ifcmaker.fewsites:ifcloseisNone:# all is far, common for small magyieldm,mag,arr,sitecolelse:# something is closeyieldm,mag,pla,sitecolelse:# many sitesifcloseisNone:# all is faryieldm,mag,arr,fareliffarisNone:# all is closeyieldm,mag,pla,closeelse:# some sites are far, some are closeyieldm,mag,arr,faryieldm,mag,pla,close# helper used to populate contexts for planar rupturesdef_get_ctx_planar(cmaker,zeroctx,mag,planar,sites,src_id,tom):# computing distancesrrup,xx,yy=project(planar,sites.xyz)# (3, U, N)# get the closest points on the surfaceifcmaker.fewsitesor'clon'incmaker.REQUIRES_DISTANCES:closest=project_back(planar,xx,yy)# (3, U, N)# set distanceszeroctx['rrup']=rrupforparincmaker.REQUIRES_DISTANCES-{'rrup'}:zeroctx[par]=get_distances_planar(planar,sites,par)forparincmaker.REQUIRES_DISTANCES:dst=zeroctx[par]ifcmaker.minimum_distance:dst[dst<cmaker.minimum_distance]=cmaker.minimum_distance# ctx has shape (U, N), ctxt (N, U)ctxt=zeroctx.T# smart trick taking advantage of numpy magicctxt['src_id']=src_id# setting rupture parametersforparincmaker.ruptparams:ifpar=='mag':ctxt[par]=magelifpar=='occurrence_rate':ctxt[par]=planar.wlr[:,2]# shape U-> (N, U)elifpar=='width':ctxt[par]=planar.wlr[:,0]elifpar=='strike':ctxt[par]=planar.sdr[:,0]elifpar=='dip':ctxt[par]=planar.sdr[:,1]elifpar=='rake':ctxt[par]=planar.sdr[:,2]elifpar=='ztor':# top edge depthctxt[par]=planar.corners[:,2,0]elifpar=='zbot':# bottom edge depthctxt[par]=planar.corners[:,2,3]elifpar=='hypo_lon':ctxt[par]=planar.hypo[:,0]elifpar=='hypo_lat':ctxt[par]=planar.hypo[:,1]elifpar=='hypo_depth':ctxt[par]=planar.hypo[:,2]ifcmaker.fewsites:zeroctx['clon']=closest[0]zeroctx['clat']=closest[1]# setting site parametersforparincmaker.siteparams:zeroctx[par]=sites.array[par]# shape N-> (U, N)ifhasattr(tom,'get_pmf'):# NegativeBinomialTOM# read Probability Mass Function from model and reshape it# into predetermined shape of probs_occurpmf=tom.get_pmf(planar.wlr[:,2],n_max=zeroctx['probs_occur'].shape[2])zeroctx['probs_occur']=pmf[:,numpy.newaxis,:]returnzeroctx.flatten()# shape N*U
[docs]defgenctxs_Pp(src,sitecol,cmaker):""" Context generator for point sources and collapsed point sources """dd=cmaker.defaultdict.copy()tom=getattr(src,'temporal_occurrence_model',None)iftomandisinstance(tom,NegativeBinomialTOM):ifhasattr(src,'pointsources'):# CollapsedPointSourcemaxrate=max(max(ps.mfd.occurrence_rates)forpsinsrc.pointsources)else:# regular sourcemaxrate=max(src.mfd.occurrence_rates)p_size=tom.get_pmf(maxrate).shape[1]dd['probs_occur']=numpy.zeros(p_size)else:dd['probs_occur']=numpy.zeros(0)builder=RecordBuilder(**dd)cmaker.siteparams=[parforparinsitecol.array.dtype.namesifparindd]cmaker.ruptparams=cmaker.REQUIRES_RUPTURE_PARAMETERS|{'occurrence_rate'}withcmaker.ir_mon:# building planar geometriesplanardict=src.get_planar(cmaker.shift_hypo)magdist={mag:cmaker.maximum_distance(mag)formag,rateinsrc.get_annual_occurrence_rates()}# cmaker.maximum_distance(mag) can be 0 if outside the mag rangemaxmag=max(magformag,distinmagdist.items()ifdist>0)maxdist=magdist[maxmag]cdist=sitecol.get_cdist(src.location)# NB: having a decent max_radius is essential for performance!mask=cdist<=maxdist+src.max_radius(maxdist)sitecol=sitecol.filter(mask)ifsitecolisNone:return[]formagi,mag,planarlist,sitesin_quartets(cmaker,src,sitecol,cdist[mask],magdist,planardict):ifnotplanarlist:continueeliflen(planarlist)>1:# when using ps_grid_spacingpla=numpy.concatenate(planarlist).view(numpy.recarray)else:pla=planarlist[0]offset=src.offset+magi*len(pla)zctx=builder.zeros((len(pla),len(sites)))# shape (N, U)ifcmaker.fewsites:rup_ids=zctx['rup_id'].T# numpy trick, shape (U, N)rup_ids[:]=numpy.arange(offset,offset+len(pla))# building contextsctx=_get_ctx_planar(cmaker,zctx,mag,pla,sites,src.id,tom)ctxt=ctx[ctx.rrup<magdist[mag]]iflen(ctxt):yieldctxt
def_build_dparam(src,sitecol,cmaker):dparams={'rjb','tuw'}ifcmaker.fewsites:dparams|={'clon_clat'}sections=src.get_sections(src.get_unique_idxs())out={}forsecinsections:out[sec.idx,'rrup']=get_dparam(sec,sitecol,'rrup')forparamindparams:out[sec.idx,param]=get_dparam(sec,sitecol,param)# use multi_fault_test to debug this# from openquake.baselib.general import getsizeof# print(getsizeof(out))returnout# this is the critical function for the performance of the classical calculator# the performance is dominated by the CPU cache, i.e. large arrays are slow# the only way to speedup is to reduce the maximum_distance, then the array# will become shorter in the N dimension (number of affected sites), or to# collapse the ruptures, then truncnorm_sf will be called less times@compile("(float64[:,:,:], float64[:,:], float64, float32[:,:])")def_set_poes(mean_std,loglevels,phi_b,out):L1=loglevels.size//len(loglevels)form,levelsinenumerate(loglevels):mL1=m*L1mea,std=mean_std[:,m]# shape Nforlvl,imlinenumerate(levels):out[mL1+lvl]=truncnorm_sf(phi_b,(iml-mea)/std)# ############################ ContextMaker ############################### #def_fix(gsimdict,betw):ifbetw:out={}forgsim,uintsingsimdict.items():iflen(gsim.DEFINED_FOR_STANDARD_DEVIATION_TYPES)==1:out[valid.modified_gsim(gsim,add_between_within_stds=betw)] \
=uintselse:out[gsim]=uintsreturnoutreturngsimdict
[docs]classContextMaker(object):""" A class to manage the creation of contexts and to compute mean/stddevs and possibly PoEs. :param trt: tectonic region type string :param gsims: list of GSIMs or a dictionary gsim -> rlz indices :param oq: dictionary of parameters like the maximum_distance, the IMTLs, the investigation time, etc, or an OqParam instance :param extraparams: additional site parameters to consider, used only in the tests NB: the trt can be different from the tectonic region type for which the underlying GSIMs are defined. This is intentional. """REQUIRES=['DISTANCES','SITES_PARAMETERS','RUPTURE_PARAMETERS']scenario=Falsedeltagetter=Nonefewsites=Falsetom=Nonecluster=None# set in PmapMakerdef__init__(self,trt,gsims,oq,monitor=Monitor(),extraparams=()):self.trt=trtifisinstance(oq,dict):# this happens when instantiating RuptureData in extract.pyparam=oqoq=Oq(**param)self.mags=param.get('mags',())# list of strings %.2fself.cross_correl=param.get('cross_correl')# cond_spectra_testelse:# OqParamparam=vars(oq)param['reqv']=oq.get_reqv()param['af']=getattr(oq,'af',None)self.cross_correl=oq.cross_correlself.imtls=oq.imtlstry:self.mags=oq.mags_by_trt[trt]exceptAttributeError:self.mags=()exceptKeyError:# missing TRT but there is only one[(_,self.mags)]=oq.mags_by_trt.items()ifoq.with_betw_ratio:betw_ratio={'with_betw_ratio':oq.with_betw_ratio}elifoq.impact:betw_ratio={'with_betw_ratio':1.7}# same as in GEESEelse:betw_ratio={}ifisinstance(gsims,dict):self.gsims=_fix(gsims,betw_ratio)else:self.gsims=_fix({gsim:U32([i])fori,gsiminenumerate(gsims)},betw_ratio)# NB: the gid array can be overridden later onself.gid=numpy.arange(len(gsims),dtype=numpy.uint16)self.oq=oqself.monitor=monitorself._init1(param)self._init2(param,extraparams)self.set_imts_conv()self.init_monitoring(self.monitor)def_init1(self,param):if'poes'inparam:self.poes=param['poes']if'imtls'inparam:forimtinparam['imtls']:ifnotisinstance(imt,str):raiseTypeError('Expected string, got %s'%type(imt))self.imtls=param['imtls']elif'hazard_imtls'inparam:self.imtls=imt_module.dictarray(param['hazard_imtls'])elifnothasattr(self,'imtls'):raiseKeyError('Missing imtls in ContextMaker!')self.cache_distances=param.get('cache_distances',False)self.max_sites_disagg=param.get('max_sites_disagg',10)self.time_per_task=param.get('time_per_task',60)self.collapse_level=int(param.get('collapse_level',-1))self.disagg_by_src=param.get('disagg_by_src',False)self.horiz_comp=param.get('horiz_comp_to_geom_mean',False)self.maximum_distance=_interp(param,'maximum_distance',self.trt)if'pointsource_distance'notinparam:self.pointsource_distance=float(config.performance.pointsource_distance)else:self.pointsource_distance=getdefault(param['pointsource_distance'],self.trt)self.minimum_distance=param.get('minimum_distance',0)self.investigation_time=param.get('investigation_time')self.ses_seed=param.get('ses_seed',42)self.ses_per_logic_tree_path=param.get('ses_per_logic_tree_path',1)self.truncation_level=param.get('truncation_level',99.)self.phi_b=ndtr(self.truncation_level)self.num_epsilon_bins=param.get('num_epsilon_bins',1)self.disagg_bin_edges=param.get('disagg_bin_edges',{})self.ps_grid_spacing=param.get('ps_grid_spacing')self.split_sources=self.oq.split_sourcesforgsiminself.gsims:ifhasattr(gsim,'set_tables'):iflen(self.mags)==0andnotis_modifiable(gsim):raiseValueError('You must supply a list of magnitudes as 2-digit ''strings, like mags=["6.00", "6.10", "6.20"]')gsim.set_tables(self.mags,self.imtls)def_init2(self,param,extraparams):forreqinself.REQUIRES:reqset=set()forgsiminself.gsims:reqset.update(getattr(gsim,'REQUIRES_'+req))ifgetattr(self.oq,'af',None)andreq=='SITES_PARAMETERS':reqset.add('ampcode')ifis_modifiable(gsim)andreq=='SITES_PARAMETERS':reqset.add('vs30')# required by the ModifiableGMPEreqset.update(gsim.gmpe.REQUIRES_SITES_PARAMETERS)if'apply_swiss_amplification'ingsim.params:reqset.add('amplfactor')if('apply_swiss_amplification_sa'ingsim.params):reqset.add('ch_ampl03')reqset.add('ch_ampl06')reqset.add('ch_phis2s03')reqset.add('ch_phis2s06')reqset.add('ch_phiss03')reqset.add('ch_phiss06')setattr(self,'REQUIRES_'+req,reqset)self.min_iml=self.oq.min_imlself.reqv=param.get('reqv')ifself.reqvisnotNone:self.REQUIRES_DISTANCES.add('repi')# NB: REQUIRES_DISTANCES is empty when gsims = [FromFile]REQUIRES_DISTANCES=self.REQUIRES_DISTANCES|{'rrup'}reqs=(sorted(self.REQUIRES_RUPTURE_PARAMETERS)+sorted(self.REQUIRES_SITES_PARAMETERS|set(extraparams))+sorted(REQUIRES_DISTANCES))dic={}forreqinreqs:ifreqinsite_param_dt:dt=site_param_dt[req]ifisinstance(dt,tuple):# (string_, size)dic[req]=b'X'*dt[1]else:dic[req]=dt(0)else:dic[req]=0.dic['src_id']=I32(0)dic['rup_id']=U32(0)dic['sids']=U32(0)dic['rrup']=F64(0)dic['occurrence_rate']=F64(0)self.defaultdict=dicself.shift_hypo=param.get('shift_hypo')
[docs]definit_monitoring(self,monitor):# instantiating child monitors, may be called in the workersself.pla_mon=monitor('planar contexts',measuremem=False)self.ctx_mon=monitor('nonplanar contexts',measuremem=False)self.gmf_mon=monitor('computing mean_std',measuremem=False)self.poe_mon=monitor('get_poes',measuremem=False)self.ir_mon=monitor('iter_ruptures',measuremem=False)self.sec_mon=monitor('building dparam',measuremem=True)self.delta_mon=monitor('getting delta_rates',measuremem=False)self.task_no=getattr(monitor,'task_no',0)self.out_no=getattr(monitor,'out_no',self.task_no)self.cfactor=numpy.zeros(2)
[docs]defcopy(self,**kw):""" :returns: a copy of the ContextMaker with modified attributes """new=copy.copy(self)fork,vinkw.items():setattr(new,k,v)if'imtls'inkw:new.set_imts_conv()returnnew
[docs]defrestrict(self,imts):""" :param imts: a list of IMT strings subset of the full list :returns: a new ContextMaker involving less IMTs """new=copy.copy(self)new.imtls=DictArray({imt:self.imtls[imt]forimtinimts})new.set_imts_conv()returnnew
[docs]defset_imts_conv(self):""" Set the .imts list and .conv dictionary for the horizontal component conversion (if any). Also set the .loglevels. """self.loglevels=DictArray(self.imtls)ifself.imtlselse{}withwarnings.catch_warnings():# avoid RuntimeWarning: divide by zero encountered in logwarnings.simplefilter("ignore")forimt,imlsinself.imtls.items():ifimt!='MMI':self.loglevels[imt]=numpy.log(imls)self.imts=tuple(imt_module.from_string(im)foriminself.imtls)self.conv={}# gsim -> imt -> (conv_median, conv_sigma, rstd)ifnotself.horiz_comp:return# do not convertforgsiminself.gsims:self.conv[gsim]={}imc=gsim.DEFINED_FOR_INTENSITY_MEASURE_COMPONENTifnotimc:# for GMPETablescontinueelifimc.name=='GEOMETRIC_MEAN':pass# nothing to doelifimc.nameinOK_COMPONENTS:dic={imt:imc.apply_conversion(imt)forimtinself.imts}self.conv[gsim].update(dic)else:logging.info(f'Conversion from {imc.name} not applicable to'f' {gsim.__class__.__name__}')
[docs]defsplit(self,blocksize):""" Split the ContextMaker by blocks of GSIMs """forgid,wei,gsimsinzip(block_splitter(self.gid,blocksize),block_splitter(self.wei,blocksize),block_splitter(self.gsims,blocksize)):new=copy.copy(self)new.gsims=gsimsnew.gid=gidnew.wei=weiyieldnew
[docs]defhoriz_comp_to_geom_mean(self,mean_stds,gsim):""" This function converts ground-motion obtained for a given description of horizontal component into ground-motion values for geometric_mean. The conversion equations used are from: - Beyer and Bommer (2006): for arithmetic mean, GMRot and random - Boore and Kishida (2017): for RotD50 """ifnotself.conv[gsim]:returnform,imtinenumerate(self.imts):me,si,_ta,_ph=mean_stds[:,m]conv_median,conv_sigma,rstd=self.conv[gsim][imt]me[:]=numpy.log(numpy.exp(me)/conv_median)si[:]=((si**2-conv_sigma**2)/rstd**2)**0.5
@propertydefZ(self):""" :returns: the number of realizations associated to self """returnsum(len(rlzs)forrlzsinself.gsims.values())
[docs]defnew_ctx(self,size):""" :returns: a recarray of the given size full of zeros """returnRecordBuilder(**self.defaultdict).zeros(size)
[docs]defrecarray(self,ctxs):""" :params ctxs: a non-empty list of homogeneous contexts :returns: a recarray, possibly collapsed """assertctxsdd=self.defaultdict.copy()ifnothasattr(ctxs[0],'probs_occur'):forctxinctxs:ctx.probs_occur=numpy.zeros(0)np=0else:shps=[ctx.probs_occur.shapeforctxinctxs]np=max(i[1]iflen(i)>1elsei[0]foriinshps)dd['probs_occur']=numpy.zeros(np)C=sum(len(ctx)forctxinctxs)ra=RecordBuilder(**dd).zeros(C)start=0forctxinctxs:ifself.minimum_distance:fornameinself.REQUIRES_DISTANCES:array=ctx[name]small_distances=array<self.minimum_distanceifsmall_distances.any():array=numpy.array(array)# make a copy firstarray[small_distances]=self.minimum_distancectx[name]=arrayslc=slice(start,start+len(ctx))forparindd:ifpar=='rup_id':val=getattr(ctx,par)else:val=getattr(ctx,par,numpy.nan)ifpar=='clon_clat':ra['clon'][slc]=ctx.clonra['clat'][slc]=ctx.clatelse:getattr(ra,par)[slc]=valra.sids[slc]=ctx.sidsstart=slc.stopreturnra
[docs]defget_ctx_params(self):""" :returns: the interesting attributes of the context """params={'occurrence_rate','sids','src_id','probs_occur','clon','clat','rrup'}params.update(self.REQUIRES_RUPTURE_PARAMETERS)fordparaminself.REQUIRES_DISTANCES:params.add(dparam+'_')returnparams
[docs]deffrom_planar(self,rup,hdist,step,point='TC',toward_azimuth=90.,direction='positive'):""" :param rup: a BaseRupture instance with a PlanarSurface and site parameters :returns: a context array for the sites around the rupture """sitecol=SiteCollection.from_planar(rup,point='TC',toward_azimuth=toward_azimuth,direction=direction,hdist=hdist,step=step,req_site_params=self.REQUIRES_SITES_PARAMETERS)ctxs=list(self.genctxs([rup],sitecol,src_id=0))returnself.recarray(ctxs)
[docs]deffrom_srcs(self,srcs,sitecol):# used in disagg.disaggregation""" :param srcs: a list of Source objects :param sitecol: a SiteCollection instance :returns: a list of context arrays """ctxs=[]srcfilter=SourceFilter(sitecol,self.maximum_distance)fori,srcinenumerate(srcs):ifsrc.id==-1:# not set yetsrc.id=isites=srcfilter.get_close_sites(src)ifsitesisnotNone:ctxs.extend(self.get_ctx_iter(src,sites))returnconcat(ctxs)
[docs]defget_rparams(self,rup):""" :returns: a dictionary with the rupture parameters """dic={}ifhasattr(self,'dparam')andself.dparam:msparam=rup.surface.msparamelse:msparam=Noneforparaminself.REQUIRES_RUPTURE_PARAMETERS:ifparam=='mag':value=numpy.round(rup.mag,3)elifparam=='strike':ifmsparam:value=msparam['strike']else:value=rup.surface.get_strike()elifparam=='dip':ifmsparam:value=msparam['dip']else:value=rup.surface.get_dip()elifparam=='rake':value=rup.rakeelifparam=='ztor':ifmsparam:value=msparam['ztor']else:value=rup.surface.get_top_edge_depth()elifparam=='hypo_lon':value=rup.hypocenter.longitudeelifparam=='hypo_lat':value=rup.hypocenter.latitudeelifparam=='hypo_depth':value=rup.hypocenter.depthelifparam=='width':ifmsparam:value=msparam['width']else:value=rup.surface.get_width()elifparam=='in_cshm':# used in McVerry and Bradley GMPEsifrup.surface:# this is really expensivelons=rup.surface.mesh.lons.flatten()lats=rup.surface.mesh.lats.flatten()points_in_polygon=(shapely.geometry.Point(lon,lat).within(cshm_polygon)forlon,latinzip(lons,lats))value=any(points_in_polygon)else:value=Falseelifparam=='zbot':# needed for width estimation in CampbellBozorgnia2014ifmsparam:value=msparam['zbot']elifrup.surfaceandhasattr(rup,'surfaces'):value=rup.surface.zbotelifrup.surface:value=rup.surface.mesh.depths.max()else:value=rup.hypocenter.depthelse:raiseValueError('%s requires unknown rupture parameter %r'%(type(self).__name__,param))dic[param]=valuedic['occurrence_rate']=getattr(rup,'occurrence_rate',numpy.nan)ifhasattr(rup,'temporal_occurrence_model'):ifisinstance(rup.temporal_occurrence_model,NegativeBinomialTOM):dic['probs_occur']=rup.temporal_occurrence_model.get_pmf(rup.occurrence_rate)elifhasattr(rup,'probs_occur'):dic['probs_occur']=rup.probs_occurreturndic
[docs]defgenctxs(self,same_mag_rups,sites,src_id):""" :params same_mag_rups: a list of ruptures :param sites: a (filtered) site collection :param src_id: source index :yields: a context array for each rupture """magdist=self.maximum_distance(same_mag_rups[0].mag)dparam=getattr(self,'dparam',None)forrupinsame_mag_rups:ifdparam:rrups=_get(rup.surface.surfaces,'rrup',dparam)rrup=numpy.min(rrups,axis=0)else:rrup=get_distances(rup,sites,'rrup')mask=rrup<=magdistifnotmask.any():continuer_sites=sites.filter(mask)# to debug you can insert here# print(rup.surface.tor.get_tuw_df(r_sites))# import pdb; pdb.set_trace()''' # sanity check true_rrup = rup.surface.get_min_distance(r_sites) numpy.testing.assert_allclose(true_rrup, rrup[mask]) '''rparams=self.get_rparams(rup)dd=self.defaultdict.copy()try:po=rparams['probs_occur']exceptKeyError:dd['probs_occur']=numpy.zeros(0)else:L=len(po)iflen(po.shape)==1elsepo.shape[1]dd['probs_occur']=numpy.zeros(L)ctx=RecordBuilder(**dd).zeros(len(r_sites))forpar,valinrparams.items():ctx[par]=valctx.rrup=rrup[mask]ctx.sids=r_sites.sidsparams=self.REQUIRES_DISTANCES-{'rrup'}ifself.fewsitesor'clon'inparamsor'clat'inparams:params.add('clon_clat')# compute tu only onceifdparamand('rx'inparamsor'ry0'inparams):tu=_get_tu(rup,dparam,mask)else:tu=Noneforparaminparams-{'clon','clat'}:set_distances(ctx,rup,r_sites,param,dparam,mask,tu)# Equivalent distancesreqv_obj=(self.reqv.get(self.trt)ifself.reqvelseNone)ifreqv_objandnotrup.surface:# PointRuptures have no surfacereqv=reqv_obj.get(ctx.repi,rup.mag)if'rjb'inself.REQUIRES_DISTANCES:ctx.rjb=reqvif'rrup'inself.REQUIRES_DISTANCES:ctx.rrup=numpy.sqrt(reqv**2+rup.hypocenter.depth**2)fornameinr_sites.array.dtype.names:setattr(ctx,name,r_sites[name])ctx.src_id=src_idifsrc_id>=0:ctx.rup_id=rup.rup_idyieldctx
# this is called for non-point sources (or point sources in preclassical)
[docs]defgen_contexts(self,rups_sites,src_id):""" :yields: the old-style RuptureContexts generated by the source """forrups,sitesinrups_sites:# ruptures with the same magnitudeyield fromself.genctxs(rups,sites,src_id)
[docs]defget_ctx_iter(self,src,sitecol,src_id=0,step=1):""" :param src: a source object (already split) or a list of ruptures :param sitecol: a (filtered) SiteCollection :param src_id: integer source ID used where src is actually a list :param step: > 1 only in preclassical :returns: iterator over recarrays """self.fewsites=len(sitecol.complete)<=self.max_sites_disaggifself.fewsitesor'clon'inself.REQUIRES_DISTANCES:self.defaultdict['clon']=F64(0.)self.defaultdict['clat']=F64(0.)ifgetattr(src,'location',None)andstep==1:returnself.pla_mon.iter(genctxs_Pp(src,sitecol,self))elifhasattr(src,'source_id'):# other sourceifsrc.code==b'F'andstep==1:withself.sec_mon:self.dparam=_build_dparam(src,sitecol,self)else:self.dparam=Noneminmag=self.maximum_distance.x[0]maxmag=self.maximum_distance.x[-1]withself.ir_mon:allrups=list(src.iter_ruptures(shift_hypo=self.shift_hypo,step=step))fori,rupinenumerate(allrups):rup.rup_id=src.offset+iallrups=sorted([rupforrupinallrupsifminmag<=rup.mag<=maxmag],key=bymag)self.num_rups=len(allrups)or1ifnotallrups:returniter([])# sorted by mag by constructionu32mags=U32([rup.mag*100forrupinallrups])rups_sites=[(rups,sitecol)forrupsinsplit_array(numpy.array(allrups),u32mags)]src_id=src.idelse:# in event based we get a list with a single rupturerups_sites=[(src,sitecol)]self.dparam=Nonesrc_id=-1ctxs=self.gen_contexts(rups_sites,src_id)blocks=block_splitter(ctxs,10_000,weight=len)# the weight of 10_000 ensure less than 1MB per block (recarray)returnself.ctx_mon.iter(map(self.recarray,blocks))
[docs]defmax_intensity(self,sitecol1,mags,dists):""" :param sitecol1: a SiteCollection instance with a single site :param mags: a sequence of magnitudes :param dists: a sequence of distances :returns: an array of GMVs of shape (#mags, #dists) """assertlen(sitecol1)==1,sitecol1nmags,ndists=len(mags),len(dists)gmv=numpy.zeros((nmags,ndists))form,dinitertools.product(range(nmags),range(ndists)):mag,dist=mags[m],dists[d]ctx=RuptureContext()forparinself.REQUIRES_RUPTURE_PARAMETERS:setattr(ctx,par,0)fordstinself.REQUIRES_DISTANCES:setattr(ctx,dst,numpy.array([dist]))forparinself.REQUIRES_SITES_PARAMETERS:setattr(ctx,par,getattr(sitecol1,par))ctx.sids=sitecol1.sidsctx.mag=magctx.width=.01# 10 meters to avoid warnings in abrahamson_2014try:maxmean=self.get_mean_stds([ctx])[0].max()# shape NMexceptValueError:# magnitude outside of supported rangecontinueelse:gmv[m,d]=numpy.exp(maxmean)returngmv
[docs]defget_occ_rates(self,ctxt):""" :param ctxt: context array generated by this ContextMaker :returns: occurrence rates, possibly from probs_occur[0] """# thanks to split_by_tom we can assume ctx to be homogeneousifnumpy.isfinite(ctxt[0].occurrence_rate):returnctxt.occurrence_rateelse:probs=[rec.probs_occur[0]forrecinctxt]return-numpy.log(probs)/self.investigation_time
# not used by the engine, is is meant for notebooks
[docs]defget_poes(self,srcs,sitecol,tom=None,rup_mutex={},collapse_level=-1):""" :param srcs: a list of sources with the same TRT :param sitecol: a SiteCollection instance with N sites :returns: an array of PoEs of shape (N, L, G) """ctxs=self.from_srcs(srcs,sitecol)returnself.get_pmap(ctxs,tom,rup_mutex).array
def_gen_poes(self,ctx):fromopenquake.hazardlib.site_amplificationimportget_poes_site(M,L1),G=self.loglevels.array.shape,len(self.gsims)# split large context arrays to avoid filling the CPU cachewithself.gmf_mon:# split_by_mag=False because already contains a single magmean_stdt=self.get_mean_stds([ctx],split_by_mag=False)iflen(ctx)<1000:# do not split in slices to make debugging easierslices=[slice(0,len(ctx))]else:# making plenty of slices so that the array `poes` is smallslices=split_in_slices(len(ctx),2*L1)forslcinslices:withself.poe_mon:# this is allocating at most a few MB of RAMpoes=numpy.zeros((slc.stop-slc.start,M*L1,G),F32)# NB: using .empty would break the MixtureModelGMPETestCaseforg,gsiminenumerate(self.gsims):ms=mean_stdt[:2,g,:,slc]# builds poes of shape (n, L, G)ifself.oq.af:# amplification methodpoes[:,:,g]=get_poes_site(ms,self,ctx[slc])else:# regular caseset_poes(gsim,ms,self,ctx,poes[:,:,g],slc)yieldpoes,mean_stdt[0,:,:,slc],mean_stdt[1,:,:,slc],slc#cs,ms,ps = ctx.nbytes/TWO20, mean_stdt.nbytes/TWO20, poes.nbytes/TWO20#print('C=%.1fM, mean_stds=%.1fM, poes=%.1fM, G=%d' % (cs, ms, ps, G))
[docs]defgen_poes(self,ctx):""" :param ctx: a vectorized context (recarray) of size N :param rup_indep: rupture flag (false for mutex ruptures) :yields: poes, mea_sig, ctxt with poes of shape (N, L, G) """ctx.mag=numpy.round(ctx.mag,3)formaginnumpy.unique(ctx.mag):ctxt=ctx[ctx.mag==mag]self.cfactor+=[len(ctxt),1]forpoes,mea,sig,slcinself._gen_poes(ctxt):# NB: using directly 64 bit poes would be slower without reason# since with astype(F64) the numbers are identicalyieldpoes.astype(F64),mea,sig,ctxt[slc]
# documented but not used in the engine
[docs]defget_pmap(self,ctxs,tom=None,rup_mutex={}):""" :param ctxs: a list of context arrays (only one for poissonian ctxs) :param tom: temporal occurrence model (default PoissonTom) :param rup_mutex: dictionary of weights (default empty) :returns: a MapArray """rup_indep=notrup_mutexsids=numpy.unique(ctxs[0].sids)pmap=MapArray(sids,size(self.imtls),len(self.gsims)).fill(rup_indep)self.tom=tomorPoissonTOM(self.investigation_time)forctxinctxs:self.update(pmap,ctx,rup_mutex)return~pmapifrup_indepelsepmap
[docs]defratesNLG(self,srcgroup,sitecol):""" Used for debugging simple sources :param srcgroup: a group of sources :param sitecol: a SiteCollection instance :returns: an array of annual rates of shape (N, L, G) """pmap=self.get_pmap(self.from_srcs(srcgroup,sitecol))return(~pmap).to_rates()
[docs]defupdate(self,pmap,ctx,rup_mutex=None):""" :param pmap: probability map to update :param ctx: a context array :param rup_mutex: dictionary (src_id, rup_id) -> weight """forpoes,mea,sig,ctxtinself.gen_poes(ctx):ifrup_mutex:pmap.update_mutex(poes,ctxt,self.tom.time_span,rup_mutex)elifself.cluster:forpoe,sidxinzip(poes,pmap.sidx[ctxt.sids]):pmap.array[sidx]*=1.-poeelse:pmap.update_indep(poes,ctxt,self.tom.time_span)
# called by gen_poes and by the GmfComputer
[docs]defget_mean_stds(self,ctxs,split_by_mag=True):""" :param ctxs: a list of contexts with N=sum(len(ctx) for ctx in ctxs) :param split_by_mag: where to split by magnitude :returns: an array of shape (4, G, M, N) with mean and stddevs """N=sum(len(ctx)forctxinctxs)M=len(self.imts)G=len(self.gsims)ifall(isinstance(ctx,numpy.recarray)forctxinctxs):# contexts already vectorizedrecarrays=ctxselse:# vectorize the contextsrecarrays=[self.recarray(ctxs)]ifsplit_by_mag:recarr=numpy.concatenate(recarrays,dtype=recarrays[0].dtype).view(numpy.recarray)recarrays=split_array(recarr,U32(numpy.round(recarr.mag*100)))out=numpy.empty((4,G,M,N))forg,gsiminenumerate(self.gsims):out[:,g]=self.get_4MN(recarrays,gsim)returnout
[docs]defget_4MN(self,ctxs,gsim):""" Called by the GmfComputer """N=sum(len(ctx)forctxinctxs)M=len(self.imts)out=numpy.zeros((4,M,N))gsim.adj=[]# NSHM2014P adjustmentscompute=gsim.__class__.computestart=0forctxinctxs:slc=slice(start,start+len(ctx))adj=compute(gsim,ctx,self.imts,*out[:,:,slc])ifadjisnotNone:gsim.adj.append(adj)start=slc.stopifself.truncation_levelnotin(0,1E-9,99.)and(out[1]==0.).any():raiseValueError('Total StdDev is zero for %s'%gsim)ifgsim.adj:gsim.adj=numpy.concatenate(gsim.adj)ifself.conv:# apply horizontal component conversionself.horiz_comp_to_geom_mean(out,gsim)returnout
# not used right now
[docs]defget_att_curves(self,site,msr,mag,aratio=1.,strike=0.,dip=45.,rake=-90):""" :returns: 4 attenuation curves mea, sig, tau, phi (up to 500 km from the site at steps of 5 km) """fromopenquake.hazardlib.sourceimportrupturerup=rupture.get_planar(site,msr,mag,aratio,strike,dip,rake,self.trt)ctx=self.from_planar(rup,hdist=500,step=5)mea,sig,tau,phi=self.get_mean_stds([ctx])return(interp1d(ctx.rrup,mea),interp1d(ctx.rrup,sig),interp1d(ctx.rrup,tau),interp1d(ctx.rrup,phi))
# tested in test_collapse_small
[docs]defestimate_weight(self,src,srcfilter,multiplier=1):""" :param src: a source object :param srcfilter: a SourceFilter instance :returns: (weight, estimate_sites) """eps=.01*EPSifsrc.code=='S'elseEPS# needed for EURsrc.dt=0ifsrc.nsites==0:# was discarded by the prefilteringreturn(0,0)ifsrc.codeinb'pP'else(eps,0)sites=srcfilter.get_close_sites(src)ifsitesisNone:# may happen for CollapsedPointSourcesreturneps,0src.nsites=len(sites)t0=time.time()ctxs=list(self.get_ctx_iter(src,sites,step=5))# reducedsrc.dt=time.time()-t0ifnotctxs:returneps,0lenctx=sum(len(ctx)forctxinctxs)esites=lenctx*src.num_ruptures/self.num_rups*multiplier# NB: num_rups is set by get_ctx_iterweight=src.dt*src.num_ruptures/self.num_rups*src.nsites**.5ifsrc.codeinb'NX':# increase weightweight*=5.elifsrc.code==b'S':# increase for USA, decrease for EURweight*=3returnmax(weight,eps),int(esites)
[docs]defset_weight(self,sources,srcfilter,multiplier=1):""" Set the weight attribute on each prefiltered source """ifsrcfilter.sitecolisNone:forsrcinsources:src.weight=EPSelse:forsrcinsources:src.weight,src.esites=self.estimate_weight(src,srcfilter,multiplier)
# if src.code == b'S':# print(src, src.dt, src.num_ruptures / self.num_rups)
# see contexts_tests.py for examples of collapse# probs_occur = functools.reduce(combine_pmf, (r.probs_occur for r in rups))
[docs]defcombine_pmf(o1,o2):""" Combine probabilities of occurrence; used to collapse nonparametric ruptures. :param o1: probability distribution of length n1 :param o2: probability distribution of length n2 :returns: probability distribution of length n1 + n2 - 1 >>> combine_pmf([.99, .01], [.98, .02]) array([9.702e-01, 2.960e-02, 2.000e-04]) """n1=len(o1)n2=len(o2)o=numpy.zeros(n1+n2-1)foriinrange(n1):forjinrange(n2):o[i+j]+=o1[i]*o2[j]returno
[docs]defprint_finite_size(rups):""" Used to print the number of finite-size ruptures """c=collections.Counter()forrupinrups:ifrup.surface:c['%.2f'%rup.mag]+=1print(c)print('total finite size ruptures = ',sum(c.values()))
def_get_poes(mean_std,loglevels,phi_b):# returns a matrix of shape (N, L)N=mean_std.shape[2]# shape (2, M, N)out=numpy.empty((loglevels.size,N),F32)# shape (L, N)_set_poes(mean_std,loglevels,phi_b,out)returnout.T
[docs]defset_poes(gsim,mean_std,cmaker,ctx,out,slc):""" Calculate and return probabilities of exceedance (PoEs) of one or more intensity measure levels (IMLs) of one intensity measure type (IMT) for one or more pairs "site -- rupture". :param gsim: A GMPE instance :param mean_std: An array of shape (2, M, N) with mean and standard deviations for the sites and intensity measure types :param cmaker: A ContextMaker instance, used only in nhsm_2014 :param ctx: A context array used only in avg_poe_gmpe :param out: An array of PoEs of shape (N, L) to be filled :param slc: A slice object used only in avg_poe_gmpe :raises ValueError: If truncation level is not ``None`` and neither non-negative float number, and if ``imts`` dictionary contain wrong or unsupported IMTs (see :attr:`DEFINED_FOR_INTENSITY_MEASURE_TYPES`). """loglevels=cmaker.loglevels.arrayphi_b=cmaker.phi_b_M,L1=loglevels.shapeifhasattr(gsim,'weights_signs'):# for nshmp_2014, case_72adj=gsim.adj[slc]outs=[]weights,signs=zip(*gsim.weights_signs)forsinsigns:ms=numpy.array(mean_std)# make a copyforminrange(len(loglevels)):ms[0,m]+=s*adjouts.append(_get_poes(ms,loglevels,phi_b))out[:]=numpy.average(outs,weights=weights,axis=0)elifhasattr(gsim,'mixture_model'):forf,winzip(gsim.mixture_model["factors"],gsim.mixture_model["weights"]):mean_stdi=mean_std.copy()mean_stdi[1]*=f# multiply stddev by factorout[:]+=w*_get_poes(mean_stdi,loglevels,phi_b)elifhasattr(gsim,'weights'):# avg_poe_gmpecm=copy.copy(cmaker)cm.poe_mon=Monitor()# avoid double countscm.gsims=gsim.gsimsavgs=[]forpoes,_mea,_sig,_ctxincm.gen_poes(ctx[slc]):# poes has shape N, L, Gavgs.append(poes@gsim.weights)out[:]=numpy.concatenate(avgs)else:# regular case_set_poes(mean_std,loglevels,phi_b,out.T)imtweight=getattr(gsim,'weight',None)# ImtWeight or Noneform,imtinenumerate(cmaker.imtls):mL1=m*L1ifimtweightandimtweight.dic.get(imt)==0:# set by the engine when parsing the gsim logictree# when 0 ignore the contribution: see _build_branchesout[:,mL1:mL1+L1]=0
[docs]classPmapMaker(object):""" A class to compute the PoEs from a given source """def__init__(self,cmaker,srcfilter,group):vars(self).update(vars(cmaker))self.cmaker=cmakerself.srcfilter=srcfilterself.N=len(self.srcfilter.sitecol.complete)try:self.sources=group.sourcesexceptAttributeError:# already a list of sourcesself.sources=groupself.src_mutex=getattr(group,'src_interdep',None)=='mutex'self.rup_indep=getattr(group,'rup_interdep',None)!='mutex'ifself.rup_indep:self.rup_mutex={}else:self.rup_mutex={}# src_id, rup_id -> rup_weightforsrcingroup:fori,(rup,_)inenumerate(src.data):self.rup_mutex[src.id,i]=rup.weightself.fewsites=self.N<=cmaker.max_sites_disaggself.grp_probability=getattr(group,'grp_probability',1.)self.cluster=self.cmaker.cluster=getattr(group,'cluster',0)ifself.cluster:tom=group.temporal_occurrence_modelelse:tom=getattr(self.sources[0],'temporal_occurrence_model',PoissonTOM(self.cmaker.investigation_time))self.cmaker.tom=self.tom=tomM,G=len(self.cmaker.imtls),len(self.cmaker.gsims)self.maxsize=8*TWO20//(M*G)# crucial for a fast get_mean_stds
[docs]defcount_bytes(self,ctxs):# # usuful for debugging memory issuesrparams=len(self.cmaker.REQUIRES_RUPTURE_PARAMETERS)sparams=len(self.cmaker.REQUIRES_SITES_PARAMETERS)+1dparams=len(self.cmaker.REQUIRES_DISTANCES)nbytes=0forctxinctxs:nsites=len(ctx)nbytes+=8*rparamsnbytes+=8*sparams*nsitesnbytes+=8*dparams*nsitesreturnnbytes
[docs]defgen_ctxs(self,src):sites=self.srcfilter.get_close_sites(src)ifsitesisNone:returnforctxinself.cmaker.get_ctx_iter(src,sites):ifself.cmaker.deltagetter:# adjust occurrence rates in case of aftershockswithself.cmaker.delta_mon:delta=self.cmaker.deltagetter(src.id)ctx.occurrence_rate+=delta[ctx.rup_id]ifself.fewsites:# keep rupdata in memory (before collapse)ifself.src_mutex:# needed for Disaggregator.initctx.src_id=valid.fragmentno(src)self.rupdata.append(ctx)yieldctx
def_make_src_indep(self):# sources with the same IDcm=self.cmakerallctxs=[]ctxlen=0totlen=0t0=time.time()sids=self.srcfilter.sitecol.sids# using most memory here; limited by pmap_max_gbpnemap=MapArray(sids,self.cmaker.imtls.size,len(self.cmaker.gsims),notself.cluster).fill(self.cluster)forsrcinself.sources:src.nsites=0forctxinself.gen_ctxs(src):ctxlen+=len(ctx)src.nsites+=len(ctx)totlen+=len(ctx)allctxs.append(ctx)ifctxlen>self.maxsize:forctxinconcat(allctxs):cm.update(pnemap,ctx)allctxs.clear()ctxlen=0ifallctxs:# all sources have the same tom by constructionforctxinconcat(allctxs):cm.update(pnemap,ctx)allctxs.clear()dt=time.time()-t0nsrcs=len(self.sources)forsrcinself.sources:self.source_data['src_id'].append(src.source_id)self.source_data['grp_id'].append(src.grp_id)self.source_data['nsites'].append(src.nsites)self.source_data['esites'].append(src.esites)self.source_data['nrupts'].append(src.num_ruptures)self.source_data['weight'].append(src.weight)self.source_data['ctimes'].append(dt*src.nsites/totleniftotlenelsedt/nsrcs)self.source_data['taskno'].append(cm.task_no)returnpnemapdef_make_src_mutex(self):# used in Japan (case_27) and in New Madrid (case_80)cm=self.cmakert0=time.time()weight=0.nsites=0esites=0nctxs=0sids=self.srcfilter.sitecol.sidspmap=MapArray(sids,self.cmaker.imtls.size,len(self.cmaker.gsims)).fill(0)forsrcinself.sources:t0=time.time()pm=MapArray(pmap.sids,cm.imtls.size,len(cm.gsims)).fill(self.rup_indep)ctxs=list(self.gen_ctxs(src))n=sum(len(ctx)forctxinctxs)ifn==0:continuenctxs+=len(ctxs)nsites+=nesites+=src.esitesforctxinctxs:ifself.rup_mutex:cm.update(pm,ctx,self.rup_mutex)else:cm.update(pm,ctx)ifhasattr(src,'mutex_weight'):arr=1.-pm.arrayifself.rup_indepelsepm.arraypmap.array+=arr*src.mutex_weightelse:pmap.array=1.-(1-pmap.array)*(1-pm.array)weight+=src.weightpmap.array*=self.grp_probabilitydt=time.time()-t0self.source_data['src_id'].append(valid.basename(src))self.source_data['grp_id'].append(src.grp_id)self.source_data['nsites'].append(nsites)self.source_data['esites'].append(esites)self.source_data['nrupts'].append(nctxs)self.source_data['weight'].append(weight)self.source_data['ctimes'].append(dt)self.source_data['taskno'].append(cm.task_no)return~pmap
[docs]defmake(self):dic={}self.rupdata=[]self.source_data=AccumDict(accum=[])ifself.rup_indepandnotself.src_mutex:pnemap=self._make_src_indep()else:pnemap=self._make_src_mutex()ifself.cluster:fornoccinrange(0,50):prob_n_occ=self.tom.get_probability_n_occurrences(self.tom.occurrence_rate,nocc)ifnocc==0:pmapclu=pnemap.new(numpy.full(pnemap.shape,prob_n_occ))else:pmapclu.array+=pnemap.array**nocc*prob_n_occpnemap.array[:]=pmapclu.arraydic['rmap']=pnemap.to_rates()dic['rmap'].gid=self.cmaker.giddic['cfactor']=self.cmaker.cfactordic['rup_data']=concat(self.rupdata)dic['source_data']=self.source_datadic['task_no']=self.task_nodic['grp_id']=self.sources[0].grp_idifself.disagg_by_src:# all the sources in the group must have the same source_id because# of the groupby(group, corename) in classical.pycoreids=set(map(valid.corename,self.sources))iflen(coreids)>1:raiseNameError('Invalid source naming: %s'%coreids)# in oq-risk-tests test_phl there are multiple srcids# (mps-0!b1;0, mps-0!b1;1, ...); you can simply use the first,# since in `store_mean_rates_by_src` we use corenamedic['basename']=valid.basename(self.sources[0])returndic
[docs]classBaseContext(metaclass=abc.ABCMeta):""" Base class for context object. """def__eq__(self,other):""" Return True if ``other`` has same attributes with same values. """ifisinstance(other,self.__class__):ifself._slots_==other._slots_:oks=[]forsinself._slots_:a,b=getattr(self,s,None),getattr(other,s,None)ifaisNoneandbisNone:ok=TrueelifaisNoneandbisnotNone:ok=FalseelifaisnotNoneandbisNone:ok=Falseelifhasattr(a,'shape')andhasattr(b,'shape'):ifa.shape==b.shape:ok=numpy.allclose(a,b)else:ok=Falseelse:ok=a==boks.append(ok)returnnumpy.all(oks)returnFalse
# mock of a site collection used in the tests and in the SMT module of the OQ-MBTK
[docs]classSitesContext(BaseContext):""" Sites calculation context for ground shaking intensity models. Instances of this class are passed into :meth:`GroundShakingIntensityModel.get_mean_and_stddevs`. They are intended to represent relevant features of the sites collection. Every GSIM class is required to declare what :attr:`sites parameters <GroundShakingIntensityModel.REQUIRES_SITES_PARAMETERS>` does it need. Only those required parameters are made available in a result context object. """# _slots_ is used in hazardlib check_gsim and in the SMTdef__init__(self,slots='vs30 vs30measured z1pt0 z2pt5'.split(),sitecol=None):self._slots_=slotsifsitecolisnotNone:self.sids=sitecol.sidsforslotinslots:setattr(self,slot,getattr(sitecol,slot))# used in the SMTdef__len__(self):returnlen(self.sids)
[docs]classDistancesContext(BaseContext):""" Distances context for ground shaking intensity models. Instances of this class are passed into :meth:`GroundShakingIntensityModel.get_mean_and_stddevs`. They are intended to represent relevant distances between sites from the collection and the rupture. Every GSIM class is required to declare what :attr:`distance measures <GroundShakingIntensityModel.REQUIRES_DISTANCES>` does it need. Only those required values are calculated and made available in a result context object. """_slots_=('rrup','rx','rjb','rhypo','repi','ry0','rcdpp','azimuth','hanging_wall','rvolc')def__init__(self,param_dist_pairs=()):forparam,distinparam_dist_pairs:setattr(self,param,dist)
# used in boore_atkinson_2008
[docs]defget_dists(ctx):""" Extract the distance parameters from a context. :returns: a dictionary dist_name -> distances """return{par:distforpar,distinvars(ctx).items()ifparinKNOWN_DISTANCES}
# used to produce a RuptureContext suitable for legacy code, i.e. for calls# to .get_mean_and_stddevs, like for instance in the SMT module of the OQ-MBTK
[docs]deffull_context(sites,rup,dctx=None):""" :returns: a full RuptureContext with all the relevant attributes """self=RuptureContext()self.src_id=0forpar,valinvars(rup).items():setattr(self,par,val)ifnothasattr(self,'occurrence_rate'):self.occurrence_rate=numpy.nanifhasattr(sites,'array'):# is a SiteCollectionforparinsites.array.dtype.names:setattr(self,par,sites[par])else:# sites is a SitesContextforpar,valinvars(sites).items():setattr(self,par,val)ifdctx:forpar,valinvars(dctx).items():setattr(self,par,val)returnself
[docs]defget_mean_stds(gsim,ctx,imts,**kw):""" :param gsim: a single GSIM or a a list of GSIMs :param ctx: a RuptureContext or a recarray of size N with same magnitude :param imts: a list of M IMTs :param kw: additional keyword arguments :returns: an array of shape (4, M, N) obtained by applying the given GSIM, ctx amd imts, or an array of shape (G, 4, M, N) """single=hasattr(gsim,'compute')kw['imtls']={imt.string:[0]forimtinimts}cmaker=ContextMaker('*',[gsim]ifsingleelsegsim,kw)out=cmaker.get_mean_stds([ctx],split_by_mag=False)# (4, G, M, N)returnout[:,0]ifsingleelseout
# mock of a rupture used in the tests and in the module of the OQ-MBTK
[docs]classRuptureContext(BaseContext):""" Rupture calculation context for ground shaking intensity models. Instances of this class are passed into :meth:`GroundShakingIntensityModel.get_mean_and_stddevs`. They are intended to represent relevant features of a single rupture. Every GSIM class is required to declare what :attr:`rupture parameters <GroundShakingIntensityModel.REQUIRES_RUPTURE_PARAMETERS>` does it need. Only those required parameters are made available in a result context object. """src_id=0rup_id=0_slots_=('mag','strike','dip','rake','ztor','hypo_lon','hypo_lat','hypo_depth','width','hypo_loc','src_id','rup_id')def__init__(self,param_pairs=()):forparam,valueinparam_pairs:setattr(self,param,value)
[docs]defsize(self):""" If the context is a multi rupture context, i.e. it contains an array of magnitudes and it refers to a single site, returns the size of the array, otherwise returns 1. """nsites=len(self.sids)ifnsites==1andisinstance(self.mag,numpy.ndarray):returnlen(self.mag)returnnsites
# used in acme_2019def__len__(self):returnlen(self.sids)
[docs]classEffect(object):""" Compute the effect of a rupture of a given magnitude and distance. :param effect_by_mag: a dictionary magstring -> intensities :param dists: array of distances, one per each intensity :param cdist: collapse distance """def__init__(self,effect_by_mag,dists,collapse_dist=None):self.effect_by_mag=effect_by_magself.dists=distsself.nbins=len(dists)
[docs]defcollapse_value(self,collapse_dist):""" :returns: intensity at collapse distance """# get the maximum magnitude with a cutoff at 7formaginself.effect_by_mag:ifmag>'7.00':breakeffect=self.effect_by_mag[mag]idx=numpy.searchsorted(self.dists,collapse_dist)returneffect[idx-1ifidx==self.nbinselseidx]
def__call__(self,mag,dist):di=numpy.searchsorted(self.dists,dist)ifdi==self.nbins:di=self.nbinseff=self.effect_by_mag['%.2f'%mag][di]returneff# this is used to compute the magnitude-dependent pointsource_distance
[docs]defdist_by_mag(self,intensity):""" :returns: a dict magstring -> distance """dst={}# magnitude -> distanceformag,intensitiesinself.effect_by_mag.items():ifintensity<intensities.min():dst[mag]=self.dists[-1]# largest distanceelifintensity>intensities.max():dst[mag]=self.dists[0]# smallest distanceelse:dst[mag]=interp1d(intensities,self.dists)(intensity)returndst
[docs]defget_effect_by_mag(mags,sitecol1,gsims_by_trt,maximum_distance,imtls):""" :param mags: an ordered list of magnitude strings with format %.2f :param sitecol1: a SiteCollection with a single site :param gsims_by_trt: a dictionary trt -> gsims :param maximum_distance: an IntegrationDistance object :param imtls: a DictArray with intensity measure types and levels :returns: a dict magnitude-string -> array(#dists, #trts) """trts=list(gsims_by_trt)ndists=51gmv=numpy.zeros((len(mags),ndists,len(trts)))param=dict(maximum_distance=maximum_distance,imtls=imtls)fort,trtinenumerate(trts):dist_bins=maximum_distance.get_dist_bins(trt,ndists)cmaker=ContextMaker(trt,gsims_by_trt[trt],param)gmv[:,:,t]=cmaker.max_intensity(sitecol1,F64(mags),dist_bins)returndict(zip(mags,gmv))
[docs]defget_cmakers(src_groups,full_lt,oq):""" :params src_groups: a list of SourceGroups :param full_lt: a FullLogicTree instance :param oq: object containing the calculation parameters :returns: list of ContextMakers associated to the given src_groups """all_trt_smrs=[]forsginsrc_groups:src=sg.sources[0]all_trt_smrs.append(src.trt_smrs)trts=list(full_lt.gsim_lt.values)gweights=full_lt.g_weights(all_trt_smrs)[:,-1]# shape Gtcmakers=[]forgrp_id,trt_smrsinenumerate(all_trt_smrs):rlzs_by_gsim=full_lt.get_rlzs_by_gsim(trt_smrs)ifnotrlzs_by_gsim:# happens for gsim_lt.reduce() on empty TRTscontinuetrti=trt_smrs[0]//TWO24cm=ContextMaker(trts[trti],rlzs_by_gsim,oq)cm.trti=trticm.trt_smrs=trt_smrscm.grp_id=grp_idcmakers.append(cm)gids=full_lt.get_gids(cm.trt_smrsforcmincmakers)forcmincmakers:cm.gid=gids[cm.grp_id]cm.wei=gweights[cm.gid]returncmakers
[docs]defread_cmakers(dstore,csm=None):""" :param dstore: a DataStore-like object :param csm: a CompositeSourceModel instance, if given :returns: an array of ContextMaker instances, one per source group """fromopenquake.hazardlib.site_amplificationimportAmplFunctionoq=dstore['oqparam']oq.mags_by_trt={k:decode(v[:])fork,vindstore['source_mags'].items()}if'amplification'inoq.inputsandoq.amplification_method=='kernel':df=AmplFunction.read_df(oq.inputs['amplification'])oq.af=AmplFunction.from_dframe(df)else:oq.af=NoneifcsmisNone:csm=dstore['_csm']ifnothasattr(csm,'full_lt'):csm.full_lt=dstore['full_lt'].init()cmakers=get_cmakers(csm.src_groups,csm.full_lt,oq)if'delta_rates'indstore:# aftershockforcmakerincmakers:cmaker.deltagetter=DeltaRatesGetter(dstore)returnnumpy.array(cmakers)
# used in event_based
[docs]defread_cmaker(dstore,trt_smr):""" :param dstore: a DataStore-like object :returns: a ContextMaker instance """oq=dstore['oqparam']full_lt=dstore['full_lt']trts=list(full_lt.gsim_lt.values)trt=trts[trt_smr//len(full_lt.sm_rlzs)]rlzs_by_gsim=full_lt._rlzs_by_gsim(trt_smr)returnContextMaker(trt,rlzs_by_gsim,oq)
[docs]defget_src_mutex(srcs):""" :param srcs: a list of sources with weights and the same grp_id :returns: a dictionary grp_id -> {'src_id': [...], 'weight': [...]} """grp_ids=[src.grp_idforsrcinsrcs][grp_id]=set(grp_ids)ok=all(hasattr(src,'mutex_weight')forsrcinsrcs)ifnotok:return{grp_id:{}}dic=dict(src_ids=U32([src.idforsrcinsrcs]),weights=F64([src.mutex_weightforsrcinsrcs]))return{grp_id:dic}