Source code for openquake.calculators.event_based_risk

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2019 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import logging
import operator
import numpy

from openquake.baselib.general import AccumDict, group_array, fast_agg
from openquake.baselib.python3compat import zip, encode
from openquake.hazardlib.stats import set_rlzs_stats
from openquake.risklib import riskinput, riskmodels
from openquake.calculators import base
from openquake.calculators.export.loss_curves import get_loss_builder

U8 = numpy.uint8
U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64
getweight = operator.attrgetter('weight')


[docs]def build_loss_tables(dstore): """ Compute the total losses by rupture and losses by rlzi. """ oq = dstore['oqparam'] R = dstore['csm_info'].get_num_rlzs() lbe = dstore['losses_by_event'][()] loss = lbe['loss'] shp = (R,) + lbe.dtype['loss'].shape lbr = numpy.zeros(shp, F32) # losses by rlz losses_by_rlz = fast_agg(lbe['rlzi'], loss) lbr[:len(losses_by_rlz)] = losses_by_rlz dstore['losses_by_rlzi'] = lbr rup_id = dstore['events']['rup_id'] if len(shp) > 2: loss = loss.sum(axis=tuple(range(1, len(shp) - 1))) losses_by_rupid = fast_agg(rup_id[lbe['event_id']], loss) lst = [('rup_id', U32)] + [(name, F32) for name in oq.loss_names] tbl = numpy.zeros(len(losses_by_rupid), lst) tbl['rup_id'] = numpy.arange(len(tbl)) for li, name in enumerate(oq.loss_names): tbl[name] = losses_by_rupid[:, li] tbl.sort(order=oq.loss_names[0]) dstore['rup_loss_table'] = tbl
[docs]def event_based_risk(riskinputs, crmodel, param, monitor): """ :param riskinputs: :class:`openquake.risklib.riskinput.RiskInput` objects :param crmodel: a :class:`openquake.risklib.riskinput.CompositeRiskModel` instance :param param: a dictionary of parameters :param monitor: :class:`openquake.baselib.performance.Monitor` instance :returns: a dictionary of numpy arrays of shape (L, R) """ L = len(crmodel.lti) epspath = param['epspath'] for ri in riskinputs: with monitor('getting hazard'): ri.hazard_getter.init() hazard = ri.hazard_getter.get_hazard() mon = monitor('build risk curves', measuremem=False) A = len(ri.aids) R = ri.hazard_getter.num_rlzs try: avg = numpy.zeros((A, R, L), F32) except MemoryError: raise MemoryError( 'Building array avg of shape (%d, %d, %d)' % (A, R, L)) result = dict(aids=ri.aids, avglosses=avg) acc = AccumDict() # accumulator eidx -> agglosses aid2idx = {aid: idx for idx, aid in enumerate(ri.aids)} if 'builder' in param: builder = param['builder'] P = len(builder.return_periods) all_curves = numpy.zeros((A, R, P), builder.loss_dt) # update the result dictionary and the agg array with each output for out in ri.gen_outputs(crmodel, monitor, epspath, hazard): if len(out.eids) == 0: # this happens for sites with no events continue r = out.rlzi agglosses = numpy.zeros((len(out.eids), L), F32) for l, loss_type in enumerate(crmodel.loss_types): loss_ratios = out[loss_type] if loss_ratios is None: # for GMFs below the minimum_intensity continue avalues = riskmodels.get_values(loss_type, ri.assets) for a, asset in enumerate(ri.assets): aval = avalues[a] aid = asset['ordinal'] idx = aid2idx[aid] ratios = loss_ratios[a] # length E # average losses avg[idx, r, l] = ( ratios.sum(axis=0) * param['ses_ratio'] * aval) # agglosses agglosses[:, l] += ratios * aval if 'builder' in param: with mon: # this is the heaviest part all_curves[idx, r][loss_type] = ( builder.build_curve(aval, ratios, r)) # NB: I could yield the agglosses per output, but then I would # have millions of small outputs with big data transfer and slow # saving time acc += dict(zip(out.eids, agglosses)) if 'builder' in param: clp = param['conditional_loss_poes'] result['curves-rlzs'], result['curves-stats'] = builder.pair( all_curves, param['stats']) if R > 1 and param['individual_curves'] is False: del result['curves-rlzs'] if clp: result['loss_maps-rlzs'], result['loss_maps-stats'] = ( builder.build_maps(all_curves, clp, param['stats'])) if R > 1 and param['individual_curves'] is False: del result['loss_maps-rlzs'] # store info about the GMFs, must be done at the end result['agglosses'] = (numpy.array(list(acc)), numpy.array(list(acc.values()))) yield result
[docs]@base.calculators.add('event_based_risk') class EbrCalculator(base.RiskCalculator): """ Event based PSHA calculator generating the total losses by taxonomy """ core_task = event_based_risk is_stochastic = True precalc = 'event_based' accept_precalc = ['event_based', 'event_based_risk', 'ucerf_hazard', 'ebrisk']
[docs] def pre_execute(self): oq = self.oqparam super().pre_execute() parent = self.datastore.parent if not oq.ground_motion_fields: return # this happens in the reportwriter self.L = len(self.crmodel.lti) self.T = len(self.assetcol.tagcol) self.A = len(self.assetcol) if parent: self.datastore['csm_info'] = parent['csm_info'] self.events = parent['events'][('id', 'rlz_id')] else: self.events = self.datastore['events'][('id', 'rlz_id')] if oq.return_periods != [0]: # setting return_periods = 0 disable loss curves and maps eff_time = oq.investigation_time * oq.ses_per_logic_tree_path if eff_time < 2: logging.warning( 'eff_time=%s is too small to compute loss curves', eff_time) else: self.param['builder'] = get_loss_builder( parent if parent else self.datastore, oq.return_periods, oq.loss_dt()) # sorting the eids is essential to get the epsilons in the right # order (i.e. consistent with the one used in ebr from ruptures) self.riskinputs = self.build_riskinputs('gmf') self.param['epspath'] = riskinput.cache_epsilons( self.datastore, oq, self.assetcol, self.crmodel, self.E) self.param['avg_losses'] = oq.avg_losses self.param['ses_ratio'] = oq.ses_ratio self.param['stats'] = list(oq.hazard_stats().items()) self.param['conditional_loss_poes'] = oq.conditional_loss_poes self.taskno = 0 self.start = 0 avg_losses = oq.avg_losses if avg_losses: self.dset = self.datastore.create_dset( 'avg_losses-rlzs', F32, (self.A, self.R, self.L)) self.agglosses = numpy.zeros((self.E, self.L), F32) if 'builder' in self.param: logging.warning( 'Building the loss curves and maps for each asset is ' 'deprecated: consider building the aggregate curves and ' 'maps with the ebrisk calculator instead') self.build_datasets(self.param['builder']) if parent: parent.close() # avoid concurrent reading issues
[docs] def build_datasets(self, builder): oq = self.oqparam R = len(builder.weights) stats = self.param['stats'] A = self.A S = len(stats) P = len(builder.return_periods) C = len(oq.conditional_loss_poes) L = self.L self.loss_maps_dt = (F32, (C, L)) if oq.individual_curves or R == 1: self.datastore.create_dset('curves-rlzs', F32, (A, R, P, L)) self.datastore.set_attrs( 'curves-rlzs', return_periods=builder.return_periods) if oq.conditional_loss_poes: self.datastore.create_dset( 'loss_maps-rlzs', self.loss_maps_dt, (A, R), fillvalue=None) if R > 1: self.datastore.create_dset('curves-stats', F32, (A, S, P, L)) self.datastore.set_attrs( 'curves-stats', return_periods=builder.return_periods, stats=[encode(name) for (name, func) in stats]) if oq.conditional_loss_poes: self.datastore.create_dset( 'loss_maps-stats', self.loss_maps_dt, (A, S), fillvalue=None) self.datastore.set_attrs( 'loss_maps-stats', stats=[encode(name) for (name, func) in stats])
[docs] def save_losses(self, dic): """ Save the event loss tables incrementally. :param dic: dictionary with agglosses, avglosses """ idxs, agglosses = dic.pop('agglosses') if len(idxs): self.agglosses[idxs] += agglosses aids = dic.pop('aids') if self.oqparam.avg_losses: self.dset[aids, :, :] = dic.pop('avglosses') self._save_curves(dic, aids) self._save_maps(dic, aids) self.taskno += 1
def _save_curves(self, dic, aids): for key in ('curves-rlzs', 'curves-stats'): array = dic.get(key) # shape (A, S, P) if array is not None: shp = array.shape + (self.L,) self.datastore[key][aids, ...] = array.view(F32).reshape(shp) def _save_maps(self, dic, aids): for key in ('loss_maps-rlzs', 'loss_maps-stats'): array = dic.get(key) # shape (A, S, C, LI) if array is not None: self.datastore[key][aids, ...] = array
[docs] def combine(self, dummy, res): """ :param dummy: unused parameter :param res: a result dictionary """ with self.monitor('saving losses', measuremem=True): self.save_losses(res) return 1
[docs] def post_execute(self, result): """ Save risk data and build the aggregate loss curves """ oq = self.oqparam loss_types = oq.loss_dt().names logging.info('Saving event loss table') elt_dt = numpy.dtype( [('event_id', U32), ('rlzi', U16), ('loss', (F32, (self.L,)))]) with self.monitor('saving event loss table', measuremem=True): agglosses = numpy.fromiter( ((eid, rlz, losses) for (eid, rlz), losses in zip(self.events, self.agglosses) if losses.any()), elt_dt) self.datastore['losses_by_event'] = agglosses self.datastore.set_attrs('losses_by_event', loss_types=loss_types) self.postproc()
[docs] def postproc(self): """ Build aggregate loss curves in process """ dstore = self.datastore self.before_export() # set 'realizations' oq = self.oqparam stats = self.param['stats'] # store avg_losses-stats if oq.avg_losses: set_rlzs_stats(self.datastore, 'avg_losses') try: b = self.param['builder'] except KeyError: # don't build auxiliary tables return if dstore.parent: dstore.parent.open('r') # to read the ruptures logging.info('Building loss tables') build_loss_tables(dstore) logging.info('Building aggregate loss curves') with self.monitor('building agg_curves', measuremem=True): lbr = group_array(dstore['losses_by_event'][()], 'rlzi') dic = {r: arr['loss'] for r, arr in lbr.items()} array, arr_stats = b.build(dic, stats) loss_types = oq.loss_dt().names units = self.datastore['cost_calculator'].get_units(loss_types) if oq.individual_curves or self.R == 1: self.datastore['agg_curves-rlzs'] = array # shape (P, R, L) self.datastore.set_attrs( 'agg_curves-rlzs', shape_descr=['return_periods', 'rlzs', 'loss_types'], return_periods=b.return_periods, rlzs=numpy.arange(self.R), loss_types=loss_types, units=units) if arr_stats is not None: self.datastore['agg_curves-stats'] = arr_stats # shape (P, S, L) self.datastore.set_attrs( 'agg_curves-stats', shape_descr=['return_periods', 'stats', 'loss_types'], return_periods=b.return_periods, stats=[encode(name) for (name, func) in stats], loss_types=loss_types, units=units)