# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division
import logging
import operator
import itertools
import collections
import numpy
from openquake.baselib.python3compat import zip
from openquake.baselib.general import AccumDict, humansize
from openquake.calculators import base
from openquake.commonlib import readinput, parallel, calc
from openquake.risklib import riskinput, scientific
from openquake.commonlib.parallel import starmap
U32 = numpy.uint32
F32 = numpy.float32
[docs]def build_el_dtypes(insured_losses):
"""
:param bool insured_losses:
job.ini configuration parameter
:returns:
ela_dt and elt_dt i.e. the data types for event loss assets and
event loss table respectively
"""
I = insured_losses + 1
ela_list = [('rup_id', U32), ('ass_id', U32), ('loss', (F32, I))]
elt_list = [('rup_id', U32), ('loss', (F32, I))]
return numpy.dtype(ela_list), numpy.dtype(elt_list)
@parallel.litetask
[docs]def build_agg_curve(lr_data, insured_losses, ses_ratio, curve_resolution, L,
monitor):
"""
Build the aggregate loss curve in parallel for each loss type
and realization pair.
:param lr_data:
a list of triples `(l, r, data)` where `l` is the loss type index,
`r` is the realization index and `data` is an array of kind
`(rupture_id, loss)` or `(rupture_id, loss, loss_ins)`
:param bool insured_losses:
job.ini configuration parameter
:param ses_ratio:
a ratio obtained from ses_per_logic_tree_path
:param curve_resolution:
the number of discretization steps for the loss curve
:param L:
the number of loss types
:param monitor:
a Monitor instance
:returns:
a dictionary (r, l, i) -> (losses, poes, avg)
"""
result = {}
for l, r, data in lr_data:
if len(data) == 0: # realization with no losses
continue
if insured_losses:
gloss = data['loss'][:, 0]
iloss = data['loss'][:, 1]
else:
gloss = data['loss']
losses, poes = scientific.event_based(
gloss, ses_ratio, curve_resolution)
avg = scientific.average_loss((losses, poes))
result[l, r, 'losses'] = losses
result[l, r, 'poes'] = poes
result[l, r, 'avg'] = avg
if insured_losses:
losses_ins, poes_ins = scientific.event_based(
iloss, ses_ratio, curve_resolution)
avg_ins = scientific.average_loss((losses_ins, poes_ins))
result[l, r, 'losses_ins'] = losses_ins
result[l, r, 'poes_ins'] = poes_ins
result[l, r, 'avg_ins'] = avg_ins
return result
[docs]def square(L, R, factory):
"""
:param L: the number of loss types
:param R: the number of realizations
:param factory: thunk used to initialize the elements
:returns: a numpy matrix of shape (L, R)
"""
losses = numpy.zeros((L, R), object)
for l in range(L):
for r in range(R):
losses[l, r] = factory()
return losses
def _old_loss_curves(asset_values, rcurves, ratios):
# build loss curves in the old format (i.e. (losses, poes)) from
# loss curves in the new format (i.e. poes).
# shape (N, 2, C)
return numpy.array([(avalue * ratios, poes)
for avalue, poes in zip(asset_values, rcurves)])
def _aggregate_output(output, compositemodel, agg, ass, idx, result, monitor):
# update the result dictionary and the agg array with each output
asset_ids = [a.ordinal for a in output.assets]
for (l, r), out in sorted(output.items()):
loss_type = compositemodel.loss_types[l]
indices = numpy.array([idx[eid] for eid in out.eids])
cb = compositemodel.curve_builders[l]
if cb.user_provided:
counts_matrix = cb.build_counts(out.loss_ratios[:, :, 0])
result['RC'][l, r] += dict(zip(asset_ids, counts_matrix))
if monitor.insured_losses:
result['IC'][l, r] += dict(
zip(asset_ids, cb.build_counts(out.loss_ratios[:, :, 1])))
for i, asset in enumerate(output.assets):
aid = asset.ordinal
loss_ratios = out.loss_ratios[i]
losses = loss_ratios * asset.value(loss_type)
# average losses
if monitor.avg_losses:
result['AVGLOSS'][l, r][aid] += (
loss_ratios.sum(axis=0) * monitor.ses_ratio)
# asset losses
if monitor.asset_loss_table:
data = [(eid, aid, loss)
for eid, loss in zip(out.eids, losses)
if loss.sum() > 0]
if data:
ass[l, r].append(numpy.array(data, monitor.ela_dt))
# agglosses
agg[indices, l, r] += losses
@parallel.litetask
[docs]def event_based_risk(riskinput, riskmodel, rlzs_assoc, assetcol, monitor):
"""
:param riskinput:
a :class:`openquake.risklib.riskinput.RiskInput` object
:param riskmodel:
a :class:`openquake.risklib.riskinput.CompositeRiskModel` instance
:param rlzs_assoc:
a class:`openquake.commonlib.source.RlzsAssoc` instance
:param assetcol:
AssetCollection instance
:param monitor:
:class:`openquake.baselib.performance.Monitor` instance
:returns:
a dictionary of numpy arrays of shape (L, R)
"""
lti = riskmodel.lti # loss type -> index
L, R = len(lti), len(rlzs_assoc.realizations)
I = monitor.insured_losses + 1
eids = riskinput.eids
E = len(eids)
idx = dict(zip(eids, range(E)))
agg = numpy.zeros((E, L, R, I), F32)
ass = collections.defaultdict(list)
def zeroN():
return numpy.zeros((monitor.num_assets, I))
result = dict(RC=square(L, R, AccumDict), IC=square(L, R, AccumDict),
AGGLOSS=AccumDict(), ASSLOSS=AccumDict())
if monitor.avg_losses:
result['AVGLOSS'] = square(L, R, zeroN)
agglosses_mon = monitor('aggregate losses', measuremem=False)
for output in riskmodel.gen_outputs(
riskinput, rlzs_assoc, monitor, assetcol):
with agglosses_mon:
_aggregate_output(
output, riskmodel, agg, ass, idx, result, monitor)
for (l, r) in itertools.product(range(L), range(R)):
records = [(eids[i], loss) for i, loss in enumerate(agg[:, l, r])
if loss.sum() > 0]
if records:
result['AGGLOSS'][l, r] = numpy.array(records, monitor.elt_dt)
for lr in ass:
if ass[lr]:
result['ASSLOSS'][lr] = numpy.concatenate(ass[lr])
# store the size of the GMFs
result['gmfbytes'] = monitor.gmfbytes
return result
@base.calculators.add('event_based_risk')
[docs]class EventBasedRiskCalculator(base.RiskCalculator):
"""
Event based PSHA calculator generating the event loss table and
fixed ratios loss curves.
"""
pre_calculator = 'event_based'
core_task = event_based_risk
is_stochastic = True
[docs] def pre_execute(self):
"""
Read the precomputed ruptures (or compute them on the fly)
"""
super(EventBasedRiskCalculator, self).pre_execute()
calc.check_overflow(self)
if not self.riskmodel: # there is no riskmodel, exit early
self.execute = lambda: None
self.post_execute = lambda result: None
return
[docs] def execute(self):
"""
Run the event_based_risk calculator and aggregate the results
"""
oq = self.oqparam
correl_model = readinput.get_correl_model(oq)
self.N = len(self.assetcol)
self.E = len(self.etags)
logging.info('Populating the risk inputs')
rlzs_by_tr_id = self.rlzs_assoc.get_rlzs_by_trt_id()
num_rlzs = {t: len(rlzs) for t, rlzs in rlzs_by_tr_id.items()}
num_assets = {sid: len(self.assets_by_site[sid])
for sid in self.sitecol.sids}
all_ruptures = []
for serial in self.datastore['sescollection']:
rup = self.datastore['sescollection/' + serial]
rup.set_weight(num_rlzs, num_assets)
all_ruptures.append(rup)
all_ruptures.sort(key=operator.attrgetter('serial'))
if not self.riskmodel.covs:
# do not generate epsilons
eps = None
else:
eps = riskinput.make_eps(
self.assets_by_site, self.E, oq.master_seed,
oq.asset_correlation)
logging.info('Generated %s epsilons', eps.shape)
# preparing empty datasets
loss_types = self.riskmodel.loss_types
self.C = self.oqparam.loss_curve_resolution
self.L = L = len(loss_types)
self.R = R = len(self.rlzs_assoc.realizations)
self.I = self.oqparam.insured_losses
# ugly: attaching attributes needed in the task function
mon = self.monitor
mon.num_assets = self.count_assets()
mon.avg_losses = self.oqparam.avg_losses
mon.asset_loss_table = self.oqparam.asset_loss_table
mon.insured_losses = self.I
mon.ses_ratio = (
oq.risk_investigation_time or oq.investigation_time) / (
oq.investigation_time * oq.ses_per_logic_tree_path)
self.N = N = len(self.assetcol)
self.E = len(self.datastore['etags'])
# average losses, stored in a composite array of shape N, R
self.avg_losses = numpy.zeros((N, R), oq.loss_dt())
self.ass_loss_table = square(L, R, lambda: None)
self.agg_loss_table = square(L, R, lambda: None)
self.ela_dt, self.elt_dt = mon.ela_dt, mon.elt_dt = build_el_dtypes(
self.I)
for (l, r) in itertools.product(range(L), range(R)):
lt = loss_types[l]
if self.oqparam.asset_loss_table:
self.ass_loss_table[l, r] = self.datastore.create_dset(
'ass_loss_table/rlz-%03d/%s' % (r, lt), self.ela_dt)
self.agg_loss_table[l, r] = self.datastore.create_dset(
'agg_loss_table/rlz-%03d/%s' % (r, lt), self.elt_dt)
self.saved = collections.Counter() # nbytes per HDF5 key
self.ass_bytes = 0
self.agg_bytes = 0
self.gmfbytes = 0
rlz_ids = getattr(self.oqparam, 'rlz_ids', ())
if rlz_ids:
self.rlzs_assoc = self.rlzs_assoc.extract(rlz_ids)
if not oq.minimum_intensity:
# infer it from the risk models if not directly set in job.ini
oq.minimum_intensity = self.riskmodel.get_min_iml()
min_iml = calc.fix_minimum_intensity(
oq.minimum_intensity, oq.imtls)
if min_iml.sum() == 0:
logging.warn('The GMFs are not filtered: '
'you may want to set a minimum_intensity')
else:
logging.info('minimum_intensity=%s', oq.minimum_intensity)
with self.monitor('building riskinputs', autoflush=True):
riskinputs = self.riskmodel.build_inputs_from_ruptures(
self.sitecol.complete, all_ruptures, oq.truncation_level,
correl_model, min_iml, eps, oq.concurrent_tasks or 1)
# NB: I am using generators so that the tasks are submitted one at
# the time, without keeping all of the arguments in memory
tm = starmap(
self.core_task.__func__,
((riskinput, self.riskmodel, self.rlzs_assoc,
self.assetcol, self.monitor.new('task'))
for riskinput in riskinputs))
res = tm.reduce(agg=self.agg)
self.save_data_transfer(tm)
return res
[docs] def agg(self, acc, result):
"""
Aggregate losses and store them in the datastore.
:param acc: accumulator dictionary
:param result: dictionary coming from event_based_risk
"""
self.gmfbytes += result.pop('gmfbytes')
with self.monitor('saving event loss tables', autoflush=True):
if self.oqparam.asset_loss_table:
for lr, array in sorted(result.pop('ASSLOSS').items()):
self.ass_loss_table[lr].extend(array)
self.ass_bytes += array.nbytes
for lr, array in sorted(result.pop('AGGLOSS').items()):
self.agg_loss_table[lr].extend(array)
self.agg_bytes += array.nbytes
self.datastore.hdf5.flush()
return acc + result
[docs] def post_execute(self, result):
"""
Save the event loss table in the datastore.
:param result:
the dictionary returned by the .execute method
"""
logging.info('Generated %s of GMFs', humansize(self.gmfbytes))
self.datastore.save('job_info', {'gmfbytes': self.gmfbytes})
if self.oqparam.asset_loss_table:
asslt = self.datastore['ass_loss_table']
asslt.attrs['nbytes'] = self.ass_bytes
for rlz, dset in asslt.items():
for ds in dset.values():
ds.attrs['nonzero_fraction'] = len(ds) / (self.N * self.E)
agglt = self.datastore['agg_loss_table']
agglt.attrs['nbytes'] = self.agg_bytes
for rlz, dset in agglt.items():
for ds in dset.values():
ds.attrs['nonzero_fraction'] = len(ds) / self.E
insured_losses = self.oqparam.insured_losses
ses_ratio = self.oqparam.ses_ratio
saved = self.saved
self.N = N = len(self.assetcol)
self.R = R = len(self.rlzs_assoc.realizations)
ltypes = self.riskmodel.loss_types
self.loss_curve_dt, self.loss_maps_dt = (
self.riskmodel.build_loss_dtypes(
self.oqparam.conditional_loss_poes, self.I))
self.vals = {} # asset values by loss_type
for ltype in ltypes:
asset_values = []
for assets in self.assets_by_site:
for asset in assets:
asset_values.append(asset.value(
ltype, self.oqparam.time_event))
self.vals[ltype] = numpy.array(asset_values)
# loss curves
multi_lr_dt = numpy.dtype(
[(ltype, (F32, cbuilder.curve_resolution))
for ltype, cbuilder in zip(
ltypes, self.riskmodel.curve_builders)])
rcurves = numpy.zeros((N, R, 2), multi_lr_dt)
# AVGLOSS
if self.oqparam.avg_losses:
with self.monitor('building avg_losses-rlzs'):
for (l, r), avgloss in numpy.ndenumerate(
result['AVGLOSS']):
lt = self.riskmodel.loss_types[l]
avg_losses_lt = self.avg_losses[lt]
for i, avalue in enumerate(self.vals[lt]):
avg_losses_lt[i, r] = avgloss[i, 0] * avalue
if self.oqparam.insured_losses:
self.avg_losses[lt + '_ins'][i, r] = (
avgloss[i, 1] * avalue)
self.datastore['avg_losses-rlzs'] = self.avg_losses
saved['avg_losses-rlzs'] = self.avg_losses.nbytes
# RC, IC
if self.oqparam.loss_ratios:
with self.monitor('building rcurves-rlzs'):
for (l, r), data in numpy.ndenumerate(result['RC']):
cb = self.riskmodel.curve_builders[l]
if data and cb.user_provided:
# data is a dict asset idx -> counts
lt = self.riskmodel.loss_types[l]
poes = cb.build_poes(N, [data], ses_ratio)
rcurves[lt][:, r, 0] = poes
saved['rcurves-rlzs'] += poes.nbytes
for (l, r), data in numpy.ndenumerate(result['IC']):
cb = self.riskmodel.curve_builders[l]
if data and cb.user_provided and insured_losses:
# data is a dict asset idx -> counts
lt = self.riskmodel.loss_types[l]
poes = cb.build_poes(N, [data], ses_ratio)
rcurves[lt][:, r, 1] = poes
saved['rcurves-rlzs'] += poes.nbytes
self.datastore['rcurves-rlzs'] = rcurves
oq = self.oqparam
builder = scientific.StatsBuilder(
oq.quantile_loss_curves, oq.conditional_loss_poes, [],
oq.loss_curve_resolution, scientific.normalize_curves_eb,
oq.insured_losses)
# build an aggregate loss curve per realization plus statistics
with self.monitor('building agg_curve'):
self.build_agg_curve_and_stats(builder)
self.datastore.hdf5.flush()
for out in sorted(saved):
nbytes = saved[out]
if nbytes:
self.datastore[out].attrs['nbytes'] = nbytes
logging.info('Saved %s in %s', humansize(nbytes), out)
if self.oqparam.asset_loss_table:
pass # TODO: build specific loss curves
rlzs = self.rlzs_assoc.realizations
if self.loss_maps_dt:
with self.monitor('building loss_maps-rlzs'):
if (self.oqparam.conditional_loss_poes and
'rcurves-rlzs' in self.datastore):
loss_maps = numpy.zeros((N, R), self.loss_maps_dt)
rcurves = self.datastore['rcurves-rlzs']
for cb in self.riskmodel.curve_builders:
if cb.user_provided:
lm = loss_maps[cb.loss_type]
for r, lmaps in cb.build_loss_maps(
self.assetcol.array, rcurves):
lm[:, r] = lmaps
self.datastore['loss_maps-rlzs'] = loss_maps
if len(rlzs) > 1:
self.Q1 = len(self.oqparam.quantile_loss_curves) + 1
with self.monitor('computing stats'):
if 'rcurves-rlzs' in self.datastore:
self.compute_store_stats(rlzs, builder)
if oq.avg_losses: # stats for avg_losses
stats = scientific.SimpleStats(
rlzs, oq.quantile_loss_curves)
stats.compute_and_store('avg_losses', self.datastore)
self.datastore.hdf5.flush()
[docs] def build_agg_curve_and_stats(self, builder):
"""
Build a single loss curve per realization. It is NOT obtained
by aggregating the loss curves; instead, it is obtained without
generating the loss curves, directly from the the aggregate losses.
"""
oq = self.oqparam
C = oq.loss_curve_resolution
loss_curve_dt, _ = self.riskmodel.build_all_loss_dtypes(
C, oq.conditional_loss_poes, oq.insured_losses)
lts = self.riskmodel.loss_types
lr_data = [(l, r, dset.dset.value) for (l, r), dset in
numpy.ndenumerate(self.agg_loss_table)]
ses_ratio = self.oqparam.ses_ratio
result = parallel.apply_reduce(
build_agg_curve, (lr_data, self.I, ses_ratio, C, self.L,
self.monitor('')),
concurrent_tasks=self.oqparam.concurrent_tasks)
agg_curve = numpy.zeros(self.R, loss_curve_dt)
for l, r, name in result:
agg_curve[lts[l]][name][r] = result[l, r, name]
if oq.individual_curves:
self.datastore['agg_curve-rlzs'] = agg_curve
self.saved['agg_curve-rlzs'] = agg_curve.nbytes
if self.R > 1:
self.build_agg_curve_stats(builder, agg_curve, loss_curve_dt)
# ################### methods to compute statistics #################### #
def _collect_all_data(self):
# called only if 'rcurves-rlzs' in dstore; return a list of outputs
all_data = []
assets = self.datastore['asset_refs'].value[self.assetcol.array['idx']]
rlzs = self.rlzs_assoc.realizations
insured = self.oqparam.insured_losses
if self.oqparam.avg_losses:
avg_losses = self.datastore['avg_losses-rlzs'].value
else:
avg_losses = self.avg_losses
r_curves = self.datastore['rcurves-rlzs'].value
for loss_type, cbuilder in zip(
self.riskmodel.loss_types, self.riskmodel.curve_builders):
rcurves = r_curves[loss_type]
asset_values = self.vals[loss_type]
data = []
for rlz in rlzs:
average_losses = avg_losses[loss_type][:, rlz.ordinal]
average_insured_losses = (
avg_losses[loss_type + '_ins'][:, rlz.ordinal]
if insured else None)
loss_curves = _old_loss_curves(
asset_values, rcurves[:, rlz.ordinal, 0], cbuilder.ratios)
insured_curves = _old_loss_curves(
asset_values, rcurves[:, rlz.ordinal, 1],
cbuilder.ratios) if insured else None
out = scientific.Output(
assets, loss_type, rlz.ordinal, rlz.weight,
loss_curves=loss_curves,
insured_curves=insured_curves,
average_losses=average_losses,
average_insured_losses=average_insured_losses)
data.append(out)
all_data.append(data)
return all_data
# NB: the HDF5 structure is of kind <output>-stats/structural/mean, ...
# and must be so for the loss curves, since different loss_types may have
# a different discretization. This is not needed for the loss maps, but it
# is done anyway for consistency, also because in the future we could
# specify different conditional loss poes depending on the loss type
[docs] def compute_store_stats(self, rlzs, builder):
"""
Compute and store the statistical outputs.
:param rlzs: list of realizations
"""
oq = self.oqparam
ltypes = self.riskmodel.loss_types
all_stats = map(builder.build, self._collect_all_data())
if not all_stats:
return
loss_curves = numpy.zeros((self.N, self.Q1), self.loss_curve_dt)
if oq.conditional_loss_poes:
loss_maps = numpy.zeros((self.N, self.Q1), self.loss_maps_dt)
for stats in all_stats:
# there is one stat for each loss_type
cb = self.riskmodel.curve_builders[ltypes.index(stats.loss_type)]
if not cb.user_provided:
continue
sb = scientific.StatsBuilder(
oq.quantile_loss_curves, oq.conditional_loss_poes, [],
len(cb.ratios), scientific.normalize_curves_eb,
oq.insured_losses)
curves, maps = sb.get_curves_maps(stats) # matrices (Q1, N)
loss_curves[cb.loss_type] = curves.T
if oq.conditional_loss_poes:
loss_maps[cb.loss_type] = maps.T
self.datastore['loss_curves-stats'] = loss_curves
if oq.conditional_loss_poes:
self.datastore['loss_maps-stats'] = loss_maps
[docs] def build_agg_curve_stats(self, builder, agg_curve, loss_curve_dt):
"""
Build and save `agg_curve-stats` in the HDF5 file.
:param builder:
:class:`openquake.risklib.scientific.StatsBuilder` instance
:param agg_curve:
array of aggregate curves, one per realization
:param loss_curve_dt:
numpy dtype for loss curves
"""
rlzs = self.datastore['csm_info'].get_rlzs_assoc().realizations
Q1 = len(builder.mean_quantiles)
agg_curve_stats = numpy.zeros(Q1, loss_curve_dt)
for l, loss_type in enumerate(self.riskmodel.loss_types):
agg_curve_lt = agg_curve[loss_type]
outputs = []
for rlz in rlzs:
curve = agg_curve_lt[rlz.ordinal]
average_loss = curve['avg']
loss_curve = (curve['losses'], curve['poes'])
if self.oqparam.insured_losses:
average_insured_loss = curve['avg_ins']
insured_curves = [(curve['losses_ins'], curve['poes_ins'])]
else:
average_insured_loss = None
insured_curves = None
out = scientific.Output(
[None], loss_type, rlz.ordinal, rlz.weight,
loss_curves=[loss_curve],
insured_curves=insured_curves,
average_losses=[average_loss],
average_insured_losses=[average_insured_loss])
outputs.append(out)
stats = builder.build(outputs)
curves, _maps = builder.get_curves_maps(stats) # shape (Q1, 1)
acs = agg_curve_stats[loss_type]
for i, statname in enumerate(builder.mean_quantiles):
for name in acs.dtype.names:
acs[name][i] = curves[name][i]
# saving agg_curve_stats
self.datastore['agg_curve-stats'] = agg_curve_stats
self.datastore['agg_curve-stats'].attrs['nbytes'] = (
agg_curve_stats.nbytes)