Source code for openquake.calculators.event_based

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

import io
import math
import time
import os.path
import logging
import operator
import numpy
import pandas
import fiona
from shapely import geometry
from openquake.baselib import config, hdf5, parallel, python3compat
from openquake.baselib.general import (
    AccumDict, humansize, groupby, block_splitter)
from openquake.hazardlib.map_array import MapArray, get_mean_curve
from openquake.hazardlib.stats import geom_avg_std, compute_stats
from openquake.hazardlib.calc.stochastic import sample_ruptures
from openquake.hazardlib.contexts import ContextMaker, FarAwayRupture
from openquake.hazardlib.calc.filters import (
    nofilter, getdefault, get_distances, SourceFilter)
from openquake.hazardlib.calc.gmf import GmfComputer
from openquake.hazardlib.calc.conditioned_gmfs import ConditionedGmfComputer
from openquake.hazardlib import logictree, InvalidFile
from openquake.hazardlib.geo.utils import geolocate
from openquake.hazardlib.calc.stochastic import get_rup_array, rupture_dt
from openquake.hazardlib.source.rupture import (
    RuptureProxy, EBRupture, get_ruptures)
from openquake.commonlib import util, logs, readinput, datastore
from openquake.commonlib.calc import (
    gmvs_to_poes, make_hmaps, slice_dt, build_slice_by_event, RuptureImporter,
    SLICE_BY_EVENT_NSITES)
from openquake.risklib.riskinput import str2rsi, rsi2str
from openquake.calculators import base, views
from openquake.calculators.getters import get_rupture_getters, sig_eps_dt
from openquake.calculators.classical import ClassicalCalculator
from openquake.calculators.extract import Extractor
from openquake.calculators.postproc.plots import plot_avg_gmf
from openquake.engine import engine
from PIL import Image

U8 = numpy.uint8
U16 = numpy.uint16
U32 = numpy.uint32
I64 = numpy.int64
F32 = numpy.float32
F64 = numpy.float64
TWO24 = 2 ** 24
TWO32 = numpy.float64(2 ** 32)
rup_dt = numpy.dtype(
    [('rup_id', I64), ('rrup', F32), ('time', F32), ('task_no', U16)])


