Source code for openquake.calculators.event_based_risk

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2019 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import logging
import operator
import numpy

from openquake.baselib.general import AccumDict, group_array
from openquake.baselib.python3compat import zip, encode
from openquake.hazardlib.stats import set_rlzs_stats
from openquake.hazardlib.calc.stochastic import TWO32
from openquake.risklib import riskinput, riskmodels
from openquake.calculators import base
from openquake.calculators.export.loss_curves import get_loss_builder

U8 = numpy.uint8
U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64
U64 = numpy.uint64
getweight = operator.attrgetter('weight')


[docs]def build_loss_tables(dstore): """ Compute the total losses by rupture and losses by rlzi. """ oq = dstore['oqparam'] L = len(oq.loss_dt().names) R = dstore['csm_info'].get_num_rlzs() serials = dstore['ruptures']['serial'] idx_by_ser = dict(zip(serials, range(len(serials)))) tbl = numpy.zeros((len(serials), L), F32) lbr = numpy.zeros((R, L), F32) # losses by rlz for rec in dstore['losses_by_event'][()]: # call .value for speed idx = idx_by_ser[rec['eid'] // TWO32] tbl[idx] += rec['loss'] lbr[rec['rlzi']] += rec['loss'] return tbl, lbr
[docs]def event_based_risk(riskinputs, riskmodel, param, monitor): """ :param riskinputs: :class:`openquake.risklib.riskinput.RiskInput` objects :param riskmodel: a :class:`openquake.risklib.riskinput.CompositeRiskModel` instance :param param: a dictionary of parameters :param monitor: :class:`openquake.baselib.performance.Monitor` instance :returns: a dictionary of numpy arrays of shape (L, R) """ L = len(riskmodel.lti) epspath = param['epspath'] for ri in riskinputs: with monitor('getting hazard'): ri.hazard_getter.init() hazard = ri.hazard_getter.get_hazard() mon = monitor('build risk curves', measuremem=False) A = len(ri.aids) R = ri.hazard_getter.num_rlzs try: avg = numpy.zeros((A, R, L), F32) except MemoryError: raise MemoryError( 'Building array avg of shape (%d, %d, %d)' % (A, R, L)) result = dict(aids=ri.aids, avglosses=avg) acc = AccumDict() # accumulator eidx -> agglosses aid2idx = {aid: idx for idx, aid in enumerate(ri.aids)} if 'builder' in param: builder = param['builder'] P = len(builder.return_periods) all_curves = numpy.zeros((A, R, P), builder.loss_dt) # update the result dictionary and the agg array with each output for out in riskmodel.gen_outputs(ri, monitor, epspath, hazard): if len(out.eids) == 0: # this happens for sites with no events continue r = out.rlzi agglosses = numpy.zeros((len(out.eids), L), F32) for l, loss_type in enumerate(riskmodel.loss_types): loss_ratios = out[loss_type] if loss_ratios is None: # for GMFs below the minimum_intensity continue avalues = riskmodels.get_values(loss_type, ri.assets) for a, asset in enumerate(ri.assets): aval = avalues[a] aid = asset['ordinal'] idx = aid2idx[aid] ratios = loss_ratios[a] # length E # average losses avg[idx, r, l] = ( ratios.sum(axis=0) * param['ses_ratio'] * aval) # agglosses agglosses[:, l] += ratios * aval if 'builder' in param: with mon: # this is the heaviest part all_curves[idx, r][loss_type] = ( builder.build_curve(aval, ratios, r)) # NB: I could yield the agglosses per output, but then I would # have millions of small outputs with big data transfer and slow # saving time acc += dict(zip(out.eids, agglosses)) if 'builder' in param: clp = param['conditional_loss_poes'] result['curves-rlzs'], result['curves-stats'] = builder.pair( all_curves, param['stats']) if R > 1 and param['individual_curves'] is False: del result['curves-rlzs'] if clp: result['loss_maps-rlzs'], result['loss_maps-stats'] = ( builder.build_maps(all_curves, clp, param['stats'])) if R > 1 and param['individual_curves'] is False: del result['loss_maps-rlzs'] # store info about the GMFs, must be done at the end result['agglosses'] = (numpy.array(list(acc)), numpy.array(list(acc.values()))) yield result
[docs]@base.calculators.add('event_based_risk') class EbrCalculator(base.RiskCalculator): """ Event based PSHA calculator generating the total losses by taxonomy """ core_task = event_based_risk is_stochastic = True precalc = 'event_based' accept_precalc = ['event_based', 'event_based_risk', 'ucerf_hazard', 'ebrisk']
[docs] def pre_execute(self): oq = self.oqparam super().pre_execute() parent = self.datastore.parent if not oq.ground_motion_fields: return # this happens in the reportwriter self.L = len(self.riskmodel.lti) self.T = len(self.assetcol.tagcol) self.A = len(self.assetcol) if parent: self.datastore['csm_info'] = parent['csm_info'] self.events = parent['events'][('id', 'rlz')] else: self.events = self.datastore['events'][('id', 'rlz')] if oq.return_periods != [0]: # setting return_periods = 0 disable loss curves and maps eff_time = oq.investigation_time * oq.ses_per_logic_tree_path if eff_time < 2: logging.warning( 'eff_time=%s is too small to compute loss curves', eff_time) else: self.param['builder'] = get_loss_builder( parent if parent else self.datastore, oq.return_periods, oq.loss_dt()) # sorting the eids is essential to get the epsilons in the right # order (i.e. consistent with the one used in ebr from ruptures) self.riskinputs = self.build_riskinputs('gmf') self.param['epspath'] = riskinput.cache_epsilons( self.datastore, oq, self.assetcol, self.riskmodel, self.E) self.param['avg_losses'] = oq.avg_losses self.param['ses_ratio'] = oq.ses_ratio self.param['stats'] = list(oq.hazard_stats().items()) self.param['conditional_loss_poes'] = oq.conditional_loss_poes self.taskno = 0 self.start = 0 avg_losses = oq.avg_losses if avg_losses: self.dset = self.datastore.create_dset( 'avg_losses-rlzs', F32, (self.A, self.R, self.L)) self.agglosses = numpy.zeros((self.E, self.L), F32) if 'builder' in self.param: logging.warning( 'Building the loss curves and maps for each asset is ' 'deprecated: consider building the aggregate curves and ' 'maps with the ebrisk calculator instead') self.build_datasets(self.param['builder']) if parent: parent.close() # avoid concurrent reading issues
[docs] def build_datasets(self, builder): oq = self.oqparam R = len(builder.weights) stats = self.param['stats'] A = self.A S = len(stats) P = len(builder.return_periods) C = len(oq.conditional_loss_poes) L = self.L self.loss_maps_dt = (F32, (C, L)) if oq.individual_curves or R == 1: self.datastore.create_dset('curves-rlzs', F32, (A, R, P, L)) self.datastore.set_attrs( 'curves-rlzs', return_periods=builder.return_periods) if oq.conditional_loss_poes: self.datastore.create_dset( 'loss_maps-rlzs', self.loss_maps_dt, (A, R), fillvalue=None) if R > 1: self.datastore.create_dset('curves-stats', F32, (A, S, P, L)) self.datastore.set_attrs( 'curves-stats', return_periods=builder.return_periods, stats=[encode(name) for (name, func) in stats]) if oq.conditional_loss_poes: self.datastore.create_dset( 'loss_maps-stats', self.loss_maps_dt, (A, S), fillvalue=None) self.datastore.set_attrs( 'loss_maps-stats', stats=[encode(name) for (name, func) in stats])
[docs] def save_losses(self, dic): """ Save the event loss tables incrementally. :param dic: dictionary with agglosses, avglosses """ idxs, agglosses = dic.pop('agglosses') if len(idxs): self.agglosses[idxs] += agglosses aids = dic.pop('aids') if self.oqparam.avg_losses: self.dset[aids, :, :] = dic.pop('avglosses') self._save_curves(dic, aids) self._save_maps(dic, aids) self.taskno += 1
def _save_curves(self, dic, aids): for key in ('curves-rlzs', 'curves-stats'): array = dic.get(key) # shape (A, S, P) if array is not None: shp = array.shape + (self.L,) self.datastore[key][aids, ...] = array.view(F32).reshape(shp) def _save_maps(self, dic, aids): for key in ('loss_maps-rlzs', 'loss_maps-stats'): array = dic.get(key) # shape (A, S, C, LI) if array is not None: self.datastore[key][aids, ...] = array
[docs] def combine(self, dummy, res): """ :param dummy: unused parameter :param res: a result dictionary """ with self.monitor('saving losses', measuremem=True): self.save_losses(res) return 1
[docs] def post_execute(self, result): """ Save risk data and build the aggregate loss curves """ logging.info('Saving event loss table') elt_dt = numpy.dtype( [('eid', U64), ('rlzi', U16), ('loss', (F32, (self.L,)))]) with self.monitor('saving event loss table', measuremem=True): agglosses = numpy.fromiter( ((eid, rlz, losses) for (eid, rlz), losses in zip(self.events, self.agglosses) if losses.any()), elt_dt) self.datastore['losses_by_event'] = agglosses loss_types = ' '.join(self.oqparam.loss_dt().names) self.datastore.set_attrs('losses_by_event', loss_types=loss_types) self.postproc()
[docs] def postproc(self): """ Build aggregate loss curves in process """ dstore = self.datastore self.before_export() # set 'realizations' oq = self.oqparam stats = self.param['stats'] # store avg_losses-stats if oq.avg_losses: set_rlzs_stats(self.datastore, 'avg_losses') try: b = self.param['builder'] except KeyError: # don't build auxiliary tables return if dstore.parent: dstore.parent.open('r') # to read the ruptures if 'ruptures' in self.datastore and len(self.datastore['ruptures']): logging.info('Building loss tables') with self.monitor('building loss tables', measuremem=True): rlt, lbr = build_loss_tables(dstore) dstore['rup_loss_table'] = rlt dstore['losses_by_rlzi'] = lbr ridx = [rlt[:, lti].argmax() for lti in range(self.L)] dstore.set_attrs('rup_loss_table', ridx=ridx) logging.info('Building aggregate loss curves') with self.monitor('building agg_curves', measuremem=True): lbr = group_array(dstore['losses_by_event'][()], 'rlzi') dic = {r: arr['loss'] for r, arr in lbr.items()} array, arr_stats = b.build(dic, stats) loss_types = ' '.join(oq.loss_dt().names) units = self.datastore['cost_calculator'].get_units(loss_types.split()) if oq.individual_curves or self.R == 1: self.datastore['agg_curves-rlzs'] = array # shape (P, R, L) self.datastore.set_attrs( 'agg_curves-rlzs', return_periods=b.return_periods, loss_types=loss_types, units=units) if arr_stats is not None: self.datastore['agg_curves-stats'] = arr_stats # shape (P, S, L) self.datastore.set_attrs( 'agg_curves-stats', return_periods=b.return_periods, stats=[encode(name) for (name, func) in stats], loss_types=loss_types, units=units)