# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2017 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division
import math
import os.path
import itertools
import logging
import collections
import numpy
from openquake.baselib import hdf5
from openquake.baselib.python3compat import zip
from openquake.baselib.general import (
AccumDict, block_splitter, humansize, split_in_slices)
from openquake.hazardlib.calc.filters import SourceFilter
from openquake.hazardlib.calc.stochastic import sample_ruptures
from openquake.hazardlib.probability_map import ProbabilityMap
from openquake.hazardlib.stats import compute_pmap_stats
from openquake.risklib.riskinput import str2rsi, rsi2str, indices_dt
from openquake.baselib import parallel
from openquake.commonlib import calc, util, readinput
from openquake.calculators import base
from openquake.calculators.getters import GmfGetter, RuptureGetter
from openquake.calculators.classical import (
ClassicalCalculator, saving_sources_by_task)
U8 = numpy.uint8
U16 = numpy.uint16
U32 = numpy.uint32
U64 = numpy.uint64
F32 = numpy.float32
F64 = numpy.float64
[docs]def compute_ruptures(sources, src_filter, gsims, param, monitor):
"""
:param sources:
a sequence of sources of the same group
:param src_filter:
a source site filter
:param gsims:
a list of GSIMs for the current tectonic region model
:param param:
a dictionary of additional parameters
:param monitor:
monitor instance
:returns:
a dictionary src_group_id -> [Rupture instances]
"""
# NB: by construction each block is a non-empty list with
# sources of the same src_group_id
grp_id = sources[0].src_group_id
dic = sample_ruptures(sources, src_filter, gsims, param, monitor)
res = AccumDict({grp_id: dic['eb_ruptures']})
res.num_events = dic['num_events']
res.calc_times = dic['calc_times']
res.eff_ruptures = {grp_id: dic['num_ruptures']}
return res
[docs]def get_events(ebruptures):
"""
Extract an array of dtype stored_event_dt from a list of EBRuptures
"""
events = []
year = 0 # to be set later
for ebr in ebruptures:
for event in ebr.events:
rec = (event['eid'], ebr.serial, ebr.grp_id, year, event['ses'],
event['sample'])
events.append(rec)
return numpy.array(events, readinput.stored_event_dt)
[docs]@base.calculators.add('event_based_rupture')
class EventBasedRuptureCalculator(base.HazardCalculator):
"""
Event based PSHA calculator generating the ruptures only
"""
core_task = compute_ruptures
is_stochastic = True
[docs] def init(self):
"""
Set the random seed passed to the SourceManager and the
minimum_intensity dictionary.
"""
oq = self.oqparam
self.min_iml = self.get_min_iml(oq)
self.rupser = calc.RuptureSerializer(self.datastore)
[docs] def zerodict(self):
"""
Initial accumulator, a dictionary (grp_id, gsim) -> curves
"""
zd = AccumDict()
zd.calc_times = []
zd.eff_ruptures = AccumDict()
self.grp_trt = self.csm.info.grp_by("trt")
return zd
[docs] def agg_dicts(self, acc, ruptures_by_grp_id):
"""
Accumulate dictionaries of ruptures and populate the `events`
dataset in the datastore.
:param acc: accumulator dictionary
:param ruptures_by_grp_id: a nested dictionary grp_id -> ruptures
"""
if hasattr(ruptures_by_grp_id, 'calc_times'):
acc.calc_times.extend(ruptures_by_grp_id.calc_times)
if hasattr(ruptures_by_grp_id, 'eff_ruptures'):
acc.eff_ruptures += ruptures_by_grp_id.eff_ruptures
acc += ruptures_by_grp_id
self.save_ruptures(ruptures_by_grp_id)
return acc
[docs] def save_ruptures(self, ruptures_by_grp_id):
"""
Extend the 'events' dataset with the events from the given ruptures;
also, save the ruptures if the flag `save_ruptures` is on.
:param ruptures_by_grp_id: a dictionary grp_id -> list of EBRuptures
"""
with self.monitor('saving ruptures', autoflush=True):
for grp_id, ebrs in ruptures_by_grp_id.items():
if len(ebrs):
events = get_events(ebrs)
dset = self.datastore.extend('events', events)
if self.oqparam.save_ruptures:
self.rupser.save(ebrs, eidx=len(dset)-len(events))
[docs] def gen_args(self, csm, monitor):
"""
Used in the case of large source model logic trees.
:param monitor: a :class:`openquake.baselib.performance.Monitor`
:param csm: a reduced CompositeSourceModel
:yields: (sources, sites, gsims, monitor) tuples
"""
oq = self.oqparam
src_filter = SourceFilter(self.sitecol, oq.maximum_distance)
def weight(src):
return src.num_ruptures * src.RUPTURE_WEIGHT
csm = self.csm.filter(src_filter)
maxweight = csm.get_maxweight(weight, oq.concurrent_tasks or 1)
logging.info('Using maxweight=%d', maxweight)
param = dict(
truncation_level=oq.truncation_level,
imtls=oq.imtls, seed=oq.ses_seed,
maximum_distance=oq.maximum_distance,
ses_per_logic_tree_path=oq.ses_per_logic_tree_path)
num_tasks = 0
num_sources = 0
for sm in csm.source_models:
for sg in sm.src_groups:
gsims = csm.info.gsim_lt.get_gsims(sg.trt)
csm.add_infos(sg.sources)
if sg.src_interdep == 'mutex':
sg.samples = sm.samples
yield sg, src_filter, gsims, param, monitor
num_tasks += 1
num_sources += len(sg.sources)
continue
for block in block_splitter(sg.sources, maxweight, weight):
block.samples = sm.samples
yield block, src_filter, gsims, param, monitor
num_tasks += 1
num_sources += len(block)
logging.info('Sent %d sources in %d tasks', num_sources, num_tasks)
[docs] def execute(self):
with self.monitor('managing sources', autoflush=True):
allargs = self.gen_args(self.csm, self.monitor('classical'))
iterargs = saving_sources_by_task(allargs, self.datastore)
if isinstance(allargs, list):
# there is a trick here: if the arguments are known
# (a list, not an iterator), keep them as a list
# then the Starmap will understand the case of a single
# argument tuple and it will run in core the task
iterargs = list(iterargs)
acc = parallel.Starmap(self.core_task.__func__, iterargs).reduce(
self.agg_dicts, self.zerodict())
with self.monitor('store source_info', autoflush=True):
self.store_source_info(self.csm.infos, acc)
return acc
[docs] def post_execute(self, result):
"""
Save the SES collection
"""
self.rupser.close()
num_events = sum(set_counts(self.datastore, 'events').values())
if num_events == 0:
raise RuntimeError(
'No seismic events! Perhaps the investigation time is too '
'small or the maximum_distance is too small')
num_ruptures = sum(len(ruptures) for ruptures in result.values())
logging.info('Setting %d event years on %d ruptures',
num_events, num_ruptures)
with self.monitor('setting event years', measuremem=True,
autoflush=True):
numpy.random.seed(self.oqparam.ses_seed)
set_random_years(self.datastore, 'events',
int(self.oqparam.investigation_time),
self.oqparam.ses_per_logic_tree_path)
[docs]def set_counts(dstore, dsetname):
"""
:param dstore: a DataStore instance
:dsetname: name of dataset with a field `grp_id`
:returns: a dictionary grp_id > counts
"""
groups = dstore[dsetname]['grp_id']
unique, counts = numpy.unique(groups, return_counts=True)
dic = dict(zip(unique, counts))
dstore.set_attrs(dsetname, by_grp=sorted(dic.items()))
return dic
[docs]def set_random_years(dstore, name, investigation_time, num_ses):
"""
Set on the `events` dataset year labels sensitive to the
SES ordinal and the investigation time.
"""
events = dstore[name].value
years = numpy.random.choice(investigation_time, len(events)) + 1
year_of = dict(zip(numpy.sort(events['eid']), years)) # eid -> year
for event in events:
ses_idx = event['ses'] - 1 # starts from 0
offset = num_ses * event['sample'] * investigation_time
event['year'] = (offset + ses_idx * investigation_time +
year_of[event['eid']])
dstore[name] = events
# ######################## GMF calculator ############################ #
[docs]def compute_gmfs_and_curves(getters, oq, monitor):
"""
:param getters:
a list of GmfGetter instances
:param oq:
an OqParam instance
:param monitor:
a Monitor instance
:returns:
a list of dictionaries with keys gmfcoll and hcurves
"""
results = []
for getter in getters:
with monitor('GmfGetter.init', measuremem=True):
getter.init()
hcurves = {} # key -> poes
if oq.hazard_curves_from_gmfs:
hc_mon = monitor('building hazard curves', measuremem=False)
duration = oq.investigation_time * oq.ses_per_logic_tree_path
with monitor('building hazard', measuremem=True):
gmfdata = numpy.fromiter(getter.gen_gmv(), getter.gmf_data_dt)
hazard = getter.get_hazard(data=gmfdata)
for sid, hazardr in zip(getter.sids, hazard):
for rlzi, array in hazardr.items():
if len(array) == 0: # no data
continue
with hc_mon:
gmvs = array['gmv']
for imti, imt in enumerate(getter.imtls):
poes = calc._gmvs_to_haz_curve(
gmvs[:, imti], oq.imtls[imt],
oq.investigation_time, duration)
hcurves[rsi2str(rlzi, sid, imt)] = poes
else: # fast lane
with monitor('building hazard', measuremem=True):
gmfdata = numpy.fromiter(getter.gen_gmv(), getter.gmf_data_dt)
indices = []
if oq.ground_motion_fields:
gmfdata.sort(order=('sid', 'rlzi', 'eid'))
start = stop = 0
for sid, rows in itertools.groupby(gmfdata['sid']):
for row in rows:
stop += 1
indices.append((sid, start, stop))
start = stop
else:
gmfdata = None
res = dict(gmfdata=gmfdata, hcurves=hcurves, gmdata=getter.gmdata,
indices=numpy.array(indices, (U32, 3)))
if len(getter.gmdata):
results.append(res)
return results
[docs]def save_gmdata(calc, n_rlzs):
"""
Save a composite array `gmdata` in the datastore.
:param calc: a calculator with a dictionary .gmdata {rlz: data}
:param n_rlzs: the total number of realizations
"""
n_sites = len(calc.sitecol)
dtlist = ([(imt, F32) for imt in calc.oqparam.imtls] +
[('events', U32), ('nbytes', U32)])
array = numpy.zeros(n_rlzs, dtlist)
for rlzi in sorted(calc.gmdata):
data = calc.gmdata[rlzi] # (imts, events, nbytes)
events = data[-2]
nbytes = data[-1]
gmv = data[:-2] / events / n_sites
array[rlzi] = tuple(gmv) + (events, nbytes)
calc.datastore['gmdata'] = array
logging.info('Generated %s of GMFs', humansize(array['nbytes'].sum()))
[docs]def update_nbytes(dstore, key, array):
nbytes = dstore.get_attr(key, 'nbytes', 0)
dstore.set_attrs(key, nbytes=nbytes + array.nbytes)
[docs]@base.calculators.add('event_based')
class EventBasedCalculator(base.HazardCalculator):
"""
Event based PSHA calculator generating the ground motion fields and
the hazard curves from the ruptures, depending on the configuration
parameters.
"""
pre_calculator = 'event_based_rupture'
core_task = compute_gmfs_and_curves
is_stochastic = True
[docs] def combine_pmaps_and_save_gmfs(self, acc, results):
"""
Combine the hazard curves (if any) and save the gmfs (if any)
sequentially; notice that the gmfs may come from
different tasks in any order.
:param acc: an accumulator for the hazard curves
:param results: dictionaries rlzi, imt -> [gmf_array, curves_by_imt]
:returns: a new accumulator
"""
sav_mon = self.monitor('saving gmfs')
agg_mon = self.monitor('aggregating hcurves')
hdf5path = self.datastore.hdf5path
for res in results:
self.gmdata += res['gmdata']
data = res['gmfdata']
if data is not None:
with sav_mon:
hdf5.extend3(hdf5path, 'gmf_data/data', data)
# it is important to save the number of bytes while the
# computation is going, to see the progress
update_nbytes(self.datastore, 'gmf_data/data', data)
for sid, start, stop in res['indices']:
self.indices[sid].append(
(start + self.offset, stop + self.offset))
self.offset += len(data)
slicedic = self.oqparam.imtls.slicedic
with agg_mon:
for key, poes in res['hcurves'].items():
r, sid, imt = str2rsi(key)
array = acc[r].setdefault(sid, 0).array[slicedic[imt], 0]
array[:] = 1. - (1. - array) * (1. - poes)
sav_mon.flush()
agg_mon.flush()
self.datastore.flush()
if 'ruptures' in res:
vars(EventBasedRuptureCalculator)['save_ruptures'](
self, res['ruptures'])
return acc
[docs] def gen_args(self):
"""
:yields: the arguments for compute_gmfs_and_curves
"""
oq = self.oqparam
sitecol = self.sitecol.complete
monitor = self.monitor(self.core_task.__name__)
imts = list(oq.imtls)
min_iml = self.get_min_iml(oq)
correl_model = oq.get_correl_model()
try:
csm_info = self.csm.info
except AttributeError: # no csm
csm_info = self.datastore['csm_info']
samples_by_grp = csm_info.get_samples_by_grp()
rlzs_by_gsim = {grp_id: self.rlzs_assoc.get_rlzs_by_gsim(grp_id)
for grp_id in samples_by_grp}
if self.precalc:
num_ruptures = sum(len(rs) for rs in self.precalc.result.values())
block_size = math.ceil(num_ruptures / (oq.concurrent_tasks or 1))
for grp_id, ruptures in self.precalc.result.items():
if not ruptures:
continue
for block in block_splitter(ruptures, block_size):
getter = GmfGetter(
rlzs_by_gsim[grp_id], block, sitecol,
imts, min_iml, oq.maximum_distance,
oq.truncation_level, correl_model,
samples_by_grp[grp_id])
yield [getter], oq, monitor
return
U = len(self.datastore['ruptures'])
logging.info('Found %d ruptures', U)
parent = self.can_read_parent() or self.datastore
for slc in split_in_slices(U, oq.concurrent_tasks or 1):
getters = []
for grp_id in rlzs_by_gsim:
ruptures = RuptureGetter(parent, slc, grp_id)
if parent is self.datastore: # not accessible parent
ruptures = list(ruptures)
if not ruptures:
continue
getters.append(GmfGetter(
rlzs_by_gsim[grp_id], ruptures, sitecol,
imts, min_iml, oq.maximum_distance, oq.truncation_level,
correl_model, samples_by_grp[grp_id]))
yield getters, oq, monitor
[docs] def execute(self):
"""
Run in parallel `core_task(sources, sitecol, monitor)`, by
parallelizing on the ruptures according to their weight and
tectonic region type.
"""
oq = self.oqparam
if not oq.hazard_curves_from_gmfs and not oq.ground_motion_fields:
return
if self.oqparam.ground_motion_fields:
calc.check_overflow(self)
self.csm_info = self.datastore['csm_info']
self.sm_id = {tuple(sm.path): sm.ordinal
for sm in self.csm_info.source_models}
L = len(oq.imtls.array)
R = self.datastore['csm_info'].get_num_rlzs()
self.gmdata = {}
self.offset = 0
self.indices = collections.defaultdict(list) # sid -> indices
ires = parallel.Starmap(
self.core_task.__func__, self.gen_args()).submit_all()
if self.precalc and self.precalc.result:
# remove the ruptures in memory to save memory
self.precalc.result.clear()
acc = ires.reduce(self.combine_pmaps_and_save_gmfs, {
r: ProbabilityMap(L) for r in range(R)})
save_gmdata(self, R)
if self.indices:
logging.info('Saving gmf_data/indices')
with self.monitor('saving gmf_data/indices', measuremem=True,
autoflush=True):
self.datastore.save_vlen(
'gmf_data/indices',
[numpy.array(self.indices[sid], indices_dt)
for sid in self.sitecol.complete.sids])
return acc
[docs] def save_gmf_bytes(self):
"""Save the attribute nbytes in the gmf_data datasets"""
ds = self.datastore
for sm_id in ds['gmf_data']:
ds.set_nbytes('gmf_data/' + sm_id)
ds.set_nbytes('gmf_data')
[docs] def post_execute(self, result):
"""
:param result:
a dictionary (src_group_id, gsim) -> haz_curves or an empty
dictionary if hazard_curves_from_gmfs is false
"""
oq = self.oqparam
if not oq.hazard_curves_from_gmfs and not oq.ground_motion_fields:
return
elif oq.hazard_curves_from_gmfs:
rlzs = self.rlzs_assoc.realizations
# save individual curves
for i in sorted(result):
key = 'hcurves/rlz-%03d' % i
if result[i]:
self.datastore[key] = result[i]
else:
self.datastore[key] = ProbabilityMap(oq.imtls.array.size)
logging.info('Zero curves for %s', key)
# compute and save statistics; this is done in process
# we don't need to parallelize, since event based calculations
# involves a "small" number of sites (<= 65,536)
weights = [rlz.weight for rlz in rlzs]
hstats = self.oqparam.hazard_stats()
if len(hstats) and len(rlzs) > 1:
for kind, stat in hstats:
pmap = compute_pmap_stats(result.values(), [stat], weights)
self.datastore['hcurves/' + kind] = pmap
if self.datastore.parent:
self.datastore.parent.open()
if 'gmf_data' in self.datastore:
self.save_gmf_bytes()
if oq.compare_with_classical: # compute classical curves
export_dir = os.path.join(oq.export_dir, 'cl')
if not os.path.exists(export_dir):
os.makedirs(export_dir)
oq.export_dir = export_dir
# one could also set oq.number_of_logic_tree_samples = 0
self.cl = ClassicalCalculator(oq, self.monitor('classical'))
# TODO: perhaps it is possible to avoid reprocessing the source
# model, however usually this is quite fast and do not dominate
# the computation
self.cl.run(close=False)
cl_mean_curves = get_mean_curves(self.cl.datastore)
eb_mean_curves = get_mean_curves(self.datastore)
for imt in eb_mean_curves.dtype.names:
rdiff, index = util.max_rel_diff_index(
cl_mean_curves[imt], eb_mean_curves[imt])
logging.warn('Relative difference with the classical '
'mean curves for IMT=%s: %d%% at site index %d',
imt, rdiff * 100, index)
[docs]def get_mean_curves(dstore):
"""
Extract the mean hazard curves from the datastore, as a composite
array of length nsites.
"""
imtls = dstore['oqparam'].imtls
nsites = len(dstore['sitecol'])
hcurves = dstore['hcurves']
if 'mean' in hcurves:
mean = dstore['hcurves/mean']
elif len(hcurves) == 1: # there is a single realization
mean = dstore['hcurves/rlz-0000']
return mean.convert(imtls, nsites)