# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import io
import math
import time
import os.path
import logging
import numpy
import pandas
from shapely import geometry
from openquake.baselib import config, hdf5, parallel, python3compat
from openquake.baselib.general import AccumDict, humansize, block_splitter
from openquake.hazardlib.geo.packager import fiona
from openquake.hazardlib.map_array import MapArray, get_mean_curve
from openquake.hazardlib.stats import geom_avg_std, compute_stats
from openquake.hazardlib.calc.stochastic import sample_ruptures
from openquake.hazardlib.contexts import ContextMaker, FarAwayRupture
from openquake.hazardlib.calc.filters import (
magstr, nofilter, getdefault, get_distances, SourceFilter)
from openquake.hazardlib.calc.gmf import GmfComputer
from openquake.hazardlib.calc.conditioned_gmfs import ConditionedGmfComputer
from openquake.hazardlib import logictree, InvalidFile
from openquake.hazardlib.calc.stochastic import get_rup_array, rupture_dt
from openquake.hazardlib.source.rupture import (
RuptureProxy, EBRupture, get_ruptures)
from openquake.commonlib import util, logs, readinput, datastore
from openquake.commonlib.calc import (
gmvs_to_poes, make_hmaps, slice_dt, build_slice_by_event, RuptureImporter,
SLICE_BY_EVENT_NSITES, get_close_mosaic_models, get_proxies)
from openquake.risklib.riskinput import str2rsi, rsi2str
from openquake.calculators import base, views
from openquake.calculators.getters import sig_eps_dt
from openquake.calculators.classical import ClassicalCalculator
from openquake.calculators.extract import Extractor
from openquake.calculators.postproc.plots import plot_avg_gmf
from openquake.calculators.base import expose_outputs
from PIL import Image
U8 = numpy.uint8
U16 = numpy.uint16
U32 = numpy.uint32
I64 = numpy.int64
F32 = numpy.float32
F64 = numpy.float64
TWO24 = 2 ** 24
TWO32 = numpy.float64(2 ** 32)
rup_dt = numpy.dtype(
[('rup_id', I64), ('rrup', F32), ('time', F32), ('task_no', U16)])
[docs]def rup_weight(rup):
# rup['nsites'] is 0 if the ruptures were generated without a sitecol
if isinstance(rup, numpy.ndarray):
nsites = numpy.clip(rup['nsites'], 1., numpy.inf)
return numpy.ceil(nsites / 100.)
return math.ceil((rup['nsites'] or 1) / 100.)
# ######################## hcurves_from_gmfs ############################ #
[docs]def build_hcurves(calc):
"""
Build the hazard curves from each realization starting from
the stored GMFs. Works only for few sites.
"""
oq = calc.oqparam
# compute and save statistics; this is done in process and can
# be very slow if there are thousands of realizations
weights = calc.datastore['weights'][:]
# NB: in the future we may want to save to individual hazard
# curves if oq.individual_rlzs is set; for the moment we
# save the statistical curves only
hstats = oq.hazard_stats()
S = len(hstats)
R = len(weights)
N = calc.N
M = len(oq.imtls)
L1 = oq.imtls.size // M
gmf_df = calc.datastore.read_df('gmf_data', 'eid')
ev_df = calc.datastore.read_df('events', 'id')[['rlz_id']]
gmf_df = gmf_df.join(ev_df)
hc_mon = calc._monitor('building hazard curves', measuremem=False)
hcurves = {}
for (sid, rlz), df in gmf_df.groupby(['sid', 'rlz_id']):
with hc_mon:
poes = gmvs_to_poes(df, oq.imtls, oq.ses_per_logic_tree_path)
for m, imt in enumerate(oq.imtls):
hcurves[rsi2str(rlz, sid, imt)] = poes[m]
pmaps = {r: MapArray(calc.sitecol.sids, L1*M, 1).fill(0)
for r in range(R)}
for key, poes in hcurves.items():
r, sid, imt = str2rsi(key)
array = pmaps[r].array[sid, oq.imtls(imt), 0]
array[:] = 1. - (1. - array) * (1. - poes)
pmaps = [p.reshape(N, M, L1) for p in pmaps.values()]
if oq.individual_rlzs:
logging.info('Saving individual hazard curves')
calc.datastore.create_dset('hcurves-rlzs', F32, (N, R, M, L1))
calc.datastore.set_shape_descr(
'hcurves-rlzs', site_id=N, rlz_id=R,
imt=list(oq.imtls), lvl=numpy.arange(L1))
if oq.poes:
P = len(oq.poes)
M = len(oq.imtls)
ds = calc.datastore.create_dset(
'hmaps-rlzs', F32, (N, R, M, P))
calc.datastore.set_shape_descr(
'hmaps-rlzs', site_id=N, rlz_id=R,
imt=list(oq.imtls), poe=oq.poes)
for r in range(R):
calc.datastore['hcurves-rlzs'][:, r] = pmaps[r].array
if oq.poes:
[hmap] = make_hmaps([pmaps[r]], oq.imtls, oq.poes)
ds[:, r] = hmap.array
if S:
logging.info('Computing statistical hazard curves')
calc.datastore.create_dset('hcurves-stats', F32, (N, S, M, L1))
calc.datastore.set_shape_descr(
'hcurves-stats', site_id=N, stat=list(hstats),
imt=list(oq.imtls), lvl=numpy.arange(L1))
if oq.poes:
P = len(oq.poes)
M = len(oq.imtls)
ds = calc.datastore.create_dset(
'hmaps-stats', F32, (N, S, M, P))
calc.datastore.set_shape_descr(
'hmaps-stats', site_id=N, stat=list(hstats),
imt=list(oq.imtls), poes=oq.poes)
for s, stat in enumerate(hstats):
smap = MapArray(calc.sitecol.sids, L1, M)
[smap.array] = compute_stats(
numpy.array([p.array for p in pmaps]),
[hstats[stat]], weights)
calc.datastore['hcurves-stats'][:, s] = smap.array
if oq.poes:
[hmap] = make_hmaps([smap], oq.imtls, oq.poes)
ds[:, s] = hmap.array
# ######################## GMF calculator ############################ #
[docs]def count_ruptures(src):
"""
Count the number of ruptures on a heavy source
"""
return {src.source_id: src.count_ruptures()}
[docs]def get_computer(cmaker, proxy, srcfilter, station_data, station_sitecol):
"""
:returns: GmfComputer or ConditionedGmfComputer
"""
sids = srcfilter.close_sids(proxy, cmaker.trt)
if len(sids) == 0: # filtered away
raise FarAwayRupture
ebr = proxy.to_ebr(cmaker.trt)
oq = cmaker.oq
if station_sitecol:
stations = numpy.isin(sids, station_sitecol.sids)
if stations.any():
station_sids = sids[stations]
return ConditionedGmfComputer(
ebr, srcfilter.sitecol.filtered(sids),
srcfilter.sitecol.filtered(station_sids),
station_data.loc[station_sids],
oq.observed_imts,
cmaker, oq.correl_model, oq.cross_correl,
oq.ground_motion_correlation_params,
oq.number_of_ground_motion_fields,
oq._amplifier, oq._sec_perils)
else:
logging.warning('There are no stations!')
return GmfComputer(
ebr, srcfilter.sitecol.filtered(sids), cmaker,
oq.correl_model, oq.cross_correl,
oq._amplifier, oq._sec_perils)
def _event_based(proxies, cmaker, stations, srcfilter, shr,
fmon, cmon, umon, mmon):
oq = cmaker.oq
alldata = []
sig_eps = []
times = []
max_iml = oq.get_max_iml()
se_dt = sig_eps_dt(oq.imtls)
mea_tau_phi = []
for proxy in proxies:
t0 = time.time()
with fmon:
if proxy['mag'] < cmaker.min_mag:
continue
try:
computer = get_computer(cmaker, proxy, srcfilter, *stations)
except FarAwayRupture:
# skip this rupture
continue
if stations and stations[0] is not None: # conditioned GMFs
assert cmaker.scenario
with shr['mea'] as mea, shr['tau'] as tau, shr['phi'] as phi:
df = computer.compute_all(
[mea, tau, phi], max_iml, mmon, cmon, umon)
else: # regular GMFs
df = computer.compute_all(None, max_iml, mmon, cmon, umon)
if oq.mea_tau_phi:
mtp = numpy.array(computer.mea_tau_phi, GmfComputer.mtp_dt)
mea_tau_phi.append(mtp)
sig_eps.append(computer.build_sig_eps(se_dt))
dt = time.time() - t0
times.append((proxy['id'], computer.ctx.rrup.min(), dt))
alldata.append(df)
times = numpy.array([tup + (fmon.task_no,) for tup in times], rup_dt)
times.sort(order='rup_id')
if sum(len(df) for df in alldata) == 0:
return dict(gmfdata={}, times=times, sig_eps=())
gmfdata = pandas.concat(alldata) # ~40 MB
dic = dict(gmfdata={k: gmfdata[k].to_numpy() for k in gmfdata.columns},
times=times, sig_eps=numpy.concatenate(sig_eps, dtype=se_dt))
if oq.mea_tau_phi:
mtpdata = numpy.concatenate(mea_tau_phi, dtype=GmfComputer.mtp_dt)
dic['mea_tau_phi'] = {col: mtpdata[col] for col in mtpdata.dtype.names}
return dic
[docs]def event_based(proxies, cmaker, sitecol, stations, dstore, monitor):
"""
Compute GMFs and optionally hazard curves
"""
if isinstance(dstore, str):
# when passing ruptures.hdf5
dstore = hdf5.File(dstore)
oq = cmaker.oq
rmon = monitor('reading sites and ruptures', measuremem=True)
fmon = monitor('instantiating GmfComputer', measuremem=False)
mmon = monitor('computing mean_stds', measuremem=False)
cmon = monitor('computing gmfs', measuremem=False)
umon = monitor('updating gmfs', measuremem=False)
cmaker.scenario = 'scenario' in oq.calculation_mode
with dstore, rmon:
srcfilter = SourceFilter(
sitecol.complete, oq.maximum_distance(cmaker.trt))
dset = dstore['rupgeoms']
for proxy in proxies:
proxy.geom = dset[proxy['geom_id']]
for block in block_splitter(proxies, 20_000, rup_weight):
yield _event_based(block, cmaker, stations, srcfilter,
monitor.shared, fmon, cmon, umon, mmon)
[docs]def filter_stations(station_df, complete, rup, maxdist):
"""
:param station_df: DataFrame with the stations
:param complete: complete SiteCollection
:param rup: rupture
:param maxdist: maximum distance
:returns: filtered (station_df, station_sitecol)
"""
ns = len(station_df)
ok = (get_distances(rup, complete, 'rrup') <= maxdist) & numpy.isin(
complete.sids, station_df.index)
station_sites = complete.filter(ok)
if station_sites is None:
station_data = None
logging.warning('Discarded %d/%d stations more distant than %d km, '
'switching to the unconditioned GMF computer',
ns, ns, maxdist)
else:
station_data = station_df[
numpy.isin(station_df.index, station_sites.sids)]
assert len(station_data) == len(station_sites), (
len(station_data), len(station_sites))
if len(station_data) < ns:
logging.info('Discarded %d/%d stations more distant than %d km',
ns - len(station_data), ns, maxdist)
return station_data, station_sites
[docs]def get_nsites(rups, trt_smr, trt, srcfilter):
model = rups[0]['model'].decode('ascii')
return {(model, trt_smr, trt): srcfilter.get_nsites(rups, trt)}
[docs]def get_rups_dic(ruptures_hdf5, sitecol, maximum_distance, trts):
dic = {}
smap = parallel.Starmap(get_nsites)
with hdf5.File(ruptures_hdf5) as f:
for trt_smr, start, stop in f['trt_smr_start_stop']:
slc = slice(start, stop)
rups = f['ruptures'][slc]
model = rups[0]['model'].decode('ascii')
trt = trts[model][trt_smr // TWO24]
srcfilter = SourceFilter(sitecol, maximum_distance(trt))
dic[model, trt_smr, trt] = rups
smap.submit((rups, trt_smr, trt, srcfilter))
for key, nsites in smap.reduce().items():
dic[key]['nsites'] = nsites
return dic
[docs]def starmap_from_rups_hdf5(oq, sitecol, dstore):
"""
:returns: a Starmap instance sending event_based tasks
"""
ruptures_hdf5 = oq.inputs['rupture_model']
trts = {}
rlzs_by_gsim = {}
with hdf5.File(ruptures_hdf5) as r:
for model in r['full_lt']:
full_lt = r[f'full_lt/{model}']
trts[model] = full_lt.trts
logging.info('Building rlzs_by_gsim for %s', model)
for trt_smr, rbg in full_lt.get_rlzs_by_gsim_dic().items():
rlzs_by_gsim[model, trt_smr] = rbg
dstore['full_lt'] = full_lt # saving the last lt (hackish)
r.copy('events', dstore.hdf5) # saving the events
R = full_lt.num_samples
dstore['weights'] = numpy.ones(R) / R
rups_dic = get_rups_dic(ruptures_hdf5, sitecol, oq.maximum_distance, trts)
totw = sum(rup_weight(rups).sum() for rups in rups_dic.values())
maxw = totw / (oq.concurrent_tasks or 1)
extra = sitecol.array.dtype.names
dstore.swmr_on()
smap = parallel.Starmap(event_based, h5=dstore.hdf5)
for (model, trt_smr, trt), rups in rups_dic.items():
proxies = get_proxies(ruptures_hdf5, rups)
mags = numpy.unique(numpy.round(rups['mag'], 2))
oq.mags_by_trt = {trt: [magstr(mag) for mag in mags]}
cmaker = ContextMaker(trt, rlzs_by_gsim[model, trt_smr],
oq, extraparams=extra)
cmaker.min_mag = getdefault(oq.minimum_magnitude, trt)
for block in block_splitter(proxies, maxw * 1.02, rup_weight):
args = block, cmaker, sitecol, (None, None), ruptures_hdf5
smap.submit(args)
return smap
# NB: save_tmp is passed in event_based_risk
[docs]def starmap_from_rups(func, oq, full_lt, sitecol, dstore, save_tmp=None):
"""
Submit the ruptures and apply `func` (event_based or ebrisk)
"""
try:
vs30 = sitecol.vs30
except ValueError: # in scenario test_case_14
pass
else:
if numpy.isnan(vs30).any():
raise ValueError('The vs30 is NaN, missing site model '
'or site parameter')
set_mags(oq, dstore)
rups = dstore['ruptures'][:]
logging.info('Reading {:_d} ruptures'.format(len(rups)))
logging.info('Affected sites ~%.0f per rupture, max=%.0f',
rups['nsites'].mean(), rups['nsites'].max())
allproxies = [RuptureProxy(rec) for rec in rups]
if "station_data" in oq.inputs:
trt = full_lt.trts[0]
proxy = allproxies[0]
proxy.geom = dstore['rupgeoms'][proxy['geom_id']]
rup = proxy.to_ebr(trt).rupture
station_df = dstore.read_df('station_data', 'site_id')
maxdist = (oq.maximum_distance_stations or
oq.maximum_distance['default'][-1][1])
station_data, station_sites = filter_stations(
station_df, sitecol.complete, rup, maxdist)
else:
station_data, station_sites = None, None
maxw = sum(rup_weight(p) for p in allproxies) / (
oq.concurrent_tasks or 1)
logging.info('maxw = {:_d}'.format(round(maxw)))
if station_data is not None:
# assume scenario with a single true rupture
rlzs_by_gsim = full_lt.get_rlzs_by_gsim(0)
cmaker = ContextMaker(trt, rlzs_by_gsim, oq)
cmaker.scenario = True
maxdist = oq.maximum_distance(cmaker.trt)
srcfilter = SourceFilter(sitecol.complete, maxdist)
computer = get_computer(
cmaker, proxy, srcfilter, station_data, station_sites)
G = len(cmaker.gsims)
M = len(cmaker.imts)
N = len(computer.sitecol)
size = 2 * G * M * N * N * 8 # tau, phi
msg = f'{G=} * {M=} * {humansize(N*N*8)} * 2'
logging.info('Requiring %s for tau, phi [%s]', humansize(size), msg)
if size > float(config.memory.conditioned_gmf_gb) * 1024**3:
raise ValueError(
f'The calculation is too large: {G=}, {M=}, {N=}. '
'You must reduce the number of sites i.e. maximum_distance')
mea, tau, phi = computer.get_mea_tau_phi(dstore.hdf5)
del proxy.geom # to reduce data transfer
dstore.swmr_on()
smap = parallel.Starmap(func, h5=dstore.hdf5)
if save_tmp:
save_tmp(smap.monitor)
# NB: for conditioned scenarios we are looping on a single trt
toml_gsims = []
for trt_smr, start, stop in dstore['trt_smr_start_stop']:
proxies = allproxies[start:stop]
trt = full_lt.trts[trt_smr // TWO24]
extra = sitecol.array.dtype.names
rlzs_by_gsim = full_lt.get_rlzs_by_gsim(trt_smr)
cmaker = ContextMaker(trt, rlzs_by_gsim, oq, extraparams=extra)
cmaker.min_mag = getdefault(oq.minimum_magnitude, trt)
for gsim in rlzs_by_gsim:
toml_gsims.append(gsim._toml)
if station_data is not None:
if parallel.oq_distribute() in ('zmq', 'slurm'):
logging.error('Conditioned scenarios are not meant to be run'
' on a cluster')
smap.share(mea=mea, tau=tau, phi=phi)
# producing slightly less than concurrent_tasks thanks to the 1.02
for block in block_splitter(proxies, maxw * 1.02, rup_weight):
args = block, cmaker, sitecol, (station_data, station_sites), dstore
smap.submit(args)
dstore['gsims'] = numpy.array(toml_gsims)
return smap
[docs]def set_mags(oq, dstore):
"""
Set the attribute oq.mags_by_trt
"""
if 'source_mags' in dstore:
# classical or event_based
oq.mags_by_trt = {
trt: python3compat.decode(dset[:])
for trt, dset in dstore['source_mags'].items()}
elif oq.ruptures_hdf5:
pass
elif 'ruptures' in dstore:
# scenario
trts = dstore['full_lt'].trts
ruptures = dstore['ruptures'][:]
dic = {}
for trti, trt in enumerate(trts):
rups = ruptures[ruptures['trt_smr'] == trti]
mags = numpy.unique(numpy.round(rups['mag'], 2))
dic[trt] = ['%.02f' % mag for mag in mags]
oq.mags_by_trt = dic
[docs]def compute_avg_gmf(gmf_df, weights, min_iml):
"""
:param gmf_df: a DataFrame with colums eid, sid, rlz, gmv...
:param weights: E weights associated to the realizations
:param min_iml: array of M minimum intensities
:returns: a dictionary site_id -> array of shape (2, M)
"""
dic = {}
E = len(weights)
M = len(min_iml)
for sid, df in gmf_df.groupby(gmf_df.index):
eid = df.pop('eid')
gmvs = numpy.ones((E, M), F32) * min_iml
gmvs[eid.to_numpy()] = df.to_numpy()
dic[sid] = geom_avg_std(gmvs, weights)
return dic
[docs]def read_gsim_lt(oq):
# in aristotle mode the gsim_lt is read from the exposure.hdf5 file
if oq.aristotle:
if not oq.mosaic_model:
if oq.rupture_dict:
lon, lat = [oq.rupture_dict['lon'], oq.rupture_dict['lat']]
elif oq.rupture_xml:
hypo = readinput.get_rupture(oq).hypocenter
lon, lat = [hypo.x, hypo.y]
mosaic_models = get_close_mosaic_models(lon, lat, 5)
# NOTE: using the first mosaic model
oq.mosaic_model = mosaic_models[0]
if len(mosaic_models) > 1:
logging.info('Using the "%s" model' % oq.mosaic_model)
[expo_hdf5] = oq.inputs['exposure']
if oq.mosaic_model == '???':
raise ValueError(
'(%(lon)s, %(lat)s) is not covered by the mosaic!' %
oq.rupture_dict)
if oq.gsim != '[FromFile]':
raise ValueError(
'In Aristotle mode the gsim can not be specified in'
' the job.ini: %s' % oq.gsim)
if oq.tectonic_region_type == '*':
raise ValueError(
'The tectonic_region_type parameter must be specified')
gsim_lt = logictree.GsimLogicTree.from_hdf5(
expo_hdf5, oq.mosaic_model,
oq.tectonic_region_type.encode('utf8'))
else:
gsim_lt = readinput.get_gsim_lt(oq)
return gsim_lt
[docs]@base.calculators.add('event_based', 'scenario', 'ucerf_hazard')
class EventBasedCalculator(base.HazardCalculator):
"""
Event based PSHA calculator generating the ground motion fields and
the hazard curves from the ruptures, depending on the configuration
parameters.
"""
core_task = event_based
is_stochastic = True
accept_precalc = ['event_based', 'ebrisk', 'event_based_risk']
[docs] def init(self):
if self.oqparam.cross_correl.__class__.__name__ == 'GodaAtkinson2009':
logging.warning(
'The truncation_level param is ignored with GodaAtkinson2009')
if hasattr(self, 'csm'):
self.check_floating_spinning()
if hasattr(self.oqparam, 'maximum_distance'):
self.srcfilter = self.src_filter()
else:
self.srcfilter = nofilter
if not self.datastore.parent:
self.datastore.create_dset('ruptures', rupture_dt)
self.datastore.create_dset('rupgeoms', hdf5.vfloat32)
[docs] def counting_ruptures(self):
"""
Sets src.num_ruptures and src.offset
"""
sources = self.csm.get_sources()
logging.info('Counting the ruptures in the CompositeSourceModel')
self.datastore.swmr_on()
with self.monitor('counting ruptures', measuremem=True):
nrups = parallel.Starmap( # weighting the heavy sources
count_ruptures, [(src,) for src in sources
if src.code in b'AMSC'],
h5=self.datastore.hdf5,
progress=logging.debug).reduce()
# NB: multifault sources must be considered light to avoid a large
# data transfer, even if .count_ruptures can be slow
for src in sources:
try:
src.num_ruptures = nrups[src.source_id]
except KeyError: # light sources
src.num_ruptures = src.count_ruptures()
src.weight = src.num_ruptures
self.csm.fix_src_offset() # NB: must be AFTER count_ruptures
maxweight = sum(sg.weight for sg in self.csm.src_groups) / (
self.oqparam.concurrent_tasks or 1)
return maxweight
[docs] def build_events_from_sources(self):
"""
Prefilter the composite source model and store the source_info
"""
oq = self.oqparam
maxweight = self.counting_ruptures()
eff_ruptures = AccumDict(accum=0) # grp_id => potential ruptures
source_data = AccumDict(accum=[])
allargs = []
srcfilter = self.srcfilter
if 'geometry' in oq.inputs:
fname = oq.inputs['geometry']
with fiona.open(fname) as f:
model_geom = geometry.shape(f[0].geometry)
elif oq.mosaic_model: # 3-letter mosaic model
mosaic_df = readinput.read_mosaic_df(buffer=0).set_index('code')
model_geom = mosaic_df.loc[oq.mosaic_model].geom
logging.info('Building ruptures')
g_index = 0
for sg_id, sg in enumerate(self.csm.src_groups):
if not sg.sources:
continue
rgb = self.full_lt.get_rlzs_by_gsim(sg.sources[0].trt_smr)
cmaker = ContextMaker(sg.trt, rgb, oq)
cmaker.gid = numpy.arange(g_index, g_index + len(rgb))
g_index += len(rgb)
cmaker.model = oq.mosaic_model or '???'
if oq.mosaic_model or 'geometry' in oq.inputs:
cmaker.model_geom = model_geom
for src_group in sg.split(maxweight):
allargs.append((src_group, cmaker, srcfilter.sitecol))
self.datastore.swmr_on()
smap = parallel.Starmap(
sample_ruptures, allargs, h5=self.datastore.hdf5)
mon = self.monitor('saving ruptures')
self.nruptures = 0 # estimated classical ruptures within maxdist
t0 = time.time()
tot_ruptures = 0
filtered_ruptures = 0
for dic in smap:
# NB: dic should be a dictionary, but when the calculation dies
# for an OOM it can become None, thus giving a very confusing error
if dic is None:
raise MemoryError('You ran out of memory!')
rup_array = dic['rup_array']
tot_ruptures += len(rup_array)
if len(rup_array) == 0:
continue
geom = rup_array.geom
filtered_ruptures += len(rup_array)
if dic['source_data']:
source_data += dic['source_data']
if dic['eff_ruptures']:
eff_ruptures += dic['eff_ruptures']
with mon:
self.nruptures += len(rup_array)
# NB: the ruptures will we reordered and resaved later
hdf5.extend(self.datastore['ruptures'], rup_array)
hdf5.extend(self.datastore['rupgeoms'], geom)
t1 = time.time()
logging.info(f'Generated {filtered_ruptures}/{tot_ruptures} ruptures,'
f' stored in {t1 - t0} seconds')
if len(self.datastore['ruptures']) == 0:
raise RuntimeError('No ruptures were generated, perhaps the '
'effective investigation time is too short')
# don't change the order of the 3 things below!
self.store_source_info(source_data)
self.store_rlz_info(eff_ruptures)
imp = RuptureImporter(self.datastore)
with self.monitor('saving ruptures and events'):
imp.import_rups_events(self.datastore.getitem('ruptures')[()])
[docs] def agg_dicts(self, acc, result):
"""
:param acc: accumulator dictionary
:param result: an AccumDict with events, ruptures and gmfs
"""
if result is None: # instead of a dict
raise MemoryError('You ran out of memory!')
sav_mon = self.monitor('saving gmfs')
primary = self.oqparam.get_primary_imtls()
sec_imts = self.oqparam.sec_imts
with sav_mon:
gmfdata = result.pop('gmfdata')
if len(gmfdata):
df = pandas.DataFrame(gmfdata)
dset = self.datastore['gmf_data/sid']
times = result.pop('times')
hdf5.extend(self.datastore['gmf_data/rup_info'], times)
if self.N >= SLICE_BY_EVENT_NSITES:
sbe = build_slice_by_event(
df.eid.to_numpy(), self.offset)
hdf5.extend(self.datastore['gmf_data/slice_by_event'], sbe)
hdf5.extend(dset, df.sid.to_numpy())
hdf5.extend(self.datastore['gmf_data/eid'], df.eid.to_numpy())
for m in range(len(primary)):
hdf5.extend(self.datastore[f'gmf_data/gmv_{m}'],
df[f'gmv_{m}'])
for sec_imt in sec_imts:
hdf5.extend(self.datastore[f'gmf_data/{sec_imt}'],
df[sec_imt])
sig_eps = result.pop('sig_eps')
hdf5.extend(self.datastore['gmf_data/sigma_epsilon'], sig_eps)
self.offset += len(df)
# optionally save mea_tau_phi
mtp = result.pop('mea_tau_phi', None)
if mtp:
for col, arr in mtp.items():
hdf5.extend(self.datastore[f'mea_tau_phi/{col}'], arr)
return acc
def _read_scenario_ruptures(self):
oq = self.oqparam
gsim_lt = read_gsim_lt(oq)
trts = list(gsim_lt.values)
if (str(gsim_lt.branches[0].gsim) == '[FromFile]'
and 'gmfs' not in oq.inputs):
raise InvalidFile('%s: missing gsim or gsim_logic_tree_file' %
oq.inputs['job_ini'])
G = gsim_lt.get_num_paths()
if oq.calculation_mode.startswith('scenario'):
ngmfs = oq.number_of_ground_motion_fields
if oq.rupture_dict or oq.rupture_xml:
# check the number of branchsets
bsets = len(gsim_lt._ltnode)
if bsets > 1:
raise InvalidFile(
'%s for a scenario calculation must contain a single '
'branchset, found %d!' % (oq.inputs['job_ini'], bsets))
[(trt_smr, rlzs_by_gsim)] = gsim_lt.get_rlzs_by_gsim_dic().items()
trt = trts[trt_smr // TWO24]
rup = readinput.get_rupture(oq)
oq.mags_by_trt = {trt: [magstr(rup.mag)]}
self.cmaker = ContextMaker(trt, rlzs_by_gsim, oq)
if self.N > oq.max_sites_disagg: # many sites, split rupture
ebrs = []
for i in range(ngmfs):
ebr = EBRupture(rup, 0, 0, G, i, e0=i * G)
ebr.seed = oq.ses_seed + i
ebrs.append(ebr)
else: # keep a single rupture with a big occupation number
ebrs = [EBRupture(rup, 0, 0, G * ngmfs, 0)]
ebrs[0].seed = oq.ses_seed
srcfilter = SourceFilter(self.sitecol, oq.maximum_distance(trt))
aw = get_rup_array(ebrs, srcfilter)
if len(aw) == 0:
raise RuntimeError(
'The rupture is too far from the sites! Please check the '
'maximum_distance and the position of the rupture')
elif oq.inputs['rupture_model'].endswith('.csv'):
aw = get_ruptures(oq.inputs['rupture_model'])
if len(gsim_lt.values) == 1: # fix for scenario_damage/case_12
aw['trt_smr'] = 0 # a single TRT
if oq.calculation_mode.startswith('scenario'):
# rescale n_occ by ngmfs and nrlzs
aw['n_occ'] *= ngmfs * gsim_lt.get_num_paths()
else:
raise InvalidFile("Something wrong in %s" % oq.inputs['job_ini'])
rup_array = aw.array
hdf5.extend(self.datastore['rupgeoms'], aw.geom)
if len(rup_array) == 0:
raise RuntimeError(
'There are no sites within the maximum_distance'
' of %s km from the rupture' % oq.maximum_distance(
rup.tectonic_region_type)(rup.mag))
fake = logictree.FullLogicTree.fake(gsim_lt)
self.datastore['full_lt'] = fake
self.store_rlz_info({}) # store weights
self.save_params()
imp = RuptureImporter(self.datastore)
imp.import_rups_events(rup_array)
[docs] def execute(self):
oq = self.oqparam
dstore = self.datastore
E = None
if oq.ground_motion_fields and oq.min_iml.sum() == 0:
logging.warning('The GMFs are not filtered: '
'you may want to set a minimum_intensity')
elif oq.minimum_intensity:
logging.info('minimum_intensity=%s', oq.minimum_intensity)
else:
logging.info('min_iml=%s', oq.min_iml)
self.offset = 0
if oq.hazard_calculation_id: # from ruptures
dstore.parent = datastore.read(oq.hazard_calculation_id)
self.full_lt = dstore.parent['full_lt'].init()
set_mags(oq, dstore)
elif hasattr(self, 'csm'): # from sources
set_mags(oq, dstore)
self.build_events_from_sources()
if (oq.ground_motion_fields is False and
oq.hazard_curves_from_gmfs is False):
return {}
elif not oq.rupture_dict and 'rupture_model' not in oq.inputs:
logging.warning(
'There is no rupture_model, the calculator will just '
'import data without performing any calculation')
fake = logictree.FullLogicTree.fake()
dstore['full_lt'] = fake # needed to expose the outputs
dstore['weights'] = [1.]
return {}
elif oq.ruptures_hdf5:
with hdf5.File(oq.ruptures_hdf5) as r:
E = len(r['events'])
else: # scenario
self._read_scenario_ruptures()
if (oq.ground_motion_fields is False and
oq.hazard_curves_from_gmfs is False):
return {}
if oq.ground_motion_fields:
prim_imts = oq.get_primary_imtls()
base.create_gmf_data(dstore, prim_imts, oq.sec_imts,
E=E, R=oq.number_of_logic_tree_samples)
dstore.create_dset('gmf_data/sigma_epsilon', sig_eps_dt(oq.imtls))
dstore.create_dset('gmf_data/rup_info', rup_dt)
if self.N >= SLICE_BY_EVENT_NSITES:
dstore.create_dset('gmf_data/slice_by_event', slice_dt)
# event_based in parallel
if oq.ruptures_hdf5:
smap = starmap_from_rups_hdf5(oq, self.sitecol, dstore)
else:
smap = starmap_from_rups(
event_based, oq, self.full_lt, self.sitecol, dstore)
acc = smap.reduce(self.agg_dicts)
if 'gmf_data' not in dstore:
return acc
if oq.ground_motion_fields:
with self.monitor('saving avg_gmf', measuremem=True):
self.save_avg_gmf()
return acc
[docs] def save_avg_gmf(self):
"""
Compute and save avg_gmf, unless there are too many GMFs
"""
size = self.datastore.getsize('gmf_data')
maxsize = self.oqparam.gmf_max_gb * 1024 ** 3
logging.info(f'Stored {humansize(size)} of GMFs')
if size > maxsize:
logging.warning(
f'There are more than {humansize(maxsize)} of GMFs,'
' not computing avg_gmf')
return
rlzs = self.datastore['events'][:]['rlz_id']
self.weights = self.datastore['weights'][:][rlzs]
gmf_df = self.datastore.read_df('gmf_data', 'sid')
for sec_imt in self.oqparam.sec_imts: # ignore secondary perils
del gmf_df[sec_imt]
rel_events = gmf_df.eid.unique()
e = len(rel_events)
if e == 0:
raise RuntimeError(
'No GMFs were generated, perhaps they were '
'all below the minimum_intensity threshold')
elif e < len(self.datastore['events']):
self.datastore['relevant_events'] = rel_events
logging.info('Stored {:_d} relevant event IDs'.format(e))
# really compute and store the avg_gmf
M = len(self.oqparam.min_iml)
avg_gmf = numpy.zeros((2, len(self.sitecol.complete), M), F32)
for sid, avgstd in compute_avg_gmf(
gmf_df, self.weights, self.oqparam.min_iml).items():
avg_gmf[:, sid] = avgstd
self.datastore['avg_gmf'] = avg_gmf
# make avg_gmf plots only if running via the webui
if os.environ.get('OQ_APPLICATION_MODE') == 'ARISTOTLE':
imts = list(self.oqparam.imtls)
ex = Extractor(self.datastore.calc_id)
for imt in imts:
plt = plot_avg_gmf(ex, imt)
bio = io.BytesIO()
plt.savefig(bio, format='png', bbox_inches='tight')
fig_path = f'png/avg_gmf-{imt}.png'
logging.info(f'Saving {fig_path} into the datastore')
self.datastore[fig_path] = Image.open(bio)
[docs] def post_execute(self, dummy):
oq = self.oqparam
if not oq.ground_motion_fields or 'gmf_data' not in self.datastore:
return
# check seed dependency unless the number of GMFs is huge
size = self.datastore.getsize('gmf_data/gmv_0')
if 'gmf_data' in self.datastore and size < 4E9 and not oq.ruptures_hdf5:
# TODO: check why there is an error for ruptures_hdf5
logging.info('Checking stored GMFs')
msg = views.view('extreme_gmvs', self.datastore)
logging.info(msg)
if self.datastore.parent:
self.datastore.parent.open('r')
if oq.hazard_curves_from_gmfs:
if size > 4E6:
msg = 'gmf_data has {:_d} rows'.format(size)
raise RuntimeError(f'{msg}: too big to compute the hcurves')
build_hcurves(self)
if oq.compare_with_classical: # compute classical curves
export_dir = os.path.join(oq.export_dir, 'cl')
if not os.path.exists(export_dir):
os.makedirs(export_dir)
oq.export_dir = export_dir
oq.calculation_mode = 'classical'
with logs.init(vars(oq)) as log:
self.cl = ClassicalCalculator(oq, log.calc_id)
# TODO: perhaps it is possible to avoid reprocessing the
# source model, however usually this is quite fast and
# does not dominate the computation
self.cl.run()
expose_outputs(self.cl.datastore)
all = slice(None)
for imt in oq.imtls:
cl_mean_curves = get_mean_curve(
self.datastore, imt, all)
eb_mean_curves = get_mean_curve(
self.datastore, imt, all)
self.rdiff, index = util.max_rel_diff_index(
cl_mean_curves, eb_mean_curves)
logging.warning(
'Relative difference with the classical '
'mean curves: %d%% at site index %d, imt=%s',
self.rdiff * 100, index, imt)