# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2018-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importosimportoperatorimportcollectionsimportnumpyfromopenquake.baselibimportgeneral,hdf5fromopenquake.hazardlib.map_arrayimportMapArrayfromopenquake.hazardlib.calc.disaggimportto_rates,to_probsfromopenquake.hazardlib.source.ruptureimportBaseRupture,get_ebrfromopenquake.commonlib.calcimportget_proxiesU16=numpy.uint16U32=numpy.uint32I64=numpy.int64F32=numpy.float32TWO24=2**24by_taxonomy=operator.attrgetter('taxonomy')code2cls=BaseRupture.init()weight=operator.itemgetter('n_occ')slice_dt=numpy.dtype([('idx',U32),('start',int),('stop',int)])
[docs]defbuild_stat_curve(hcurve,imtls,stat,weights,wget,use_rates=False):""" Build statistics by taking into account IMT-dependent weights """poes=hcurve.T# shape R, Lassertlen(poes)==len(weights),(len(poes),len(weights))L=imtls.sizearray=numpy.zeros((L,1))ifweights.shape[1]>1:# IMT-dependent weights# this is slower since the arrays are shorterforimtinimtls:slc=imtls(imt)ws=wget(weights,imt)ifnotws.sum():# expect no data for this IMTcontinueifuse_rates:array[slc,0]=to_probs(stat(to_rates(poes[:,slc]),ws))else:array[slc,0]=stat(poes[:,slc],ws)else:ifuse_rates:array[:,0]=to_probs(stat(to_rates(poes),weights[:,-1]))else:array[:,0]=stat(poes,weights[:,-1])returnarray
[docs]defsig_eps_dt(imts):""" :returns: a composite data type for the sig_eps output """lst=[('eid',U32),('rlz_id',U16)]forimtinimts:lst.append(('sig_inter_'+imt,F32))forimtinimts:lst.append(('eps_inter_'+imt,F32))returnnumpy.dtype(lst)
[docs]classHcurvesGetter(object):""" Read the contribution to the hazard curves coming from each source in a calculation with a source specific logic tree """def__init__(self,dstore):self.dstore=dstoreself.imtls=dstore['oqparam'].imtlsself.full_lt=dstore['full_lt'].init()self.sslt=self.full_lt.source_model_lt.decompose()self.source_info=dstore['source_info'][:]
[docs]defget_hcurve(self,src_id,imt=None,site_id=0,gsim_idx=None):""" Return the curve associated to the given src_id, imt and gsim_idx as an array of length L """assert';'insrc_id,src_id# must be a realization specific src_idimt_slc=self.imtls(imt)ifimtelseslice(None)start,gsims,weights=self.bysrc[src_id]dset=self.dstore['_rates']ifgsim_idxisNone:curves=dset[start:start+len(gsims),site_id,imt_slc]returnweights@curvesreturnto_probs(dset[start+gsim_idx,site_id,imt_slc])
# NB: not used right now
[docs]defget_hcurves(self,src,imt=None,site_id=0,gsim_idx=None):""" Return the curves associated to the given src, imt and gsim_idx as an array of shape (R, L) """assert';'notinsrc,src# not a rlz specific source IDcurves=[]foriinrange(self.sslt[src].num_paths):src_id='%s;%d'%(src,i)curves.append(self.get_hcurve(src_id,imt,site_id,gsim_idx))returnnumpy.array(curves)
[docs]defget_mean_hcurve(self,src=None,imt=None,site_id=0,gsim_idx=None):""" Return the mean curve associated to the given src, imt and gsim_idx as an array of shape L """ifsrcisNone:hcurves=[self.get_mean_hcurve(src)forsrcinself.sslt]returngeneral.agg_probs(*hcurves)weights=[rlz.weightforrlzinself.sslt[src]]curves=self.get_hcurves(src,imt,site_id,gsim_idx)returnweights@curves
# NB: using 32 bit ratemaps
[docs]defget_pmaps_gb(dstore,full_lt=None):""" :returns: memory required on the master node to keep the pmaps """N=len(dstore['sitecol/sids'])L=dstore['oqparam'].imtls.sizefull_lt=full_ltordstore['full_lt'].init()if'trt_smrs'notindstore:# starting from hazard_curves.csvtrt_smrs=[[0]]else:trt_smrs=dstore['trt_smrs'][:]trt_rlzs=full_lt.get_trt_rlzs(trt_smrs)gids=full_lt.get_gids(trt_smrs)max_gb=len(trt_rlzs)*N*L*4/1024**3returnmax_gb,trt_rlzs,gids
[docs]defget_num_chunks_sites(dstore):""" :returns: (number of postclassical tasks to generate, number of sites) It is 5 times the number of GB required to store the rates. """N=len(dstore['sitecol/sids'])max_chunks=min(dstore['oqparam'].max_sites_disagg,N)try:req_gb=dstore['source_groups'].attrs['req_gb']exceptKeyError:returnmax_chunks,Nchunks=max(int(5*req_gb),max_chunks)returnchunks,N
[docs]defmap_getters(dstore,full_lt=None,disagg=False):""" :returns: a list of pairs (MapGetter, weights) """oq=dstore['oqparam']# disaggregation is meant for few sites, i.e. no tilingchunks,N=get_num_chunks_sites(dstore)ifdisaggandN>chunks:raiseValueError('There are %d sites but only %d chunks'%(N,chunks))full_lt=full_ltordstore['full_lt'].init()R=full_lt.get_num_paths()_req_gb,trt_rlzs,_gids=get_pmaps_gb(dstore,full_lt)ifoq.fastmeanandnotdisagg:weights=dstore['gweights'][:]trt_rlzs=numpy.zeros(len(weights))# reduces the data transferelse:weights=full_lt.weightsfnames=[dstore.filename]try:scratch_dir=dstore.hdf5.attrs['scratch_dir']exceptKeyError:# no tilingpasselse:forfinos.listdir(scratch_dir):iff.endswith('.hdf5'):fnames.append(os.path.join(scratch_dir,f))out=[]forchunkinrange(chunks):getter=MapGetter(fnames,chunk,trt_rlzs,R,oq)getter.weights=weightsout.append(getter)returnout
[docs]classZeroGetter(object):""" Return an array of zeros of shape (L, R) """def__init__(self,L,R):self.L=Lself.R=R
[docs]classCurveGetter(object):""" Hazard curve builder used in classical_risk/classical_damage. :param sid: site index :param rates: array of shape (L, G) for the given site """
[docs]classMapGetter(object):""" Read hazard curves from the datastore for all realizations or for a specific realization. """def__init__(self,filenames,idx,trt_rlzs,R,oq):self.filenames=filenamesself.idx=idxself.trt_rlzs=trt_rlzsself.R=Rself.imtls=oq.imtlsself.poes=oq.poesself.use_rates=oq.use_ratesself.eids=Noneself._map={}@propertydefsids(self):self.init()returnlist(self._map)@propertydefimts(self):returnlist(self.imtls)@propertydefG(self):returnlen(self.trt_rlzs)@propertydefL(self):returnself.imtls.size@propertydefN(self):self.init()returnlen(self._map)@propertydefM(self):returnlen(self.imtls)
[docs]definit(self):""" Build the _map from the underlying dataframes """ifself._map:returnself._mapforfnameinself.filenames:withhdf5.File(fname)asdstore:slices=dstore['_rates/slice_by_idx'][:]slices=slices[slices['idx']==self.idx]forstart,stopinzip(slices['start'],slices['stop']):rates_df=dstore.read_df('_rates',slc=slice(start,stop))# not using groupby to save memoryforsidinrates_df.sid.unique():df=rates_df[rates_df.sid==sid]try:array=self._map[sid]exceptKeyError:array=numpy.zeros((self.L,self.G))self._map[sid]=arrayarray[df.lid,df.gid]=df.ratereturnself._map
[docs]defget_hcurve(self,sid):# used in classical""" :param sid: a site ID :returns: an array of shape (L, R) for the given site ID """pmap=self.init()r0=numpy.zeros((self.L,self.R))ifsidnotinpmap:# no hazard for sidreturnr0forg,t_rlzsinenumerate(self.trt_rlzs):rlzs=t_rlzs%TWO24rates=pmap[sid][:,g]forrlzinrlzs:r0[:,rlz]+=ratesreturnto_probs(r0)
[docs]defget_fast_mean(self,gweights):""" :returns: a MapArray of shape (N, M, L1) with the mean hcurves """M=self.ML1=self.L//Mmeans=MapArray(U32(self.sids),M,L1).fill(0)forsidinself.sids:idx=means.sidx[sid]rates=self._map[sid]# shape (L, G)means.array[idx]=(rates@gweights).reshape((M,L1))means.array[:]=to_probs(means.array)returnmeans
[docs]defget_ebruptures(dstore):""" Extract EBRuptures from the datastore """ebrs=[]trts=list(dstore['full_lt/gsim_lt'].values)fortrt_smr,start,stopindstore['trt_smr_start_stop']:trt=trts[trt_smr//TWO24]forproxyinget_proxies(dstore.filename,slice(start,stop)):ebrs.append(proxy.to_ebr(trt))returnebrs
[docs]defget_ebrupture(dstore,rup_id):# used in show rupture""" This is EXTREMELY inefficient, since it reads all ruptures. NB: it assumes rup_is is unique """rups=dstore['ruptures'][:]# read everything in memoryrupgeoms=dstore['rupgeoms']# do not read everything in memoryidxs,=numpy.where(rups['id']==rup_id)iflen(idxs)==0:raiseValueError(f"Missing {rup_id=}")[rec]=rups[idxs]trts=dstore.getitem('full_lt').attrs['trts']trt=trts[rec['trt_smr']//TWO24]geom=rupgeoms[rec['geom_id']]returnget_ebr(rec,geom,trt)
[docs]defmultiline(array3RC):""" :param array3RC: array of shape (3, R, C) :returns: a MULTILINESTRING """D,R,_C=array3RC.shapeassertD==3,Dlines='MULTILINESTRING(%s)'%', '.join(line(array3RC[:,r,:].T)forrinrange(R))returnlines