# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2018 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import logging
import operator
import numpy
from openquake.baselib.parallel import Starmap
from openquake.baselib.python3compat import zip, encode
from openquake.baselib.general import AccumDict, humansize
from openquake.hazardlib.stats import set_rlzs_stats
from openquake.risklib import riskinput
from openquake.calculators import base
from openquake.calculators.export.loss_curves import get_loss_builder
U8 = numpy.uint8
U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64
U64 = numpy.uint64
getweight = operator.attrgetter('weight')
[docs]def build_loss_tables(dstore):
"""
Compute the total losses by rupture and losses by rlzi.
"""
oq = dstore['oqparam']
L = len(oq.loss_dt().names)
R = dstore['csm_info'].get_num_rlzs()
events = dstore['events']
serials = dstore['ruptures']['serial']
rup_by_eid = dict(zip(events['eid'], events['rup_id']))
idx_by_ser = dict(zip(serials, range(len(serials))))
tbl = numpy.zeros((len(serials), L), F32)
lbr = numpy.zeros((R, L), F32) # losses by rlz
for rec in dstore['losses_by_event'].value: # call .value for speed
rupid = rup_by_eid[rec['eid']]
tbl[idx_by_ser[rupid]] += rec['loss']
lbr[rec['rlzi']] += rec['loss']
return tbl, lbr
[docs]def event_based_risk(riskinputs, riskmodel, param, monitor):
"""
:param riskinputs:
:class:`openquake.risklib.riskinput.RiskInput` objects
:param riskmodel:
a :class:`openquake.risklib.riskinput.CompositeRiskModel` instance
:param param:
a dictionary of parameters
:param monitor:
:class:`openquake.baselib.performance.Monitor` instance
:returns:
a dictionary of numpy arrays of shape (L, R)
"""
results = []
mon = monitor('build risk curves', measuremem=False)
I = param['insured_losses'] + 1
L = len(riskmodel.lti)
param['lrs_dt'] = numpy.dtype([('rlzi', U16), ('ratios', (F32, (L * I,)))])
for ri in riskinputs:
with monitor('%s.init' % ri.hazard_getter.__class__.__name__):
ri.hazard_getter.init()
eids = ri.hazard_getter.eids
eid2idx = ri.hazard_getter.eid2idx
A = len(ri.aids)
E = len(eids)
R = ri.hazard_getter.num_rlzs
agg = numpy.zeros((E, R, L * I), F32)
avg = AccumDict(
accum={} if ri.by_site or not param['avg_losses']
else numpy.zeros(A, F64))
result = dict(aids=ri.aids, avglosses=avg)
if 'builder' in param:
builder = param['builder']
P = len(builder.return_periods)
all_curves = numpy.zeros((A, R, P), builder.loss_dt)
aid2idx = {aid: idx for idx, aid in enumerate(ri.aids)}
# update the result dictionary and the agg array with each output
for out in riskmodel.gen_outputs(ri, monitor):
if len(out.eids) == 0: # this happens for sites with no events
continue
r = out.rlzi
for l, loss_ratios in enumerate(out):
if loss_ratios is None: # for GMFs below the minimum_intensity
continue
loss_type = riskmodel.loss_types[l]
indices = numpy.array([eid2idx[eid] for eid in out.eids])
for a, asset in enumerate(out.assets):
ratios = loss_ratios[a] # shape (E, I)
aid = asset.ordinal
aval = asset.value(loss_type)
losses = aval * ratios
if 'builder' in param:
with mon: # this is the heaviest part
idx = aid2idx[aid]
for i in range(I):
lt = loss_type + '_ins' * i
all_curves[idx, r][lt] = builder.build_curve(
aval, ratios[:, i], r)
# average losses
if param['avg_losses']:
rat = ratios.sum(axis=0) * param['ses_ratio']
for i in range(I):
lba = avg[l + L * i, r]
try:
lba[aid] += rat[i]
except KeyError:
lba[aid] = rat[i]
# agglosses
for i in range(I):
li = l + L * i
# this is the critical loop: it is important to keep it
# vectorized in terms of the event indices
agg[indices, r, li] += losses[:, i]
idx = agg.nonzero() # return only the nonzero values
result['agglosses'] = (idx, agg[idx])
if 'builder' in param:
clp = param['conditional_loss_poes']
result['curves-rlzs'], result['curves-stats'] = builder.pair(
all_curves, param['stats'])
if R > 1 and param['individual_curves'] is False:
del result['curves-rlzs']
if clp:
result['loss_maps-rlzs'], result['loss_maps-stats'] = (
builder.build_maps(all_curves, clp, param['stats']))
if R > 1 and param['individual_curves'] is False:
del result['loss_maps-rlzs']
# store info about the GMFs, must be done at the end
result['gmdata'] = ri.gmdata
results.append(result)
return results
[docs]@base.calculators.add('event_based_risk')
class EbrCalculator(base.RiskCalculator):
"""
Event based PSHA calculator generating the total losses by taxonomy
"""
core_task = event_based_risk
is_stochastic = True
[docs] def pre_execute(self):
oq = self.oqparam
super().pre_execute('event_based')
parent = self.datastore.parent
if not self.oqparam.ground_motion_fields:
return # this happens in the reportwriter
if not parent:
# hazard + risk were done in the same calculation
# save memory by resetting the processpool (if any)
Starmap.shutdown()
Starmap.init()
self.L = len(self.riskmodel.lti)
self.T = len(self.assetcol.tagcol)
self.A = len(self.assetcol)
self.I = oq.insured_losses + 1
if parent:
self.datastore['csm_info'] = parent['csm_info']
self.eids = sorted(parent['events']['eid'])
else:
self.eids = sorted(self.datastore['events']['eid'])
if oq.return_periods != [0]:
# setting return_periods = 0 disable loss curves and maps
eff_time = oq.investigation_time * oq.ses_per_logic_tree_path
if eff_time < 2:
logging.warn('eff_time=%s is too small to compute loss curves',
eff_time)
else:
self.param['builder'] = get_loss_builder(
parent if parent else self.datastore,
oq.return_periods, oq.loss_dt())
# sorting the eids is essential to get the epsilons in the right
# order (i.e. consistent with the one used in ebr from ruptures)
self.E = len(self.eids)
eps = self.epsilon_getter()()
# FIXME: commented because it can be misleading
# if not oq.ignore_covs:
# logging.info('Generating %s of epsilons',
# humansize(self.A * self.E * 4))
self.riskinputs = self.build_riskinputs('gmf', eps, self.E)
self.param['insured_losses'] = oq.insured_losses
self.param['avg_losses'] = oq.avg_losses
self.param['ses_ratio'] = oq.ses_ratio
self.param['stats'] = oq.risk_stats()
self.param['conditional_loss_poes'] = oq.conditional_loss_poes
self.taskno = 0
self.start = 0
avg_losses = self.oqparam.avg_losses
if avg_losses:
self.dset = self.datastore.create_dset(
'avg_losses-rlzs', F32, (self.A, self.R, self.L * self.I))
self.agglosses = numpy.zeros((self.E, self.R, self.L * self.I), F32)
self.num_losses = numpy.zeros((self.A, self.R), U32)
if 'builder' in self.param:
self.build_datasets(self.param['builder'])
if parent:
parent.close() # avoid fork issues
[docs] def build_datasets(self, builder):
oq = self.oqparam
R = len(builder.weights)
stats = oq.risk_stats()
A = self.A
S = len(stats)
P = len(builder.return_periods)
C = len(self.oqparam.conditional_loss_poes)
self.loss_maps_dt = oq.loss_dt((F32, (C,)))
if oq.individual_curves:
self.datastore.create_dset(
'curves-rlzs', builder.loss_dt, (A, R, P), fillvalue=None)
self.datastore.set_attrs(
'curves-rlzs', return_periods=builder.return_periods)
if oq.conditional_loss_poes:
self.datastore.create_dset(
'loss_maps-rlzs', self.loss_maps_dt, (A, R), fillvalue=None)
if R > 1:
self.datastore.create_dset(
'curves-stats', builder.loss_dt, (A, S, P), fillvalue=None)
self.datastore.set_attrs(
'curves-stats', return_periods=builder.return_periods,
stats=[encode(name) for (name, func) in stats])
if oq.conditional_loss_poes:
self.datastore.create_dset(
'loss_maps-stats', self.loss_maps_dt, (A, S),
fillvalue=None)
self.datastore.set_attrs(
'loss_maps-stats',
stats=[encode(name) for (name, func) in stats])
[docs] def epsilon_getter(self):
"""
:returns: a callable (start, stop) producing a slice of epsilons
"""
return riskinput.make_epsilon_getter(
len(self.assetcol), self.E,
self.oqparam.asset_correlation,
self.oqparam.master_seed,
self.oqparam.ignore_covs or not self.riskmodel.covs)
[docs] def save_losses(self, dic, offset=0):
"""
Save the event loss tables incrementally.
:param dic:
dictionary with agglosses, avglosses
:param offset:
realization offset
"""
aids = dic.pop('aids')
agglosses = dic.pop('agglosses')
avglosses = dic.pop('avglosses')
with self.monitor('saving event loss table', autoflush=True):
idx, agg = agglosses
self.agglosses[idx] += agg
if not hasattr(self, 'vals'):
self.vals = self.assetcol.values()
with self.monitor('saving avg_losses-rlzs'):
for (li, r), ratios in avglosses.items():
l = li if li < self.L else li - self.L
vs = self.vals[self.riskmodel.loss_types[l]]
self.dset[aids, r, li] += numpy.array(
[ratios.get(aid, 0) * vs[aid] for aid in aids])
self._save_curves(dic, aids)
self._save_maps(dic, aids)
self.taskno += 1
def _save_curves(self, dic, aids):
for key in ('curves-rlzs', 'curves-stats'):
array = dic.get(key) # shape (A, S, P)
if array is not None:
for aid, arr in zip(aids, array):
self.datastore[key][aid] = arr
def _save_maps(self, dic, aids):
for key in ('loss_maps-rlzs', 'loss_maps-stats'):
array = dic.get(key) # shape (A, S)
if array is not None:
loss_maps = numpy.zeros(array.shape[:2], self.loss_maps_dt)
for lti, lt in enumerate(self.loss_maps_dt.names):
loss_maps[lt] = array[:, :, :, lti]
for aid, arr in zip(aids, loss_maps):
self.datastore[key][aid] = arr
[docs] def combine(self, dummy, results):
"""
:param dummy: unused parameter
:param res: a list of result dictionaries
"""
for res in results:
self.save_losses(res, offset=0)
return 1
[docs] def post_execute(self, result):
"""
Save risk data and build the aggregate loss curves
"""
logging.info('Saving event loss table')
elt_dt = numpy.dtype(
[('eid', U64), ('rlzi', U16), ('loss', (F32, (self.L * self.I,)))])
with self.monitor('saving event loss table', measuremem=True):
# saving zeros is a lot faster than adding an `if loss.sum()`
agglosses = numpy.fromiter(
((e, r, loss)
for e, losses in zip(self.eids, self.agglosses)
for r, loss in enumerate(losses) if loss.sum()), elt_dt)
self.datastore['losses_by_event'] = agglosses
self.postproc()
[docs] def postproc(self):
"""
Build aggregate loss curves in process
"""
dstore = self.datastore
self.before_export() # set 'realizations'
oq = self.oqparam
stats = oq. risk_stats()
# store avg_losses-stats
if oq.avg_losses:
set_rlzs_stats(self.datastore, 'avg_losses')
try:
b = self.param['builder']
except KeyError: # don't build auxiliary tables
return
if dstore.parent:
dstore.parent.open('r') # to read the ruptures
if 'ruptures' in dstore:
logging.info('Building loss tables')
with self.monitor('building loss tables', measuremem=True):
rlt, lbr = build_loss_tables(dstore)
dstore['rup_loss_table'] = rlt
dstore['losses_by_rlzi'] = lbr
ridx = [rlt[:, lti].argmax() for lti in range(self.L)]
dstore.set_attrs('rup_loss_table', ridx=ridx)
logging.info('Building aggregate loss curves')
with self.monitor('building agg_curves', measuremem=True):
array, arr_stats = b.build(dstore['losses_by_event'].value, stats)
units = self.assetcol.cost_calculator.get_units(
loss_types=array.dtype.names)
if oq.individual_curves:
self.datastore['agg_curves-rlzs'] = array
self.datastore.set_attrs(
'agg_curves-rlzs',
return_periods=b.return_periods, units=units)
if arr_stats is not None:
self.datastore['agg_curves-stats'] = arr_stats
self.datastore.set_attrs(
'agg_curves-stats', return_periods=b.return_periods,
stats=[encode(name) for (name, func) in stats], units=units)