# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2018 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import logging
import collections
import numpy
import h5py
from openquake.baselib.general import AccumDict
from openquake.baselib.python3compat import zip
from openquake.baselib import parallel
from openquake.hazardlib.calc import stochastic
from openquake.hazardlib.scalerel.wc1994 import WC1994
from openquake.hazardlib.contexts import ContextMaker, FarAwayRupture
from openquake.hazardlib.source.rupture import EBRupture
from openquake.risklib import riskinput
from openquake.commonlib import util, readinput
from openquake.calculators import base, event_based, getters
from openquake.calculators.ucerf_base import (
DEFAULT_TRT, UcerfFilter, generate_background_ruptures)
from openquake.calculators.event_based_risk import EbrCalculator
from openquake.calculators.export.loss_curves import get_loss_builder
U16 = numpy.uint16
U32 = numpy.uint32
U64 = numpy.uint64
F32 = numpy.float32
F64 = numpy.float64
TWO16 = 2 ** 16
save_ruptures = event_based.EventBasedCalculator.save_ruptures
[docs]def ucerf_risk(riskinput, riskmodel, param, monitor):
"""
:param riskinput:
a :class:`openquake.risklib.riskinput.RiskInput` object
:param riskmodel:
a :class:`openquake.risklib.riskinput.CompositeRiskModel` instance
:param param:
a dictionary of parameters
:param monitor:
:class:`openquake.baselib.performance.Monitor` instance
:returns:
a dictionary of numpy arrays of shape (L, R)
"""
with monitor('%s.init' % riskinput.hazard_getter.__class__.__name__):
riskinput.hazard_getter.init()
eids = riskinput.hazard_getter.eids
A = len(riskinput.aids)
E = len(eids)
assert not param['insured_losses']
L = len(riskmodel.lti)
R = riskinput.hazard_getter.num_rlzs
param['lrs_dt'] = numpy.dtype([('rlzi', U16), ('ratios', (F32, L))])
agg = numpy.zeros((E, R, L), F32)
avg = AccumDict(accum={} if riskinput.by_site or not param['avg_losses']
else numpy.zeros(A, F64))
result = dict(aids=riskinput.aids, avglosses=avg)
# update the result dictionary and the agg array with each output
for out in riskmodel.gen_outputs(riskinput, monitor):
if len(out.eids) == 0: # this happens for sites with no events
continue
r = out.rlzi
idx = riskinput.hazard_getter.eid2idx
for l, loss_ratios in enumerate(out):
if loss_ratios is None: # for GMFs below the minimum_intensity
continue
loss_type = riskmodel.loss_types[l]
indices = numpy.array([idx[eid] for eid in out.eids])
for a, asset in enumerate(out.assets):
ratios = loss_ratios[a] # shape (E, I)
aid = asset.ordinal
losses = ratios * asset.value(loss_type)
# average losses
if param['avg_losses']:
rat = ratios.sum(axis=0) * param['ses_ratio']
lba = avg[l, r]
try:
lba[aid] += rat
except KeyError:
lba[aid] = rat
# this is the critical loop: it is important to keep it
# vectorized in terms of the event indices
agg[indices, r, l] += losses[:, 0] # 0 == no insured
it = ((eid, r, losses)
for eid, all_losses in zip(eids, agg)
for r, losses in enumerate(all_losses) if losses.sum())
result['agglosses'] = numpy.fromiter(it, param['elt_dt'])
# store info about the GMFs, must be done at the end
result['gmdata'] = riskinput.gmdata
return result
[docs]def generate_event_set(ucerf, background_sids, src_filter, seed):
"""
Generates the event set corresponding to a particular branch
"""
# get rates from file
with h5py.File(ucerf.source_file, 'r') as hdf5:
occurrences = ucerf.tom.sample_number_of_occurrences(
ucerf.rate, seed)
indices = numpy.where(occurrences)[0]
logging.debug(
'Considering "%s", %d ruptures', ucerf.source_id, len(indices))
# get ruptures from the indices
ruptures = []
rupture_occ = []
for iloc, n_occ in zip(indices, occurrences[indices]):
ucerf_rup = ucerf.get_ucerf_rupture(iloc, src_filter)
if ucerf_rup:
ruptures.append(ucerf_rup)
rupture_occ.append(n_occ)
# sample background sources
background_ruptures, background_n_occ = sample_background_model(
hdf5, ucerf.idx_set["grid_key"], ucerf.tom, seed,
background_sids, ucerf.min_mag, ucerf.npd, ucerf.hdd, ucerf.usd,
ucerf.lsd, ucerf.msr, ucerf.aspect, ucerf.tectonic_region_type)
ruptures.extend(background_ruptures)
rupture_occ.extend(background_n_occ)
return ruptures, rupture_occ
[docs]def sample_background_model(
hdf5, branch_key, tom, seed, filter_idx, min_mag, npd, hdd,
upper_seismogenic_depth, lower_seismogenic_depth, msr=WC1994(),
aspect=1.5, trt=DEFAULT_TRT):
"""
Generates a rupture set from a sample of the background model
:param branch_key:
Key to indicate the branch for selecting the background model
:param tom:
Temporal occurrence model as instance of :class:
openquake.hazardlib.tom.TOM
:param seed:
Random seed to use in the call to tom.sample_number_of_occurrences
:param filter_idx:
Sites for consideration (can be None!)
:param float min_mag:
Minimim magnitude for consideration of background sources
:param npd:
Nodal plane distribution as instance of :class:
openquake.hazardlib.pmf.PMF
:param hdd:
Hypocentral depth distribution as instance of :class:
openquake.hazardlib.pmf.PMF
:param float aspect:
Aspect ratio
:param float upper_seismogenic_depth:
Upper seismogenic depth (km)
:param float lower_seismogenic_depth:
Lower seismogenic depth (km)
:param msr:
Magnitude scaling relation
:param float integration_distance:
Maximum distance from rupture to site for consideration
"""
bg_magnitudes = hdf5["/".join(["Grid", branch_key, "Magnitude"])].value
# Select magnitudes above the minimum magnitudes
mag_idx = bg_magnitudes >= min_mag
mags = bg_magnitudes[mag_idx]
rates = hdf5["/".join(["Grid", branch_key, "RateArray"])][filter_idx, :]
rates = rates[:, mag_idx]
valid_locs = hdf5["Grid/Locations"][filter_idx, :]
# Sample remaining rates
sampler = tom.sample_number_of_occurrences(rates, seed)
background_ruptures = []
background_n_occ = []
for i, mag in enumerate(mags):
rate_idx = numpy.where(sampler[:, i])[0]
rate_cnt = sampler[rate_idx, i]
occurrence = rates[rate_idx, i]
locations = valid_locs[rate_idx, :]
ruptures = generate_background_ruptures(
tom, locations, occurrence,
mag, npd, hdd, upper_seismogenic_depth,
lower_seismogenic_depth, msr, aspect, trt)
background_ruptures.extend(ruptures)
background_n_occ.extend(rate_cnt.tolist())
return background_ruptures, background_n_occ
# #################################################################### #
[docs]@util.reader
def compute_hazard(sources, src_filter, rlzs_by_gsim, param, monitor):
"""
:param sources: a list with a single UCERF source
:param src_filter: a SourceFilter instance
:param rlzs_by_gsim: a dictionary gsim -> rlzs
:param param: extra parameters
:param monitor: a Monitor instance
:returns: an AccumDict grp_id -> EBRuptures
"""
[src] = sources
res = AccumDict()
res.calc_times = []
serial = 1
sampl_mon = monitor('sampling ruptures', measuremem=True)
filt_mon = monitor('filtering ruptures', measuremem=False)
res.trt = DEFAULT_TRT
ebruptures = []
background_sids = src.get_background_sids(src_filter)
sitecol = src_filter.sitecol
cmaker = ContextMaker(rlzs_by_gsim, src_filter.integration_distance)
for sample in range(param['samples']):
for ses_idx, ses_seed in param['ses_seeds']:
seed = sample * TWO16 + ses_seed
with sampl_mon:
rups, n_occs = generate_event_set(
src, background_sids, src_filter, seed)
with filt_mon:
for rup, n_occ in zip(rups, n_occs):
rup.serial = serial
rup.seed = seed
try:
rup.sctx, rup.dctx = cmaker.make_contexts(sitecol, rup)
indices = rup.sctx.sids
except FarAwayRupture:
continue
events = []
for _ in range(n_occ):
events.append((0, src.src_group_id, ses_idx, sample))
if events:
evs = numpy.array(events, stochastic.event_dt)
ebruptures.append(EBRupture(rup, indices, evs))
serial += 1
res.num_events = len(stochastic.set_eids(ebruptures))
res['ruptures'] = {src.src_group_id: ebruptures}
if param['save_ruptures']:
res.ruptures_by_grp = {src.src_group_id: ebruptures}
else:
res.events_by_grp = {
src.src_group_id: event_based.get_events(ebruptures)}
res.eff_ruptures = {src.src_group_id: src.num_ruptures}
if param.get('gmf'):
getter = getters.GmfGetter(
rlzs_by_gsim, ebruptures, sitecol,
param['oqparam'], param['min_iml'], param['samples'])
res.update(getter.compute_gmfs_curves(monitor))
return res
[docs]@base.calculators.add('ucerf_hazard')
class UCERFHazardCalculator(event_based.EventBasedCalculator):
"""
Event based PSHA calculator generating the ruptures only
"""
core_task = compute_hazard
[docs] def pre_execute(self):
"""
parse the logic tree and source model input
"""
logging.warn('%s is still experimental', self.__class__.__name__)
oq = self.oqparam
self.read_inputs() # read the site collection
self.csm = readinput.get_composite_source_model(oq)
logging.info('Found %d source model logic tree branches',
len(self.csm.source_models))
self.datastore['sitecol'] = self.sitecol
self.datastore['csm_info'] = self.csm_info = self.csm.info
self.rlzs_assoc = self.csm_info.get_rlzs_assoc()
self.infos = []
self.eid = collections.Counter() # sm_id -> event_id
self.sm_by_grp = self.csm_info.get_sm_by_grp()
if not self.oqparam.imtls:
raise ValueError('Missing intensity_measure_types!')
self.precomputed_gmfs = False
[docs] def filter_csm(self):
return UcerfFilter(
self.sitecol, self.oqparam.maximum_distance), self.csm
[docs] def gen_args(self, monitor):
"""
Generate a task for each branch
"""
oq = self.oqparam
allargs = [] # it is better to return a list; if there is single
# branch then `parallel.Starmap` will run the task in core
rlzs_by_gsim = self.csm.info.get_rlzs_by_gsim_grp()
for sm_id in range(len(self.csm.source_models)):
ssm = self.csm.get_model(sm_id)
[sm] = ssm.source_models
srcs = ssm.get_sources()
for ses_idx in range(1, oq.ses_per_logic_tree_path + 1):
ses_seeds = [(ses_idx, oq.ses_seed + ses_idx)]
param = dict(ses_seeds=ses_seeds, samples=sm.samples,
oqparam=oq, save_ruptures=oq.save_ruptures,
filter_distance=oq.filter_distance,
gmf=oq.ground_motion_fields,
min_iml=self.get_min_iml(oq))
allargs.append((srcs, self.src_filter, rlzs_by_gsim[sm_id],
param, monitor))
return allargs
[docs]class List(list):
"""Trivial container returned by compute_losses"""
[docs]@util.reader
def compute_losses(ssm, src_filter, param, riskmodel, monitor):
"""
Compute the losses for a single source model. Returns the ruptures
as an attribute `.ruptures_by_grp` of the list of losses.
:param ssm: CompositeSourceModel containing a single source model
:param sitecol: a SiteCollection instance
:param param: a dictionary of extra parameters
:param riskmodel: a RiskModel instance
:param monitor: a Monitor instance
:returns: a List containing the losses by taxonomy and some attributes
"""
[grp] = ssm.src_groups
res = List()
rlzs_assoc = ssm.info.get_rlzs_assoc()
rlzs_by_gsim = rlzs_assoc.get_rlzs_by_gsim(DEFAULT_TRT)
hazard = compute_hazard(grp, src_filter, rlzs_by_gsim, param, monitor)
[(grp_id, ebruptures)] = hazard['ruptures'].items()
samples = ssm.info.get_samples_by_grp()
num_rlzs = len(rlzs_assoc.realizations)
rlzs_by_gsim = rlzs_assoc.get_rlzs_by_gsim(DEFAULT_TRT)
getter = getters.GmfGetter(
rlzs_by_gsim, ebruptures, src_filter.sitecol,
param['oqparam'], param['min_iml'], samples[grp_id])
ri = riskinput.RiskInput(getter, param['assetcol'].assets_by_site())
res.append(ucerf_risk(ri, riskmodel, param, monitor))
res.sm_id = ssm.sm_id
res.num_events = len(ri.hazard_getter.eids)
start = res.sm_id * num_rlzs
res.rlz_slice = slice(start, start + num_rlzs)
res.events_by_grp = hazard.events_by_grp
res.eff_ruptures = hazard.eff_ruptures
return res
[docs]@base.calculators.add('ucerf_risk')
class UCERFRiskCalculator(EbrCalculator):
"""
Event based risk calculator for UCERF, parallelizing on the source models
"""
pre_execute = UCERFHazardCalculator.pre_execute
[docs] def gen_args(self):
"""
Yield the arguments required by build_ruptures, i.e. the
source models, the asset collection, the riskmodel and others.
"""
oq = self.oqparam
self.L = len(self.riskmodel.lti)
self.I = oq.insured_losses + 1
min_iml = self.get_min_iml(oq)
elt_dt = numpy.dtype([('eid', U64), ('rlzi', U16),
('loss', (F32, (self.L, self.I)))])
monitor = self.monitor('compute_losses')
src_filter = UcerfFilter(self.sitecol.complete, oq.maximum_distance)
for sm in self.csm.source_models:
if sm.samples > 1:
logging.warn('Sampling in ucerf_risk is untested')
ssm = self.csm.get_model(sm.ordinal)
for ses_idx in range(1, oq.ses_per_logic_tree_path + 1):
param = dict(ses_seeds=[(ses_idx, oq.ses_seed + ses_idx)],
samples=sm.samples, assetcol=self.assetcol,
save_ruptures=False,
ses_ratio=oq.ses_ratio,
avg_losses=oq.avg_losses,
elt_dt=elt_dt,
min_iml=min_iml,
oqparam=oq,
insured_losses=oq.insured_losses)
yield ssm, src_filter, param, self.riskmodel, monitor
[docs] def execute(self):
self.riskmodel.taxonomy = self.assetcol.tagcol.taxonomy
num_rlzs = len(self.rlzs_assoc.realizations)
self.grp_trt = self.csm_info.grp_by("trt")
res = parallel.Starmap(
compute_losses, self.gen_args(),
self.monitor()).submit_all()
self.vals = self.assetcol.values()
self.eff_ruptures = AccumDict(accum=0)
num_events = self.save_results(res, num_rlzs)
self.csm.info.update_eff_ruptures(self.eff_ruptures)
self.datastore['csm_info'] = self.csm.info
return num_events
[docs] def save_results(self, allres, num_rlzs):
"""
:param allres: an iterable of result iterators
:param num_rlzs: the total number of realizations
:returns: the total number of events
"""
oq = self.oqparam
self.A = len(self.assetcol)
if oq.avg_losses:
self.dset = self.datastore.create_dset(
'avg_losses-rlzs', F32, (self.A, num_rlzs, self.L * self.I))
num_events = collections.Counter()
self.gmdata = AccumDict(accum=numpy.zeros(len(oq.imtls) + 1, F32))
self.taskno = 0
self.start = 0
for res in allres:
start, stop = res.rlz_slice.start, res.rlz_slice.stop
for dic in res:
for r, arr in dic.pop('gmdata').items():
self.gmdata[start + r] += arr
self.save_losses(dic, start)
logging.debug(
'Saving results for source model #%d, realizations %d:%d',
res.sm_id + 1, start, stop)
if hasattr(res, 'eff_ruptures'):
self.eff_ruptures += res.eff_ruptures
if hasattr(res, 'ruptures_by_grp'):
for ruptures in res.ruptures_by_grp.values():
save_ruptures(self, ruptures)
elif hasattr(res, 'events_by_grp'):
for grp_id in res.events_by_grp:
events = res.events_by_grp[grp_id]
self.datastore.extend('events', events)
num_events[res.sm_id] += res.num_events
base.save_gmdata(self, num_rlzs)
return num_events
[docs] def save_losses(self, dic, offset=0):
"""
Save the event loss tables incrementally.
:param dic:
dictionary with agglosses, assratios, avglosses, lrs_idx
:param offset:
realization offset
"""
with self.monitor('saving event loss table', autoflush=True):
agglosses = dic.pop('agglosses')
agglosses['rlzi'] += offset
self.datastore.extend('losses_by_event', agglosses)
with self.monitor('saving avg_losses-rlzs'):
avglosses = dic.pop('avglosses')
for (l, r), ratios in avglosses.items():
lt = self.riskmodel.loss_types[l]
self.dset[:, r + offset, l] += ratios * self.vals[lt]
self.taskno += 1
[docs] def post_execute(self, result):
"""
Call the EbrPostCalculator to compute the aggregate loss curves
"""
if 'losses_by_event' not in self.datastore:
logging.warning(
'No losses were generated: most likely there is an error '
'in your input files or the GMFs were below the minimum '
'intensity')
else:
self.datastore.set_nbytes('losses_by_event')
E = sum(result.values())
agglt = self.datastore['losses_by_event']
agglt.attrs['nonzero_fraction'] = len(agglt) / E
self.param = dict(builder=get_loss_builder(self.datastore))
self.postproc()