Source code for openquake.calculators.event_based_risk

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division
import logging
import operator
import itertools
import functools
import collections

import numpy

from openquake.baselib import hdf5
from openquake.baselib.python3compat import zip
from openquake.baselib.general import AccumDict, humansize, block_splitter
from openquake.calculators import base, event_based
from openquake.commonlib import parallel, calc
from openquake.risklib import riskinput, scientific
from openquake.commonlib.parallel import starmap

U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64
getweight = operator.attrgetter('weight')


[docs]def build_el_dtypes(insured_losses): """ :param bool insured_losses: job.ini configuration parameter :returns: ela_dt and elt_dt i.e. the data types for event loss assets and event loss table respectively """ I = insured_losses + 1 ela_list = [('rup_id', U32), ('ass_id', U32), ('loss', (F32, I))] elt_list = [('rup_id', U32), ('loss', (F32, I))] return numpy.dtype(ela_list), numpy.dtype(elt_list)
[docs]def build_agg_curve(lr_data, insured_losses, ses_ratio, curve_resolution, L, monitor): """ Build the aggregate loss curve in parallel for each loss type and realization pair. :param lr_data: a list of triples `(l, r, data)` where `l` is the loss type index, `r` is the realization index and `data` is an array of kind `(rupture_id, loss)` or `(rupture_id, loss, loss_ins)` :param bool insured_losses: job.ini configuration parameter :param ses_ratio: a ratio obtained from ses_per_logic_tree_path :param curve_resolution: the number of discretization steps for the loss curve :param L: the number of loss types :param monitor: a Monitor instance :returns: a dictionary (r, l, i) -> (losses, poes, avg) """ result = {} for l, r, data in lr_data: if len(data) == 0: # realization with no losses continue if insured_losses: gloss = data['loss'][:, 0] iloss = data['loss'][:, 1] else: gloss = data['loss'] losses, poes = scientific.event_based( gloss, ses_ratio, curve_resolution) avg = scientific.average_loss((losses, poes)) result[l, r, 'losses'] = losses result[l, r, 'poes'] = poes result[l, r, 'avg'] = avg if insured_losses: losses_ins, poes_ins = scientific.event_based( iloss, ses_ratio, curve_resolution) avg_ins = scientific.average_loss((losses_ins, poes_ins)) result[l, r, 'losses_ins'] = losses_ins result[l, r, 'poes_ins'] = poes_ins result[l, r, 'avg_ins'] = avg_ins return result
[docs]def square(L, R, factory): """ :param L: the number of loss types :param R: the number of realizations :param factory: thunk used to initialize the elements :returns: a numpy matrix of shape (L, R) """ losses = numpy.zeros((L, R), object) for l in range(L): for r in range(R): losses[l, r] = factory() return losses
def _old_loss_curves(asset_values, rcurves, ratios): # build loss curves in the old format (i.e. (losses, poes)) from # loss curves in the new format (i.e. poes). # shape (N, 2, C) return numpy.array([(avalue * ratios, poes) for avalue, poes in zip(asset_values, rcurves)]) def _aggregate(outputs, compositemodel, agg, ass, idx, result, monitor): # update the result dictionary and the agg array with each output for out in outputs: l, r = out.lr asset_ids = [a.ordinal for a in out.assets] loss_type = compositemodel.loss_types[l] indices = numpy.array([idx[eid] for eid in out.eids]) cb = compositemodel.curve_builders[l] if cb.user_provided: counts_matrix = cb.build_counts(out.loss_ratios[:, :, 0]) result['RC'][l, r] += dict(zip(asset_ids, counts_matrix)) if monitor.insured_losses: result['IC'][l, r] += dict( zip(asset_ids, cb.build_counts(out.loss_ratios[:, :, 1]))) for i, asset in enumerate(out.assets): aid = asset.ordinal loss_ratios = out.loss_ratios[i] losses = loss_ratios * asset.value(loss_type) # average losses if monitor.avg_losses: result['AVGLOSS'][l, r][aid] += ( loss_ratios.sum(axis=0) * monitor.ses_ratio) # asset losses if monitor.asset_loss_table: data = [(eid, aid, loss) for eid, loss in zip(out.eids, losses) if loss.sum() > 0] if data: ass[l, r].append(numpy.array(data, monitor.ela_dt)) # agglosses agg[indices, l, r] += losses
[docs]def event_based_risk(riskinput, riskmodel, rlzs_assoc, assetcol, monitor): """ :param riskinput: a :class:`openquake.risklib.riskinput.RiskInput` object :param riskmodel: a :class:`openquake.risklib.riskinput.CompositeRiskModel` instance :param rlzs_assoc: a class:`openquake.commonlib.source.RlzsAssoc` instance :param assetcol: AssetCollection instance :param monitor: :class:`openquake.baselib.performance.Monitor` instance :returns: a dictionary of numpy arrays of shape (L, R) """ lti = riskmodel.lti # loss type -> index L, R = len(lti), len(rlzs_assoc.realizations) I = monitor.insured_losses + 1 eids = riskinput.eids E = len(eids) idx = dict(zip(eids, range(E))) agg = numpy.zeros((E, L, R, I), F32) ass = collections.defaultdict(list) def zeroN(): return numpy.zeros((monitor.num_assets, I)) result = dict(RC=square(L, R, AccumDict), IC=square(L, R, AccumDict), AGGLOSS=AccumDict(), ASSLOSS=AccumDict()) if monitor.avg_losses: result['AVGLOSS'] = square(L, R, zeroN) outputs = riskmodel.gen_outputs(riskinput, rlzs_assoc, monitor, assetcol) _aggregate(outputs, riskmodel, agg, ass, idx, result, monitor) for (l, r) in itertools.product(range(L), range(R)): records = [(eids[i], loss) for i, loss in enumerate(agg[:, l, r]) if loss.sum() > 0] if records: result['AGGLOSS'][l, r] = numpy.array(records, monitor.elt_dt) for lr in ass: if ass[lr]: result['ASSLOSS'][lr] = numpy.concatenate(ass[lr]) # store the size of the GMFs result['gmfbytes'] = monitor.gmfbytes return result
@base.calculators.add('event_based_risk')
[docs]class EventBasedRiskCalculator(base.RiskCalculator): """ Event based PSHA calculator generating the event loss table and fixed ratios loss curves. """ pre_calculator = 'event_based' core_task = event_based_risk is_stochastic = True
[docs] def pre_execute(self): """ Read the precomputed ruptures (or compute them on the fly) """ super(EventBasedRiskCalculator, self).pre_execute() calc.check_overflow(self) if not self.riskmodel: # there is no riskmodel, exit early self.execute = lambda: None self.post_execute = lambda result: None return
[docs] def execute(self): """ Run the event_based_risk calculator and aggregate the results """ oq = self.oqparam correl_model = oq.get_correl_model() self.N = len(self.assetcol) self.E = sum(len(v) for v in self.datastore['events'].values()) logging.info('Populating the risk inputs') all_ruptures = [] preprecalc = getattr(self.precalc, 'precalc', None) if preprecalc: # the ruptures are already in memory for grp_id, sesruptures in preprecalc.result.items(): for sr in sesruptures: all_ruptures.append(sr) else: # read the ruptures from the datastore for serial in self.datastore['sescollection']: rup = self.datastore['sescollection/' + serial] all_ruptures.append(rup) all_ruptures.sort(key=operator.attrgetter('serial')) if not self.riskmodel.covs: # do not generate epsilons eps = None else: eps = riskinput.make_eps( self.assets_by_site, self.E, oq.master_seed, oq.asset_correlation) logging.info('Generated %s epsilons', eps.shape) # preparing empty datasets loss_types = self.riskmodel.loss_types self.C = self.oqparam.loss_curve_resolution self.L = L = len(loss_types) self.R = R = len(self.rlzs_assoc.realizations) self.I = self.oqparam.insured_losses # ugly: attaching attributes needed in the task function mon = self.monitor mon.num_assets = self.count_assets() mon.avg_losses = self.oqparam.avg_losses mon.asset_loss_table = self.oqparam.asset_loss_table mon.insured_losses = self.I mon.ses_ratio = ( oq.risk_investigation_time or oq.investigation_time) / ( oq.investigation_time * oq.ses_per_logic_tree_path) self.N = N = len(self.assetcol) self.E = sum(len(v) for v in self.datastore['events'].values()) # average losses, stored in a composite array of shape N, R self.avg_losses = numpy.zeros((N, R), oq.loss_dt()) self.ass_loss_table = square(L, R, lambda: None) self.agg_loss_table = square(L, R, lambda: None) self.ela_dt, self.elt_dt = mon.ela_dt, mon.elt_dt = build_el_dtypes( self.I) for (l, r) in itertools.product(range(L), range(R)): lt = loss_types[l] if self.oqparam.asset_loss_table: self.ass_loss_table[l, r] = self.datastore.create_dset( 'ass_loss_table/rlz-%03d/%s' % (r, lt), self.ela_dt) self.agg_loss_table[l, r] = self.datastore.create_dset( 'agg_loss_table/rlz-%03d/%s' % (r, lt), self.elt_dt) self.saved = collections.Counter() # nbytes per HDF5 key self.ass_bytes = 0 self.agg_bytes = 0 self.gmfbytes = 0 rlz_ids = getattr(self.oqparam, 'rlz_ids', ()) if rlz_ids: self.rlzs_assoc = self.rlzs_assoc.extract(rlz_ids) if not oq.minimum_intensity: # infer it from the risk models if not directly set in job.ini oq.minimum_intensity = self.riskmodel.get_min_iml() min_iml = calc.fix_minimum_intensity( oq.minimum_intensity, oq.imtls) if min_iml.sum() == 0: logging.warn('The GMFs are not filtered: ' 'you may want to set a minimum_intensity') else: logging.info('minimum_intensity=%s', oq.minimum_intensity) csm_info = self.datastore['csm_info'] grp_trt = {sg.id: sg.trt for sm in csm_info.source_models for sg in sm.src_groups} with self.monitor('building riskinputs', autoflush=True): riskinputs = self.riskmodel.build_inputs_from_ruptures( grp_trt, list(oq.imtls), self.sitecol.complete, all_ruptures, oq.truncation_level, correl_model, min_iml, eps, oq.concurrent_tasks or 1) # NB: I am using generators so that the tasks are submitted one at # the time, without keeping all of the arguments in memory res = starmap( self.core_task.__func__, ((riskinput, self.riskmodel, self.rlzs_assoc, self.assetcol, self.monitor.new('task')) for riskinput in riskinputs)).submit_all() acc = functools.reduce(self.agg, res, AccumDict()) self.save_data_transfer(res) return acc
[docs] def agg(self, acc, result): """ Aggregate losses and store them in the datastore. :param acc: accumulator dictionary :param result: dictionary coming from event_based_risk """ self.gmfbytes += result.pop('gmfbytes') with self.monitor('saving event loss tables', autoflush=True): if self.oqparam.asset_loss_table: for lr, array in sorted(result.pop('ASSLOSS').items()): hdf5.extend(self.ass_loss_table[lr], array) self.ass_bytes += array.nbytes for lr, array in sorted(result.pop('AGGLOSS').items()): hdf5.extend(self.agg_loss_table[lr], array) self.agg_bytes += array.nbytes self.datastore.hdf5.flush() return acc + result
[docs] def post_execute(self, result): """ Save the event loss table in the datastore. :param result: the dictionary returned by the .execute method """ logging.info('Generated %s of GMFs', humansize(self.gmfbytes)) self.datastore.save('job_info', {'gmfbytes': self.gmfbytes}) if self.gmfbytes == 0: raise RuntimeError('No GMFs were generated, perhaps they were ' 'all below the minimum_intensity threshold') if self.oqparam.asset_loss_table: asslt = self.datastore['ass_loss_table'] asslt.attrs['nbytes'] = self.ass_bytes for rlz, dset in asslt.items(): for ds in dset.values(): ds.attrs['nonzero_fraction'] = len(ds) / (self.N * self.E) agglt = self.datastore['agg_loss_table'] agglt.attrs['nbytes'] = self.agg_bytes for rlz, dset in agglt.items(): for ds in dset.values(): ds.attrs['nonzero_fraction'] = len(ds) / self.E insured_losses = self.oqparam.insured_losses ses_ratio = self.oqparam.ses_ratio saved = self.saved self.N = N = len(self.assetcol) self.R = R = len(self.rlzs_assoc.realizations) ltypes = self.riskmodel.loss_types self.loss_curve_dt, self.loss_maps_dt = ( self.riskmodel.build_loss_dtypes( self.oqparam.conditional_loss_poes, self.I)) self.vals = {} # asset values by loss_type for ltype in ltypes: asset_values = [] for assets in self.assets_by_site: for asset in assets: asset_values.append(asset.value( ltype, self.oqparam.time_event)) self.vals[ltype] = numpy.array(asset_values) # loss curves multi_lr_dt = numpy.dtype( [(ltype, (F32, cbuilder.curve_resolution)) for ltype, cbuilder in zip( ltypes, self.riskmodel.curve_builders)]) rcurves = numpy.zeros((N, R, 2), multi_lr_dt) # AVGLOSS if self.oqparam.avg_losses: with self.monitor('building avg_losses-rlzs'): for (l, r), avgloss in numpy.ndenumerate( result['AVGLOSS']): lt = self.riskmodel.loss_types[l] avg_losses_lt = self.avg_losses[lt] for i, avalue in enumerate(self.vals[lt]): avg_losses_lt[i, r] = avgloss[i, 0] * avalue if self.oqparam.insured_losses: self.avg_losses[lt + '_ins'][i, r] = ( avgloss[i, 1] * avalue) self.datastore['avg_losses-rlzs'] = self.avg_losses saved['avg_losses-rlzs'] = self.avg_losses.nbytes # RC, IC if self.oqparam.loss_ratios: with self.monitor('building rcurves-rlzs'): for (l, r), data in numpy.ndenumerate(result['RC']): cb = self.riskmodel.curve_builders[l] if data and cb.user_provided: # data is a dict asset idx -> counts lt = self.riskmodel.loss_types[l] poes = cb.build_poes(N, [data], ses_ratio) rcurves[lt][:, r, 0] = poes saved['rcurves-rlzs'] += poes.nbytes for (l, r), data in numpy.ndenumerate(result['IC']): cb = self.riskmodel.curve_builders[l] if data and cb.user_provided and insured_losses: # data is a dict asset idx -> counts lt = self.riskmodel.loss_types[l] poes = cb.build_poes(N, [data], ses_ratio) rcurves[lt][:, r, 1] = poes saved['rcurves-rlzs'] += poes.nbytes self.datastore['rcurves-rlzs'] = rcurves oq = self.oqparam builder = scientific.StatsBuilder( oq.quantile_loss_curves, oq.conditional_loss_poes, [], oq.loss_curve_resolution, scientific.normalize_curves_eb, oq.insured_losses) # build an aggregate loss curve per realization plus statistics with self.monitor('building agg_curve'): self.build_agg_curve_and_stats(builder) self.datastore.hdf5.flush() for out in sorted(saved): nbytes = saved[out] if nbytes: self.datastore[out].attrs['nbytes'] = nbytes logging.info('Saved %s in %s', humansize(nbytes), out) if self.oqparam.asset_loss_table: pass # TODO: build specific loss curves rlzs = self.rlzs_assoc.realizations if self.loss_maps_dt: with self.monitor('building loss_maps-rlzs'): if (self.oqparam.conditional_loss_poes and 'rcurves-rlzs' in self.datastore): loss_maps = numpy.zeros((N, R), self.loss_maps_dt) rcurves = self.datastore['rcurves-rlzs'] for cb in self.riskmodel.curve_builders: if cb.user_provided: lm = loss_maps[cb.loss_type] for r, lmaps in cb.build_loss_maps( self.assetcol.array, rcurves): lm[:, r] = lmaps self.datastore['loss_maps-rlzs'] = loss_maps if len(rlzs) > 1: self.Q1 = len(self.oqparam.quantile_loss_curves) + 1 with self.monitor('computing stats'): if 'rcurves-rlzs' in self.datastore: self.compute_store_stats(rlzs, builder) if oq.avg_losses: # stats for avg_losses stats = scientific.SimpleStats( rlzs, oq.quantile_loss_curves) stats.compute_and_store('avg_losses', self.datastore) self.datastore.hdf5.flush()
[docs] def build_agg_curve_and_stats(self, builder): """ Build a single loss curve per realization. It is NOT obtained by aggregating the loss curves; instead, it is obtained without generating the loss curves, directly from the the aggregate losses. """ oq = self.oqparam C = oq.loss_curve_resolution loss_curve_dt, _ = self.riskmodel.build_all_loss_dtypes( C, oq.conditional_loss_poes, oq.insured_losses) lts = self.riskmodel.loss_types lr_data = [(l, r, dset.value) for (l, r), dset in numpy.ndenumerate(self.agg_loss_table)] ses_ratio = self.oqparam.ses_ratio result = parallel.apply( build_agg_curve, (lr_data, self.I, ses_ratio, C, self.L, self.monitor('')), concurrent_tasks=self.oqparam.concurrent_tasks).reduce() agg_curve = numpy.zeros(self.R, loss_curve_dt) for l, r, name in result: agg_curve[lts[l]][name][r] = result[l, r, name] if oq.individual_curves: self.datastore['agg_curve-rlzs'] = agg_curve self.saved['agg_curve-rlzs'] = agg_curve.nbytes if self.R > 1: self.build_agg_curve_stats(builder, agg_curve, loss_curve_dt)
# ################### methods to compute statistics #################### # def _collect_all_data(self): # called only if 'rcurves-rlzs' in dstore; return a list of outputs all_data = [] assets = self.datastore['asset_refs'].value[self.assetcol.array['idx']] rlzs = self.rlzs_assoc.realizations insured = self.oqparam.insured_losses if self.oqparam.avg_losses: avg_losses = self.datastore['avg_losses-rlzs'].value else: avg_losses = self.avg_losses r_curves = self.datastore['rcurves-rlzs'].value for loss_type, cbuilder in zip( self.riskmodel.loss_types, self.riskmodel.curve_builders): rcurves = r_curves[loss_type] asset_values = self.vals[loss_type] data = [] for rlz in rlzs: average_losses = avg_losses[loss_type][:, rlz.ordinal] average_insured_losses = ( avg_losses[loss_type + '_ins'][:, rlz.ordinal] if insured else None) loss_curves = _old_loss_curves( asset_values, rcurves[:, rlz.ordinal, 0], cbuilder.ratios) insured_curves = _old_loss_curves( asset_values, rcurves[:, rlz.ordinal, 1], cbuilder.ratios) if insured else None out = scientific.Output( assets, loss_type, rlz.ordinal, rlz.weight, loss_curves=loss_curves, insured_curves=insured_curves, average_losses=average_losses, average_insured_losses=average_insured_losses) data.append(out) all_data.append(data) return all_data # NB: the HDF5 structure is of kind <output>-stats/structural/mean, ... # and must be so for the loss curves, since different loss_types may have # a different discretization. This is not needed for the loss maps, but it # is done anyway for consistency, also because in the future we could # specify different conditional loss poes depending on the loss type
[docs] def compute_store_stats(self, rlzs, builder): """ Compute and store the statistical outputs. :param rlzs: list of realizations """ oq = self.oqparam ltypes = self.riskmodel.loss_types all_stats = map(builder.build, self._collect_all_data()) if not all_stats: return loss_curves = numpy.zeros((self.N, self.Q1), self.loss_curve_dt) if oq.conditional_loss_poes: loss_maps = numpy.zeros((self.N, self.Q1), self.loss_maps_dt) for stats in all_stats: # there is one stat for each loss_type cb = self.riskmodel.curve_builders[ltypes.index(stats.loss_type)] if not cb.user_provided: continue sb = scientific.StatsBuilder( oq.quantile_loss_curves, oq.conditional_loss_poes, [], len(cb.ratios), scientific.normalize_curves_eb, oq.insured_losses) curves, maps = sb.get_curves_maps(stats) # matrices (Q1, N) loss_curves[cb.loss_type] = curves.T if oq.conditional_loss_poes: loss_maps[cb.loss_type] = maps.T self.datastore['loss_curves-stats'] = loss_curves if oq.conditional_loss_poes: self.datastore['loss_maps-stats'] = loss_maps
[docs] def build_agg_curve_stats(self, builder, agg_curve, loss_curve_dt): """ Build and save `agg_curve-stats` in the HDF5 file. :param builder: :class:`openquake.risklib.scientific.StatsBuilder` instance :param agg_curve: array of aggregate curves, one per realization :param loss_curve_dt: numpy dtype for loss curves """ rlzs = self.datastore['csm_info'].get_rlzs_assoc().realizations Q1 = len(builder.mean_quantiles) agg_curve_stats = numpy.zeros(Q1, loss_curve_dt) for l, loss_type in enumerate(self.riskmodel.loss_types): agg_curve_lt = agg_curve[loss_type] outputs = [] for rlz in rlzs: curve = agg_curve_lt[rlz.ordinal] average_loss = curve['avg'] loss_curve = (curve['losses'], curve['poes']) if self.oqparam.insured_losses: average_insured_loss = curve['avg_ins'] insured_curves = [(curve['losses_ins'], curve['poes_ins'])] else: average_insured_loss = None insured_curves = None out = scientific.Output( [None], loss_type, rlz.ordinal, rlz.weight, loss_curves=[loss_curve], insured_curves=insured_curves, average_losses=[average_loss], average_insured_losses=[average_insured_loss]) outputs.append(out) stats = builder.build(outputs) curves, _maps = builder.get_curves_maps(stats) # shape (Q1, 1) acs = agg_curve_stats[loss_type] for i, statname in enumerate(builder.mean_quantiles): for name in acs.dtype.names: acs[name][i] = curves[name][i] # saving agg_curve_stats self.datastore['agg_curve-stats'] = agg_curve_stats self.datastore['agg_curve-stats'].attrs['nbytes'] = ( agg_curve_stats.nbytes)
elt_dt = numpy.dtype([('rup_id', U32), ('loss', F32)])
[docs]def losses_by_taxonomy(riskinput, riskmodel, rlzs_assoc, assetcol, monitor): """ :param riskinput: a :class:`openquake.risklib.riskinput.RiskInput` object :param riskmodel: a :class:`openquake.risklib.riskinput.CompositeRiskModel` instance :param rlzs_assoc: a class:`openquake.commonlib.source.RlzsAssoc` instance :param assetcol: AssetCollection instance :param monitor: :class:`openquake.baselib.performance.Monitor` instance :returns: a numpy array of shape (T, L, R) """ lti = riskmodel.lti # loss type -> index L, R = len(lti), len(rlzs_assoc.realizations) T = len(assetcol.taxonomies) A = len(assetcol) taxonomy_id = {t: i for i, t in enumerate(sorted(assetcol.taxonomies))} losses = numpy.zeros((T, L, R), F64) avglosses = numpy.zeros((A, L, R), F64) if monitor.avg_losses else None agglosses = AccumDict( {lr: AccumDict() for lr in itertools.product(range(L), range(R))}) for out in riskmodel.gen_outputs(riskinput, rlzs_assoc, monitor, assetcol): # NB: out.assets is a non-empty list of assets with the same taxonomy t = taxonomy_id[out.assets[0].taxonomy] l, r = out.lr losses[t, l, r] += out.alosses.sum() if monitor.avg_losses: for i, loss in enumerate(out.alosses): if loss: avglosses[i, l, r] += loss agglosses[l, r] += {eid: loss for eid, loss in zip(out.eids, out.elosses) if loss} # convert agglosses into arrays to reduce the data transfer agglosses = {lr: numpy.array(sorted(agglosses[lr].items()), elt_dt) for lr in agglosses} return AccumDict(losses=losses, avglosses=avglosses, agglosses=agglosses, gmfbytes=monitor.gmfbytes)
save_ruptures = event_based.EventBasedRuptureCalculator.__dict__[ 'save_ruptures'] @base.calculators.add('ebrisk')
[docs]class EbriskCalculator(base.RiskCalculator): """ Event based PSHA calculator generating the total losses by taxonomy """ pre_calculator = None is_stochastic = True compute_ruptures = staticmethod(event_based.compute_ruptures) # TODO: if the number of source models is larger than concurrent_tasks # a different strategy should be used; the one used here is good when # there are few source models, so that we cannot parallelize on those
[docs] def build_starmap(self, ssm, sitecol, assetcol, riskmodel, imts, trunc_level, correl_model, min_iml, monitor): """ :param ssm: CompositeSourceModel containing a single source model :param sitecol: a SiteCollection instance :param assetcol: an AssetCollection instance :param riskmodel: a RiskModel instance :param imts: a list of Intensity Measure Types :param trunc_level: truncation level :param correl_model: correlation model :param min_iml: vector of minimum intensities, one per IMT :param monitor: a Monitor instance :returns: a pair (starmap, dictionary) """ ruptures_by_grp = AccumDict() num_ruptures = 0 num_events = 0 allargs = [] grp_trt = {} # collect the sources maxweight = ssm.get_maxweight(self.oqparam.concurrent_tasks) logging.info('Using a maxweight of %d', maxweight) for src_group in ssm.src_groups: grp_trt[src_group.id] = trt = src_group.trt gsims = ssm.gsim_lt.values[trt] for block in block_splitter(src_group, maxweight, getweight): allargs.append((block, self.sitecol, gsims, monitor)) # collect the ruptures for dic in parallel.starmap(self.compute_ruptures, allargs): ruptures_by_grp += dic [rupts] = dic.values() num_ruptures += len(rupts) num_events += dic.num_events ruptures_by_grp.num_events = num_events save_ruptures(self, ruptures_by_grp) # determine the realizations rlzs_assoc = ssm.info.get_rlzs_assoc( count_ruptures=lambda grp: len(ruptures_by_grp.get(grp.id, 0))) allargs = [] # prepare the risk inputs ruptures_per_block = self.oqparam.ruptures_per_block for src_group in ssm.src_groups: for rupts in block_splitter( ruptures_by_grp[src_group.id], ruptures_per_block): trt = grp_trt[rupts[0].grp_id] ri = riskinput.RiskInputFromRuptures( trt, imts, sitecol, rupts, trunc_level, correl_model, min_iml) allargs.append((ri, riskmodel, rlzs_assoc, assetcol, monitor)) taskname = '%s#%d' % (losses_by_taxonomy.__name__, ssm.sm_id + 1) smap = starmap(losses_by_taxonomy, allargs, name=taskname) attrs = dict(num_ruptures={ sg_id: len(rupts) for sg_id, rupts in ruptures_by_grp.items()}, num_events=num_events, num_rlzs=len(rlzs_assoc.realizations), sm_id=ssm.sm_id) return smap, attrs
[docs] def gen_args(self): """ Yield the arguments required by build_starmap, i.e. the source models, the asset collection, the riskmodel and others. """ oq = self.oqparam correl_model = oq.get_correl_model() if not oq.minimum_intensity: # infer it from the risk models if not directly set in job.ini oq.minimum_intensity = self.riskmodel.get_min_iml() min_iml = calc.fix_minimum_intensity(oq.minimum_intensity, oq.imtls) if min_iml.sum() == 0: logging.warn('The GMFs are not filtered: ' 'you may want to set a minimum_intensity') else: logging.info('minimum_intensity=%s', oq.minimum_intensity) self.csm.init_serials() imts = list(oq.imtls) for sm_id in range(len(self.csm.source_models)): ssm = self.csm.get_model(sm_id) monitor = self.monitor.new( avg_losses=oq.avg_losses, ses_per_logic_tree_path=oq.ses_per_logic_tree_path, maximum_distance=oq.maximum_distance, samples=ssm.source_models[0].samples, seed=ssm.source_model_lt.seed) yield (ssm, self.sitecol, self.assetcol, self.riskmodel, imts, oq.truncation_level, correl_model, min_iml, monitor)
[docs] def execute(self): """ Run the calculator and aggregate the results """ num_rlzs = 0 allres = [] source_models = self.csm.info.source_models with self.monitor('sending riskinputs', autoflush=True): self.sm_by_grp = self.csm.info.get_sm_by_grp() self.eid = collections.Counter() # sm_id -> event_id for i, args in enumerate(self.gen_args()): smap, attrs = self.build_starmap(*args) logging.info( 'Generated %d/%d ruptures/events for source model #%d', sum(attrs['num_ruptures'].values()), attrs['num_events'], attrs['sm_id'] + 1) res = smap.submit_all() vars(res).update(attrs) allres.append(res) res.rlz_slice = slice(num_rlzs, num_rlzs + res.num_rlzs) num_rlzs += res.num_rlzs for sg in source_models[i].src_groups: sg.eff_ruptures = res.num_ruptures[sg.id] self.datastore['csm_info'] = self.csm.info num_events = self.save_results(allres, num_rlzs) self.save_data_transfer(parallel.IterResult.sum(allres)) return num_events
[docs] def save_results(self, allres, num_rlzs): """ :param allres: an iterable of result iterators :param num_rlzs: the total number of realizations :returns: the total number of events """ self.L = len(self.riskmodel.lti) self.R = num_rlzs self.T = len(self.assetcol.taxonomies) self.A = len(self.assetcol) avg_losses = self.oqparam.avg_losses dset1 = self.datastore.create_dset( 'losses_by_taxon', F64, (self.T, self.L, self.R)) if avg_losses: dset2 = self.datastore.create_dset( 'avglosses', F64, (self.A, self.L, self.R)) num_events = 0 self.gmfbytes = 0 for res in allres: start, stop = res.rlz_slice.start, res.rlz_slice.stop r = stop - start taxlosses = numpy.zeros((self.T, self.L, r), F64) if avg_losses: avglosses = numpy.zeros((self.A, self.L, r), F64) for dic in res: if avg_losses: avglosses += dic.pop('avglosses') taxlosses += dic.pop('losses') self.gmfbytes += dic.pop('gmfbytes') self.save_agglosses(dic.pop('agglosses'), start) logging.debug( 'Saving results for source model #%d, realizations %d:%d', res.sm_id + 1, start, stop) dset1[:, :, start:stop] = taxlosses if avg_losses: dset2[:, :, start:stop] = avglosses if hasattr(res, 'ruptures_by_grp'): save_ruptures(self, res.ruptures_by_grp) num_events += res.num_events if avg_losses: self.datastore['avglosses'] = avglosses self.datastore['events'].attrs['num_events'] = num_events return num_events
[docs] def save_agglosses(self, agglosses, offset): """ Save the event loss tables incrementally. :param agglosses: a dictionary lr -> {eid: loss} :param offset: realization offset """ with self.monitor('saving event loss tables', autoflush=True): for l, r in agglosses: loss_type = self.riskmodel.loss_types[l] key = 'agg_loss_table/rlz-%03d/%s' % (r + offset, loss_type) self.datastore.extend(key, agglosses[l, r])
[docs] def post_execute(self, num_events): """ Save an array of losses by taxonomy of shape (T, L, R). """ if self.gmfbytes == 0: raise RuntimeError('No GMFs were generated, perhaps they were ' 'all below the minimum_intensity threshold') logging.info('Generated %s of GMFs', humansize(self.gmfbytes)) self.datastore.save('job_info', {'gmfbytes': self.gmfbytes}) logging.info('Saved %s losses by taxonomy', (self.T, self.L, self.R)) logging.info('Saved %d event losses', num_events) self.datastore.set_nbytes('agg_loss_table') self.datastore.set_nbytes('events')