Source code for openquake.calculators.scenario_risk

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2021 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

import copy
import logging
import numpy
from openquake.hazardlib.stats import set_rlzs_stats
from openquake.risklib import scientific, riskinput
from openquake.calculators import base, post_risk, views

U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64  # higher precision to avoid task order dependency


[docs]def run_sec_sims(out, loss_types, sec_sims, seed): """ :param out: a dictionary of losses :param loss_types: the loss types :param sec_sims: pair (probability field, number of simulations) :param seed: random seed to use Run secondary simulations and update the losses """ [(prob_field, num_sims)] = sec_sims numpy.random.seed(seed) probs = out.haz[prob_field].to_numpy() # LiqProb affected = numpy.random.random((num_sims, 1)) < probs # (N, E) # doing the mean on the secondary simulations for each event for lt in loss_types: out[lt][:] = numpy.mean(affected * out[lt], axis=0) # shape E
[docs]def event_based_risk(riskinputs, param, monitor): """ Core function for a scenario_risk/event_based_risk computation. :param riskinputs: a list of :class:`openquake.risklib.riskinput.RiskInput` objects :param param: dictionary of extra parameters :param monitor: :class:`openquake.baselib.performance.Monitor` instance :returns: a dictionary { 'alt': AggLoggTable instance 'losses_by_asset': list of tuples (lt_idx, rlz_idx, asset_ordinal, totloss)} """ crmodel = monitor.read('crmodel') alt = copy.copy(param['alt']) result = dict(losses_by_asset=[], alt=alt) sec_sims = param['secondary_simulations'].items() for ri in riskinputs: for out in ri.gen_outputs(crmodel, monitor, param['tempname']): if sec_sims: run_sec_sims(out, crmodel.loss_types, sec_sims, param['master_seed']) alt.aggregate( out, param['minimum_asset_loss'], param['aggregate_by']) for lti, loss_type in enumerate(crmodel.loss_types): losses = out[loss_type] for a, asset in enumerate(ri.assets): aid = asset['ordinal'] lba = losses[a].sum() if lba: result['losses_by_asset'].append( (lti, out.rlzi, aid, lba)) return result
[docs]@base.calculators.add('scenario_risk', 'event_based_risk') class EventBasedRiskCalculator(base.RiskCalculator): """ Run a scenario/event_based risk calculation """ core_task = event_based_risk is_stochastic = True precalc = 'event_based' accept_precalc = ['scenario', 'event_based', 'event_based_risk', 'ebrisk']
[docs] def pre_execute(self): """ Compute the GMFs, build the epsilons, the riskinputs, and a dictionary with the unit of measure, used in the export phase. """ oq = self.oqparam parent = self.datastore.parent if parent: self.datastore['full_lt'] = parent['full_lt'] ne = len(parent['events']) logging.info('There are %d ruptures and %d events', len(parent['ruptures']), ne) if oq.investigation_time and oq.return_periods != [0]: # setting return_periods = 0 disable loss curves eff_time = oq.investigation_time * oq.ses_per_logic_tree_path if eff_time < 2: logging.warning( 'eff_time=%s is too small to compute loss curves', eff_time) super().pre_execute() if not oq.ground_motion_fields: return # this happens in the reportwriter self.assetcol = self.datastore['assetcol'] self.riskinputs = self.build_riskinputs('gmf') self.param['tempname'] = riskinput.cache_epsilons( self.datastore, oq, self.assetcol, self.crmodel, self.E) self.param['aggregate_by'] = oq.aggregate_by self.param['secondary_simulations'] = oq.secondary_simulations self.param['master_seed'] = oq.master_seed self.rlzs = self.datastore['events']['rlz_id'] self.num_events = numpy.bincount(self.rlzs) # events by rlz aggkey = self.assetcol.tagcol.get_aggkey(oq.aggregate_by) self.param['alt'] = self.acc = scientific.AggLossTable.new( aggkey, oq.loss_names, sec_losses=[]) L = len(oq.loss_names) self.avglosses = numpy.zeros((len(self.assetcol), self.R, L), F32) if oq.investigation_time: # event_based self.avg_ratio = numpy.array([oq.ses_ratio] * self.R) else: # scenario self.avg_ratio = 1. / self.num_events
[docs] def combine(self, acc, res): if res is None: raise MemoryError('You ran out of memory!') with self.monitor('aggregating losses', measuremem=False): self.acc += res['alt'] for (l, r, aid, lba) in res['losses_by_asset']: self.avglosses[aid, r, l] = lba * self.avg_ratio[r] return acc
[docs] def post_execute(self, result): """ Compute stats for the aggregated distributions and save the results on the datastore. """ oq = self.oqparam L = len(oq.loss_names) # avg losses must be 32 bit otherwise export losses_by_asset will # break the QGIS test for ScenarioRisk self.datastore['avg_losses-rlzs'] = self.avglosses set_rlzs_stats(self.datastore, 'avg_losses', asset_id=self.assetcol['id'], loss_type=self.oqparam.loss_names) with self.monitor('saving agg_loss_table'): logging.info('Saving the agg_loss_table') K = len(result.aggkey) - 1 alt = result.to_dframe() self.datastore.create_dframe('agg_loss_table', alt, K=K, L=L) # save agg_losses units = self.datastore['cost_calculator'].get_units(oq.loss_names) if oq.investigation_time is None: # scenario, compute agg_losses alt['rlz_id'] = self.rlzs[alt.event_id.to_numpy()] agglosses = numpy.zeros((K+1, self.R, L), F32) for (agg_id, rlz_id), df in alt.groupby(['agg_id', 'rlz_id']): agglosses[agg_id, rlz_id] = numpy.array( [df[ln].sum() for ln in oq.loss_names] ) * self.avg_ratio[rlz_id] self.datastore['agg_losses-rlzs'] = agglosses set_rlzs_stats(self.datastore, 'agg_losses', agg_id=K, loss_types=oq.loss_names, units=units) logging.info('Total portfolio loss\n' + views.view('portfolio_loss', self.datastore)) else: # event_based_risk, run post_risk prc = post_risk.PostRiskCalculator(oq, self.datastore.calc_id) if hasattr(self, 'exported'): prc.exported = self.exported prc.run(exports='') try: agglosses = self.datastore['agg_losses-rlzs'][K] except KeyError: # not enough events in post_risk logging.error('No agg_losses-rlzs') return # sanity check on the agg_losses and sum_losses sumlosses = self.avglosses.sum(axis=0) if not numpy.allclose(agglosses, sumlosses, rtol=1E-6): url = ('https://docs.openquake.org/oq-engine/advanced/' 'addition-is-non-associative.html') logging.warning( 'Due to rounding errors inherent in floating-point arithmetic,' ' agg_losses != sum(avg_losses): %s != %s\nsee %s', agglosses.mean(), sumlosses.mean(), url)