Source code for openquake.calculators.disaggregation
# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2015-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>."""Disaggregation calculator core functionality"""importloggingimportpsutilimportnumpyfromopenquake.baselibimportparallelfromopenquake.baselib.generalimport(AccumDict,pprod,agg_probs,shortlist)fromopenquake.baselib.python3compatimportencodefromopenquake.hazardlibimportstats,map_array,validfromopenquake.hazardlib.calcimportdisagg,mean_ratesfromopenquake.hazardlib.contextsimportread_cmakers,read_ctx_by_grpfromopenquake.commonlibimportutilfromopenquake.calculatorsimportbase,gettersPOE_TOO_BIG='''\Site #%d: you are trying to disaggregate for poe=%s.However the source model produces at most probabilitiesof %.7f for rlz=#%d, IMT=%s.The disaggregation PoE is too big or your model is wrong,producing too small PoEs.'''U8=numpy.uint8U16=numpy.uint16U32=numpy.uint32F32=numpy.float32
[docs]defcompute_disagg(dstore,ctxt,sitecol,cmaker,bin_edges,src_mutex,rwdic,monitor):""" :param dstore: a DataStore instance :param ctxt: a context array :param sitecol: a site collection :param cmaker: a ContextMaker instance :param bin_edges: a tuple of bin edges (mag, dist, lon, lat, eps, trt) :param src_mutex: a dictionary src_id -> weight, usually empty :param rwdic: dictionary rlz -> weight, empty for individual realizations :param monitor: monitor of the currently running job :returns: a list of dictionaries containing matrices of rates """mon0=monitor('disagg mean_std',measuremem=False)mon1=monitor('disagg by eps',measuremem=False)mon2=monitor('composing pnes',measuremem=False)mon3=monitor('disagg matrix',measuremem=False)out=[]forsiteinsitecol:try:dis=disagg.Disaggregator([ctxt],site,cmaker,bin_edges)exceptdisagg.FarAwayRupture:continuewithdstore:iml2=dstore['hmap3'][dis.sid]ifiml2.sum()==0:# zero hard for this sitecontinueimtls={imt:iml2[m]form,imtinenumerate(cmaker.imts)}rlzs=dstore['best_rlzs'][dis.sid]res=dis.disagg_by_magi(imtls,rlzs,rwdic,src_mutex,mon0,mon1,mon2,mon3)out.extend(res)returnout
[docs]defget_outputs_size(shapedic,disagg_outputs,Z):""" :returns: the total size of the outputs """tot=AccumDict(accum=0)foroutindisagg_outputs:tot[out]=8forkeyinout.lower().split('_'):tot[out]*=shapedic[key]returntot*shapedic['N']*shapedic['M']*shapedic['P']*Z
[docs]defsubmit(smap,dstore,ctxt,sitecol,cmaker,bin_edges,src_mutex,rwdic):mags=list(numpy.unique(ctxt.mag))logging.debug('Sending %d/%d sites for grp_id=%d, mags=%s',len(sitecol),len(sitecol.complete),ctxt.grp_id[0],shortlist(mags))smap.submit((dstore,ctxt,sitecol,cmaker,bin_edges,src_mutex,rwdic))
[docs]defcheck_memory(N,Z,shape8D):""" Raise an error if the calculation will require too much memory """avail_gb=psutil.virtual_memory().available/1024**3req_gb=numpy.prod(shape8D)*N*Z*8/1024**3ifavail_gb<req_gb*2:# req_gb*2 because when storing the outputs a lot more memory will be usedraiseMemoryError('You have %.1f GB available but %.1f GB are required. ''The solution is to reduce the number of bins'%(avail_gb,req_gb*2))logging.info('The AccumDict will require %.1f GB',req_gb)
[docs]defpre_checks(self):""" Checks on the number of sites, atomic groups and size of the disaggregation matrix. """ifself.N>=32768:raiseValueError('You can disaggregate at max 32,768 sites')few=self.oqparam.max_sites_disaggifself.N>few:raiseValueError('The number of sites is to disaggregate is %d, but you have ''max_sites_disagg=%d'%(self.N,few))self.oqparam.mags_by_trt=self.datastore['source_mags']all_edges,_shapedic=disagg.get_edges_shapedic(self.oqparam,self.sitecol,self.R)*b,trts=all_edgesT=len(trts)shape=[len(bin)-1forbinin(b[0],b[1],b[2][0],b[3][0],b[4])]+[T]matrix_size=numpy.prod(shape)# 6Difmatrix_size>1E6:raiseValueError('The disaggregation matrix is too large ''(%d elements): fix the binning!'%matrix_size)
[docs]defexecute(self):"""Performs the disaggregation"""returnself.full_disaggregation()
[docs]deffull_disaggregation(self):""" Run the disaggregation phase. """oq=self.oqparamtry:full_lt=self.full_ltexceptAttributeError:full_lt=self.datastore['full_lt'].init()ifoq.rlz_indexisNoneandoq.num_rlzs_disagg==0:oq.num_rlzs_disagg=self.R# 0 means all rlzsself.oqparam.mags_by_trt=self.datastore['source_mags']edges,self.shapedic=disagg.get_edges_shapedic(oq,self.sitecol,self.R)logging.info(self.shapedic)self.save_bin_edges(edges)self.poes_disagg=oq.poes_disaggor(None,)self.imts=list(oq.imtls)self.M=len(self.imts)dstore=(self.datastore.parentifself.datastore.parentelseself.datastore)self.mgetters=getters.map_getters(dstore,full_lt,disagg=True)# build array rlzs (N, Z)ifoq.rlz_indexisNone:Z=oq.num_rlzs_disaggrlzs=numpy.zeros((self.N,Z),int)ifself.R>1:forsidinrange(self.N):hcurve=self.mgetters[sid].get_hcurve(sid)mean=getters.build_stat_curve(hcurve,oq.imtls,stats.mean_curve,full_lt.weights,full_lt.wget)# get the closest realization to the meanrlzs[sid]=util.closest_to_ref(hcurve.T,mean)[:Z]self.datastore['best_rlzs']=rlzselse:Z=len(oq.rlz_index)rlzs=numpy.zeros((self.N,Z),int)forzinrange(Z):rlzs[:,z]=oq.rlz_index[z]self.datastore['best_rlzs']=rlzsassertZ<=self.R,(Z,self.R)self.Z=Zself.rlzs=rlzsmean_curves=self.datastore.sel('hcurves-stats',stat='mean')[:,0]s=self.shapedicifoq.iml_disagg:iml3=numpy.zeros((s['N'],s['M'],1))form,imtinenumerate(oq.imtls):iml3[:,m]=oq.iml_disagg[imt]else:iml3=map_array.compute_hmaps(mean_curves,oq.imtls,oq.poes)ifiml3.sum()==0:raiseSystemExit('Cannot do any disaggregation: zero hazard')logging.info('Building N * M * P * Z = {:_d} intensities'.format(s['N']*s['M']*s['P']*s['Z']))self.datastore['hmap3']=iml3self.datastore['poe4']=numpy.zeros((s['N'],s['M'],s['P'],s['Z']))self.sr2z={(s,r):zforsinself.sitecol.sidsforz,rinenumerate(rlzs[s])}returnself.compute()
[docs]defcompute(self):""" Submit disaggregation tasks and return the results """oq=self.oqparamdstore=(self.datastore.parentifself.datastore.parentelseself.datastore)logging.info("Reading contexts")cmakers=read_cmakers(dstore)if'src_mutex'indstore:gb=dstore.read_df('src_mutex').groupby('grp_id')gp=dict(dstore['grp_probability'])# grp_id -> probabilitysrc_mutex_by_grp={grp_id:{'src_id':disagg.get_ints(df.src_id),'weight':df.mutex_weight.to_numpy(),'grp_probability':gp[grp_id]}forgrp_id,dfingb}else:src_mutex_by_grp={}ctx_by_grp=read_ctx_by_grp(dstore)# little memory used heretotctxs=sum(len(ctx)forctxinctx_by_grp.values())logging.info('Read {:_d} contexts'.format(totctxs))self.datastore.swmr_on()smap=parallel.Starmap(compute_disagg,h5=self.datastore.hdf5)# IMPORTANT!! we rely on the fact that the classical part# of the calculation stores the ruptures in chunks of constant# grp_id, therefore it is possible to build (start, stop) slices;# we are NOT grouping by operator.itemgetter('grp_id', 'magi'):# that would break the ordering of the indices causing an incredibly# worse performance, but visible only in extra-large calculations!# compute the total weight of the contexts and the maxsizetotweight=sum(cmakers[grp_id].Z*len(ctx)forgrp_id,ctxinctx_by_grp.items())maxsize=int(numpy.ceil(totweight/(oq.concurrent_tasksor1)))logging.debug(f'{totweight=}, {maxsize=}')s=self.shapedicifself.Z>1:weights=self.datastore['weights'][:]else:weights=Nonemutex_by_grp=self.datastore['mutex_by_grp'][:]forgrp_id,ctxtinctx_by_grp.items():cmaker=cmakers[grp_id]src_mutex,rup_mutex=mutex_by_grp[grp_id]src_mutex=src_mutex_by_grp.get(grp_id,{})# NB: in case_27 src_mutex for grp_id=1 has the form# {'src_id': array([1, 2]), 'weight': array([0.625, 0.375])}ifrup_mutex:raiseNotImplementedError('Disaggregation with mutex ruptures')# build rlz weight dictionaryifweightsisNone:rwdic={}# don't compute meanselse:rwdic={rlz:weights[rlz]forrlzsincmaker.gsims.values()forrlzinrlzs}# submit single taskntasks=len(ctxt)*cmaker.Z/maxsizeifntasks<1orsrc_mutexorrup_mutex:# do not split (test case_11)submit(smap,self.datastore,ctxt,self.sitecol,cmaker,self.bin_edges,src_mutex,rwdic)continue# split by tilesfortile_getinself.sitecol.split(ntasks):tile=tile_get(self.sitecol)ctx=ctxt[numpy.isin(ctxt.sids,tile.sids)]iflen(ctx)*cmaker.Z>maxsize:# split by magbin tooforcindisagg.split_by_magbin(ctx,self.bin_edges[0]).values():submit(smap,self.datastore,c,tile,cmaker,self.bin_edges,src_mutex,rwdic)eliflen(ctx):# see case_multi in the oq-risk-testssubmit(smap,self.datastore,ctx,tile,cmaker,self.bin_edges,src_mutex,rwdic)shape8D=(s['trt'],s['mag'],s['dist'],s['lon'],s['lat'],s['eps'],s['M'],s['P'])check_memory(self.N,self.Z,shape8D)acc=AccumDict(accum=numpy.zeros(shape8D))# NB: a lot of memory can go in this AccumDict, please reduce the binsresults=smap.reduce(self.agg_result,acc)returnresults# s, r -> array 8D
[docs]defagg_result(self,acc,results):""" Collect the results coming from compute_disagg into self.results. :param acc: dictionary s, r -> array8D :param result: dictionary with the result coming from a task """withself.monitor('aggregating disagg matrices'):forresinresults:trti=res.pop('trti')magi=res.pop('magi')sid=res.pop('sid')forrlz,arrinres.items():acc[sid,rlz][trti,magi]+=arrreturnacc
[docs]defpost_execute(self,results):""" Save all the results of the disaggregation. NB: the number of results to save is #sites * #rlzs * #disagg_poes * #IMTs. :param results: a dictionary sid, rlz -> 8D disagg matrix """# the DEBUG dictionary is populated only for OQ_DISTRIBUTE=noforsid,pnesindisagg.DEBUG.items():print('site %d, mean pnes=%s'%(sid,pnes))mean={}indv={}fors,rinresults:ifr=='mean':mean[s,0]=results[s,r]else:indv[s,self.sr2z[s,r]]=results[s,r]withself.monitor('saving disagg results'):logging.info('Extracting and saving the PMFs')ifindv:# save individual realizationsself.save_disagg_results(indv,'disagg-rlzs')ifmean:# save mean PMFsself.save_disagg_results(mean,'disagg-stats')
[docs]defsave_bin_edges(self,all_edges):""" Save disagg-bins """*self.bin_edges,self.trts=all_edgesb=self.bin_edgesdefll_edges(bin_no):# lon/lat edges for the sites, bin_no can be 2 or 3num_edges=len(b[bin_no][0])arr=numpy.zeros((self.N,num_edges))forsid,edgesinb[bin_no].items():arr[sid]=edgesreturnarrself.datastore['disagg-bins/Mag']=b[0]self.datastore['disagg-bins/Dist']=b[1]self.datastore['disagg-bins/Lon']=ll_edges(2)self.datastore['disagg-bins/Lat']=ll_edges(3)self.datastore['disagg-bins/Eps']=b[4]self.datastore['disagg-bins/TRT']=encode(self.trts)
[docs]defsave_disagg_results(self,results,name):""" Save the computed PMFs in the datastore. :param results: a dict s, z -> 8D-matrix of shape (T, Ma, D, E, Lo, La, M, P) containing individual realizations or statistics (only mean) :param name: the string "disagg-rlzs" or "disagg-stats" """oq=self.oqparamifname.endswith('rlzs'):Z=self.shapedic['Z']else:Z=1# only mean is supportedout=output_dict(self.shapedic,oq.disagg_outputs,Z)_disagg_trt=numpy.zeros(self.N,[(trt,float)fortrtinself.trts])best_rlzs=self.datastore['best_rlzs'][:]# (shape N, Z)for(s,z),mat8insorted(results.items()):mat8=disagg.to_probs(mat8)mat7=agg_probs(*mat8)# shape (Ma, D, E, Lo, La, M, P)forkeyinoq.disagg_outputs:ifkey=='TRT':out[key][s,...,z]=valid.pmf_map[key](mat8)# (T,M,P)elifkey.startswith('TRT_'):proj=valid.pmf_map[key[4:]]out[key][s,...,z]=[proj(m7)form7inmat8]else:out[key][s,...,z]=valid.pmf_map[key](mat7)# store poe4form,imtinenumerate(self.imts):forp,poeinenumerate(self.poes_disagg):mat6=mat8[...,m,p]# shape (T, Ma, D, Lo, La, E)ifm==0andpoe==self.poes_disagg[-1]:_disagg_trt[s]=tuple(pprod(mat8[...,0,0],axis=(1,2,3,4,5)))poe_agg=pprod(mat6,axis=(0,1,2,3,4,5))ifname.endswith('-rlzs'):self.datastore['poe4'][s,m,p,z]=max(poe_agg,mean_rates.CUTOFF)self.datastore[name]=out# below a dataset useful for debugging, at minimum IMT and maximum RPself.datastore['_disagg_trt']=_disagg_trt# check null realizations in the single site case, see disagg/case_2ifname.endswith('-rlzs'):for(s,z),rinnumpy.ndenumerate(best_rlzs):lst=[]forkeyinout:ifout[key][s,...,z].sum()==0:lst.append(key)iflst:logging.warning('No %s contributions for site=%d, rlz=%d',lst,s,r)