[docs]def rup_weight(rup): # rup['nsites'] is 0 if the ruptures were generated without a sitecol return math.ceil((rup['nsites'] or 1) / 100)
# ######################## hcurves_from_gmfs ############################ #
[docs]def build_hcurves(calc): """ Build the hazard curves from each realization starting from the stored GMFs. Works only for few sites. """ oq = calc.oqparam # compute and save statistics; this is done in process and can # be very slow if there are thousands of realizations weights = calc.full_lt.weights[:, -1] # NB: in the future we may want to save to individual hazard # curves if oq.individual_rlzs is set; for the moment we # save the statistical curves only hstats = oq.hazard_stats() S = len(hstats) R = len(weights) N = calc.N M = len(oq.imtls) L1 = oq.imtls.size // M gmf_df = calc.datastore.read_df('gmf_data', 'eid') ev_df = calc.datastore.read_df('events', 'id')[['rlz_id']] gmf_df = gmf_df.join(ev_df) hc_mon = calc._monitor('building hazard curves', measuremem=False) hcurves = {} for (sid, rlz), df in gmf_df.groupby(['sid', 'rlz_id']): with hc_mon: poes = gmvs_to_poes(df, oq.imtls, oq.ses_per_logic_tree_path) for m, imt in enumerate(oq.imtls): hcurves[rsi2str(rlz, sid, imt)] = poes[m] pmaps = {r: MapArray(calc.sitecol.sids, L1*M, 1).fill(0) for r in range(R)} for key, poes in hcurves.items(): r, sid, imt = str2rsi(key) array = pmaps[r].array[sid, oq.imtls(imt), 0] array[:] = 1. - (1. - array) * (1. - poes) pmaps = [p.reshape(N, M, L1) for p in pmaps.values()] if oq.individual_rlzs: logging.info('Saving individual hazard curves') calc.datastore.create_dset('hcurves-rlzs', F32, (N, R, M, L1)) calc.datastore.set_shape_descr( 'hcurves-rlzs', site_id=N, rlz_id=R, imt=list(oq.imtls), lvl=numpy.arange(L1)) if oq.poes: P = len(oq.poes) M = len(oq.imtls) ds = calc.datastore.create_dset( 'hmaps-rlzs', F32, (N, R, M, P)) calc.datastore.set_shape_descr( 'hmaps-rlzs', site_id=N, rlz_id=R, imt=list(oq.imtls), poe=oq.poes) for r in range(R): calc.datastore['hcurves-rlzs'][:, r] = pmaps[r].array if oq.poes: [hmap] = make_hmaps([pmaps[r]], oq.imtls, oq.poes) ds[:, r] = hmap.array if S: logging.info('Computing statistical hazard curves') calc.datastore.create_dset('hcurves-stats', F32, (N, S, M, L1)) calc.datastore.set_shape_descr( 'hcurves-stats', site_id=N, stat=list(hstats), imt=list(oq.imtls), lvl=numpy.arange(L1)) if oq.poes: P = len(oq.poes) M = len(oq.imtls) ds = calc.datastore.create_dset( 'hmaps-stats', F32, (N, S, M, P)) calc.datastore.set_shape_descr( 'hmaps-stats', site_id=N, stat=list(hstats), imt=list(oq.imtls), poes=oq.poes) for s, stat in enumerate(hstats): smap = MapArray(calc.sitecol.sids, L1, M) [smap.array] = compute_stats( numpy.array([p.array for p in pmaps]), [hstats[stat]], weights) calc.datastore['hcurves-stats'][:, s] = smap.array if oq.poes: [hmap] = make_hmaps([smap], oq.imtls, oq.poes) ds[:, s] = hmap.array
# ######################## GMF calculator ############################ #
[docs]def count_ruptures(src): """ Count the number of ruptures on a heavy source """ return {src.source_id: src.count_ruptures()}
[docs]def get_computer(cmaker, proxy, rupgeoms, srcfilter, station_data, station_sitecol): """ :returns: GmfComputer or ConditionedGmfComputer """ sids = srcfilter.close_sids(proxy, cmaker.trt) if len(sids) == 0: # filtered away raise FarAwayRupture proxy.geom = rupgeoms[proxy['geom_id']] ebr = proxy.to_ebr(cmaker.trt) oq = cmaker.oq if station_sitecol: stations = numpy.isin(sids, station_sitecol.sids) if stations.any(): station_sids = sids[stations] return ConditionedGmfComputer( ebr, srcfilter.sitecol.filtered(sids), srcfilter.sitecol.filtered(station_sids), station_data.loc[station_sids], oq.observed_imts, cmaker, oq.correl_model, oq.cross_correl, oq.ground_motion_correlation_params, oq.number_of_ground_motion_fields, oq._amplifier, oq._sec_perils) else: logging.warning('There are no stations!') return GmfComputer( ebr, srcfilter.sitecol.filtered(sids), cmaker, oq.correl_model, oq.cross_correl, oq._amplifier, oq._sec_perils)
[docs]def gen_event_based(allproxies, cmaker, stations, dstore, monitor): """ Launcher of event_based tasks """ t0 = time.time() n = 0 for proxies in block_splitter(allproxies, 10_000, rup_weight): n += len(proxies) yield event_based(proxies, cmaker, stations, dstore, monitor) rem = allproxies[n:] # remaining ruptures dt = time.time() - t0 if dt > cmaker.oq.time_per_task and sum( rup_weight(r) for r in rem) > 12_000: half = len(rem) // 2 yield gen_event_based, rem[:half], cmaker, stations, dstore yield gen_event_based, rem[half:], cmaker, stations, dstore return
[docs]def event_based(proxies, cmaker, stations, dstore, monitor): """ Compute GMFs and optionally hazard curves """ shr = monitor.shared oq = cmaker.oq alldata = [] se_dt = sig_eps_dt(oq.imtls) sig_eps = [] times = [] # rup_id, nsites, dt fmon = monitor('instantiating GmfComputer', measuremem=False) mmon = monitor('computing mean_stds', measuremem=False) cmon = monitor('computing gmfs', measuremem=False) umon = monitor('updating gmfs', measuremem=False) max_iml = oq.get_max_iml() cmaker.scenario = 'scenario' in oq.calculation_mode with dstore: if dstore.parent: sitecol = dstore['sitecol'] if 'complete' in dstore.parent: sitecol.complete = dstore.parent['complete'] else: sitecol = dstore['sitecol'] if 'complete' in dstore: sitecol.complete = dstore['complete'] maxdist = oq.maximum_distance(cmaker.trt) srcfilter = SourceFilter(sitecol.complete, maxdist) rupgeoms = dstore['rupgeoms'] for proxy in proxies: t0 = time.time() with fmon: if proxy['mag'] < cmaker.min_mag: continue try: computer = get_computer( cmaker, proxy, rupgeoms, srcfilter, *stations) except FarAwayRupture: # skip this rupture continue if hasattr(computer, 'station_data'): # conditioned GMFs assert cmaker.scenario with shr['mea'] as mea, shr['tau'] as tau, shr['phi'] as phi: df = computer.compute_all([mea, tau, phi], cmon, umon) else: # regular GMFs with mmon: mean_stds = cmaker.get_mean_stds( [computer.ctx], split_by_mag=False) # avoid numba type error computer.ctx.flags.writeable = True df = computer.compute_all(mean_stds, max_iml, cmon, umon) sig_eps.append(computer.build_sig_eps(se_dt)) dt = time.time() - t0 times.append((proxy['id'], computer.ctx.rrup.min(), dt)) alldata.append(df) if sum(len(df) for df in alldata): gmfdata = pandas.concat(alldata) else: gmfdata = {} times = numpy.array([tup + (monitor.task_no,) for tup in times], rup_dt) times.sort(order='rup_id') if not oq.ground_motion_fields: gmfdata = {} if len(gmfdata) == 0: return dict(gmfdata={}, times=times, sig_eps=()) return dict(gmfdata={k: gmfdata[k].to_numpy() for k in gmfdata.columns}, times=times, sig_eps=numpy.concatenate(sig_eps, dtype=se_dt))
[docs]def filter_stations(station_df, complete, rup, maxdist): """ :param station_df: DataFrame with the stations :param complete: complete SiteCollection :param rup: rupture :param maxdist: maximum distance :returns: filtered (station_df, station_sitecol) """ ns = len(station_df) ok = (get_distances(rup, complete, 'rrup') <= maxdist) & numpy.isin( complete.sids, station_df.index) station_sites = complete.filter(ok) if station_sites is None: station_data = None logging.warning('Discarded %d/%d stations more distant than %d km, ' 'switching to the unconditioned GMF computer', ns, ns, maxdist) else: station_data = station_df[ numpy.isin(station_df.index, station_sites.sids)] assert len(station_data) == len(station_sites), ( len(station_data), len(station_sites)) if len(station_data) < ns: logging.info('Discarded %d/%d stations more distant than %d km', ns - len(station_data), ns, maxdist) return station_data, station_sites
# NB: save_tmp is passed in event_based_risk
[docs]def starmap_from_rups(func, oq, full_lt, sitecol, dstore, save_tmp=None): """ Submit the ruptures and apply `func` (event_based or ebrisk) """ try: vs30 = sitecol.vs30 except ValueError: # in scenario test_case_14 pass else: if numpy.isnan(vs30).any(): raise ValueError('The vs30 is NaN, missing site model ' 'or site parameter') set_mags(oq, dstore) rups = dstore['ruptures'][:] logging.info('Reading {:_d} ruptures'.format(len(rups))) logging.info('Affected sites = %.1f per rupture', rups['nsites'].mean()) allproxies = [RuptureProxy(rec) for rec in rups] if "station_data" in oq.inputs: rupgeoms = dstore['rupgeoms'][:] trt = full_lt.trts[0] proxy = allproxies[0] proxy.geom = rupgeoms[proxy['geom_id']] rup = proxy.to_ebr(trt).rupture station_df = dstore.read_df('station_data', 'site_id') maxdist = (oq.maximum_distance_stations or oq.maximum_distance['default'][-1][1]) station_data, station_sites = filter_stations( station_df, sitecol.complete, rup, maxdist) else: station_data, station_sites = None, None gb = groupby(allproxies, operator.itemgetter('trt_smr')) totw = sum(rup_weight(p) for p in allproxies) / ( oq.concurrent_tasks or 1) logging.info('totw = {:_d}'.format(round(totw))) if station_data is not None: # assume scenario with a single true rupture rlzs_by_gsim = full_lt.get_rlzs_by_gsim(0) cmaker = ContextMaker(trt, rlzs_by_gsim, oq) cmaker.scenario = True maxdist = oq.maximum_distance(cmaker.trt) srcfilter = SourceFilter(sitecol.complete, maxdist) computer = get_computer( cmaker, proxy, rupgeoms, srcfilter, station_data, station_sites) G = len(cmaker.gsims) M = len(cmaker.imts) N = len(computer.sitecol) size = 2 * G * M * N * N * 8 # tau, phi msg = f'{G=} * {M=} * {humansize(N*N*8)} * 2' logging.info('Requiring %s for tau, phi [%s]', humansize(size), msg) if size > float(config.memory.conditioned_gmf_gb) * 1024**3: raise ValueError( f'The calculation is too large: {G=}, {M=}, {N=}. ' 'You must reduce the number of sites i.e. enlarge ' 'region_grid_spacing)') mea, tau, phi = computer.get_mea_tau_phi() del proxy.geom # to reduce data transfer dstore.swmr_on() smap = parallel.Starmap(func, h5=dstore.hdf5) if save_tmp: save_tmp(smap.monitor) # NB: for conditioned scenarios we are looping on a single trt for trt_smr, proxies in gb.items(): trt = full_lt.trts[trt_smr // TWO24] extra = sitecol.array.dtype.names rlzs_by_gsim = full_lt.get_rlzs_by_gsim(trt_smr) cmaker = ContextMaker(trt, rlzs_by_gsim, oq, extraparams=extra) cmaker.min_mag = getdefault(oq.minimum_magnitude, trt) if station_data is not None: if parallel.oq_distribute() == 'zmq': logging.error('Conditioned scenarios are not meant to be run' ' on a cluster') smap.share(mea=mea, tau=tau, phi=phi) for block in block_splitter(proxies, totw, rup_weight): args = block, cmaker, (station_data, station_sites), dstore smap.submit(args) return smap
[docs]def set_mags(oq, dstore): """ Set the attribute oq.mags_by_trt """ if 'source_mags' in dstore: # classical or event_based oq.mags_by_trt = { trt: python3compat.decode(dset[:]) for trt, dset in dstore['source_mags'].items()} elif 'ruptures' in dstore: # scenario trts = dstore['full_lt'].trts ruptures = dstore['ruptures'][:] dic = {} for trti, trt in enumerate(trts): rups = ruptures[ruptures['trt_smr'] == trti] mags = numpy.unique(numpy.round(rups['mag'], 2)) dic[trt] = ['%.02f' % mag for mag in mags] oq.mags_by_trt = dic
[docs]def compute_avg_gmf(gmf_df, weights, min_iml): """ :param gmf_df: a DataFrame with colums eid, sid, rlz, gmv... :param weights: E weights associated to the realizations :param min_iml: array of M minimum intensities :returns: a dictionary site_id -> array of shape (2, M) """ dic = {} E = len(weights) M = len(min_iml) for sid, df in gmf_df.groupby(gmf_df.index): eid = df.pop('eid') gmvs = numpy.ones((E, M), F32) * min_iml gmvs[eid.to_numpy()] = df.to_numpy() dic[sid] = geom_avg_std(gmvs, weights) return dic
[docs]@base.calculators.add('event_based', 'scenario', 'ucerf_hazard') class EventBasedCalculator(base.HazardCalculator): """ Event based PSHA calculator generating the ground motion fields and the hazard curves from the ruptures, depending on the configuration parameters. """ core_task = event_based is_stochastic = True accept_precalc = ['event_based', 'ebrisk', 'event_based_risk']
[docs] def init(self): if self.oqparam.cross_correl.__class__.__name__ == 'GodaAtkinson2009': logging.warning( 'The truncation_level param is ignored with GodaAtkinson2009') if hasattr(self, 'csm'): self.check_floating_spinning() if hasattr(self.oqparam, 'maximum_distance'): self.srcfilter = self.src_filter() else: self.srcfilter = nofilter if not self.datastore.parent: self.datastore.create_dset('ruptures', rupture_dt) self.datastore.create_dset('rupgeoms', hdf5.vfloat64)
# NB: using vfloat32 for the geometries would make # debugging a lot more difficult
[docs] def build_events_from_sources(self): """ Prefilter the composite source model and store the source_info """ oq = self.oqparam sources = self.csm.get_sources() logging.info('Counting the ruptures in the CompositeSourceModel') self.datastore.swmr_on() with self.monitor('counting ruptures', measuremem=True): nrups = parallel.Starmap( # weighting the heavy sources count_ruptures, [(src,) for src in sources if src.code in b'AMSC'], h5=self.datastore.hdf5, progress=logging.debug).reduce() # NB: multifault sources must be considered light to avoid a large # data transfer, even if .count_ruptures can be slow for src in sources: try: src.num_ruptures = nrups[src.source_id] except KeyError: # light sources src.num_ruptures = src.count_ruptures() src.weight = src.num_ruptures self.csm.fix_src_offset() # NB: must be AFTER count_ruptures maxweight = sum(sg.weight for sg in self.csm.src_groups) / ( self.oqparam.concurrent_tasks or 1) eff_ruptures = AccumDict(accum=0) # grp_id => potential ruptures source_data = AccumDict(accum=[]) allargs = [] srcfilter = self.srcfilter if 'geometry' in oq.inputs: fname = oq.inputs['geometry'] with fiona.open(fname) as f: model_geom = geometry.shape(f[0].geometry) elif oq.mosaic_model: # 3-letter mosaic model mosaic_df = readinput.read_mosaic_df(buffer=.1).set_index('code') model_geom = mosaic_df.loc[oq.mosaic_model].geom logging.info('Building ruptures') for sg in self.csm.src_groups: if not sg.sources: continue rgb = self.full_lt.get_rlzs_by_gsim(sg.sources[0].trt_smr) cmaker = ContextMaker(sg.trt, rgb, oq) if oq.mosaic_model or 'geometry' in oq.inputs: cmaker.model_geom = model_geom for src_group in sg.split(maxweight): allargs.append((src_group, cmaker, srcfilter.sitecol)) self.datastore.swmr_on() smap = parallel.Starmap( sample_ruptures, allargs, h5=self.datastore.hdf5) mon = self.monitor('saving ruptures') self.nruptures = 0 # estimated classical ruptures within maxdist t0 = time.time() tot_ruptures = 0 filtered_ruptures = 0 for dic in smap: # NB: dic should be a dictionary, but when the calculation dies # for an OOM it can become None, thus giving a very confusing error if dic is None: raise MemoryError('You ran out of memory!') rup_array = dic['rup_array'] tot_ruptures += len(rup_array) if len(rup_array) == 0: continue geom = rup_array.geom filtered_ruptures += len(rup_array) if dic['source_data']: source_data += dic['source_data'] if dic['eff_ruptures']: eff_ruptures += dic['eff_ruptures'] with mon: self.nruptures += len(rup_array) # NB: the ruptures will we reordered and resaved later hdf5.extend(self.datastore['ruptures'], rup_array) hdf5.extend(self.datastore['rupgeoms'], geom) t1 = time.time() logging.info(f'Generated {filtered_ruptures}/{tot_ruptures} ruptures,' f' stored in {t1 - t0} seconds') if len(self.datastore['ruptures']) == 0: raise RuntimeError('No ruptures were generated, perhaps the ' 'effective investigation time is too short') # don't change the order of the 3 things below! self.store_source_info(source_data) self.store_rlz_info(eff_ruptures) imp = RuptureImporter(self.datastore) with self.monitor('saving ruptures and events'): imp.import_rups_events( self.datastore.getitem('ruptures')[()], get_rupture_getters)
[docs] def agg_dicts(self, acc, result): """ :param acc: accumulator dictionary :param result: an AccumDict with events, ruptures and gmfs """ if result is None: # instead of a dict raise MemoryError('You ran out of memory!') sav_mon = self.monitor('saving gmfs') primary = self.oqparam.get_primary_imtls() sec_imts = self.oqparam.sec_imts with sav_mon: gmfdata = result.pop('gmfdata') if len(gmfdata): df = pandas.DataFrame(gmfdata) dset = self.datastore['gmf_data/sid'] times = result.pop('times') hdf5.extend(self.datastore['gmf_data/rup_info'], times) if self.N >= SLICE_BY_EVENT_NSITES: sbe = build_slice_by_event( df.eid.to_numpy(), self.offset) hdf5.extend(self.datastore['gmf_data/slice_by_event'], sbe) hdf5.extend(dset, df.sid.to_numpy()) hdf5.extend(self.datastore['gmf_data/eid'], df.eid.to_numpy()) for m in range(len(primary)): hdf5.extend(self.datastore[f'gmf_data/gmv_{m}'], df[f'gmv_{m}']) for sec_imt in sec_imts: hdf5.extend(self.datastore[f'gmf_data/{sec_imt}'], df[sec_imt]) sig_eps = result.pop('sig_eps') hdf5.extend(self.datastore['gmf_data/sigma_epsilon'], sig_eps) self.offset += len(df) return acc
def _read_scenario_ruptures(self): oq = self.oqparam gsim_lt = readinput.get_gsim_lt(oq) if oq.aristotle: # the gsim_lt is read from the exposure.hdf5 file mosaic_df = readinput.read_mosaic_df(buffer=1) if oq.rupture_dict: lonlat = [[oq.rupture_dict['lon'], oq.rupture_dict['lat']]] elif oq.rupture_xml: hypo = readinput.get_rupture(oq).hypocenter lonlat = [[hypo.x, hypo.y]] [oq.mosaic_model] = geolocate(F32(lonlat), mosaic_df) [expo_hdf5] = oq.inputs['exposure'] if oq.mosaic_model == '???': raise ValueError( '(%(lon)s, %(lat)s) is not covered by the mosaic!' % oq.rupture_dict) if oq.gsim != '[FromFile]': raise ValueError( 'In Aristotle mode the gsim can not be specified in' ' the job.ini: %s' % oq.gsim) if oq.tectonic_region_type == '*': raise ValueError( 'The tectonic_region_type parameter must be specified') gsim_lt = logictree.GsimLogicTree.from_hdf5( expo_hdf5, oq.mosaic_model, oq.tectonic_region_type.encode('utf8')) elif (str(gsim_lt.branches[0].gsim) == '[FromFile]' and 'gmfs' not in oq.inputs): raise InvalidFile('%s: missing gsim or gsim_logic_tree_file' % oq.inputs['job_ini']) G = gsim_lt.get_num_paths() if oq.calculation_mode.startswith('scenario'): ngmfs = oq.number_of_ground_motion_fields if oq.rupture_dict or oq.rupture_xml: # check the number of branchsets bsets = len(gsim_lt._ltnode) if bsets > 1: raise InvalidFile( '%s for a scenario calculation must contain a single ' 'branchset, found %d!' % (oq.inputs['job_ini'], bsets)) [(trt, rlzs_by_gsim)] = gsim_lt.get_rlzs_by_gsim_trt().items() rup = readinput.get_rupture(oq) oq.mags_by_trt = {trt: ['%.2f' % rup.mag]} self.cmaker = ContextMaker(trt, rlzs_by_gsim, oq) if self.N > oq.max_sites_disagg: # many sites, split rupture ebrs = [] for i in range(ngmfs): ebr = EBRupture(rup, 0, 0, G, i, e0=i * G) ebr.seed = oq.ses_seed + i ebrs.append(ebr) else: # keep a single rupture with a big occupation number ebrs = [EBRupture(rup, 0, 0, G * ngmfs, 0)] ebrs[0].seed = oq.ses_seed srcfilter = SourceFilter(self.sitecol, oq.maximum_distance(trt)) aw = get_rup_array(ebrs, srcfilter) if len(aw) == 0: raise RuntimeError( 'The rupture is too far from the sites! Please check the ' 'maximum_distance and the position of the rupture') elif oq.inputs['rupture_model'].endswith('.csv'): aw = get_ruptures(oq.inputs['rupture_model']) if len(gsim_lt.values) == 1: # fix for scenario_damage/case_12 aw['trt_smr'] = 0 # a single TRT if oq.calculation_mode.startswith('scenario'): # rescale n_occ by ngmfs and nrlzs aw['n_occ'] *= ngmfs * gsim_lt.get_num_paths() else: raise InvalidFile("Something wrong in %s" % oq.inputs['job_ini']) rup_array = aw.array hdf5.extend(self.datastore['rupgeoms'], aw.geom) if len(rup_array) == 0: raise RuntimeError( 'There are no sites within the maximum_distance' ' of %s km from the rupture' % oq.maximum_distance( rup.tectonic_region_type)(rup.mag)) fake = logictree.FullLogicTree.fake(gsim_lt) self.datastore['full_lt'] = fake self.store_rlz_info({}) # store weights self.save_params() imp = RuptureImporter(self.datastore) imp.import_rups_events(rup_array, get_rupture_getters)
[docs] def execute(self): oq = self.oqparam dstore = self.datastore if oq.ground_motion_fields and oq.min_iml.sum() == 0: logging.warning('The GMFs are not filtered: ' 'you may want to set a minimum_intensity') elif oq.minimum_intensity: logging.info('minimum_intensity=%s', oq.minimum_intensity) else: logging.info('min_iml=%s', oq.min_iml) self.offset = 0 if oq.hazard_calculation_id: # from ruptures dstore.parent = datastore.read(oq.hazard_calculation_id) self.full_lt = dstore.parent['full_lt'].init() set_mags(oq, dstore) elif hasattr(self, 'csm'): # from sources set_mags(oq, dstore) self.build_events_from_sources() if (oq.ground_motion_fields is False and oq.hazard_curves_from_gmfs is False): return {} elif not oq.rupture_dict and 'rupture_model' not in oq.inputs: logging.warning( 'There is no rupture_model, the calculator will just ' 'import data without performing any calculation') fake = logictree.FullLogicTree.fake() dstore['full_lt'] = fake # needed to expose the outputs dstore['weights'] = [1.] return {} else: # scenario self._read_scenario_ruptures() if (oq.ground_motion_fields is False and oq.hazard_curves_from_gmfs is False): return {} if oq.ground_motion_fields: imts = oq.get_primary_imtls() base.create_gmf_data(dstore, imts, oq.sec_imts) dstore.create_dset('gmf_data/sigma_epsilon', sig_eps_dt(oq.imtls)) dstore.create_dset('gmf_data/rup_info', rup_dt) if self.N >= SLICE_BY_EVENT_NSITES: dstore.create_dset('gmf_data/slice_by_event', slice_dt) # event_based in parallel eb = (event_based if ('station_data' in oq.inputs or parallel.oq_distribute() == 'slurm') else gen_event_based) smap = starmap_from_rups(eb, oq, self.full_lt, self.sitecol, dstore) acc = smap.reduce(self.agg_dicts) if 'gmf_data' not in dstore: return acc if oq.ground_motion_fields: with self.monitor('saving avg_gmf', measuremem=True): self.save_avg_gmf() return acc
[docs] def save_avg_gmf(self): """ Compute and save avg_gmf, unless there are too many GMFs """ size = self.datastore.getsize('gmf_data') maxsize = self.oqparam.gmf_max_gb * 1024 ** 3 logging.info(f'Stored {humansize(size)} of GMFs') if size > maxsize: logging.warning( f'There are more than {humansize(maxsize)} of GMFs,' ' not computing avg_gmf') return rlzs = self.datastore['events']['rlz_id'] self.weights = self.datastore['weights'][:][rlzs] gmf_df = self.datastore.read_df('gmf_data', 'sid') for sec_imt in self.oqparam.sec_imts: # ignore secondary perils del gmf_df[sec_imt] rel_events = gmf_df.eid.unique() e = len(rel_events) if e == 0: raise RuntimeError( 'No GMFs were generated, perhaps they were ' 'all below the minimum_intensity threshold') elif e < len(self.datastore['events']): self.datastore['relevant_events'] = rel_events logging.info('Stored {:_d} relevant event IDs'.format(e)) # really compute and store the avg_gmf M = len(self.oqparam.min_iml) avg_gmf = numpy.zeros((2, len(self.sitecol.complete), M), F32) for sid, avgstd in compute_avg_gmf( gmf_df, self.weights, self.oqparam.min_iml).items(): avg_gmf[:, sid] = avgstd self.datastore['avg_gmf'] = avg_gmf # make avg_gmf plots only if running via the webui if os.environ.get('OQ_APPLICATION_MODE') == 'ARISTOTLE': imts = list(self.oqparam.imtls) for imt in imts: plt = plot_avg_gmf(Extractor(self.datastore.calc_id), imt) bio = io.BytesIO() plt.savefig(bio, format='png', bbox_inches='tight') fig_path = f'png/avg_gmf-{imt}.png' logging.info(f'Saving {fig_path} into the datastore') self.datastore[fig_path] = Image.open(bio)
[docs] def post_execute(self, dummy): oq = self.oqparam if not oq.ground_motion_fields or 'gmf_data' not in self.datastore: return # check seed dependency unless the number of GMFs is huge size = self.datastore.getsize('gmf_data/gmv_0') if 'gmf_data' in self.datastore and size < 4E9: logging.info('Checking stored GMFs') msg = views.view('extreme_gmvs', self.datastore) logging.info(msg) if self.datastore.parent: self.datastore.parent.open('r') if oq.hazard_curves_from_gmfs: if size > 4E6: msg = 'gmf_data has {:_d} rows'.format(size) raise RuntimeError(f'{msg}: too big to compute the hcurves') build_hcurves(self) if oq.compare_with_classical: # compute classical curves export_dir = os.path.join(oq.export_dir, 'cl') if not os.path.exists(export_dir): os.makedirs(export_dir) oq.export_dir = export_dir oq.calculation_mode = 'classical' with logs.init(vars(oq)) as log: self.cl = ClassicalCalculator(oq, log.calc_id) # TODO: perhaps it is possible to avoid reprocessing the # source model, however usually this is quite fast and # does not dominate the computation self.cl.run() engine.expose_outputs(self.cl.datastore) all = slice(None) for imt in oq.imtls: cl_mean_curves = get_mean_curve( self.datastore, imt, all) eb_mean_curves = get_mean_curve( self.datastore, imt, all) self.rdiff, index = util.max_rel_diff_index( cl_mean_curves, eb_mean_curves) logging.warning( 'Relative difference with the classical ' 'mean curves: %d%% at site index %d, imt=%s', self.rdiff * 100, index, imt)