# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2014-2023 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importioimportosimporttimeimportzlibimportpickleimportpsutilimportloggingimportoperatorimportnumpyimportpandasfromPILimportImagefromopenquake.baselibimport(performance,parallel,hdf5,config,python3compat)fromopenquake.baselib.generalimport(AccumDict,DictArray,block_splitter,groupby,humansize)fromopenquake.hazardlibimportvalid,InvalidFilefromopenquake.hazardlib.contextsimportread_cmakers,get_maxsizefromopenquake.hazardlib.calc.hazard_curveimportclassicalashazclassicalfromopenquake.hazardlib.calcimportdisaggfromopenquake.hazardlib.map_arrayimportMapArray,rates_dtfromopenquake.commonlibimportcalcfromopenquake.calculatorsimportbase,gettersU16=numpy.uint16U32=numpy.uint32F32=numpy.float32F64=numpy.float64I64=numpy.int64TWO24=2**24TWO32=2**32BUFFER=1.5# enlarge the pointsource_distance sphere to fix the weight;# with BUFFER = 1 we would have lots of apparently light sources# collected together in an extra-slow task, as it happens in SHARE# with ps_grid_spacing=50get_weight=operator.attrgetter('weight')slice_dt=numpy.dtype([('idx',U32),('start',int),('stop',int)])# NB: using 32 bit ratemaps
[docs]defget_pmaps_gb(dstore):""" :returns: memory required on the master node to keep the pmaps """N=len(dstore['sitecol'])L=dstore['oqparam'].imtls.sizefull_lt=dstore['full_lt'].init()all_trt_smrs=dstore['trt_smrs'][:]trt_rlzs=full_lt.get_trt_rlzs(all_trt_smrs)gids=full_lt.get_gids(all_trt_smrs)returnlen(trt_rlzs)*N*L*4/1024**3,trt_rlzs,gids
[docs]defbuild_slices(idxs,offset=0):""" Convert an array of site IDs (with repetitions) into an array slice_dt """arr=performance.idx_start_stop(idxs)sbs=numpy.zeros(len(arr),slice_dt)sbs['idx']=arr[:,0]sbs['start']=arr[:,1]+offsetsbs['stop']=arr[:,2]+offsetreturnsbs
[docs]defstore_ctxs(dstore,rupdata_list,grp_id):""" Store contexts in the datastore """forrupdatainrupdata_list:nr=len(rupdata)known=set(rupdata.dtype.names)forparindstore['rup']:ifpar=='grp_id':hdf5.extend(dstore['rup/grp_id'],numpy.full(nr,grp_id))elifpar=='probs_occur':dstore.hdf5.save_vlen('rup/probs_occur',rupdata[par])elifparinknown:hdf5.extend(dstore['rup/'+par],rupdata[par])else:hdf5.extend(dstore['rup/'+par],numpy.full(nr,numpy.nan))
[docs]defto_rates(pnemap,gid,tiling,disagg_by_src):""" :returns: dictionary if tiling is True, else MapArray with rates """rates=pnemap.to_rates()iftiling:returnrates.to_dict(gid)ifdisagg_by_src:returnratesreturnrates.remove_zeros()
[docs]defclassical(sources,sitecol,cmaker,dstore,monitor):""" Call the classical calculator in hazardlib """# NB: removing the yield would cause terrible slow taskscmaker.init_monitoring(monitor)tiling=nothasattr(sources,'__iter__')# passed giddisagg_by_src=cmaker.disagg_by_srcwithdstore:iftiling:# tiling calculator, read the sources from the datastoregid=sourceswithmonitor('reading sources'):# fast, but uses a lot of RAMarr=dstore.getitem('_csm')[cmaker.grp_id]sources=pickle.loads(zlib.decompress(arr.tobytes()))else:# regular calculatorgid=0sitecol=dstore['sitecol']# super-fastifdisagg_by_srcandnotgetattr(sources,'atomic',False):# in case_27 (Japan) we do NOT enter here;# disagg_by_src still works since the atomic group contains a single# source 'case' (mutex combination of case:01, case:02)forsrcsingroupby(sources,valid.basename).values():pmap=MapArray(sitecol.sids,cmaker.imtls.size,len(cmaker.gsims)).fill(cmaker.rup_indep)result=hazclassical(srcs,sitecol,cmaker,pmap)result['pnemap']=to_rates(~pmap,gid,tiling,disagg_by_src)yieldresultelse:# size_mb is the maximum size of the pmap array in GBsize_mb=(len(cmaker.gsims)*cmaker.imtls.size*len(sitecol)*8/1024**2)ifconfig.distribution.compress:size_mb/=5# produce 5x less tiles# NB: the parameter config.memory.pmap_max_mb avoids the hanging# of oq1 due to too large zmq packetsitiles=int(numpy.ceil(size_mb/cmaker.pmap_max_mb))forsitesinsitecol.split_in_tiles(itiles):pmap=MapArray(sites.sids,cmaker.imtls.size,len(cmaker.gsims)).fill(cmaker.rup_indep)result=hazclassical(sources,sites,cmaker,pmap)result['pnemap']=to_rates(~pmap,gid,tiling,disagg_by_src)yieldresult
# for instance for New Zealand G~1000 while R[full_enum]~1_000_000# i.e. passing the gweights reduces the data transfer by 1000 times
[docs]deffast_mean(pgetter,gweights,monitor):""" :param pgetter: a :class:`openquake.commonlib.getters.MapGetter` :param gweights: an array of G weights :returns: a dictionary kind -> MapArray """withmonitor('reading rates',measuremem=True):pgetter.init()withmonitor('compute stats',measuremem=True):hcurves=pgetter.get_fast_mean(gweights)pmap_by_kind={'hcurves-stats':[hcurves]}ifpgetter.poes:withmonitor('make_hmaps',measuremem=False):pmap_by_kind['hmaps-stats']=calc.make_hmaps(pmap_by_kind['hcurves-stats'],pgetter.imtls,pgetter.poes)returnpmap_by_kind
[docs]defpostclassical(pgetter,weights,wget,hstats,individual_rlzs,max_sites_disagg,amplifier,monitor):""" :param pgetter: a :class:`openquake.commonlib.getters.MapGetter` :param weights: a list of ImtWeights :param wget: function (weights[:, :], imt) -> weights[:] :param hstats: a list of pairs (statname, statfunc) :param individual_rlzs: if True, also build the individual curves :param max_sites_disagg: if there are less sites than this, store rup info :param amplifier: instance of Amplifier or None :param monitor: instance of Monitor :returns: a dictionary kind -> MapArray The "kind" is a string of the form 'rlz-XXX' or 'mean' of 'quantile-XXX' used to specify the kind of output. """withmonitor('reading rates',measuremem=True):pgetter.init()ifamplifier:withhdf5.File(pgetter.filename,'r')asf:ampcode=f['sitecol'].ampcodeimtls=DictArray({imt:amplifier.amplevelsforimtinpgetter.imtls})else:imtls=pgetter.imtlspoes,sids=pgetter.poes,U32(pgetter.sids)M=len(imtls)L=imtls.sizeL1=L//MR=pgetter.RS=len(hstats)pmap_by_kind={}ifR==1orindividual_rlzs:pmap_by_kind['hcurves-rlzs']=[MapArray(sids,M,L1).fill(0)forrinrange(R)]ifhstats:pmap_by_kind['hcurves-stats']=[MapArray(sids,M,L1).fill(0)forrinrange(S)]combine_mon=monitor('combine pmaps',measuremem=False)compute_mon=monitor('compute stats',measuremem=False)hmaps_mon=monitor('make_hmaps',measuremem=False)sidx=MapArray(sids,1,1).fill(0).sidxforsidinsids:idx=sidx[sid]withcombine_mon:pc=pgetter.get_hcurve(sid)# shape (L, R)ifamplifier:pc=amplifier.amplify(ampcode[sid],pc)# NB: the hcurve have soil levels != IMT levelsifpc.sum()==0:# no datacontinuewithcompute_mon:ifR==1orindividual_rlzs:forrinrange(R):pmap_by_kind['hcurves-rlzs'][r].array[idx]=(pc[:,r].reshape(M,L1))ifhstats:fors,(statname,stat)inenumerate(hstats.items()):sc=getters.build_stat_curve(pc,imtls,stat,weights,wget,pgetter.use_rates)arr=sc.reshape(M,L1)pmap_by_kind['hcurves-stats'][s].array[idx]=arrifpoesand(R==1orindividual_rlzs):withhmaps_mon:pmap_by_kind['hmaps-rlzs']=calc.make_hmaps(pmap_by_kind['hcurves-rlzs'],imtls,poes)ifpoesandhstats:withhmaps_mon:pmap_by_kind['hmaps-stats']=calc.make_hmaps(pmap_by_kind['hcurves-stats'],imtls,poes)returnpmap_by_kind
[docs]defmake_hmap_png(hmap,lons,lats):""" :param hmap: a dictionary with keys calc_id, m, p, imt, poe, inv_time, array :param lons: an array of longitudes :param lats: an array of latitudes :returns: an Image object containing the hazard map """importmatplotlib.pyplotaspltfig=plt.figure()ax=fig.add_subplot(111)ax.grid(True)ax.set_title('hmap for IMT=%(imt)s, poe=%(poe)s\ncalculation %(calc_id)d,''inv_time=%(inv_time)dy'%hmap)ax.set_ylabel('Longitude')coll=ax.scatter(lons,lats,c=hmap['array'],cmap='jet')plt.colorbar(coll)bio=io.BytesIO()plt.savefig(bio,format='png')returndict(img=Image.open(bio),m=hmap['m'],p=hmap['p'])
[docs]classHazard:""" Helper class for storing the rates """def__init__(self,dstore,srcidx,gids):self.datastore=dstoreoq=dstore['oqparam']self.itime=oq.investigation_timeself.weig=dstore['_rates/weig'][:]self.imtls=oq.imtlsself.sids=dstore['sitecol/sids'][:]self.srcidx=srcidxself.gids=gidsself.N=len(dstore['sitecol/sids'])self.M=len(oq.imtls)self.L=oq.imtls.sizeself.L1=self.L//self.Mself.sites_per_task=int(numpy.ceil(self.N/(oq.concurrent_tasksor1)))self.acc=AccumDict(accum={})self.offset=0# used in in disagg_by_src
[docs]defget_rates(self,pmap,grp_id):""" :param pmap: a MapArray :returns: an array of rates of shape (N, M, L1) """gids=self.gids[grp_id]rates=pmap.array@self.weig[gids]/self.itimereturnrates.reshape((self.N,self.M,self.L1))
[docs]defstore_rates(self,pnemap):""" Store pnes inside the _rates dataset """ifisinstance(pnemap,dict):# already converted (tiling)rates=pnemapelse:rates=pnemap.to_dict()iflen(rates['sid'])==0:# happens in case_60returnself.offset*12hdf5.extend(self.datastore['_rates/sid'],rates['sid'])hdf5.extend(self.datastore['_rates/gid'],rates['gid'])hdf5.extend(self.datastore['_rates/lid'],rates['lid'])hdf5.extend(self.datastore['_rates/rate'],rates['rate'])# NB: there is a genious idea here, to split in tasks by using# the formula ``taskno = sites_ids // sites_per_task`` and then# extracting a dictionary of slices for each taskno. This works# since by construction the site_ids are sequential and there are# at most G slices per task. For instance if there are 6 sites# disposed in 2 groups and we want to produce 2 tasks we can use# 012345012345 // 3 = 000111000111 and the slices are# {0: [(0, 3), (6, 9)], 1: [(3, 6), (9, 12)]}sbs=build_slices(rates['sid']//self.sites_per_task,self.offset)hdf5.extend(self.datastore['_rates/slice_by_idx'],sbs)# slice_by_idx contains 3 slices in classical/case_22self.offset+=len(rates['sid'])self.acc['nsites']=self.offsetreturnself.offset*12# 4 + 2 + 2 + 4 bytes
[docs]defstore_mean_rates_by_src(self,dic):""" Store data inside mean_rates_by_src with shape (N, M, L1, Ns) """mean_rates_by_src=self.datastore['mean_rates_by_src/array'][()]forkey,ratesindic.items():ifisinstance(key,str):# in case of mean_rates_by_src key is a source IDidx=self.srcidx[valid.corename(key)]mean_rates_by_src[...,idx]+=ratesself.datastore['mean_rates_by_src/array'][:]=mean_rates_by_srcreturnmean_rates_by_src
[docs]defagg_dicts(self,acc,dic):""" Aggregate dictionaries of hazard curves by updating the accumulator. :param acc: accumulator dictionary :param dic: dict with keys pmap, source_data, rup_data """# NB: dic should be a dictionary, but when the calculation dies# for an OOM it can become None, thus giving a very confusing errorifdicisNone:raiseMemoryError('You ran out of memory!')sdata=dic['source_data']self.source_data+=sdatagrp_id=dic.pop('grp_id')self.rel_ruptures[grp_id]+=sum(sdata['nrupts'])cfactor=dic.pop('cfactor')ifcfactor[1]!=cfactor[0]:print('ctxs_per_mag = {:.0f}, cfactor_per_task = {:.1f}'.format(cfactor[1]/cfactor[2],cfactor[1]/cfactor[0]))self.cfactor+=cfactor# store rup_data if there are few sitesifself.few_sitesandlen(dic['rup_data']):withself.monitor('saving rup_data'):store_ctxs(self.datastore,dic['rup_data'],grp_id)pnemap=dic['pnemap']# probabilities of no exceedencesource_id=dic.pop('basename','')# non-empty for disagg_by_srcifsource_id:# accumulate the rates for the given sourceacc[source_id]+=self.haz.get_rates(pnemap,grp_id)G=pnemap.array.shape[2]rates=self.pmap.arraysidx=self.pmap.sidx[pnemap.sids]fori,gidinenumerate(self.gids[grp_id]):rates[sidx,:,gid]+=pnemap.array[:,:,i%G]returnacc
[docs]defcreate_rup(self):""" Create the rup datasets *before* starting the calculation """params={'grp_id','occurrence_rate','clon','clat','rrup','probs_occur','sids','src_id','rup_id','weight'}forcminself.cmakers:params.update(cm.REQUIRES_RUPTURE_PARAMETERS)params.update(cm.REQUIRES_DISTANCES)ifself.few_sites:descr=[]# (param, dt)forparaminsorted(params):ifparam=='sids':dt=U16# storing only for few siteselifparam=='probs_occur':dt=hdf5.vfloat64elifparamin('src_id','rup_id'):dt=U32elifparam=='grp_id':dt=U16else:dt=F32descr.append((param,dt))self.datastore.create_df('rup',descr,'gzip')
# NB: the relevant ruptures are less than the effective ruptures,# which are a preclassical concept
[docs]defcheck_memory(self,N,L,maxw):""" Log the memory required to receive the largest MapArray, assuming all sites are affected (upper limit) """num_gs=[len(cm.gsims)forcminself.cmakers]max_gs=max(num_gs)maxsize=get_maxsize(len(self.oqparam.imtls),max_gs)logging.info('Considering {:_d} contexts at once'.format(maxsize))size=max_gs*N*L*4avail=min(psutil.virtual_memory().available,config.memory.limit)ifavail<size:raiseMemoryError('You have only %s of free RAM'%humansize(avail))
[docs]defexecute(self):""" Run in parallel `core_task(sources, sitecol, monitor)`, by parallelizing on the sources according to their weight and tectonic region type. """oq=self.oqparamifoq.hazard_calculation_id:logging.info('Reading from parent calculation')parent=self.datastore.parentself.full_lt=parent['full_lt'].init()self.csm=parent['_csm']self.csm.init(self.full_lt)self.datastore['source_info']=parent['source_info'][:]maxw=self.csm.get_max_weight(oq)oq.mags_by_trt={trt:python3compat.decode(dset[:])fortrt,dsetinparent['source_mags'].items()}if'_rates'inparent:self.build_curves_maps()# repeat post-processingreturn{}else:maxw=self.max_weightself.init_poes()ifoq.fastmean:logging.info('Will use the fast_mean algorithm')req_gb,self.trt_rlzs,self.gids=get_pmaps_gb(self.datastore)self.datastore['_rates/weig']=self.full_lt.g_weights(self.trt_rlzs)srcidx={name:ifori,nameinenumerate(self.csm.get_basenames())}self.haz=Hazard(self.datastore,srcidx,self.gids)rlzs=self.R==1oroq.individual_rlzsifnotrlzsandnotoq.hazard_stats():raiseInvalidFile('%(job_ini)s: you disabled all statistics',oq.inputs)self.source_data=AccumDict(accum=[])ifnotperformance.numba:logging.warning('numba is not installed: using the slow algorithm')t0=time.time()max_gb=float(config.memory.pmap_max_gb)ifoq.disagg_by_srcorself.N<oq.max_sites_disaggorreq_gb<max_gb:self.check_memory(len(self.sitecol),oq.imtls.size,maxw)self.execute_reg(maxw)else:self.execute_big(maxw*.75)self.store_info()ifself.cfactor[0]==0:ifself.N==1:logging.warning('The site is far from all seismic sources'' included in the hazard model')else:raiseRuntimeError('The sites are far from all seismic sources'' included in the hazard model')else:logging.info('cfactor = {:_d}/{:_d} = {:.1f}'.format(int(self.cfactor[1]),int(self.cfactor[0]),self.cfactor[1]/self.cfactor[0]))if'_rates'inself.datastore:self.build_curves_maps()ifnotoq.hazard_calculation_id:self.classical_time=time.time()-t0returnTrue
[docs]defexecute_reg(self,maxw):""" Regular case """self.create_rup()# create the rup/ datasets BEFORE swmr_on()acc=AccumDict(accum=0.)# src_id -> pmapoq=self.oqparamL=oq.imtls.sizeGt=len(self.trt_rlzs)self.pmap=MapArray(self.sitecol.sids,L,Gt).fill(0,F32)allargs=[]if'sitecol'inself.datastore.parent:ds=self.datastore.parentelse:ds=self.datastoreforcminself.cmakers:sg=self.csm.src_groups[cm.grp_id]cm.rup_indep=getattr(sg,'rup_interdep',None)!='mutex'cm.pmap_max_mb=float(config.memory.pmap_max_mb)ifsg.atomicorsg.weight<=maxw:blks=[sg]else:blks=block_splitter(sg,maxw,get_weight,sort=True)forblockinblks:logging.debug('Sending %d source(s) with weight %d',len(block),sg.weight)allargs.append((block,None,cm,ds))self.datastore.swmr_on()# must come before the Starmapsmap=parallel.Starmap(classical,allargs,h5=self.datastore.hdf5)acc=smap.reduce(self.agg_dicts,acc)withself.monitor('storing rates',measuremem=True):self.haz.store_rates(self.pmap)delself.pmapifoq.disagg_by_src:mrs=self.haz.store_mean_rates_by_src(acc)ifoq.use_ratesandself.N==1:# sanity checkself.check_mean_rates(mrs)
[docs]defcheck_mean_rates(self,mean_rates_by_src):""" The sum of the mean_rates_by_src must correspond to the mean_rates """try:exp=disagg.to_rates(self.datastore['hcurves-stats'][0,0])exceptKeyError:# if there are no ruptures close to the sitereturngot=mean_rates_by_src[0].sum(axis=2)# sum over the sourcesforminrange(len(got)):# skipping large rates which can be wrong due to numerics# (it happens in logictree/case_05 and in Japan)ok=got[m]<10.numpy.testing.assert_allclose(got[m,ok],exp[m,ok],atol=1E-5)
[docs]defexecute_big(self,maxw):""" Use parallel tiling """oq=self.oqparamassertnotoq.disagg_by_srcassertself.N>self.oqparam.max_sites_disagg,self.Nallargs=[]self.ntiles=[]if'_csm'inself.datastore.parent:ds=self.datastore.parentelse:ds=self.datastoreforcminself.cmakers:sg=self.csm.src_groups[cm.grp_id]cm.rup_indep=getattr(sg,'rup_interdep',None)!='mutex'cm.pmap_max_mb=float(config.memory.pmap_max_mb)gid=self.gids[cm.grp_id][0]ifsg.atomicorsg.weight<=maxw:allargs.append((gid,self.sitecol,cm,ds))else:tiles=self.sitecol.split(numpy.ceil(sg.weight/maxw))logging.info('Group #%d, %d tiles',cm.grp_id,len(tiles))fortileintiles:allargs.append((gid,tile,cm,ds))self.ntiles.append(len(tiles))logging.warning('Generated at most %d tiles',max(self.ntiles))self.datastore.swmr_on()# must come before the Starmapmon=self.monitor('storing rates')fordicinparallel.Starmap(classical,allargs,h5=self.datastore.hdf5):self.cfactor+=dic['cfactor']withmon:self.haz.store_rates(dic['pnemap'])return{}
[docs]defstore_info(self):""" Store full_lt, source_info and source_data """self.store_rlz_info(self.rel_ruptures)self.store_source_info(self.source_data)df=pandas.DataFrame(self.source_data)# NB: the impact factor is the number of effective ruptures;# consider for instance a point source producing 200 ruptures# for points within the pointsource_distance (n points) and# producing 20 effective ruptures for the N-n points outside;# then impact = (200 * n + 20 * (N-n)) / N; for n=1 and N=10# it gives impact = 38, i.e. there are 38 effective rupturesdf['impact']=df.nsites/self.Nself.datastore.create_df('source_data',df)self.source_data.clear()# save a bit of memory
[docs]defcollect_hazard(self,acc,pmap_by_kind):""" Populate hcurves and hmaps in the .hazard dictionary :param acc: ignored :param pmap_by_kind: a dictionary of MapArrays """# this is practically instantaneousifpmap_by_kindisNone:# instead of a dictraiseMemoryError('You ran out of memory!')forkindinpmap_by_kind:# hmaps-XXX, hcurves-XXXpmaps=pmap_by_kind[kind]ifkindinself.hazard:array=self.hazard[kind]else:dset=self.datastore.getitem(kind)array=self.hazard[kind]=numpy.zeros(dset.shape,dset.dtype)forr,pmapinenumerate(pmaps):foridx,sidinenumerate(pmap.sids):array[sid,r]=pmap.array[idx]# shape (M, P)
[docs]defpost_execute(self,dummy):""" Check for slow tasks """oq=self.oqparamtask_info=self.datastore.read_df('task_info','taskname')try:dur=task_info.loc[b'classical'].durationexceptKeyError:# no datapasselse:slow_tasks=len(dur[dur>3*dur.mean()])anddur.max()>180msg='There were %d slow task(s)'%slow_tasksifslow_tasksandself.SLOW_TASK_ERRORandnotoq.disagg_by_src:raiseRuntimeError('%s in #%d'%(msg,self.datastore.calc_id))elifslow_tasks:logging.info(msg)
def_create_hcurves_maps(self):oq=self.oqparamN=len(self.sitecol)R=len(self.realizations)ifoq.individual_rlzsisNone:# not specified in the job.iniindividual_rlzs=(N==1)*(R>1)else:individual_rlzs=oq.individual_rlzshstats=oq.hazard_stats()# initialize datasetsP=len(oq.poes)M=self.M=len(oq.imtls)imts=list(oq.imtls)ifoq.soil_intensitiesisnotNone:L=M*len(oq.soil_intensities)else:L=oq.imtls.sizeL1=self.L1=L//MS=len(hstats)ifR==1orindividual_rlzs:self.datastore.create_dset('hcurves-rlzs',F32,(N,R,M,L1))self.datastore.set_shape_descr('hcurves-rlzs',site_id=N,rlz_id=R,imt=imts,lvl=L1)ifoq.poes:self.datastore.create_dset('hmaps-rlzs',F32,(N,R,M,P))self.datastore.set_shape_descr('hmaps-rlzs',site_id=N,rlz_id=R,imt=list(oq.imtls),poe=oq.poes)ifhstats:self.datastore.create_dset('hcurves-stats',F32,(N,S,M,L1))self.datastore.set_shape_descr('hcurves-stats',site_id=N,stat=list(hstats),imt=imts,lvl=numpy.arange(L1))ifoq.poes:self.datastore.create_dset('hmaps-stats',F32,(N,S,M,P))self.datastore.set_shape_descr('hmaps-stats',site_id=N,stat=list(hstats),imt=list(oq.imtls),poe=oq.poes)returnN,S,M,P,L1,individual_rlzs# called by execute before post_execute
[docs]defbuild_curves_maps(self):""" Compute and store hcurves-rlzs, hcurves-stats, hmaps-rlzs, hmaps-stats """oq=self.oqparamhstats=oq.hazard_stats()N,S,M,P,L1,individual=self._create_hcurves_maps()if'_rates'inset(self.datastore):dstore=self.datastoreelse:dstore=self.datastore.parentslicedic=AccumDict(accum=[])foridx,start,stopindstore['_rates/slice_by_idx'][:]:slicedic[idx].append((start,stop))ifnotslicedic:# no hazard, nothing to do, happens in case_60return# using compactify improves the performance of `reading rates`;# I have measured a 3.5x in the AUS model with 1 rlzallslices=[calc.compactify(slices)forslicesinslicedic.values()]nslices=sum(len(slices)forslicesinallslices)logging.info('There are %.1f slices of rates per task',nslices/len(slicedic))if'trt_smrs'notindstore:# starting from hazard_curves.csvtrt_rlzs=self.full_lt.get_trt_rlzs([[0]])else:trt_rlzs=self.full_lt.get_trt_rlzs(dstore['trt_smrs'][:])ifoq.fastmean:ws=self.datastore['weights'][:]weights=numpy.array([ws[trs%TWO24].sum()fortrsintrt_rlzs])trt_rlzs=numpy.zeros(len(trt_rlzs))# reduces the data transferelse:weights=self.full_lt.weightswget=self.full_lt.wgetallargs=[(getters.MapGetter(dstore.filename,trt_rlzs,self.R,slices,oq),weights,wget,hstats,individual,oq.max_sites_disagg,self.amplifier)forslicesinallslices]self.hazard={}# kind -> arrayhcbytes=8*N*S*M*L1hmbytes=8*N*S*M*Pifoq.poeselse0ifhcbytes:logging.info('Producing %s of hazard curves',humansize(hcbytes))ifhmbytes:logging.info('Producing %s of hazard maps',humansize(hmbytes))if'delta_rates'inoq.inputs:pass# avoid an HDF5 errorelse:# in all the other casesself.datastore.swmr_on()ifoq.fastmean:parallel.Starmap(fast_mean,[args[0:2]forargsinallargs],distribute='no'ifself.few_siteselseNone,h5=self.datastore.hdf5,).reduce(self.collect_hazard)else:parallel.Starmap(postclassical,allargs,distribute='no'ifself.few_siteselseNone,h5=self.datastore.hdf5,).reduce(self.collect_hazard)forkindinsorted(self.hazard):logging.info('Saving %s',kind)# very fastself.datastore[kind][:]=self.hazard.pop(kind)fraction=os.environ.get('OQ_SAMPLE_SOURCES')iffractionandhasattr(self,'classical_time'):total_time=time.time()-self.t0delta=total_time-self.classical_timeest_time=self.classical_time/float(fraction)+deltalogging.info('Estimated time: %.1f hours',est_time/3600)if'hmaps-stats'inself.datastore:self.plot_hmaps()
[docs]defplot_hmaps(self):""" Generate hazard map plots if there are more the 1000 sites """hmaps=self.datastore.sel('hmaps-stats',stat='mean')# NSMPmaxhaz=hmaps.max(axis=(0,1,3))mh=dict(zip(self.oqparam.imtls,maxhaz))logging.info('The maximum hazard map values are %s',mh)ifImageisNoneornotself.from_engine:# missing PILreturnifself.N<1000:# few sites, don't plotreturnM,P=hmaps.shape[2:]logging.info('Saving %dx%d mean hazard maps',M,P)inv_time=self.oqparam.investigation_timeallargs=[]form,imtinenumerate(self.oqparam.imtls):forp,poeinenumerate(self.oqparam.poes):dic=dict(m=m,p=p,imt=imt,poe=poe,inv_time=inv_time,calc_id=self.datastore.calc_id,array=hmaps[:,0,m,p])allargs.append((dic,self.sitecol.lons,self.sitecol.lats))smap=parallel.Starmap(make_hmap_png,allargs)fordicinsmap:self.datastore['png/hmap_%(m)d_%(p)d'%dic]=dic['img']