Source code for openquake.calculators.post_risk

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2019-2020, GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake.  If not, see <http://www.gnu.org/licenses/>.

import os
import logging
import itertools
import numpy

from openquake.baselib import general, datastore, parallel, python3compat
from openquake.hazardlib.stats import set_rlzs_stats
from openquake.risklib import scientific
from openquake.calculators import base, views

F32 = numpy.float32
U16 = numpy.uint16
U32 = numpy.uint32


[docs]def reagg_idxs(num_tags, tagnames): """ :param num_tags: dictionary tagname -> number of tags with that tagname :param tagnames: subset of tagnames of interest :returns: T = T1 x ... X TN indices with repetitions Reaggregate indices. Consider for instance a case with 3 tagnames, taxonomy (4 tags), region (3 tags) and country (2 tags): >>> num_tags = dict(taxonomy=4, region=3, country=2) There are T = T1 x T2 x T3 = 4 x 3 x 2 = 24 combinations. The function will return 24 reaggregated indices with repetions depending on the selected subset of tagnames. For instance reaggregating by taxonomy and region would give: >>> list(reagg_idxs(num_tags, ['taxonomy', 'region'])) # 4x3 [0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10, 11, 11] Reaggregating by taxonomy and country would give: >>> list(reagg_idxs(num_tags, ['taxonomy', 'country'])) # 4x2 [0, 1, 0, 1, 0, 1, 2, 3, 2, 3, 2, 3, 4, 5, 4, 5, 4, 5, 6, 7, 6, 7, 6, 7] Reaggregating by region and country would give: >>> list(reagg_idxs(num_tags, ['region', 'country'])) # 3x2 [0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 4, 5] Here is an example of single tag aggregation: >>> list(reagg_idxs(num_tags, ['taxonomy'])) # 4 [0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3] """ shape = list(num_tags.values()) T = numpy.prod(shape) arr = numpy.arange(T).reshape(shape) ranges = [numpy.arange(n) if t in tagnames else [slice(None)] for t, n in num_tags.items()] for i, idx in enumerate(itertools.product(*ranges)): arr[idx] = i return arr.flatten()
[docs]def get_loss_builder(dstore, return_periods=None, loss_dt=None): """ :param dstore: datastore for an event based risk calculation :returns: a LossCurvesMapsBuilder instance """ oq = dstore['oqparam'] weights = dstore['weights'][()] eff_time = oq.investigation_time * oq.ses_per_logic_tree_path num_events = numpy.bincount(dstore['events']['rlz_id']) periods = return_periods or oq.return_periods or scientific.return_periods( eff_time, num_events.max()) return scientific.LossCurvesMapsBuilder( oq.conditional_loss_poes, numpy.array(periods), loss_dt or oq.loss_dt(), weights, dict(enumerate(num_events)), eff_time, oq.risk_investigation_time)
[docs]def get_src_loss_table(dstore, L): """ :returns: (source_ids, array of losses of shape (Ns, L)) """ K = dstore['agg_loss_table'].attrs.get('K', 0) alt = dstore.read_df('agg_loss_table', 'agg_id', dict(agg_id=K)) eids = alt.event_id.to_numpy() evs = dstore['events'][:][eids] rlz_ids = evs['rlz_id'] rup_ids = evs['rup_id'] source_id = python3compat.decode(dstore['ruptures']['source_id'][rup_ids]) w = dstore['weights'][:] acc = general.AccumDict(accum=numpy.zeros(L, F32)) del alt['event_id'] all_losses = numpy.array(alt) for source_id, rlz_id, losses in zip(source_id, rlz_ids, all_losses): acc[source_id] += losses * w[rlz_id] return zip(*sorted(acc.items()))
[docs]def post_risk(builder, kr_losses, monitor): """ :returns: dictionary kr -> L loss curves """ res = {} for k, r, losses in kr_losses: res[k, r] = builder.build_curves(losses, r) return res
[docs]@base.calculators.add('post_risk') class PostRiskCalculator(base.RiskCalculator): """ Compute losses and loss curves starting from an event loss table. """
[docs] def pre_execute(self): oq = self.oqparam ds = self.datastore self.reaggreate = False if oq.hazard_calculation_id and not ds.parent: ds.parent = datastore.read(oq.hazard_calculation_id) assetcol = ds['assetcol'] self.aggkey = base.save_agg_values( ds, assetcol, oq.loss_names, oq.aggregate_by) aggby = ds.parent['oqparam'].aggregate_by self.reaggreate = aggby and oq.aggregate_by != aggby if self.reaggreate: self.num_tags = dict( zip(aggby, assetcol.tagcol.agg_shape(aggby))) else: assetcol = ds['assetcol'] self.aggkey = assetcol.tagcol.get_aggkey(oq.aggregate_by) self.L = len(oq.loss_names) size = general.humansize(ds.getsize('agg_loss_table')) logging.info('Stored %s in the agg_loss_table', size)
[docs] def execute(self): oq = self.oqparam if oq.return_periods != [0]: # setting return_periods = 0 disable loss curves eff_time = oq.investigation_time * oq.ses_per_logic_tree_path if eff_time < 2: logging.warning( 'eff_time=%s is too small to compute loss curves', eff_time) return if 'source_info' in self.datastore: # missing for gmf_ebrisk logging.info('Building the src_loss_table') with self.monitor('src_loss_table', measuremem=True): source_ids, losses = get_src_loss_table(self.datastore, self.L) self.datastore['src_loss_table'] = losses self.datastore.set_shape_descr('src_loss_table', source=source_ids, loss_type=oq.loss_names) builder = get_loss_builder(self.datastore) K = len(self.aggkey) if oq.aggregate_by else 0 P = len(builder.return_periods) # do everything in process since it is really fast rlz_id = self.datastore['events']['rlz_id'] alt_df = self.datastore.read_df('agg_loss_table') if self.reaggreate: idxs = numpy.concatenate([ reagg_idxs(self.num_tags, oq.aggregate_by), numpy.array([K], int)]) alt_df['agg_id'] = idxs[alt_df['agg_id'].to_numpy()] alt_df = alt_df.groupby(['event_id', 'agg_id']).sum().reset_index() alt_df['rlz_id'] = rlz_id[alt_df.event_id.to_numpy()] units = self.datastore['cost_calculator'].get_units(oq.loss_names) dist = ('no' if os.environ.get('OQ_DISTRIBUTE') == 'no' else 'processpool') # use only the local cores smap = parallel.Starmap(post_risk, h5=self.datastore.hdf5, distribute=dist) # producing concurrent_tasks/2 = num_cores tasks blocksize = int(numpy.ceil( (K + 1) * self.R / (oq.concurrent_tasks // 2 or 1))) kr_losses = [] agg_losses = numpy.zeros((K + 1, self.R, self.L), F32) agg_curves = numpy.zeros((K + 1, self.R, self.L, P), F32) gb = alt_df.groupby([alt_df.agg_id, alt_df.rlz_id]) # NB: in the future we may use multiprocessing.shared_memory for (k, r), df in gb: arr = numpy.zeros((self.L, len(df)), F32) for lni, ln in enumerate(oq.loss_names): arr[lni] = df[ln].to_numpy() agg_losses[k, r] = arr.sum(axis=1) kr_losses.append((k, r, arr)) if len(kr_losses) >= blocksize: size = sum(ls.nbytes for k, r, ls in kr_losses) logging.info('Sending %s of losses', general.humansize(size)) smap.submit((builder, kr_losses)) kr_losses[:] = [] if kr_losses: smap.submit((builder, kr_losses)) for (k, r), curve in smap.reduce().items(): agg_curves[k, r] = curve self.datastore['agg_losses-rlzs'] = agg_losses * oq.ses_ratio set_rlzs_stats(self.datastore, 'agg_losses', agg_id=K + 1, loss_types=oq.loss_names, units=units) self.datastore['agg_curves-rlzs'] = agg_curves set_rlzs_stats(self.datastore, 'agg_curves', agg_id=K + 1, lti=self.L, return_period=builder.return_periods, units=units) return 1
[docs] def post_execute(self, dummy): """ Sanity check on tot_losses """ logging.info('Total portfolio loss\n' + views.view('portfolio_loss', self.datastore)) if not self.aggkey: return logging.info('Sanity check on agg_losses') for kind in 'rlzs', 'stats': agg = 'agg_losses-' + kind if agg not in self.datastore: return if kind == 'rlzs': kinds = ['rlz-%d' % rlz for rlz in range(self.R)] else: kinds = self.oqparam.hazard_stats() for l in range(self.L): ln = self.oqparam.loss_names[l] for r, k in enumerate(kinds): tot_losses = self.datastore[agg][-1, r, l] agg_losses = self.datastore[agg][:-1, r, l].sum() if kind == 'rlzs' or k == 'mean': ok = numpy.allclose(agg_losses, tot_losses, rtol=.001) if not ok: logging.warning( 'Inconsistent total losses for %s, %s: ' '%s != %s', ln, k, agg_losses, tot_losses)