# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2017 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division
import logging
import operator
import numpy
from openquake.baselib.python3compat import zip
from openquake.baselib.general import (
AccumDict, humansize, block_splitter, group_array)
from openquake.hazardlib.stats import compute_stats, compute_stats2
from openquake.calculators import base, event_based
from openquake.baselib import parallel
from openquake.risklib import riskinput, scientific
from openquake.baselib.parallel import Starmap
U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64
getweight = operator.attrgetter('weight')
[docs]def build_el_dtypes(insured_losses):
"""
:param bool insured_losses:
job.ini configuration parameter
:returns:
ela_dt and elt_dt i.e. the data types for event loss assets and
event loss table respectively
"""
I = insured_losses + 1
ela_list = [('eid', U32), ('aid', U32), ('loss', (F32, I))]
elt_list = [('eid', U32), ('loss', (F32, I))]
return numpy.dtype(ela_list), numpy.dtype(elt_list)
[docs]def build_agg_curve(cb_inputs, monitor):
"""
Build the aggregate loss curve in parallel for each loss type
and realization pair.
:param cb_inputs:
a list of triples `(cb, rlzname, data)` where `cb` is a curve builder,
`rlzname` is a string of kind `rlz-%03d` and `data` is an array of kind
`(rupture_id, loss)` or `(rupture_id, loss, loss_ins)`
:param monitor:
a Monitor instance
:returns:
a dictionary (r, l, i) -> (losses, poes, avg)
"""
result = {}
for cb, rlzname, data in cb_inputs:
if len(data) == 0: # realization with no losses
continue
l = cb.index
r = int(rlzname[4:]) # strip rlz-
losses = data['loss']
if len(losses.shape) == 1: # no insured losses
result[l, r, 0] = cb.calc_agg_curve(losses)
else:
result[l, r, 0] = cb.calc_agg_curve(losses[:, 0])
result[l, r, 1] = cb.calc_agg_curve(losses[:, 1])
return result
[docs]def build_rcurves(cb_inputs, assets, monitor):
"""
:param cb_inputs: triples `(cb, rlzname, data)`
:param assets: full list of assets
:param monitor: Monitor instance
"""
result = {}
for cb, rlzname, data in cb_inputs:
aids, curves = cb(assets, group_array(data, 'aid'))
if len(aids):
# strip "rlz-" from rlzname below
result[cb.index, int(rlzname[4:])] = aids, curves
return result
def _aggregate(outputs, compositemodel, agg, ass, idx, result, monitor):
# update the result dictionary and the agg array with each output
lrs = set()
for outs in outputs:
r = outs.r
for l, out in enumerate(outs):
if out is None: # for GMFs below the minimum_intensity
continue
loss_ratios, eids = out
lrs.add((l, r))
loss_type = compositemodel.loss_types[l]
indices = numpy.array([idx[eid] for eid in eids])
agglr = agg[l, r]
for i, asset in enumerate(outs.assets):
ratios = loss_ratios[i]
aid = asset.ordinal
losses = ratios * asset.value(loss_type)
# average losses
if monitor.avg_losses:
result['avglosses'][l, r][aid] += (
ratios.sum(axis=0) * monitor.ses_ratio)
# asset losses
if monitor.loss_ratios:
data = [(eid, aid, loss)
for eid, loss in zip(eids, ratios)
if loss.sum() > 0]
if data:
ass[l, r].append(numpy.array(data, monitor.ela_dt))
# agglosses
agglr[indices] += losses
return sorted(lrs)
[docs]def event_based_risk(riskinput, riskmodel, assetcol, monitor):
"""
:param riskinput:
a :class:`openquake.risklib.riskinput.RiskInput` object
:param riskmodel:
a :class:`openquake.risklib.riskinput.CompositeRiskModel` instance
:param assetcol:
AssetCollection instance
:param monitor:
:class:`openquake.baselib.performance.Monitor` instance
:returns:
a dictionary of numpy arrays of shape (L, R)
"""
A = len(assetcol)
I = monitor.insured_losses + 1
eids = riskinput.eids
E = len(eids)
idx = dict(zip(eids, range(E)))
agg = AccumDict(accum=numpy.zeros((E, I), F32))
ass = AccumDict(accum=[])
result = dict(agglosses=AccumDict(), asslosses=AccumDict())
if monitor.avg_losses:
result['avglosses'] = AccumDict(accum=numpy.zeros((A, I), F64))
outputs = riskmodel.gen_outputs(riskinput, monitor, assetcol)
lrs = _aggregate(outputs, riskmodel, agg, ass, idx, result, monitor)
for lr in lrs:
records = [(eids[i], loss) for i, loss in enumerate(agg[lr])
if loss.sum() > 0]
if records:
result['agglosses'][lr] = numpy.array(records, monitor.elt_dt)
for lr in ass:
if ass[lr]:
result['asslosses'][lr] = numpy.concatenate(ass[lr])
# store the size of the GMFs
result['gmfbytes'] = monitor.gmfbytes
return result
@base.calculators.add('event_based_risk')
[docs]class EbrPostCalculator(base.RiskCalculator):
pre_calculator = 'ebrisk'
[docs] def cb_inputs(self, table):
loss_table = self.datastore[table]
inputs = []
for rlzstr in loss_table:
for lt in loss_table[rlzstr]:
cb = self.riskmodel.curve_builders[self.riskmodel.lti[lt]]
data = loss_table[rlzstr][lt].value
inputs.append((cb, rlzstr, data))
return inputs
[docs] def execute(self):
A = len(self.assetcol)
ltypes = self.riskmodel.loss_types
I = self.oqparam.insured_losses + 1
R = len(self.rlzs_assoc.realizations)
self.vals = self.assetcol.values()
# loss curves
multi_lr_dt = numpy.dtype(
[(ltype, (F32, len(cbuilder.ratios)))
for ltype, cbuilder in zip(
ltypes, self.riskmodel.curve_builders)])
rcurves = numpy.zeros((A, R, I), multi_lr_dt)
# build rcurves-rlzs
if self.oqparam.loss_ratios:
assets = list(self.assetcol)
cb_inputs = self.cb_inputs('all_loss_ratios')
mon = self.monitor('build_rcurves')
res = parallel.Starmap.apply(
build_rcurves, (cb_inputs, assets, mon)).reduce()
for l, r in res:
aids, curves = res[l, r]
rcurves[ltypes[l]][aids, r] = curves
self.datastore['rcurves-rlzs'] = rcurves
# build rcurves-stats (sequentially)
# this is a fundamental output, being used to compute loss_maps-stats
if R > 1:
weights = self.datastore['realizations']['weight']
quantiles = self.oqparam.quantile_loss_curves
if 'avg_losses-rlzs' in self.datastore:
with self.monitor('computing avg_losses-stats'):
self.datastore['avg_losses-stats'] = compute_stats2(
self.datastore['avg_losses-rlzs'], quantiles, weights)
if self.oqparam.loss_ratios:
with self.monitor('computing rcurves-stats'):
self.datastore['rcurves-stats'] = compute_stats2(
rcurves, quantiles, weights)
# build an aggregate loss curve per realization
if 'agg_loss_table' in self.datastore:
with self.monitor('building agg_curve'):
self.build_agg_curve()
[docs] def post_execute(self):
pass
[docs] def build_agg_curve(self):
"""
Build a single loss curve per realization. It is NOT obtained
by aggregating the loss curves; instead, it is obtained without
generating the loss curves, directly from the the aggregate losses.
"""
oq = self.oqparam
cr = {cb.loss_type: cb.curve_resolution
for cb in self.riskmodel.curve_builders}
loss_curve_dt, _ = scientific.build_loss_dtypes(
cr, oq.conditional_loss_poes)
lts = self.riskmodel.loss_types
cb_inputs = self.cb_inputs('agg_loss_table')
I = oq.insured_losses + 1
R = len(self.rlzs_assoc.realizations)
result = parallel.Starmap.apply(
build_agg_curve, (cb_inputs, self.monitor('')),
concurrent_tasks=self.oqparam.concurrent_tasks).reduce()
agg_curve = numpy.zeros((I, R), loss_curve_dt)
for l, r, i in result:
agg_curve[lts[l]][i, r] = result[l, r, i]
self.datastore['agg_curve-rlzs'] = agg_curve
if R > 1: # save stats too
weights = self.datastore['realizations']['weight']
Q1 = len(oq.quantile_loss_curves) + 1
agg_curve_stats = numpy.zeros((I, Q1), agg_curve.dtype)
for l, loss_type in enumerate(agg_curve.dtype.names):
acs = agg_curve_stats[loss_type]
data = agg_curve[loss_type]
for i in range(I):
losses, all_poes = scientific.normalize_curves_eb(
[(c['losses'], c['poes']) for c in data[i]])
acs['losses'][i] = losses
acs['poes'][i] = compute_stats(
all_poes, oq.quantile_loss_curves, weights)
acs['avg'][i] = compute_stats(
data['avg'][i], oq.quantile_loss_curves, weights)
self.datastore['agg_curve-stats'] = agg_curve_stats
elt_dt = numpy.dtype([('eid', U32), ('loss', F32)])
save_events = event_based.EventBasedRuptureCalculator.__dict__['save_events']
[docs]class EpsilonMatrix0(object):
"""
Mock-up for a matrix of epsilons of size N x E,
used when asset_correlation=0.
:param num_assets: N assets
:param seeds: E seeds, set before calling numpy.random.normal
"""
def __init__(self, num_assets, seeds):
self.num_assets = num_assets
self.seeds = seeds
self.eps = None
[docs] def make_eps(self):
"""
Builds a matrix of N x E epsilons
"""
eps = numpy.zeros((self.num_assets, len(self.seeds)), F32)
for i, seed in enumerate(self.seeds):
numpy.random.seed(seed)
eps[:, i] = numpy.random.normal(size=self.num_assets)
return eps
def __getitem__(self, item):
if self.eps is None:
self.eps = self.make_eps()
return self.eps[item]
[docs]class EpsilonMatrix1(object):
"""
Mock-up for a matrix of epsilons of size N x E,
used when asset_correlation=1.
:param num_events: number of events
:param seed: seed used to generate E epsilons
"""
def __init__(self, num_events, seed):
self.num_events = num_events
self.seed = seed
numpy.random.seed(seed)
self.eps = numpy.random.normal(size=num_events)
def __getitem__(self, item):
# item[0] is the asset index, item[1] the event index
# the epsilons are equal for all assets since asset_correlation=1
return self.eps[item[1]]
@base.calculators.add('ebrisk')
[docs]class EbriskCalculator(base.RiskCalculator):
"""
Event based PSHA calculator generating the total losses by taxonomy
"""
pre_calculator = 'event_based_rupture'
is_stochastic = True
# TODO: if the number of source models is larger than concurrent_tasks
# a different strategy should be used; the one used here is good when
# there are few source models, so that we cannot parallelize on those
[docs] def build_starmap(self, sm_id, ruptures_by_grp, sitecol,
assetcol, riskmodel, imts, trunc_level, correl_model,
min_iml, monitor):
"""
:param sm_id: source model ordinal
:param ruptures_by_grp: dictionary of ruptures by src_group_id
:param sitecol: a SiteCollection instance
:param assetcol: an AssetCollection instance
:param riskmodel: a RiskModel instance
:param imts: a list of Intensity Measure Types
:param trunc_level: truncation level
:param correl_model: correlation model
:param min_iml: vector of minimum intensities, one per IMT
:param monitor: a Monitor instance
:returns: a pair (Starmap, dictionary of attributes)
"""
csm_info = self.csm_info.get_info(sm_id)
grp_ids = sorted(csm_info.get_sm_by_grp())
rlzs_assoc = csm_info.get_rlzs_assoc(
count_ruptures=lambda grp: len(ruptures_by_grp.get(grp.id, [])))
num_events = sum(ebr.multiplicity for grp in ruptures_by_grp
for ebr in ruptures_by_grp[grp])
seeds = self.oqparam.random_seed + numpy.arange(num_events)
allargs = []
# prepare the risk inputs
ruptures_per_block = self.oqparam.ruptures_per_block
start = 0
grp_trt = csm_info.grp_trt()
ignore_covs = self.oqparam.ignore_covs
for grp_id in grp_ids:
for rupts in block_splitter(
ruptures_by_grp.get(grp_id, []), ruptures_per_block):
if ignore_covs or not self.riskmodel.covs:
eps = None
elif self.oqparam.asset_correlation:
eps = EpsilonMatrix1(num_events, self.oqparam.master_seed)
else:
n_events = sum(ebr.multiplicity for ebr in rupts)
eps = EpsilonMatrix0(
len(self.assetcol), seeds[start: start + n_events])
start += n_events
ri = riskinput.RiskInputFromRuptures(
grp_trt[grp_id], rlzs_assoc, imts, sitecol,
rupts, trunc_level, correl_model, min_iml, eps)
allargs.append((ri, riskmodel, assetcol, monitor))
self.vals = self.assetcol.values()
taskname = '%s#%d' % (event_based_risk.__name__, sm_id + 1)
smap = Starmap(event_based_risk, allargs, name=taskname)
attrs = dict(num_ruptures={
sg_id: len(rupts) for sg_id, rupts in ruptures_by_grp.items()},
num_events=num_events,
num_rlzs=len(rlzs_assoc.realizations),
sm_id=sm_id)
return smap, attrs
[docs] def gen_args(self, ruptures_by_grp):
"""
Yield the arguments required by build_ruptures, i.e. the
source models, the asset collection, the riskmodel and others.
"""
oq = self.oqparam
correl_model = oq.get_correl_model()
min_iml = self.get_min_iml(oq)
imts = list(oq.imtls)
ela_dt, elt_dt = build_el_dtypes(oq.insured_losses)
csm_info = self.datastore['csm_info']
for sm in csm_info.source_models:
monitor = self.monitor.new(
ses_ratio=oq.ses_ratio,
ela_dt=ela_dt, elt_dt=elt_dt,
loss_ratios=oq.loss_ratios,
avg_losses=oq.avg_losses,
insured_losses=oq.insured_losses,
ses_per_logic_tree_path=oq.ses_per_logic_tree_path,
maximum_distance=oq.maximum_distance,
samples=sm.samples,
seed=self.oqparam.random_seed)
yield (sm.ordinal, ruptures_by_grp, self.sitecol.complete,
self.assetcol, self.riskmodel, imts, oq.truncation_level,
correl_model, min_iml, monitor)
[docs] def execute(self):
"""
Run the calculator and aggregate the results
"""
if self.oqparam.ground_motion_fields:
logging.warn('To store the ground motion fields change '
'calculation_mode = event_based')
if self.oqparam.hazard_curves_from_gmfs:
logging.warn('To compute the hazard curves change '
'calculation_mode = event_based')
ruptures_by_grp = (
self.precalc.result if self.precalc
else event_based.get_ruptures_by_grp(self.datastore.parent))
# the ordering of the ruptures is essential for repeatibility
for grp in ruptures_by_grp:
ruptures_by_grp[grp].sort(key=operator.attrgetter('serial'))
num_rlzs = 0
allres = []
source_models = self.csm.info.source_models
self.sm_by_grp = self.csm.info.get_sm_by_grp()
for i, args in enumerate(self.gen_args(ruptures_by_grp)):
smap, attrs = self.build_starmap(*args)
res = smap.submit_all()
vars(res).update(attrs)
allres.append(res)
res.rlz_slice = slice(num_rlzs, num_rlzs + res.num_rlzs)
num_rlzs += res.num_rlzs
for sg in source_models[i].src_groups:
sg.eff_ruptures = res.num_ruptures.get(sg.id, 0)
self.datastore['csm_info'] = self.csm.info
self.datastore.flush() # when killing the computation
# the csm_info arrays were stored but not the attributes;
# adding the .flush() solved the issue
num_events = self.save_results(allres, num_rlzs)
self.save_data_transfer(parallel.IterResult.sum(allres))
return num_events
[docs] def save_results(self, allres, num_rlzs):
"""
:param allres: an iterable of result iterators
:param num_rlzs: the total number of realizations
:returns: the total number of events
"""
self.L = len(self.riskmodel.lti)
self.R = num_rlzs
self.T = len(self.assetcol.taxonomies)
self.A = len(self.assetcol)
self.I = I = self.oqparam.insured_losses + 1
avg_losses = self.oqparam.avg_losses
if avg_losses:
# since we are using a composite array, we must use fillvalue=None
# and then set the array to 0 manually (to avoid bogus numbers)
zero = numpy.zeros(self.A, (F32, (I,)))
dset = self.datastore.create_dset(
'avg_losses-rlzs', (F32, (I,)), (self.A, self.R, self.L),
fillvalue=None)
for r in range(self.R):
for l in range(self.L):
dset[:, r, l] = zero
num_events = 0
self.gmfbytes = 0
for res in allres:
start, stop = res.rlz_slice.start, res.rlz_slice.stop
for dic in res:
if avg_losses:
self.save_avg_losses(dset, dic.pop('avglosses'), start)
self.gmfbytes += dic.pop('gmfbytes')
self.save_losses(
dic.pop('agglosses'), dic.pop('asslosses'), start)
logging.debug(
'Saving results for source model #%d, realizations %d:%d',
res.sm_id + 1, start, stop)
if hasattr(res, 'ruptures_by_grp'):
save_events(self, res.ruptures_by_grp)
num_events += res.num_events
self.datastore['events'].attrs['num_events'] = num_events
return num_events
[docs] def save_avg_losses(self, dset, dic, start):
"""
Save a dictionary (l, r) -> losses of average losses
"""
with self.monitor('saving avg_losses-rlzs'):
for (l, r), losses in dic.items():
vs = self.vals[self.riskmodel.loss_types[l]]
new = numpy.array([losses[:, i] * vs for i in range(self.I)])
dset[:, r + start, l] += new.T # shape (A, I)
[docs] def save_losses(self, agglosses, asslosses, offset):
"""
Save the event loss tables incrementally.
:param agglosses: a dictionary lr -> (eid, loss)
:param asslosses: a dictionary lr -> (eid, aid, loss)
:param offset: realization offset
"""
with self.monitor('saving event loss tables', autoflush=True):
for l, r in agglosses:
loss_type = self.riskmodel.loss_types[l]
key = 'agg_loss_table/rlz-%03d/%s' % (r + offset, loss_type)
self.datastore.extend(key, agglosses[l, r])
for l, r in asslosses:
loss_type = self.riskmodel.loss_types[l]
key = 'all_loss_ratios/rlz-%03d/%s' % (r + offset, loss_type)
self.datastore.extend(key, asslosses[l, r])
[docs] def post_execute(self, num_events):
"""
Save an array of losses by taxonomy of shape (T, L, R).
"""
event_based.EventBasedRuptureCalculator.__dict__['post_execute'](
self, num_events)
if self.gmfbytes == 0:
raise RuntimeError('No GMFs were generated, perhaps they were '
'all below the minimum_intensity threshold')
logging.info('Generated %s of GMFs', humansize(self.gmfbytes))
self.datastore.save('job_info', {'gmfbytes': self.gmfbytes})
A, E = len(self.assetcol), num_events
if 'all_loss_ratios' in self.datastore:
for rlzname in self.datastore['all_loss_ratios']:
self.datastore.set_nbytes('all_loss_ratios/' + rlzname)
self.datastore.set_nbytes('all_loss_ratios')
asslt = self.datastore['all_loss_ratios']
for rlz, dset in asslt.items():
for ds in dset.values():
ds.attrs['nonzero_fraction'] = len(ds) / (A * E)
if 'agg_loss_table' not in self.datastore:
logging.warning(
'No losses were generated: most likely there is an error in y'
'our input files or the GMFs were below the minimum intensity')
else:
for rlzname in self.datastore['agg_loss_table']:
self.datastore.set_nbytes('agg_loss_table/' + rlzname)
self.datastore.set_nbytes('agg_loss_table')
agglt = self.datastore['agg_loss_table']
for rlz, dset in agglt.items():
for ds in dset.values():
ds.attrs['nonzero_fraction'] = len(ds) / E