# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2018-2023 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importosimportoperatorimportcollectionsimportnumpyfromopenquake.baselibimportgeneral,hdf5fromopenquake.hazardlib.map_arrayimportMapArrayfromopenquake.hazardlib.calc.disaggimportto_rates,to_probsfromopenquake.hazardlib.source.ruptureimport(BaseRupture,RuptureProxy,get_ebr)fromopenquake.commonlibimportdatastoreU16=numpy.uint16U32=numpy.uint32I64=numpy.int64F32=numpy.float32TWO24=2**24by_taxonomy=operator.attrgetter('taxonomy')code2cls=BaseRupture.init()weight=operator.itemgetter('n_occ')slice_dt=numpy.dtype([('idx',U32),('start',int),('stop',int)])
[docs]defbuild_stat_curve(hcurve,imtls,stat,weights,wget,use_rates=False):""" Build statistics by taking into account IMT-dependent weights """poes=hcurve.T# shape R, Lassertlen(poes)==len(weights),(len(poes),len(weights))L=imtls.sizearray=numpy.zeros((L,1))ifweights.shape[1]>1:# IMT-dependent weights# this is slower since the arrays are shorterforimtinimtls:slc=imtls(imt)ws=wget(weights,imt)ifnotws.sum():# expect no data for this IMTcontinueifuse_rates:array[slc,0]=to_probs(stat(to_rates(poes[:,slc]),ws))else:array[slc,0]=stat(poes[:,slc],ws)else:ifuse_rates:array[:,0]=to_probs(stat(to_rates(poes),weights[:,-1]))else:array[:,0]=stat(poes,weights[:,-1])returnarray
[docs]defsig_eps_dt(imts):""" :returns: a composite data type for the sig_eps output """lst=[('eid',U32),('rlz_id',U16)]forimtinimts:lst.append(('sig_inter_'+imt,F32))forimtinimts:lst.append(('eps_inter_'+imt,F32))returnnumpy.dtype(lst)
[docs]classHcurvesGetter(object):""" Read the contribution to the hazard curves coming from each source in a calculation with a source specific logic tree """def__init__(self,dstore):self.dstore=dstoreself.imtls=dstore['oqparam'].imtlsself.full_lt=dstore['full_lt'].init()self.sslt=self.full_lt.source_model_lt.decompose()self.source_info=dstore['source_info'][:]
[docs]defget_hcurve(self,src_id,imt=None,site_id=0,gsim_idx=None):""" Return the curve associated to the given src_id, imt and gsim_idx as an array of length L """assert';'insrc_id,src_id# must be a realization specific src_idimt_slc=self.imtls(imt)ifimtelseslice(None)start,gsims,weights=self.bysrc[src_id]dset=self.dstore['_rates']ifgsim_idxisNone:curves=dset[start:start+len(gsims),site_id,imt_slc]returnweights@curvesreturnto_probs(dset[start+gsim_idx,site_id,imt_slc])
# NB: not used right now
[docs]defget_hcurves(self,src,imt=None,site_id=0,gsim_idx=None):""" Return the curves associated to the given src, imt and gsim_idx as an array of shape (R, L) """assert';'notinsrc,src# not a rlz specific source IDcurves=[]foriinrange(self.sslt[src].num_paths):src_id='%s;%d'%(src,i)curves.append(self.get_hcurve(src_id,imt,site_id,gsim_idx))returnnumpy.array(curves)
[docs]defget_mean_hcurve(self,src=None,imt=None,site_id=0,gsim_idx=None):""" Return the mean curve associated to the given src, imt and gsim_idx as an array of shape L """ifsrcisNone:hcurves=[self.get_mean_hcurve(src)forsrcinself.sslt]returngeneral.agg_probs(*hcurves)weights=[rlz.weightforrlzinself.sslt[src]]curves=self.get_hcurves(src,imt,site_id,gsim_idx)returnweights@curves
# NB: using 32 bit ratemaps
[docs]defget_pmaps_gb(dstore,full_lt=None):""" :returns: memory required on the master node to keep the pmaps """N=len(dstore['sitecol/sids'])L=dstore['oqparam'].imtls.sizefull_lt=full_ltordstore['full_lt'].init()if'trt_smrs'notindstore:# starting from hazard_curves.csvtrt_smrs=[[0]]else:trt_smrs=dstore['trt_smrs'][:]trt_rlzs=full_lt.get_trt_rlzs(trt_smrs)gids=full_lt.get_gids(trt_smrs)max_gb=len(trt_rlzs)*N*L*4/1024**3returnmax_gb,trt_rlzs,gids
[docs]defget_num_chunks(dstore):""" :returns: the number of postclassical tasks to generate. It is 5 times the number of GB required to store the rates. """msd=dstore['oqparam'].max_sites_disaggtry:req_gb=dstore['source_groups'].attrs['req_gb']exceptKeyError:returnmsdchunks=max(int(5*req_gb),msd)returnchunks
[docs]defmap_getters(dstore,full_lt=None,disagg=False):""" :returns: a list of pairs (MapGetter, weights) """oq=dstore['oqparam']# disaggregation is meant for few sites, i.e. no tilingN=len(dstore['sitecol/sids'])chunks=get_num_chunks(dstore)ifdisaggandN>chunks:raiseValueError('There are %d sites but only %d chunks'%(N,chunks))full_lt=full_ltordstore['full_lt'].init()R=full_lt.get_num_paths()_req_gb,trt_rlzs,_gids=get_pmaps_gb(dstore,full_lt)ifoq.fastmeanandnotdisagg:weights=dstore['gweights'][:]trt_rlzs=numpy.zeros(len(weights))# reduces the data transferelse:weights=full_lt.weightsfnames=[dstore.filename]try:scratch_dir=dstore.hdf5.attrs['scratch_dir']exceptKeyError:# no tilingpasselse:forfinos.listdir(scratch_dir):iff.endswith('.hdf5'):fnames.append(os.path.join(scratch_dir,f))out=[]forchunkinrange(chunks):getter=MapGetter(fnames,chunk,trt_rlzs,R,oq)getter.weights=weightsout.append(getter)returnout
[docs]classZeroGetter(object):""" Return an array of zeros of shape (L, R) """def__init__(self,L,R):self.L=Lself.R=R
[docs]classCurveGetter(object):""" Hazard curve builder used in classical_risk/classical_damage. :param sid: site index :param rates: array of shape (L, G) for the given site """
[docs]classMapGetter(object):""" Read hazard curves from the datastore for all realizations or for a specific realization. """def__init__(self,filenames,idx,trt_rlzs,R,oq):self.filenames=filenamesself.idx=idxself.trt_rlzs=trt_rlzsself.R=Rself.imtls=oq.imtlsself.poes=oq.poesself.use_rates=oq.use_ratesself.eids=Noneself._map={}@propertydefsids(self):self.init()returnlist(self._map)@propertydefimts(self):returnlist(self.imtls)@propertydefG(self):returnlen(self.trt_rlzs)@propertydefL(self):returnself.imtls.size@propertydefN(self):self.init()returnlen(self._map)@propertydefM(self):returnlen(self.imtls)
[docs]definit(self):""" Build the _map from the underlying dataframes """ifself._map:returnself._mapforfnameinself.filenames:withhdf5.File(fname)asdstore:slices=dstore['_rates/slice_by_idx'][:]slices=slices[slices['idx']==self.idx]forstart,stopinzip(slices['start'],slices['stop']):rates_df=dstore.read_df('_rates',slc=slice(start,stop))# not using groupby to save memoryforsidinrates_df.sid.unique():df=rates_df[rates_df.sid==sid]try:array=self._map[sid]exceptKeyError:array=numpy.zeros((self.L,self.G))self._map[sid]=arrayarray[df.lid,df.gid]=df.ratereturnself._map
[docs]defget_hcurve(self,sid):# used in classical""" :param sid: a site ID :returns: an array of shape (L, R) for the given site ID """pmap=self.init()r0=numpy.zeros((self.L,self.R))ifsidnotinpmap:# no hazard for sidreturnr0forg,t_rlzsinenumerate(self.trt_rlzs):rlzs=t_rlzs%TWO24rates=pmap[sid][:,g]forrlzinrlzs:r0[:,rlz]+=ratesreturnto_probs(r0)
[docs]defget_fast_mean(self,gweights):""" :returns: a MapArray of shape (N, M, L1) with the mean hcurves """M=self.ML1=self.L//Mmeans=MapArray(U32(self.sids),M,L1).fill(0)forsidinself.sids:idx=means.sidx[sid]rates=self._map[sid]# shape (L, G)means.array[idx]=(rates@gweights).reshape((M,L1))means.array[:]=to_probs(means.array)returnmeans
[docs]defget_rupture_getters(dstore,ct=0,srcfilter=None,rupids=None):""" :param dstore: a :class:`openquake.commonlib.datastore.DataStore` :param ct: number of concurrent tasks :returns: a list of RuptureGetters """full_lt=dstore['full_lt'].init()rup_array=dstore['ruptures'][:]ifrupidsisnotNone:rup_array=rup_array[numpy.isin(rup_array['id'],rupids)]iflen(rup_array)==0:raiseNotFound('There are no ruptures in %s'%dstore)proxies=[RuptureProxy(rec)forrecinrup_array]maxweight=rup_array['n_occ'].sum()/(ct/2or1)rgetters=[]forblockingeneral.block_splitter(proxies,maxweight,operator.itemgetter('n_occ'),key=operator.itemgetter('trt_smr')):trt_smr=block[0]['trt_smr']rbg=full_lt.get_rlzs_by_gsim(trt_smr)rg=RuptureGetter(block,dstore.filename,trt_smr,full_lt.trt_by(trt_smr),rbg)rgetters.append(rg)returnrgetters
[docs]defget_ebruptures(dstore):""" Extract EBRuptures from the datastore """ebrs=[]forrgetteringet_rupture_getters(dstore):forproxyinrgetter.get_proxies():ebrs.append(proxy.to_ebr(rgetter.trt))returnebrs
[docs]defmultiline(array3RC):""" :param array3RC: array of shape (3, R, C) :returns: a MULTILINESTRING """D,R,_C=array3RC.shapeassertD==3,Dlines='MULTILINESTRING(%s)'%', '.join(line(array3RC[:,r,:].T)forrinrange(R))returnlines
[docs]defget_ebrupture(dstore,rup_id):# used in show rupture""" This is EXTREMELY inefficient, so it must be used only when you are interested in a single rupture. """rups=dstore['ruptures'][:]# read everything in memoryrupgeoms=dstore['rupgeoms']# do not read everything in memoryidxs,=numpy.where(rups['id']==rup_id)iflen(idxs)==0:raiseValueError(f"Missing {rup_id=}")[rec]=rups[idxs]trts=dstore.getitem('full_lt').attrs['trts']trt=trts[rec['trt_smr']//TWO24]geom=rupgeoms[rec['geom_id']]returnget_ebr(rec,geom,trt)
# this is never called directly; get_rupture_getters is used instead
[docs]classRuptureGetter(object):""" :param proxies: a list of RuptureProxies :param filename: path to the HDF5 file containing a 'rupgeoms' dataset :param trt_smr: source group index :param trt: tectonic region type string :param rlzs_by_gsim: dictionary gsim -> rlzs for the group """def__init__(self,proxies,filename,trt_smr,trt,rlzs_by_gsim):self.proxies=proxiesself.weight=sum(proxy['n_occ']forproxyinproxies)self.filename=filenameself.trt_smr=trt_smrself.trt=trtself.rlzs_by_gsim=rlzs_by_gsimself.num_events=sum(int(proxy['n_occ'])forproxyinproxies)@propertydefnum_ruptures(self):returnlen(self.proxies)@propertydefseeds(self):return[p['seed']forpinself.proxies]
[docs]defget_proxies(self,min_mag=0):""" :returns: a list of RuptureProxies """proxies=[]withdatastore.read(self.filename)asdstore:rupgeoms=dstore['rupgeoms']forproxyinself.proxies:ifproxy['mag']<min_mag:# discard small magnitudescontinueproxy.geom=rupgeoms[proxy['geom_id']]proxies.append(proxy)returnproxies
# called in ebrisk calculations
[docs]defsplit(self,srcfilter,maxw):""" :returns: RuptureProxies with weight < maxw """proxies=[]forproxyinself.proxies:sids=srcfilter.close_sids(proxy.rec,self.trt)iflen(sids):proxies.append(proxy)rgetters=[]forblockingeneral.block_splitter(proxies,maxw,weight):rg=RuptureGetter(block,self.filename,self.trt_smr,self.trt,self.rlzs_by_gsim)rgetters.append(rg)returnrgetters