# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2014-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importioimportosimporttimeimportzlibimportpickleimportpsutilimportloggingimportoperatorimportnumpyimportpandasfromPILimportImagefromopenquake.baselibimportparallel,hdf5,config,python3compatfromopenquake.baselib.generalimport(AccumDict,DictArray,groupby,humansize,block_splitter)fromopenquake.hazardlibimportvalid,InvalidFilefromopenquake.hazardlib.contextsimportread_cmakersfromopenquake.hazardlib.calc.hazard_curveimportclassicalashazclassicalfromopenquake.hazardlib.calcimportdisaggfromopenquake.hazardlib.map_arrayimportRateMap,MapArray,rates_dt,check_hmapsfromopenquake.commonlibimportcalcfromopenquake.calculatorsimportbase,getters,preclassical,viewsget_weight=operator.attrgetter('weight')U16=numpy.uint16U32=numpy.uint32F32=numpy.float32F64=numpy.float64I64=numpy.int64TWO24=2**24TWO30=2**30TWO32=2**32GZIP='gzip'BUFFER=1.5# enlarge the pointsource_distance sphere to fix the weight;# with BUFFER = 1 we would have lots of apparently light sources# collected together in an extra-slow task, as it happens in SHARE# with ps_grid_spacing=50def_store(rates,num_chunks,h5,mon=None,gzip=GZIP):iflen(rates)==0:returnnewh5=h5isNoneifnewh5:scratch=parallel.scratch_dir(mon.calc_id)h5=hdf5.File(f'{scratch}/{mon.task_no}.hdf5','a')chunks=rates['sid']%num_chunksidx_start_stop=[]forchunkinnumpy.unique(chunks):ch_rates=rates[chunks==chunk]try:h5.create_df('_rates',[(n,rates_dt[n])forninrates_dt.names],gzip)hdf5.create(h5,'_rates/slice_by_idx',getters.slice_dt,fillvalue=None)exceptValueError:# already createdoffset=len(h5['_rates/sid'])else:offset=0idx_start_stop.append((chunk,offset,offset+len(ch_rates)))hdf5.extend(h5['_rates/sid'],ch_rates['sid'])hdf5.extend(h5['_rates/gid'],ch_rates['gid'])hdf5.extend(h5['_rates/lid'],ch_rates['lid'])hdf5.extend(h5['_rates/rate'],ch_rates['rate'])iss=numpy.array(idx_start_stop,getters.slice_dt)hdf5.extend(h5['_rates/slice_by_idx'],iss)ifnewh5:fname=h5.filenameh5.flush()h5.close()returnfname
[docs]defget_heavy_gids(source_groups,cmakers):""" :returns: the g-indices associated to the heavy groups """ifsource_groups.attrs['tiling']:return[]elifcmakers[0].oq.disagg_by_src:grp_ids=source_groups['grp_id']# all groupselse:grp_ids=source_groups['grp_id'][source_groups['blocks']>1]gids=[]forgrp_idingrp_ids:gids.extend(cmakers[grp_id].gid)returngids
[docs]defstore_ctxs(dstore,rupdata_list,grp_id):""" Store contexts in the datastore """forrupdatainrupdata_list:nr=len(rupdata)known=set(rupdata.dtype.names)forparindstore['rup']:ifpar=='rup_id':rup_id=I64(rupdata['src_id'])*TWO30+rupdata['rup_id']hdf5.extend(dstore['rup/rup_id'],rup_id)elifpar=='grp_id':hdf5.extend(dstore['rup/grp_id'],numpy.full(nr,grp_id))elifpar=='probs_occur':dstore.hdf5.save_vlen('rup/probs_occur',rupdata[par])elifparinknown:hdf5.extend(dstore['rup/'+par],rupdata[par])else:hdf5.extend(dstore['rup/'+par],numpy.full(nr,numpy.nan))
[docs]defsave_rates(g,N,jid,num_chunks,mon):""" Store the rates for the given g on a file scratch/calc_id/task_no.hdf5 """withmon.shared['rates']asrates:rates_g=rates[:,:,jid[g]]sids=numpy.arange(N)forchunkinrange(num_chunks):ch=sids%num_chunks==chunkrmap=MapArray(sids[ch],rates.shape[1],1)rmap.array=rates_g[ch,:,None]rats=rmap.to_array([g])_store(rats,num_chunks,None,mon)
[docs]defclassical(sources,tilegetters,cmaker,dstore,monitor):""" Call the classical calculator in hazardlib """# NB: removing the yield would cause terrible slow taskscmaker.init_monitoring(monitor)withdstore:ifsourcesisNone:# read the full group from the datastorearr=dstore.getitem('_csm')[cmaker.grp_id]sources=pickle.loads(zlib.decompress(arr.tobytes()))sitecol=dstore['sitecol'].complete# super-fastifcmaker.disagg_by_srcandnotcmaker.atomic:# in case_27 (Japan) we do NOT enter here;# disagg_by_src still works since the atomic group contains a single# source 'case' (mutex combination of case:01, case:02)forsrcsingroupby(sources,valid.basename).values():result=hazclassical(srcs,sitecol,cmaker)result['rmap'].gid=cmaker.gidyieldresultreturnfortileno,tilegetinenumerate(tilegetters):result=hazclassical(sources,tileget(sitecol),cmaker)iftileno:# source_data has keys src_id, grp_id, nsites, esites, nrupts,# weight, ctimes, tasknoforkey,lstinresult['source_data'].items():ifkeyin('weight','nrupts'):# avoid bogus weights in `oq show task:classical`lst[:]=[0.for_inrange(len(lst))]ifcmaker.disagg_by_src:# do not remove zeros, otherwise AELO for JPN will break# since there are 4 sites out of 18 with zerosrmap=result.pop('rmap')else:rmap=result.pop('rmap').remove_zeros()# print(f"{monitor.task_no=} {rmap=}")ifrmap.size_mbandcmaker.blocks==1andnotcmaker.disagg_by_src:ifconfig.directory.custom_tmp:rates=rmap.to_array(cmaker.gid)_store(rates,cmaker.num_chunks,None,monitor)else:result['rmap']=rmap.to_array(cmaker.gid)elifrmap.size_mb:result['rmap']=rmapresult['rmap'].gid=cmaker.gidyieldresult
# for instance for New Zealand G~1000 while R[full_enum]~1_000_000# i.e. passing the gweights reduces the data transfer by 1000 times
[docs]deffast_mean(pgetter,monitor):""" :param pgetter: a :class:`openquake.commonlib.getters.MapGetter` :param gweights: an array of G weights :returns: a dictionary kind -> MapArray """withmonitor('reading rates',measuremem=True):pgetter.init()ifnotpgetter.sids:# can happen with tilingreturn{}withmonitor('compute stats',measuremem=True):hcurves=pgetter.get_fast_mean(pgetter.weights)pmap_by_kind={'hcurves-stats':[hcurves]}ifpgetter.poes:withmonitor('make_hmaps',measuremem=False):pmap_by_kind['hmaps-stats']=calc.make_hmaps(pmap_by_kind['hcurves-stats'],pgetter.imtls,pgetter.poes)returnpmap_by_kind
[docs]defpostclassical(pgetter,wget,hstats,individual_rlzs,max_sites_disagg,amplifier,monitor):""" :param pgetter: a :class:`openquake.commonlib.getters.MapGetter` :param wget: function (weights[:, :], imt) -> weights[:] :param hstats: a list of pairs (statname, statfunc) :param individual_rlzs: if True, also build the individual curves :param max_sites_disagg: if there are less sites than this, store rup info :param amplifier: instance of Amplifier or None :param monitor: instance of Monitor :returns: a dictionary kind -> MapArray The "kind" is a string of the form 'rlz-XXX' or 'mean' of 'quantile-XXX' used to specify the kind of output. """withmonitor('reading rates',measuremem=True):pgetter.init()ifnotpgetter.sids:# can happen with tilingreturn{}ifamplifier:# amplification is meant for few sites, i.e. no tilingwithhdf5.File(pgetter.filenames[0],'r')asf:ampcode=f['sitecol'].ampcodeimtls=DictArray({imt:amplifier.amplevelsforimtinpgetter.imtls})else:imtls=pgetter.imtlspoes,sids=pgetter.poes,U32(pgetter.sids)M=len(imtls)L=imtls.sizeL1=L//MR=pgetter.RS=len(hstats)pmap_by_kind={}ifR==1orindividual_rlzs:pmap_by_kind['hcurves-rlzs']=[MapArray(sids,M,L1).fill(0)forrinrange(R)]ifhstats:pmap_by_kind['hcurves-stats']=[MapArray(sids,M,L1).fill(0)forrinrange(S)]combine_mon=monitor('combine pmaps',measuremem=False)compute_mon=monitor('compute stats',measuremem=False)hmaps_mon=monitor('make_hmaps',measuremem=False)sidx=MapArray(sids,1,1).fill(0).sidxforsidinsids:idx=sidx[sid]withcombine_mon:pc=pgetter.get_hcurve(sid)# shape (L, R)ifamplifier:pc=amplifier.amplify(ampcode[sid],pc)# NB: the hcurve have soil levels != IMT levelsifpc.sum()==0:# no datacontinuewithcompute_mon:ifR==1orindividual_rlzs:forrinrange(R):pmap_by_kind['hcurves-rlzs'][r].array[idx]=(pc[:,r].reshape(M,L1))ifhstats:fors,(statname,stat)inenumerate(hstats.items()):sc=getters.build_stat_curve(pc,imtls,stat,pgetter.weights,wget,pgetter.use_rates)arr=sc.reshape(M,L1)pmap_by_kind['hcurves-stats'][s].array[idx]=arrifpoesand(R==1orindividual_rlzs):withhmaps_mon:pmap_by_kind['hmaps-rlzs']=calc.make_hmaps(pmap_by_kind['hcurves-rlzs'],imtls,poes)ifpoesandhstats:withhmaps_mon:pmap_by_kind['hmaps-stats']=calc.make_hmaps(pmap_by_kind['hcurves-stats'],imtls,poes)returnpmap_by_kind
[docs]defmake_hmap_png(hmap,lons,lats):""" :param hmap: a dictionary with keys calc_id, m, p, imt, poe, inv_time, array :param lons: an array of longitudes :param lats: an array of latitudes :returns: an Image object containing the hazard map """importmatplotlib.pyplotaspltfig=plt.figure()ax=fig.add_subplot(111)ax.grid(True)ax.set_title('hmap for IMT=%(imt)s, poe=%(poe)s\ncalculation %(calc_id)d,''inv_time=%(inv_time)dy'%hmap)ax.set_ylabel('Longitude')coll=ax.scatter(lons,lats,c=hmap['array'],cmap='jet')plt.colorbar(coll)bio=io.BytesIO()plt.savefig(bio,format='png')returndict(img=Image.open(bio),m=hmap['m'],p=hmap['p'])
[docs]classHazard:""" Helper class for storing the rates """def__init__(self,dstore,srcidx,gids):self.datastore=dstoreoq=dstore['oqparam']self.itime=oq.investigation_timeself.weig=dstore['gweights'][:]self.imtls=oq.imtlsself.sids=dstore['sitecol/sids'][:]self.srcidx=srcidxself.gids=gidsself.N=len(dstore['sitecol/sids'])self.M=len(oq.imtls)self.L=oq.imtls.sizeself.L1=self.L//self.Mself.acc=AccumDict(accum={})# used in in disagg_by_src
[docs]defget_rates(self,pmap,grp_id):""" :param pmap: a MapArray :returns: an array of rates of shape (N, M, L1) """gids=self.gids[grp_id]rates=pmap.array@self.weig[gids]/self.itimereturnrates.reshape((self.N,self.M,self.L1))
[docs]defstore_mean_rates_by_src(self,dic):""" Store data inside mean_rates_by_src with shape (N, M, L1, Ns) """mean_rates_by_src=self.datastore['mean_rates_by_src/array'][()]forkey,ratesindic.items():ifisinstance(key,str):# in case of mean_rates_by_src key is a source IDidx=self.srcidx[valid.corename(key)]mean_rates_by_src[...,idx]+=ratesself.datastore['mean_rates_by_src/array'][:]=mean_rates_by_srcreturnmean_rates_by_src
[docs]defagg_dicts(self,acc,dic):""" Aggregate dictionaries of hazard curves by updating the accumulator. :param acc: accumulator dictionary :param dic: dict with keys pmap, source_data, rup_data """# NB: dic should be a dictionary, but when the calculation dies# for an OOM it can become None, thus giving a very confusing errorifdicisNone:raiseMemoryError('You ran out of memory!')grp_id=dic.pop('grp_id')sdata=dic.pop('source_data',None)ifsdataisnotNone:self.source_data+=sdataself.rel_ruptures[grp_id]+=sum(sdata['nrupts'])self.cfactor+=dic.pop('cfactor')# store rup_data if there are few sitesifself.few_sitesandlen(dic['rup_data']):withself.monitor('saving rup_data'):store_ctxs(self.datastore,dic['rup_data'],grp_id)rmap=dic.pop('rmap',None)source_id=dic.pop('basename','')# non-empty for disagg_by_srcifsource_id:# accumulate the rates for the given sourceacc[source_id]+=self.haz.get_rates(rmap,grp_id)ifrmapisNone:# already stored in the workers, case_22passelifisinstance(rmap,numpy.ndarray):# store the rates directly, case_03 or tiling without custom_tmpwithself.monitor('storing rates',measuremem=True):_store(rmap,self.num_chunks,self.datastore)else:# aggregating rates is ultra-fast compared to storingself.rmap+=rmapreturnacc
[docs]defcreate_rup(self):""" Create the rup datasets *before* starting the calculation """params={'grp_id','occurrence_rate','clon','clat','rrup','probs_occur','sids','src_id','rup_id','weight'}forcminself.cmakers:params.update(cm.REQUIRES_RUPTURE_PARAMETERS)params.update(cm.REQUIRES_DISTANCES)ifself.few_sites:descr=[]# (param, dt)forparaminsorted(params):ifparam=='sids':dt=U16# storing only for few siteselifparam=='probs_occur':dt=hdf5.vfloat64elifparam=='src_id':dt=U32elifparam=='rup_id':dt=I64elifparam=='grp_id':dt=U16else:dt=F32descr.append((param,dt))self.datastore.create_df('rup',descr,'gzip')
# NB: the relevant ruptures are less than the effective ruptures,# which are a preclassical concept
[docs]defcheck_memory(self,N,L,maxw):""" Log the memory required to receive the largest MapArray, assuming all sites are affected (upper limit) """num_gs=[len(cm.gsims)forcminself.cmakers]size=max(num_gs)*N*L*4avail=min(psutil.virtual_memory().available,config.memory.limit)ifavail<size:raiseMemoryError('You have only %s of free RAM'%humansize(avail))
[docs]defexecute(self):""" Run in parallel `core_task(sources, sitecol, monitor)`, by parallelizing on the sources according to their weight and tectonic region type. """oq=self.oqparamifoq.hazard_calculation_id:logging.info('Reading from parent calculation')parent=self.datastore.parentself.full_lt=parent['full_lt'].init()self.csm=parent['_csm']self.csm.init(self.full_lt)self.datastore['source_info']=parent['source_info'][:]oq.mags_by_trt={trt:python3compat.decode(dset[:])fortrt,dsetinparent['source_mags'].items()}if'_rates'inparent:self.build_curves_maps()# repeat post-processingreturn{}self.init_poes()ifoq.fastmean:logging.info('Will use the fast_mean algorithm')ifnothasattr(self,'trt_rlzs'):self.max_gb,self.trt_rlzs,self.gids=getters.get_pmaps_gb(self.datastore,self.full_lt)srcidx={name:ifori,nameinenumerate(self.csm.get_basenames())}self.haz=Hazard(self.datastore,srcidx,self.gids)rlzs=self.R==1oroq.individual_rlzsifnotrlzsandnotoq.hazard_stats():raiseInvalidFile('%(job_ini)s: you disabled all statistics',oq.inputs)self.source_data=AccumDict(accum=[])sgs,ds=self._pre_execute()ifself.tiling:self._execute_tiling(sgs,ds)else:self._execute_regular(sgs,ds)ifself.cfactor[0]==0:ifself.N==1:logging.error('The site is far from all seismic sources'' included in the hazard model')else:raiseRuntimeError('The sites are far from all seismic sources'' included in the hazard model')else:logging.info('cfactor = {:_d}'.format(int(self.cfactor[0])))self.store_info()self.build_curves_maps()returnTrue
def_pre_execute(self):oq=self.oqparamsgs=self.datastore['source_groups']self.tiling=sgs.attrs['tiling']if'sitecol'inself.datastore.parent:ds=self.datastore.parentelse:ds=self.datastoreifconfig.directory.custom_tmp:scratch=parallel.scratch_dir(self.datastore.calc_id)logging.info('Storing the rates in %s',scratch)self.datastore.hdf5.attrs['scratch_dir']=scratchifself.tiling:assertnotoq.disagg_by_srcassertself.N>self.oqparam.max_sites_disagg,self.Nelse:# regular calculatorself.create_rup()# create the rup/ datasets BEFORE swmr_on()returnsgs,dsdef_execute_regular(self,sgs,ds):allargs=[]n_out=[]splits=[]forcmaker,tilegetters,blocks,nsplitsinself.csm.split(self.cmakers,self.sitecol,self.max_weight,self.num_chunks):forblockinblocks:fortgettersinblock_splitter(tilegetters,nsplits):allargs.append((block,tgetters,cmaker,ds))n_out.append(len(tgetters))splits.append(nsplits)logging.warning('This is a regular calculation with %d outputs, ''%d tasks, min_tiles=%d, max_tiles=%d',sum(n_out),len(allargs),min(n_out),max(n_out))# log info about the heavy sourcessrcs=self.csm.get_sources()maxsrc=max(srcs,key=lambdas:s.weight/splits[s.grp_id])logging.info('Heaviest: %s',maxsrc)L=self.oqparam.imtls.sizegids=get_heavy_gids(sgs,self.cmakers)self.rmap=RateMap(self.sitecol.sids,L,gids)self.datastore.swmr_on()# must come before the Starmapsmap=parallel.Starmap(classical,allargs,h5=self.datastore.hdf5)ifnotself.oqparam.disagg_by_src:smap.expected_outputs=sum(n_out)acc=smap.reduce(self.agg_dicts,AccumDict(accum=0.))self._post_regular(acc)def_execute_tiling(self,sgs,ds):allargs=[]n_out=[]forcmaker,tilegetters,blocks,splitsinself.csm.split(self.cmakers,self.sitecol,self.max_weight,self.num_chunks,True):forblockinblocks:fortgetterintilegetters:allargs.append((tgetter,cmaker,ds))n_out.append(len(tilegetters))logging.warning('This is a tiling calculation with ''%d tasks, min_tiles=%d, max_tiles=%d',len(allargs),min(n_out),max(n_out))t0=time.time()self.datastore.swmr_on()# must come before the Starmapsmap=parallel.Starmap(tiling,allargs,h5=self.datastore.hdf5)smap.reduce(self.agg_dicts,AccumDict(accum=0.))fraction=os.environ.get('OQ_SAMPLE_SOURCES')iffraction:est_time=time.time()-t0/float(fraction)logging.info('Estimated time for the classical part: %.1f hours ''(upper limit)',est_time/3600)def_post_regular(self,acc):# save the rates and performs some checksoq=self.oqparamifself.rmap.size_mb:logging.info('Processing %s',self.rmap)defgenargs():forg,jinself.rmap.jid.items():yieldg,self.N,self.rmap.jid,self.num_chunksif(self.rmap.size_mb>200andconfig.directory.custom_tmpandparallel.oq_distribute()!='no'):# tested in the oq-risk-testsself.datastore.swmr_on()# must come before the Starmapsavemap=parallel.Starmap(save_rates,genargs(),h5=self.datastore,distribute='processpool')savemap.share(rates=self.rmap.array)savemap.reduce()elifself.rmap.size_mb:forg,N,jid,num_chunksingenargs():rates=self.rmap.to_array(g)_store(rates,self.num_chunks,self.datastore)delself.rmapifoq.disagg_by_src:mrs=self.haz.store_mean_rates_by_src(acc)ifoq.use_ratesandself.N==1:# sanity checkself.check_mean_rates(mrs)# NB: the largest mean_rates_by_src is SUPER-SENSITIVE to numerics!# in particular disaggregation/case_15 is sensitive to num_cores# with very different values between 2 and 16 cores(!)
[docs]defcheck_mean_rates(self,mean_rates_by_src):""" The sum of the mean_rates_by_src must correspond to the mean_rates """try:exp=disagg.to_rates(self.datastore['hcurves-stats'][0,0])exceptKeyError:# if there are no ruptures close to the sitereturngot=mean_rates_by_src[0].sum(axis=2)# sum over the sourcesforminrange(len(got)):# skipping large rates which can be wrong due to numerics# (it happens in logictree/case_05 and in Japan)ok=got[m]<2.numpy.testing.assert_allclose(got[m,ok],exp[m,ok],atol=1E-5)
[docs]defstore_info(self):""" Store full_lt, source_info and source_data """self.store_rlz_info(self.rel_ruptures)self.store_source_info(self.source_data)df=pandas.DataFrame(self.source_data)# NB: the impact factor is the number of effective ruptures;# consider for instance a point source producing 200 ruptures# for points within the pointsource_distance (n points) and# producing 20 effective ruptures for the N-n points outside;# then impact = (200 * n + 20 * (N-n)) / N; for n=1 and N=10# it gives impact = 38, i.e. there are 38 effective rupturesdf['impact']=df.nsites/self.Nself.datastore.create_df('source_data',df)self.source_data.clear()# save a bit of memory
[docs]defcollect_hazard(self,acc,pmap_by_kind):""" Populate hcurves and hmaps in the .hazard dictionary :param acc: ignored :param pmap_by_kind: a dictionary of MapArrays """# this is practically instantaneousifpmap_by_kindisNone:# instead of a dictraiseMemoryError('You ran out of memory!')forkindinpmap_by_kind:# hmaps-XXX, hcurves-XXXpmaps=pmap_by_kind[kind]ifkindinself.hazard:array=self.hazard[kind]else:dset=self.datastore.getitem(kind)array=self.hazard[kind]=numpy.zeros(dset.shape,dset.dtype)forr,pmapinenumerate(pmaps):foridx,sidinenumerate(pmap.sids):array[sid,r]=pmap.array[idx]# shape (M, P)
[docs]defpost_execute(self,dummy):""" Check for slow tasks """oq=self.oqparamtask_info=self.datastore.read_df('task_info','taskname')try:dur=views.discard_small(task_info.loc[b'classical'].duration)exceptKeyError:# no datapasselse:slow_tasks=len(dur[dur>3*dur.mean()])anddur.max()>180msg='There were %d slow task(s)'%slow_tasksifslow_tasksandself.SLOW_TASK_ERRORandnotoq.disagg_by_src:raiseRuntimeError('%s in #%d'%(msg,self.datastore.calc_id))elifslow_tasks:logging.info(msg)
def_create_hcurves_maps(self):oq=self.oqparamN=len(self.sitecol)R=len(self.datastore['weights'])ifoq.individual_rlzsisNone:# not specified in the job.iniindividual_rlzs=(N==1)*(R>1)else:individual_rlzs=oq.individual_rlzshstats=oq.hazard_stats()# initialize datasetsP=len(oq.poes)M=self.M=len(oq.imtls)imts=list(oq.imtls)ifoq.soil_intensitiesisnotNone:L=M*len(oq.soil_intensities)else:L=oq.imtls.sizeL1=self.L1=L//MS=len(hstats)ifR==1orindividual_rlzs:self.datastore.create_dset('hcurves-rlzs',F32,(N,R,M,L1))self.datastore.set_shape_descr('hcurves-rlzs',site_id=N,rlz_id=R,imt=imts,lvl=L1)ifoq.poes:self.datastore.create_dset('hmaps-rlzs',F32,(N,R,M,P))self.datastore.set_shape_descr('hmaps-rlzs',site_id=N,rlz_id=R,imt=list(oq.imtls),poe=oq.poes)ifhstats:self.datastore.create_dset('hcurves-stats',F32,(N,S,M,L1))self.datastore.set_shape_descr('hcurves-stats',site_id=N,stat=list(hstats),imt=imts,lvl=numpy.arange(L1))ifoq.poes:self.datastore.create_dset('hmaps-stats',F32,(N,S,M,P))self.datastore.set_shape_descr('hmaps-stats',site_id=N,stat=list(hstats),imt=list(oq.imtls),poe=oq.poes)returnN,S,M,P,L1,individual_rlzs# called by execute before post_execute
[docs]defbuild_curves_maps(self):""" Compute and store hcurves-rlzs, hcurves-stats, hmaps-rlzs, hmaps-stats """oq=self.oqparamhstats=oq.hazard_stats()N,S,M,P,L1,individual=self._create_hcurves_maps()if'_rates'inset(self.datastore)ornotself.datastore.parent:dstore=self.datastoreelse:dstore=self.datastore.parentwget=self.full_lt.wgetallargs=[(getter,wget,hstats,individual,oq.max_sites_disagg,self.amplifier)forgetteringetters.map_getters(dstore,self.full_lt)]ifnotconfig.directory.custom_tmpandnotallargs:# case_60logging.warning('No rates were generated')returnself.hazard={}# kind -> arrayhcbytes=8*N*S*M*L1hmbytes=8*N*S*M*Pifoq.poeselse0ifhcbytes:logging.info('Producing %s of hazard curves',humansize(hcbytes))ifhmbytes:logging.info('Producing %s of hazard maps',humansize(hmbytes))if'delta_rates'inoq.inputs:pass# avoid an HDF5 errorelse:# in all the other casesself.datastore.swmr_on()ifoq.fastmean:parallel.Starmap(fast_mean,[args[0:1]forargsinallargs],distribute='no'ifself.few_siteselseNone,h5=self.datastore.hdf5,).reduce(self.collect_hazard)else:parallel.Starmap(postclassical,allargs,distribute='no'ifself.few_siteselseNone,h5=self.datastore.hdf5,).reduce(self.collect_hazard)forkindinsorted(self.hazard):logging.info('Saving %s',kind)# very fastself.datastore[kind][:]=self.hazard.pop(kind)if'hmaps-stats'inself.datastoreandnotoq.tile_spec:self.plot_hmaps()# check numerical stability of the hmaps around the poesifself.N<=oq.max_sites_disaggandnotself.amplifier:mean_hcurves=self.datastore.sel('hcurves-stats',stat='mean')[:,0]check_hmaps(mean_hcurves,oq.imtls,oq.poes)
[docs]defplot_hmaps(self):""" Generate hazard map plots if there are more than 1000 sites """hmaps=self.datastore.sel('hmaps-stats',stat='mean')# NSMPmaxhaz=hmaps.max(axis=(0,1,3))mh=dict(zip(self.oqparam.imtls,maxhaz))logging.info('The maximum hazard map values are %s',mh)ifImageisNoneornotself.from_engine:# missing PILreturnifself.N<1000:# few sites, don't plotreturnM,P=hmaps.shape[2:]logging.info('Saving %dx%d mean hazard maps',M,P)inv_time=self.oqparam.investigation_timeallargs=[]form,imtinenumerate(self.oqparam.imtls):forp,poeinenumerate(self.oqparam.poes):dic=dict(m=m,p=p,imt=imt,poe=poe,inv_time=inv_time,calc_id=self.datastore.calc_id,array=hmaps[:,0,m,p])allargs.append((dic,self.sitecol.lons,self.sitecol.lats))smap=parallel.Starmap(make_hmap_png,allargs,h5=self.datastore)fordicinsmap:self.datastore['png/hmap_%(m)d_%(p)d'%dic]=dic['img']