# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2018 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import logging
import operator
import numpy
from openquake.baselib.general import AccumDict
from openquake.baselib.python3compat import zip, encode
from openquake.hazardlib.stats import set_rlzs_stats
from openquake.hazardlib.calc.stochastic import TWO32
from openquake.risklib import riskinput
from openquake.calculators import base
from openquake.calculators.export.loss_curves import get_loss_builder
U8 = numpy.uint8
U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64
U64 = numpy.uint64
getweight = operator.attrgetter('weight')
[docs]def build_loss_tables(dstore):
"""
Compute the total losses by rupture and losses by rlzi.
"""
oq = dstore['oqparam']
L = len(oq.loss_dt().names)
R = dstore['csm_info'].get_num_rlzs()
serials = dstore['ruptures']['serial']
idx_by_ser = dict(zip(serials, range(len(serials))))
tbl = numpy.zeros((len(serials), L), F32)
lbr = numpy.zeros((R, L), F32) # losses by rlz
for rec in dstore['losses_by_event'].value: # call .value for speed
idx = idx_by_ser[rec['eid'] // TWO32]
tbl[idx] += rec['loss']
lbr[rec['rlzi']] += rec['loss']
return tbl, lbr
[docs]def event_based_risk(riskinputs, riskmodel, param, monitor):
"""
:param riskinputs:
:class:`openquake.risklib.riskinput.RiskInput` objects
:param riskmodel:
a :class:`openquake.risklib.riskinput.CompositeRiskModel` instance
:param param:
a dictionary of parameters
:param monitor:
:class:`openquake.baselib.performance.Monitor` instance
:returns:
a dictionary of numpy arrays of shape (L, R)
"""
I = param['insured_losses'] + 1
L = len(riskmodel.lti)
for ri in riskinputs:
with monitor('getting hazard'):
ri.hazard_getter.init()
hazard = ri.hazard_getter.get_hazard()
mon = monitor('build risk curves', measuremem=False)
A = len(ri.aids)
R = ri.hazard_getter.num_rlzs
try:
avg = numpy.zeros((A, R, L * I), F32)
except MemoryError:
raise MemoryError(
'Building array avg of shape (%d, %d, %d)' % (A, R, L*I))
result = dict(aids=ri.aids, avglosses=avg)
acc = AccumDict() # accumulator eidx -> agglosses
aid2idx = {aid: idx for idx, aid in enumerate(ri.aids)}
if 'builder' in param:
builder = param['builder']
P = len(builder.return_periods)
all_curves = numpy.zeros((A, R, P), builder.loss_dt)
# update the result dictionary and the agg array with each output
for out in riskmodel.gen_outputs(ri, monitor, hazard):
if len(out.eids) == 0: # this happens for sites with no events
continue
r = out.rlzi
agglosses = numpy.zeros((len(out.eids), L*I), F32)
for l, loss_ratios in enumerate(out):
if loss_ratios is None: # for GMFs below the minimum_intensity
continue
loss_type = riskmodel.loss_types[l]
for a, asset in enumerate(out.assets):
ratios = loss_ratios[a] # shape (E, I)
aid = asset.ordinal
idx = aid2idx[aid]
aval = asset.value(loss_type)
losses = aval * ratios
if 'builder' in param:
with mon: # this is the heaviest part
for i in range(I):
lt = loss_type + '_ins' * i
all_curves[idx, r][lt] = builder.build_curve(
aval, ratios[:, i], r)
# average losses
if param['avg_losses']:
rat = ratios.sum(axis=0) * param['ses_ratio'] * aval
for i in range(I):
avg[idx, r, l + L * i] = rat[i]
# agglosses
for i in range(I):
agglosses[:, l + L * i] += losses[:, i]
# NB: I could yield the agglosses per output, but then I would
# have millions of small outputs with big data transfer and slow
# saving time
acc += dict(zip(out.eids, agglosses))
if 'builder' in param:
clp = param['conditional_loss_poes']
result['curves-rlzs'], result['curves-stats'] = builder.pair(
all_curves, param['stats'])
if R > 1 and param['individual_curves'] is False:
del result['curves-rlzs']
if clp:
result['loss_maps-rlzs'], result['loss_maps-stats'] = (
builder.build_maps(all_curves, clp, param['stats']))
if R > 1 and param['individual_curves'] is False:
del result['loss_maps-rlzs']
# store info about the GMFs, must be done at the end
result['agglosses'] = (numpy.array(list(acc)),
numpy.array(list(acc.values())))
yield result
[docs]@base.calculators.add('event_based_risk')
class EbrCalculator(base.RiskCalculator):
"""
Event based PSHA calculator generating the total losses by taxonomy
"""
core_task = event_based_risk
is_stochastic = True
[docs] def pre_execute(self):
oq = self.oqparam
super().pre_execute('event_based')
parent = self.datastore.parent
if not self.oqparam.ground_motion_fields:
return # this happens in the reportwriter
self.L = len(self.riskmodel.lti)
self.T = len(self.assetcol.tagcol)
self.A = len(self.assetcol)
self.I = oq.insured_losses + 1
if parent:
self.datastore['csm_info'] = parent['csm_info']
self.events = parent['events'].value[['eid', 'rlz']]
else:
self.events = self.datastore['events'].value[['eid', 'rlz']]
if oq.return_periods != [0]:
# setting return_periods = 0 disable loss curves and maps
eff_time = oq.investigation_time * oq.ses_per_logic_tree_path
if eff_time < 2:
logging.warn('eff_time=%s is too small to compute loss curves',
eff_time)
else:
self.param['builder'] = get_loss_builder(
parent if parent else self.datastore,
oq.return_periods, oq.loss_dt())
# sorting the eids is essential to get the epsilons in the right
# order (i.e. consistent with the one used in ebr from ruptures)
self.E = len(self.events)
eps = self.epsilon_getter()()
self.riskinputs = self.build_riskinputs('gmf', eps, self.E)
self.param['insured_losses'] = oq.insured_losses
self.param['avg_losses'] = oq.avg_losses
self.param['ses_ratio'] = oq.ses_ratio
self.param['stats'] = oq.risk_stats()
self.param['conditional_loss_poes'] = oq.conditional_loss_poes
self.taskno = 0
self.start = 0
avg_losses = self.oqparam.avg_losses
if avg_losses:
self.dset = self.datastore.create_dset(
'avg_losses-rlzs', F32, (self.A, self.R, self.L * self.I))
self.agglosses = numpy.zeros((self.E, self.L * self.I), F32)
if 'builder' in self.param:
self.build_datasets(self.param['builder'])
if parent:
parent.close() # avoid fork issues
[docs] def build_datasets(self, builder):
oq = self.oqparam
R = len(builder.weights)
stats = oq.risk_stats()
A = self.A
S = len(stats)
P = len(builder.return_periods)
C = len(self.oqparam.conditional_loss_poes)
LI = self.L * self.I
self.loss_maps_dt = (F32, (C, LI))
if oq.individual_curves or R == 1:
self.datastore.create_dset('curves-rlzs', F32, (A, R, P, LI))
self.datastore.set_attrs(
'curves-rlzs', return_periods=builder.return_periods)
if oq.conditional_loss_poes:
self.datastore.create_dset(
'loss_maps-rlzs', self.loss_maps_dt, (A, R), fillvalue=None)
if R > 1:
self.datastore.create_dset('curves-stats', F32, (A, S, P, LI))
self.datastore.set_attrs(
'curves-stats', return_periods=builder.return_periods,
stats=[encode(name) for (name, func) in stats])
if oq.conditional_loss_poes:
self.datastore.create_dset(
'loss_maps-stats', self.loss_maps_dt, (A, S),
fillvalue=None)
self.datastore.set_attrs(
'loss_maps-stats',
stats=[encode(name) for (name, func) in stats])
[docs] def epsilon_getter(self):
"""
:returns: a callable (start, stop) producing a slice of epsilons
"""
return riskinput.make_epsilon_getter(
len(self.assetcol), self.E,
self.oqparam.asset_correlation,
self.oqparam.master_seed,
self.oqparam.ignore_covs or not self.riskmodel.covs)
[docs] def save_losses(self, dic):
"""
Save the event loss tables incrementally.
:param dic:
dictionary with agglosses, avglosses
"""
idxs, agglosses = dic.pop('agglosses')
if len(idxs):
self.agglosses[idxs] += agglosses
aids = dic.pop('aids')
if self.oqparam.avg_losses:
self.dset[aids, :, :] = dic.pop('avglosses')
self._save_curves(dic, aids)
self._save_maps(dic, aids)
self.taskno += 1
def _save_curves(self, dic, aids):
for key in ('curves-rlzs', 'curves-stats'):
array = dic.get(key) # shape (A, S, P)
if array is not None:
shp = array.shape + (self.L * self.I,)
self.datastore[key][aids, ...] = array.view(F32).reshape(shp)
def _save_maps(self, dic, aids):
for key in ('loss_maps-rlzs', 'loss_maps-stats'):
array = dic.get(key) # shape (A, S, C, LI)
if array is not None:
self.datastore[key][aids, ...] = array
[docs] def combine(self, dummy, res):
"""
:param dummy: unused parameter
:param res: a result dictionary
"""
with self.monitor('saving losses', measuremem=True):
self.save_losses(res)
return 1
[docs] def post_execute(self, result):
"""
Save risk data and build the aggregate loss curves
"""
logging.info('Saving event loss table')
elt_dt = numpy.dtype(
[('eid', U64), ('rlzi', U16), ('loss', (F32, (self.L * self.I,)))])
with self.monitor('saving event loss table', measuremem=True):
agglosses = numpy.fromiter(
((eid, rlz, losses)
for (eid, rlz), losses in zip(self.events, self.agglosses)
if losses.any()), elt_dt)
self.datastore['losses_by_event'] = agglosses
loss_types = ' '.join(self.oqparam.loss_dt().names)
self.datastore.set_attrs('losses_by_event', loss_types=loss_types)
self.postproc()
[docs] def postproc(self):
"""
Build aggregate loss curves in process
"""
dstore = self.datastore
self.before_export() # set 'realizations'
oq = self.oqparam
stats = oq. risk_stats()
# store avg_losses-stats
if oq.avg_losses:
set_rlzs_stats(self.datastore, 'avg_losses')
try:
b = self.param['builder']
except KeyError: # don't build auxiliary tables
return
if dstore.parent:
dstore.parent.open('r') # to read the ruptures
if 'ruptures' in self.datastore and len(self.datastore['ruptures']):
logging.info('Building loss tables')
with self.monitor('building loss tables', measuremem=True):
rlt, lbr = build_loss_tables(dstore)
dstore['rup_loss_table'] = rlt
dstore['losses_by_rlzi'] = lbr
ridx = [rlt[:, lti].argmax() for lti in range(self.L)]
dstore.set_attrs('rup_loss_table', ridx=ridx)
logging.info('Building aggregate loss curves')
with self.monitor('building agg_curves', measuremem=True):
array, arr_stats = b.build(dstore['losses_by_event'].value, stats)
units = self.assetcol.cost_calculator.get_units(
loss_types=array.dtype.names)
if oq.individual_curves or self.R == 1:
self.datastore['agg_curves-rlzs'] = array
self.datastore.set_attrs(
'agg_curves-rlzs',
return_periods=b.return_periods, units=units)
if arr_stats is not None:
self.datastore['agg_curves-stats'] = arr_stats
self.datastore.set_attrs(
'agg_curves-stats', return_periods=b.return_periods,
stats=[encode(name) for (name, func) in stats], units=units)