# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2015-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importioimportmathimporttimeimportos.pathimportloggingimportnumpyimportpandasfromshapelyimportgeometryfromopenquake.baselibimportconfig,hdf5,parallel,python3compatfromopenquake.baselib.generalimport(AccumDict,humansize,block_splitter,group_array)fromopenquake.hazardlib.geo.packagerimportfionafromopenquake.hazardlib.map_arrayimportMapArray,get_mean_curvefromopenquake.hazardlib.statsimportgeom_avg_std,compute_statsfromopenquake.hazardlib.calc.stochasticimportsample_rupturesfromopenquake.hazardlib.contextsimportContextMaker,FarAwayRupturefromopenquake.hazardlib.calc.filtersimport(close_ruptures,magstr,nofilter,getdefault,get_distances,SourceFilter)fromopenquake.hazardlib.calc.gmfimportGmfComputerfromopenquake.hazardlib.calc.conditioned_gmfsimportConditionedGmfComputerfromopenquake.hazardlibimportlogictree,InvalidFilefromopenquake.hazardlib.calc.stochasticimportget_rup_array,rupture_dtfromopenquake.hazardlib.source.ruptureimport(RuptureProxy,EBRupture,get_ruptures)fromopenquake.commonlibimportutil,logs,readinput,datastorefromopenquake.commonlib.calcimport(gmvs_to_poes,make_hmaps,slice_dt,build_slice_by_event,RuptureImporter,SLICE_BY_EVENT_NSITES,get_close_mosaic_models,get_proxies)fromopenquake.risklib.riskinputimportstr2rsi,rsi2strfromopenquake.calculatorsimportbase,viewsfromopenquake.calculators.gettersimportsig_eps_dtfromopenquake.calculators.classicalimportClassicalCalculatorfromopenquake.calculators.extractimportExtractorfromopenquake.calculators.postproc.plotsimportplot_avg_gmffromopenquake.calculators.baseimportexpose_outputsfromPILimportImageU8=numpy.uint8U16=numpy.uint16U32=numpy.uint32I64=numpy.int64F32=numpy.float32F64=numpy.float64TWO24=2**24TWO32=numpy.float64(2**32)rup_dt=numpy.dtype([('rup_id',I64),('rrup',F32),('time',F32),('task_no',U16)])
[docs]defrup_weight(rup):# rup['nsites'] is 0 if the ruptures were generated without a sitecolifisinstance(rup,numpy.ndarray):nsites=numpy.clip(rup['nsites'],1.,numpy.inf)returnnumpy.ceil(nsites/100.)returnmath.ceil((rup['nsites']or1)/100.)
[docs]defbuild_hcurves(calc):""" Build the hazard curves from each realization starting from the stored GMFs. Works only for few sites. """oq=calc.oqparam# compute and save statistics; this is done in process and can# be very slow if there are thousands of realizationsweights=calc.datastore['weights'][:]# NB: in the future we may want to save to individual hazard# curves if oq.individual_rlzs is set; for the moment we# save the statistical curves onlyhstats=oq.hazard_stats()S=len(hstats)R=len(weights)N=calc.NM=len(oq.imtls)L1=oq.imtls.size//Mgmf_df=calc.datastore.read_df('gmf_data','eid')ev_df=calc.datastore.read_df('events','id')[['rlz_id']]gmf_df=gmf_df.join(ev_df)hc_mon=calc._monitor('building hazard curves',measuremem=False)hcurves={}for(sid,rlz),dfingmf_df.groupby(['sid','rlz_id']):withhc_mon:poes=gmvs_to_poes(df,oq.imtls,oq.ses_per_logic_tree_path)form,imtinenumerate(oq.imtls):hcurves[rsi2str(rlz,sid,imt)]=poes[m]pmaps={r:MapArray(calc.sitecol.sids,L1*M,1).fill(0)forrinrange(R)}forkey,poesinhcurves.items():r,sid,imt=str2rsi(key)array=pmaps[r].array[sid,oq.imtls(imt),0]array[:]=1.-(1.-array)*(1.-poes)pmaps=[p.reshape(N,M,L1)forpinpmaps.values()]ifoq.individual_rlzs:logging.info('Saving individual hazard curves')calc.datastore.create_dset('hcurves-rlzs',F32,(N,R,M,L1))calc.datastore.set_shape_descr('hcurves-rlzs',site_id=N,rlz_id=R,imt=list(oq.imtls),lvl=numpy.arange(L1))ifoq.poes:P=len(oq.poes)M=len(oq.imtls)ds=calc.datastore.create_dset('hmaps-rlzs',F32,(N,R,M,P))calc.datastore.set_shape_descr('hmaps-rlzs',site_id=N,rlz_id=R,imt=list(oq.imtls),poe=oq.poes)forrinrange(R):calc.datastore['hcurves-rlzs'][:,r]=pmaps[r].arrayifoq.poes:[hmap]=make_hmaps([pmaps[r]],oq.imtls,oq.poes)ds[:,r]=hmap.arrayifS:logging.info('Computing statistical hazard curves')calc.datastore.create_dset('hcurves-stats',F32,(N,S,M,L1))calc.datastore.set_shape_descr('hcurves-stats',site_id=N,stat=list(hstats),imt=list(oq.imtls),lvl=numpy.arange(L1))ifoq.poes:P=len(oq.poes)M=len(oq.imtls)ds=calc.datastore.create_dset('hmaps-stats',F32,(N,S,M,P))calc.datastore.set_shape_descr('hmaps-stats',site_id=N,stat=list(hstats),imt=list(oq.imtls),poes=oq.poes)fors,statinenumerate(hstats):smap=MapArray(calc.sitecol.sids,L1,M)[smap.array]=compute_stats(numpy.array([p.arrayforpinpmaps]),[hstats[stat]],weights)calc.datastore['hcurves-stats'][:,s]=smap.arrayifoq.poes:[hmap]=make_hmaps([smap],oq.imtls,oq.poes)ds[:,s]=hmap.array
[docs]defcount_ruptures(src):""" Count the number of ruptures on a heavy source """return{src.source_id:src.count_ruptures()}
[docs]defget_computer(cmaker,proxy,srcfilter,station_data,station_sitecol):""" :returns: GmfComputer or ConditionedGmfComputer """sids=srcfilter.close_sids(proxy,cmaker.trt)iflen(sids)==0:# filtered awayraiseFarAwayRuptureebr=proxy.to_ebr(cmaker.trt)oq=cmaker.oqifstation_sitecol:stations=numpy.isin(sids,station_sitecol.sids)ifstations.any():station_sids=sids[stations]returnConditionedGmfComputer(ebr,srcfilter.sitecol.filtered(sids),srcfilter.sitecol.filtered(station_sids),station_data.loc[station_sids],oq.observed_imts,cmaker,oq.correl_model,oq.cross_correl,oq.ground_motion_correlation_params,oq.number_of_ground_motion_fields,oq._amplifier,oq._sec_perils)else:logging.warning('There are no stations!')returnGmfComputer(ebr,srcfilter.sitecol.filtered(sids),cmaker,oq.correl_model,oq.cross_correl,oq._amplifier,oq._sec_perils)
def_event_based(proxies,cmaker,stations,srcfilter,shr,fmon,cmon,umon,mmon):oq=cmaker.oqalldata=[]sig_eps=[]times=[]max_iml=oq.get_max_iml()se_dt=sig_eps_dt(oq.imtls)mea_tau_phi=[]forproxyinproxies:t0=time.time()withfmon:ifproxy['mag']<cmaker.min_mag:continuetry:computer=get_computer(cmaker,proxy,srcfilter,*stations)exceptFarAwayRupture:# skip this rupturecontinueifstationsandstations[0]isnotNone:# conditioned GMFsassertcmaker.scenariowithshr['mea']asmea,shr['tau']astau,shr['phi']asphi:df=computer.compute_all([mea,tau,phi],max_iml,mmon,cmon,umon)else:# regular GMFsdf=computer.compute_all(None,max_iml,mmon,cmon,umon)ifoq.mea_tau_phi:mtp=numpy.array(computer.mea_tau_phi,GmfComputer.mtp_dt)mea_tau_phi.append(mtp)sig_eps.append(computer.build_sig_eps(se_dt))dt=time.time()-t0times.append((proxy['id'],computer.ctx.rrup.min(),dt))alldata.append(df)times=numpy.array([tup+(fmon.task_no,)fortupintimes],rup_dt)times.sort(order='rup_id')ifsum(len(df)fordfinalldata)==0:returndict(gmfdata={},times=times,sig_eps=())gmfdata=pandas.concat(alldata)# ~40 MBdic=dict(gmfdata={k:gmfdata[k].to_numpy()forkingmfdata.columns},times=times,sig_eps=numpy.concatenate(sig_eps,dtype=se_dt))ifoq.mea_tau_phi:mtpdata=numpy.concatenate(mea_tau_phi,dtype=GmfComputer.mtp_dt)dic['mea_tau_phi']={col:mtpdata[col]forcolinmtpdata.dtype.names}returndic
[docs]defevent_based(proxies,cmaker,sitecol,stations,dstore,monitor):""" Compute GMFs and optionally hazard curves """ifisinstance(dstore,str):# when passing ruptures.hdf5dstore=hdf5.File(dstore)oq=cmaker.oqrmon=monitor('reading sites and ruptures',measuremem=True)fmon=monitor('instantiating GmfComputer',measuremem=False)mmon=monitor('computing mean_stds',measuremem=False)cmon=monitor('computing gmfs',measuremem=False)umon=monitor('updating gmfs',measuremem=False)cmaker.scenario='scenario'inoq.calculation_modewithdstore,rmon:srcfilter=SourceFilter(sitecol.complete,oq.maximum_distance(cmaker.trt))dset=dstore['rupgeoms']forproxyinproxies:proxy.geom=dset[proxy['geom_id']]forblockinblock_splitter(proxies,20_000,rup_weight):yield_event_based(block,cmaker,stations,srcfilter,monitor.shared,fmon,cmon,umon,mmon)
[docs]deffilter_stations(station_df,complete,rup,maxdist):""" :param station_df: DataFrame with the stations :param complete: complete SiteCollection :param rup: rupture :param maxdist: maximum distance :returns: filtered (station_df, station_sitecol) """ns=len(station_df)ok=(get_distances(rup,complete,'rrup')<=maxdist)&numpy.isin(complete.sids,station_df.index)station_sites=complete.filter(ok)ifstation_sitesisNone:station_data=Nonelogging.warning('Discarded %d/%d stations more distant than %d km, ''switching to the unconditioned GMF computer',ns,ns,maxdist)else:station_data=station_df[numpy.isin(station_df.index,station_sites.sids)]assertlen(station_data)==len(station_sites),(len(station_data),len(station_sites))iflen(station_data)<ns:logging.info('Discarded %d/%d stations more distant than %d km',ns-len(station_data),ns,maxdist)returnstation_data,station_sites
[docs]defstarmap_from_rups_hdf5(oq,sitecol,dstore):""" :returns: a Starmap instance sending event_based tasks """ruptures_hdf5=oq.inputs['rupture_model']trts={}rlzs_by_gsim={}withhdf5.File(ruptures_hdf5)asr:formodelinr['full_lt']:full_lt=r[f'full_lt/{model}']trts[model]=full_lt.trtslogging.info('Building rlzs_by_gsim for %s',model)fortrt_smr,rbginfull_lt.get_rlzs_by_gsim_dic().items():rlzs_by_gsim[model,trt_smr]=rbgdstore['full_lt']=full_lt# saving the last lt (hackish)r.copy('events',dstore.hdf5)# saving the eventslogging.info('Selecting the ruptures close to the sites')rups=close_ruptures(r['ruptures'][:],sitecol)dstore['ruptures']=rupsR=full_lt.num_samplesdstore['weights']=numpy.ones(R)/Rrups_dic=group_array(rups,'model','trt_smr')totw=sum(rup_weight(rups).sum()forrupsinrups_dic.values())maxw=totw/(oq.concurrent_tasksor1)extra=sitecol.array.dtype.namesdstore.swmr_on()smap=parallel.Starmap(event_based,h5=dstore.hdf5)logging.info('Computing the GMFs')for(model,trt_smr),rupsinrups_dic.items():model=model.decode('ascii')trt=trts[model][trt_smr//TWO24]proxies=get_proxies(ruptures_hdf5,rups)mags=numpy.unique(numpy.round(rups['mag'],2))oq.mags_by_trt={trt:[magstr(mag)formaginmags]}cmaker=ContextMaker(trt,rlzs_by_gsim[model,trt_smr],oq,extraparams=extra)cmaker.min_mag=getdefault(oq.minimum_magnitude,trt)forblockinblock_splitter(proxies,maxw*1.02,rup_weight):args=block,cmaker,sitecol,(None,None),ruptures_hdf5smap.submit(args)returnsmap
# NB: save_tmp is passed in event_based_risk
[docs]defstarmap_from_rups(func,oq,full_lt,sitecol,dstore,save_tmp=None):""" Submit the ruptures and apply `func` (event_based or ebrisk) """try:vs30=sitecol.vs30exceptValueError:# in scenario test_case_14passelse:ifnumpy.isnan(vs30).any():raiseValueError('The vs30 is NaN, missing site model ''or site parameter')set_mags(oq,dstore)rups=dstore['ruptures'][:]logging.info('Reading {:_d} ruptures'.format(len(rups)))logging.info('Affected sites ~%.0f per rupture, max=%.0f',rups['nsites'].mean(),rups['nsites'].max())allproxies=[RuptureProxy(rec)forrecinrups]if"station_data"inoq.inputs:trt=full_lt.trts[0]proxy=allproxies[0]proxy.geom=dstore['rupgeoms'][proxy['geom_id']]rup=proxy.to_ebr(trt).rupturestation_df=dstore.read_df('station_data','site_id')maxdist=(oq.maximum_distance_stationsoroq.maximum_distance['default'][-1][1])station_data,station_sites=filter_stations(station_df,sitecol.complete,rup,maxdist)else:station_data,station_sites=None,Nonemaxw=sum(rup_weight(p)forpinallproxies)/(oq.concurrent_tasksor1)logging.info('maxw = {:_d}'.format(round(maxw)))ifstation_dataisnotNone:# assume scenario with a single true rupturerlzs_by_gsim=full_lt.get_rlzs_by_gsim(0)cmaker=ContextMaker(trt,rlzs_by_gsim,oq)cmaker.scenario=Truemaxdist=oq.maximum_distance(cmaker.trt)srcfilter=SourceFilter(sitecol.complete,maxdist)computer=get_computer(cmaker,proxy,srcfilter,station_data,station_sites)G=len(cmaker.gsims)M=len(cmaker.imts)N=len(computer.sitecol)size=2*G*M*N*N*8# tau, phimsg=f'{G=} * {M=} * {humansize(N*N*8)} * 2'logging.info('Requiring %s for tau, phi [%s]',humansize(size),msg)ifsize>float(config.memory.conditioned_gmf_gb)*1024**3:raiseValueError(f'The calculation is too large: {G=}, {M=}, {N=}. ''You must reduce the number of sites i.e. maximum_distance')mea,tau,phi=computer.get_mea_tau_phi(dstore.hdf5)delproxy.geom# to reduce data transferdstore.swmr_on()smap=parallel.Starmap(func,h5=dstore.hdf5)ifsave_tmp:save_tmp(smap.monitor)# NB: for conditioned scenarios we are looping on a single trttoml_gsims=[]fortrt_smr,start,stopindstore['trt_smr_start_stop']:proxies=allproxies[start:stop]trt=full_lt.trts[trt_smr//TWO24]extra=sitecol.array.dtype.namesrlzs_by_gsim=full_lt.get_rlzs_by_gsim(trt_smr)cmaker=ContextMaker(trt,rlzs_by_gsim,oq,extraparams=extra)cmaker.min_mag=getdefault(oq.minimum_magnitude,trt)forgsiminrlzs_by_gsim:toml_gsims.append(gsim._toml)ifstation_dataisnotNone:ifparallel.oq_distribute()in('zmq','slurm'):logging.error('Conditioned scenarios are not meant to be run'' on a cluster')smap.share(mea=mea,tau=tau,phi=phi)# producing slightly less than concurrent_tasks thanks to the 1.02forblockinblock_splitter(proxies,maxw*1.02,rup_weight):args=block,cmaker,sitecol,(station_data,station_sites),dstoresmap.submit(args)dstore['gsims']=numpy.array(toml_gsims)returnsmap
[docs]defset_mags(oq,dstore):""" Set the attribute oq.mags_by_trt """if'source_mags'indstore:# classical or event_basedoq.mags_by_trt={trt:python3compat.decode(dset[:])fortrt,dsetindstore['source_mags'].items()}elifoq.ruptures_hdf5:passelif'ruptures'indstore:# scenariotrts=dstore['full_lt'].trtsruptures=dstore['ruptures'][:]dic={}fortrti,trtinenumerate(trts):rups=ruptures[ruptures['trt_smr']==trti]mags=numpy.unique(numpy.round(rups['mag'],2))dic[trt]=['%.02f'%magformaginmags]oq.mags_by_trt=dic
[docs]defcompute_avg_gmf(gmf_df,weights,min_iml):""" :param gmf_df: a DataFrame with colums eid, sid, rlz, gmv... :param weights: E weights associated to the realizations :param min_iml: array of M minimum intensities :returns: a dictionary site_id -> array of shape (2, M) """dic={}E=len(weights)M=len(min_iml)forsid,dfingmf_df.groupby(gmf_df.index):eid=df.pop('eid')gmvs=numpy.ones((E,M),F32)*min_imlgmvs[eid.to_numpy()]=df.to_numpy()dic[sid]=geom_avg_std(gmvs,weights)returndic
[docs]defread_gsim_lt(oq):# in impact mode the gsim_lt is read from the exposure.hdf5 fileifoq.impactandnotoq.shakemap_uri:ifnotoq.mosaic_model:ifoq.rupture_dict:lon,lat=[oq.rupture_dict['lon'],oq.rupture_dict['lat']]elifoq.rupture_xml:hypo=readinput.get_rupture(oq).hypocenterlon,lat=[hypo.x,hypo.y]mosaic_models=get_close_mosaic_models(lon,lat,5)# NOTE: using the first mosaic modeloq.mosaic_model=mosaic_models[0]iflen(mosaic_models)>1:logging.info('Using the "%s" model'%oq.mosaic_model)[expo_hdf5]=oq.inputs['exposure']ifoq.mosaic_model=='???':raiseValueError('(%(lon)s, %(lat)s) is not covered by the mosaic!'%oq.rupture_dict)ifoq.gsim!='[FromFile]':raiseValueError('In Aristotle mode the gsim can not be specified in'' the job.ini: %s'%oq.gsim)ifoq.tectonic_region_type=='*':raiseValueError('The tectonic_region_type parameter must be specified')gsim_lt=logictree.GsimLogicTree.from_hdf5(expo_hdf5,oq.mosaic_model,oq.tectonic_region_type.encode('utf8'))else:gsim_lt=readinput.get_gsim_lt(oq)returngsim_lt
[docs]@base.calculators.add('event_based','scenario','ucerf_hazard')classEventBasedCalculator(base.HazardCalculator):""" Event based PSHA calculator generating the ground motion fields and the hazard curves from the ruptures, depending on the configuration parameters. """core_task=event_basedis_stochastic=Trueaccept_precalc=['event_based','ebrisk','event_based_risk']
[docs]definit(self):ifself.oqparam.cross_correl.__class__.__name__=='GodaAtkinson2009':logging.warning('The truncation_level param is ignored with GodaAtkinson2009')ifhasattr(self,'csm'):self.check_floating_spinning()ifhasattr(self.oqparam,'maximum_distance'):self.srcfilter=self.src_filter()else:self.srcfilter=nofilterifnotself.datastore.parent:self.datastore.create_dset('ruptures',rupture_dt)self.datastore.create_dset('rupgeoms',hdf5.vfloat32)
[docs]defcounting_ruptures(self):""" Sets src.num_ruptures and src.offset """sources=self.csm.get_sources()logging.info('Counting the ruptures in the CompositeSourceModel')self.datastore.swmr_on()withself.monitor('counting ruptures',measuremem=True):nrups=parallel.Starmap(# weighting the heavy sourcescount_ruptures,[(src,)forsrcinsourcesifsrc.codeinb'AMSC'],h5=self.datastore.hdf5,progress=logging.debug).reduce()# NB: multifault sources must be considered light to avoid a large# data transfer, even if .count_ruptures can be slowforsrcinsources:try:src.num_ruptures=nrups[src.source_id]exceptKeyError:# light sourcessrc.num_ruptures=src.count_ruptures()src.weight=src.num_rupturesself.csm.fix_src_offset()# NB: must be AFTER count_rupturesmaxweight=sum(sg.weightforsginself.csm.src_groups)/(self.oqparam.concurrent_tasksor1)returnmaxweight
[docs]defbuild_events_from_sources(self):""" Prefilter the composite source model and store the source_info """oq=self.oqparammaxweight=self.counting_ruptures()eff_ruptures=AccumDict(accum=0)# grp_id => potential rupturessource_data=AccumDict(accum=[])allargs=[]srcfilter=self.srcfilterif'geometry'inoq.inputs:fname=oq.inputs['geometry']withfiona.open(fname)asf:model_geom=geometry.shape(f[0].geometry)elifoq.mosaic_model:# 3-letter mosaic modelmosaic_df=readinput.read_mosaic_df(buffer=0).set_index('code')mmodel='CAN'ifoq.mosaic_model=='CND'elseoq.mosaic_modelmodel_geom=mosaic_df.loc[mmodel].geomlogging.info('Building ruptures')g_index=0forsg_id,sginenumerate(self.csm.src_groups):ifnotsg.sources:continuergb=self.full_lt.get_rlzs_by_gsim(sg.sources[0].trt_smr)cmaker=ContextMaker(sg.trt,rgb,oq)cmaker.gid=numpy.arange(g_index,g_index+len(rgb))g_index+=len(rgb)cmaker.model=oq.mosaic_modelor'???'ifoq.mosaic_modelor'geometry'inoq.inputs:cmaker.model_geom=model_geomforsrc_groupinsg.split(maxweight):allargs.append((src_group,cmaker,srcfilter.sitecol))self.datastore.swmr_on()smap=parallel.Starmap(sample_ruptures,allargs,h5=self.datastore.hdf5)mon=self.monitor('saving ruptures')self.nruptures=0# estimated classical ruptures within maxdistt0=time.time()tot_ruptures=0filtered_ruptures=0fordicinsmap:# NB: dic should be a dictionary, but when the calculation dies# for an OOM it can become None, thus giving a very confusing errorifdicisNone:raiseMemoryError('You ran out of memory!')rup_array=dic['rup_array']tot_ruptures+=len(rup_array)iflen(rup_array)==0:continuegeom=rup_array.geomfiltered_ruptures+=len(rup_array)ifdic['source_data']:source_data+=dic['source_data']ifdic['eff_ruptures']:eff_ruptures+=dic['eff_ruptures']withmon:self.nruptures+=len(rup_array)# NB: the ruptures will we reordered and resaved laterhdf5.extend(self.datastore['ruptures'],rup_array)hdf5.extend(self.datastore['rupgeoms'],geom)t1=time.time()logging.info(f'Generated {filtered_ruptures}/{tot_ruptures} ruptures,'f' stored in {t1-t0} seconds')iflen(self.datastore['ruptures'])==0:raiseRuntimeError('No ruptures were generated, perhaps the ''effective investigation time is too short')# don't change the order of the 3 things below!self.store_source_info(source_data)self.store_rlz_info(eff_ruptures)imp=RuptureImporter(self.datastore)withself.monitor('saving ruptures and events'):imp.import_rups_events(self.datastore.getitem('ruptures')[()])
[docs]defagg_dicts(self,acc,result):""" :param acc: accumulator dictionary :param result: an AccumDict with events, ruptures and gmfs """ifresultisNone:# instead of a dictraiseMemoryError('You ran out of memory!')sav_mon=self.monitor('saving gmfs')primary=self.oqparam.get_primary_imtls()sec_imts=self.oqparam.sec_imtswithsav_mon:gmfdata=result.pop('gmfdata')iflen(gmfdata):df=pandas.DataFrame(gmfdata)dset=self.datastore['gmf_data/sid']times=result.pop('times')hdf5.extend(self.datastore['gmf_data/rup_info'],times)ifself.N>=SLICE_BY_EVENT_NSITES:sbe=build_slice_by_event(df.eid.to_numpy(),self.offset)hdf5.extend(self.datastore['gmf_data/slice_by_event'],sbe)hdf5.extend(dset,df.sid.to_numpy())hdf5.extend(self.datastore['gmf_data/eid'],df.eid.to_numpy())forminrange(len(primary)):hdf5.extend(self.datastore[f'gmf_data/gmv_{m}'],df[f'gmv_{m}'])forsec_imtinsec_imts:hdf5.extend(self.datastore[f'gmf_data/{sec_imt}'],df[sec_imt])sig_eps=result.pop('sig_eps')hdf5.extend(self.datastore['gmf_data/sigma_epsilon'],sig_eps)self.offset+=len(df)# optionally save mea_tau_phimtp=result.pop('mea_tau_phi',None)ifmtp:forcol,arrinmtp.items():hdf5.extend(self.datastore[f'mea_tau_phi/{col}'],arr)returnacc
def_read_scenario_ruptures(self):oq=self.oqparamgsim_lt=read_gsim_lt(oq)trts=list(gsim_lt.values)if(str(gsim_lt.branches[0].gsim)=='[FromFile]'and'gmfs'notinoq.inputs):raiseInvalidFile('%s: missing gsim or gsim_logic_tree_file'%oq.inputs['job_ini'])G=gsim_lt.get_num_paths()ifoq.calculation_mode.startswith('scenario'):ngmfs=oq.number_of_ground_motion_fieldsifoq.rupture_dictoroq.rupture_xml:# check the number of branchsetsbsets=len(gsim_lt._ltnode)ifbsets>1:raiseInvalidFile('%s for a scenario calculation must contain a single ''branchset, found %d!'%(oq.inputs['job_ini'],bsets))[(trt_smr,rlzs_by_gsim)]=gsim_lt.get_rlzs_by_gsim_dic().items()trt=trts[trt_smr//TWO24]rup=readinput.get_rupture(oq)oq.mags_by_trt={trt:[magstr(rup.mag)]}self.cmaker=ContextMaker(trt,rlzs_by_gsim,oq)ifself.N>oq.max_sites_disagg:# many sites, split ruptureebrs=[]foriinrange(ngmfs):ebr=EBRupture(rup,0,0,G,i,e0=i*G)ebr.seed=oq.ses_seed+iebrs.append(ebr)else:# keep a single rupture with a big occupation numberebrs=[EBRupture(rup,0,0,G*ngmfs,0)]ebrs[0].seed=oq.ses_seedsrcfilter=SourceFilter(self.sitecol,oq.maximum_distance(trt))aw=get_rup_array(ebrs,srcfilter)iflen(aw)==0:raiseRuntimeError('The rupture is too far from the sites! Please check the ''maximum_distance and the position of the rupture')elifoq.inputs['rupture_model'].endswith('.csv'):aw=get_ruptures(oq.inputs['rupture_model'])iflen(gsim_lt.values)==1:# fix for scenario_damage/case_12aw['trt_smr']=0# a single TRTifoq.calculation_mode.startswith('scenario'):# rescale n_occ by ngmfs and nrlzsaw['n_occ']*=ngmfs*gsim_lt.get_num_paths()else:raiseInvalidFile("Something wrong in %s"%oq.inputs['job_ini'])rup_array=aw.arrayhdf5.extend(self.datastore['rupgeoms'],aw.geom)iflen(rup_array)==0:raiseRuntimeError('There are no sites within the maximum_distance'' of %s km from the rupture'%oq.maximum_distance(rup.tectonic_region_type)(rup.mag))fake=logictree.FullLogicTree.fake(gsim_lt)self.datastore['full_lt']=fakeself.store_rlz_info({})# store weightsself.save_params()imp=RuptureImporter(self.datastore)imp.import_rups_events(rup_array)
[docs]defexecute(self):oq=self.oqparamifoq.impactandoq.shakemap_uri:# this is creating gmf_database.store_gmfs_from_shakemap(self,self.sitecol,self.assetcol)return{}dstore=self.datastoreE=Noneifoq.ground_motion_fieldsandoq.min_iml.sum()==0:logging.warning('The GMFs are not filtered: ''you may want to set a minimum_intensity')elifoq.minimum_intensity:logging.info('minimum_intensity=%s',oq.minimum_intensity)else:logging.info('min_iml=%s',oq.min_iml)self.offset=0ifoq.hazard_calculation_id:# from rupturesdstore.parent=datastore.read(oq.hazard_calculation_id)self.full_lt=dstore.parent['full_lt'].init()set_mags(oq,dstore)elifhasattr(self,'csm'):# from sourcesset_mags(oq,dstore)self.build_events_from_sources()if(oq.ground_motion_fieldsisFalseandoq.hazard_curves_from_gmfsisFalse):return{}elifnotoq.rupture_dictand'rupture_model'notinoq.inputs:logging.warning('There is no rupture_model, the calculator will just ''import data without performing any calculation')fake=logictree.FullLogicTree.fake()dstore['full_lt']=fake# needed to expose the outputsdstore['weights']=[1.]return{}elifoq.ruptures_hdf5:withhdf5.File(oq.ruptures_hdf5)asr:E=len(r['events'])else:# scenarioself._read_scenario_ruptures()if(oq.ground_motion_fieldsisFalseandoq.hazard_curves_from_gmfsisFalse):return{}ifoq.ground_motion_fields:prim_imts=oq.get_primary_imtls()base.create_gmf_data(dstore,prim_imts,oq.sec_imts,E=E,R=oq.number_of_logic_tree_samples)dstore.create_dset('gmf_data/sigma_epsilon',sig_eps_dt(oq.imtls))dstore.create_dset('gmf_data/rup_info',rup_dt)ifself.N>=SLICE_BY_EVENT_NSITES:dstore.create_dset('gmf_data/slice_by_event',slice_dt)# event_based in parallelifoq.ruptures_hdf5:smap=starmap_from_rups_hdf5(oq,self.sitecol,dstore)else:smap=starmap_from_rups(event_based,oq,self.full_lt,self.sitecol,dstore)acc=smap.reduce(self.agg_dicts)if'gmf_data'notindstore:returnaccifoq.ground_motion_fields:withself.monitor('saving avg_gmf',measuremem=True):self.save_avg_gmf()returnacc
[docs]defsave_avg_gmf(self):""" Compute and save avg_gmf, unless there are too many GMFs """size=self.datastore.getsize('gmf_data')maxsize=self.oqparam.gmf_max_gb*1024**3logging.info(f'Stored {humansize(size)} of GMFs')ifsize>maxsize:logging.warning(f'There are more than {humansize(maxsize)} of GMFs,'' not computing avg_gmf')returnrlzs=self.datastore['events'][:]['rlz_id']self.weights=self.datastore['weights'][:][rlzs]gmf_df=self.datastore.read_df('gmf_data','sid')forsec_imtinself.oqparam.sec_imts:# ignore secondary perilsdelgmf_df[sec_imt]rel_events=gmf_df.eid.unique()e=len(rel_events)ife==0:raiseRuntimeError('No GMFs were generated, perhaps they were ''all below the minimum_intensity threshold')elife<len(self.datastore['events']):self.datastore['relevant_events']=rel_eventslogging.info('Stored {:_d} relevant event IDs'.format(e))# really compute and store the avg_gmfM=len(self.oqparam.min_iml)avg_gmf=numpy.zeros((2,len(self.sitecol.complete),M),F32)forsid,avgstdincompute_avg_gmf(gmf_df,self.weights,self.oqparam.min_iml).items():avg_gmf[:,sid]=avgstdself.datastore['avg_gmf']=avg_gmf# make avg_gmf plots only if running via the webuiifos.environ.get('OQ_APPLICATION_MODE')=='ARISTOTLE':imts=list(self.oqparam.imtls)ex=Extractor(self.datastore.calc_id)forimtinimts:plt=plot_avg_gmf(ex,imt)bio=io.BytesIO()plt.savefig(bio,format='png',bbox_inches='tight')fig_path=f'png/avg_gmf-{imt}.png'logging.info(f'Saving {fig_path} into the datastore')self.datastore[fig_path]=Image.open(bio)
[docs]defpost_execute(self,dummy):oq=self.oqparamifnotoq.ground_motion_fieldsor'gmf_data'notinself.datastore:return# check seed dependency unless the number of GMFs is hugesize=self.datastore.getsize('gmf_data/gmv_0')if'gmf_data'inself.datastoreandsize<4E9andnotoq.ruptures_hdf5:# TODO: check why there is an error for ruptures_hdf5logging.info('Checking stored GMFs')msg=views.view('extreme_gmvs',self.datastore)logging.info(msg)ifself.datastore.parent:self.datastore.parent.open('r')ifoq.hazard_curves_from_gmfs:ifsize>4E6:msg='gmf_data has {:_d} rows'.format(size)raiseRuntimeError(f'{msg}: too big to compute the hcurves')build_hcurves(self)ifoq.compare_with_classical:# compute classical curvesexport_dir=os.path.join(oq.export_dir,'cl')ifnotos.path.exists(export_dir):os.makedirs(export_dir)oq.export_dir=export_diroq.calculation_mode='classical'withlogs.init(vars(oq))aslog:self.cl=ClassicalCalculator(oq,log.calc_id)# TODO: perhaps it is possible to avoid reprocessing the# source model, however usually this is quite fast and# does not dominate the computationself.cl.run()expose_outputs(self.cl.datastore)all=slice(None)forimtinoq.imtls:cl_mean_curves=get_mean_curve(self.datastore,imt,all)eb_mean_curves=get_mean_curve(self.datastore,imt,all)self.rdiff,index=util.max_rel_diff_index(cl_mean_curves,eb_mean_curves)logging.warning('Relative difference with the classical ''mean curves: %d%% at site index %d, imt=%s',self.rdiff*100,index,imt)