Source code for openquake.calculators.event_based

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2015-2017 GEM Foundation
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <>.

import time
import os.path
import operator
import logging
import functools
import collections

import numpy

from openquake.baselib.general import AccumDict, split_in_blocks
from openquake.hazardlib.probability_map import ProbabilityMap, PmapStats
from openquake.hazardlib.calc.filters import \
from openquake.risklib.riskinput import GmfGetter, str2rsi, rsi2str
from openquake.baselib import parallel
from openquake.commonlib import calc, util
from openquake.calculators import base
from openquake.calculators.classical import ClassicalCalculator, PSHACalculator

U8 = numpy.uint8
U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64

# ######################## rupture calculator ############################ #

[docs]def compute_ruptures(sources, src_filter, gsims, monitor): """ :param sources: List of commonlib.source.Source tuples :param src_filter: a source site filter :param gsims: a list of GSIMs for the current tectonic region model :param monitor: monitor instance :returns: a dictionary src_group_id -> [Rupture instances] """ # NB: by construction each block is a non-empty list with # sources of the same src_group_id grp_id = sources[0].src_group_id trt = sources[0].tectonic_region_type eb_ruptures = [] calc_times = [] rup_mon = monitor('filtering ruptures', measuremem=False) num_samples = monitor.samples num_events = 0 # Compute and save stochastic event sets for src, s_sites in src_filter(sources): t0 = time.time() if s_sites is None: continue max_dist = src_filter.integration_distance[trt] rupture_filter = functools.partial( filter_sites_by_distance_to_rupture, integration_distance=max_dist, sites=s_sites) num_occ_by_rup = sample_ruptures( src, monitor.ses_per_logic_tree_path, num_samples, monitor.seed) # NB: the number of occurrences is very low, << 1, so it is # more efficient to filter only the ruptures that occur, i.e. # to call sample_ruptures *before* the filtering for ebr in build_eb_ruptures( src, num_occ_by_rup, rupture_filter, monitor.seed, rup_mon): eb_ruptures.append(ebr) num_events += ebr.multiplicity dt = time.time() - t0 calc_times.append((, dt)) res = AccumDict({grp_id: eb_ruptures}) res.num_events = num_events res.calc_times = calc_times res.rup_data = {grp_id: calc.RuptureData(trt, gsims).to_array(eb_ruptures)} return res
[docs]def sample_ruptures(src, num_ses, num_samples, seed): """ Sample the ruptures contained in the given source. :param src: a hazardlib source object :param num_ses: the number of Stochastic Event Sets to generate :param num_samples: how many samples for the given source :param seed: master seed from the job.ini file :returns: a dictionary of dictionaries rupture -> {ses_id: num_occurrences} """ # the dictionary `num_occ_by_rup` contains a dictionary # ses_id -> num_occurrences for each occurring rupture num_occ_by_rup = collections.defaultdict(AccumDict) # generating ruptures for the given source for rup_no, rup in enumerate(src.iter_ruptures()): rup.seed = src.serial[rup_no] + seed numpy.random.seed(rup.seed) for sampleid in range(num_samples): for ses_idx in range(1, num_ses + 1): num_occurrences = rup.sample_number_of_occurrences() if num_occurrences: num_occ_by_rup[rup] += { (sampleid, ses_idx): num_occurrences} rup.rup_no = rup_no + 1 return num_occ_by_rup
[docs]def build_eb_ruptures( src, num_occ_by_rup, rupture_filter, random_seed, rup_mon): """ Filter the ruptures stored in the dictionary num_occ_by_rup and yield pairs (rupture, <list of associated EBRuptures>) """ eid = 0 for rup in sorted(num_occ_by_rup, key=operator.attrgetter('rup_no')): with rup_mon: r_sites = rupture_filter(rup) if r_sites is None: # ignore ruptures which are far away del num_occ_by_rup[rup] # save memory continue # creating EBRuptures serial = rup.seed - random_seed + 1 events = [] for (sampleid, ses_idx), num_occ in sorted( num_occ_by_rup[rup].items()): for occ_no in range(1, num_occ + 1): # NB: the eid below is a placeholder; the right eid will be # set later, in EventBasedRuptureCalculator.post_execute events.append((eid, ses_idx, occ_no, sampleid)) eid += 1 if events: yield calc.EBRupture( rup, r_sites.indices, numpy.array(events, calc.event_dt), src.source_id, src.src_group_id, serial)
[docs]class EventBasedRuptureCalculator(PSHACalculator): """ Event based PSHA calculator generating the ruptures only """ core_task = compute_ruptures is_stochastic = True
[docs] def init(self): """ Set the random seed passed to the SourceManager and the minimum_intensity dictionary. """ oq = self.oqparam self.random_seed = oq.random_seed self.rlzs_assoc = self.datastore['csm_info'].get_rlzs_assoc() self.min_iml = calc.fix_minimum_intensity( oq.minimum_intensity, oq.imtls)
[docs] def count_eff_ruptures(self, ruptures_by_grp_id, src_group): """ Returns the number of ruptures sampled in the given src_group. :param ruptures_by_grp_id: a dictionary with key grp_id :param src_group: a SourceGroup instance """ nr = sum( len(ruptures) for grp_id, ruptures in ruptures_by_grp_id.items() if == grp_id) return nr
[docs] def zerodict(self): """ Initial accumulator, a dictionary (grp_id, gsim) -> curves """ zd = AccumDict() zd.calc_times = [] zd.eff_ruptures = AccumDict() self.eid = collections.Counter() # sm_id -> event_id self.sm_by_grp = self.grp_trt = return zd
[docs] def agg_dicts(self, acc, ruptures_by_grp_id): """ Accumulate dictionaries of ruptures and populate the `events` dataset in the datastore. :param acc: accumulator dictionary :param ruptures_by_grp_id: a nested dictionary grp_id -> ruptures """ if hasattr(ruptures_by_grp_id, 'calc_times'): acc.calc_times.extend(ruptures_by_grp_id.calc_times) if hasattr(ruptures_by_grp_id, 'eff_ruptures'): acc.eff_ruptures += ruptures_by_grp_id.eff_ruptures acc += ruptures_by_grp_id self.save_events(ruptures_by_grp_id) return acc
[docs] def save_events(self, ruptures_by_grp_id): """Extend the 'events' dataset with the given ruptures""" with self.monitor('saving ruptures', autoflush=True): for grp_id, ebrs in ruptures_by_grp_id.items(): events = [] i = 0 sm_id = self.sm_by_grp[grp_id] for ebr in ebrs: for event in event['eid'] = self.eid[sm_id] rec = (ebr.serial, 0, # year to be set event['ses'], event['occ'], event['sample'], grp_id, ebr.source_id) events.append(rec) self.eid[sm_id] += 1 i += 1 if self.oqparam.save_ruptures: key = 'ruptures/grp-%02d/%s' % (grp_id, ebr.serial) self.datastore[key] = ebr if events: ev = 'events/sm-%04d' % sm_id self.datastore.extend( ev, numpy.array(events, calc.stored_event_dt)) # save rup_data if hasattr(ruptures_by_grp_id, 'rup_data'): for grp_id, data in sorted( ruptures_by_grp_id.rup_data.items()): if len(data): key = 'rup_data/grp-%02d' % grp_id self.rup_data = self.datastore.extend(key, data)
[docs] def post_execute(self, result): """ Save the SES collection """ num_events = sum_dict(result) if num_events == 0: raise RuntimeError( 'No seismic events! Perhaps the investigation time is too ' 'small or the maximum_distance is too small')'Setting %d event years', num_events) with self.monitor('setting event years', measuremem=True, autoflush=True): inv_time = int(self.oqparam.investigation_time) numpy.random.seed(self.oqparam.random_seed) for sm in sorted(self.datastore['events']): set_random_years(self.datastore, 'events/' + sm, inv_time) if 'ruptures' in self.datastore: self.datastore.set_nbytes('ruptures') self.datastore.set_nbytes('events') for dset in self.datastore['rup_data'].values(): if len(dset): numsites = dset['numsites'] multiplicity = dset['multiplicity'] spr = numpy.average(numsites, weights=multiplicity) mul = numpy.average(multiplicity, weights=numsites) self.datastore.set_attrs(, sites_per_rupture=spr, multiplicity=mul) self.datastore.set_nbytes('rup_data')
[docs]def set_random_years(dstore, events_sm, investigation_time): """ Sort the `events` array and attach year labels sensitive to the SES ordinal and the investigation time. """ events = dstore[events_sm].value sorted_events = sorted(tuple(event) for event in events) years = numpy.random.choice(investigation_time, len(events)) + 1 year_of = dict(zip(sorted_events, years)) for event in events: idx = event['ses'] - 1 # starts from 0 event['year'] = idx * investigation_time + year_of[tuple(event)] dstore[events_sm] = events
[docs]def sum_dict(dic): """ Sum by key a dictionary of lists or numbers: >>> sum_dict({'a': 1}) 1 >>> sum_dict({'a': [None, None]}) 2 """ if isinstance(dic, int): return dic s = 0 for k, v in dic.items(): if hasattr(v, '__len__'): s += len(v) else: s += v return s
# ######################## GMF calculator ############################ # gmv_dt = numpy.dtype([('sid', U32), ('eid', U32), ('imti', U8), ('gmv', F32)])
[docs]def compute_gmfs_and_curves(getter, rlzs, monitor): """ :param eb_ruptures: a list of blocks of EBRuptures of the same SESCollection :param sitecol: a :class:`` instance :param imts: a list of intensity measure type strings :param rlzs_by_gsim: a dictionary gsim -> associated realizations :param monitor: a Monitor instance :returns: a dictionary with keys gmfcoll and hcurves """ oq = monitor.oqparam haz = {sid: {} for sid in getter.sids} gmfcoll = {} # rlz -> gmfa for rlz in rlzs: gmfcoll[rlz] = [] for sid, gmvdict in zip(getter.sids, getter(rlz)): if gmvdict: for imti, imt in enumerate(getter.imts): if oq.hazard_curves_from_gmfs: try: gmv = gmvdict[imt] except KeyError: # no gmv for the given imt, this may happen pass else: haz[sid][imt, rlz] = gmv for rec in gmvdict.get(imt, []): gmfcoll[rlz].append( (sid, rec['eid'], imti, rec['gmv'])) for rlz in gmfcoll: gmfcoll[rlz] = numpy.array(gmfcoll[rlz], gmv_dt) result = dict(gmfcoll=gmfcoll if oq.ground_motion_fields else None, hcurves={}) if oq.hazard_curves_from_gmfs: with monitor('building hazard curves', measuremem=False): duration = oq.investigation_time * oq.ses_per_logic_tree_path for sid, haz_by_imt_rlz in haz.items(): for imt, rlz in haz_by_imt_rlz: gmvs = haz_by_imt_rlz[imt, rlz]['gmv'] poes = calc._gmvs_to_haz_curve( gmvs, oq.imtls[imt], oq.investigation_time, duration) key = rsi2str(rlz.ordinal, sid, imt) result['hcurves'][key] = poes return result
[docs]def get_ruptures_by_grp(dstore): """ Extracts the dictionary `ruptures_by_grp` from the given calculator """ ruptures_by_grp = AccumDict(accum=[]) for grp in dstore['ruptures']: grp_id = int(grp[4:]) # strip 'grp-' for serial in dstore['ruptures/' + grp]: sr = dstore['ruptures/%s/%s' % (grp, serial)] ruptures_by_grp[grp_id].append(sr) return ruptures_by_grp
[docs]class EventBasedCalculator(ClassicalCalculator): """ Event based PSHA calculator generating the ground motion fields and the hazard curves from the ruptures, depending on the configuration parameters. """ pre_calculator = 'event_based_rupture' core_task = compute_gmfs_and_curves is_stochastic = True
[docs] def combine_pmaps_and_save_gmfs(self, acc, res): """ Combine the hazard curves (if any) and save the gmfs (if any) sequentially; notice that the gmfs may come from different tasks in any order. :param acc: an accumulator for the hazard curves :param res: a dictionary rlzi, imt -> [gmf_array, curves_by_imt] :returns: a new accumulator """ sav_mon = self.monitor('saving gmfs') agg_mon = self.monitor('aggregating hcurves') if res['gmfcoll'] is not None: with sav_mon: for rlz, array in res['gmfcoll'].items(): if len(array): sm_id = self.sm_id[rlz.sm_lt_path] key = 'gmf_data/sm-%04d/%04d' % (sm_id, rlz.ordinal) self.datastore.extend(key, array) slicedic = self.oqparam.imtls.slicedic with agg_mon: for key, poes in res['hcurves'].items(): rlzi, sid, imt = str2rsi(key) array = acc[rlzi].setdefault(sid, 0).array[slicedic[imt], 0] array[:] = 1. - (1. - array) * (1. - poes) sav_mon.flush() agg_mon.flush() self.datastore.flush() if 'ruptures' in res: vars(EventBasedRuptureCalculator)['save_events']( self, res['ruptures']) return acc
[docs] def gen_args(self, ruptures_by_grp): """ :param ruptures_by_grp: a dictionary of EBRupture objects :yields: the arguments for compute_gmfs_and_curves """ oq = self.oqparam monitor = self.monitor(self.core_task.__name__) monitor.oqparam = oq imts = list(oq.imtls) min_iml = calc.fix_minimum_intensity(oq.minimum_intensity, imts) self.grp_trt = rlzs_by_grp = self.rlzs_assoc.get_rlzs_by_grp_id() correl_model = oq.get_correl_model() for grp_id in ruptures_by_grp: ruptures = ruptures_by_grp[grp_id] if not ruptures: continue for block in split_in_blocks(ruptures, oq.concurrent_tasks or 1): trt = self.grp_trt[grp_id] gsims = [dic[trt] for dic in self.rlzs_assoc.gsim_by_trt] samples = self.rlzs_assoc.samples[grp_id] getter = GmfGetter(gsims, block, self.sitecol, imts, min_iml, oq.truncation_level, correl_model, samples) yield getter, rlzs_by_grp[grp_id], monitor
[docs] def execute(self): """ Run in parallel `core_task(sources, sitecol, monitor)`, by parallelizing on the ruptures according to their weight and tectonic region type. """ oq = self.oqparam if not oq.hazard_curves_from_gmfs and not oq.ground_motion_fields: return ruptures_by_grp = (self.precalc.result if self.precalc else get_ruptures_by_grp(self.datastore.parent)) if self.oqparam.ground_motion_fields: calc.check_overflow(self) self.sm_id = {sm.path: sm.ordinal for sm in} L = len(oq.imtls.array) res = parallel.Starmap( self.core_task.__func__, self.gen_args(ruptures_by_grp) ).submit_all() acc = functools.reduce(self.combine_pmaps_and_save_gmfs, res, { rlz.ordinal: ProbabilityMap(L, 1) for rlz in self.rlzs_assoc.realizations}) self.save_data_transfer(res) return acc
[docs] def post_execute(self, result): """ :param result: a dictionary (src_group_id, gsim) -> haz_curves or an empty dictionary if hazard_curves_from_gmfs is false """ oq = self.oqparam if not oq.hazard_curves_from_gmfs and not oq.ground_motion_fields: return elif oq.hazard_curves_from_gmfs: rlzs = self.rlzs_assoc.realizations # save individual curves if self.oqparam.individual_curves: for i in sorted(result): key = 'hcurves/rlz-%03d' % i if result[i]: self.datastore[key] = result[i] else:'Zero curves for %s', key) # compute and save statistics; this is done in process # we don't need to parallelize, since event based calculations # involves a "small" number of sites (<= 65,536) weights = (None if self.oqparam.number_of_logic_tree_samples else [rlz.weight for rlz in rlzs]) pstats = PmapStats(self.oqparam.quantile_hazard_curves, weights) for kind, stat in pstats.compute( self.sitecol.sids, list(result.values())): if kind == 'mean' and not self.oqparam.mean_hazard_curves: continue self.datastore['hcurves/' + kind] = stat if ('gmf_data' in self.datastore and 'nbytes' not in self.datastore['gmf_data'].attrs): self.datastore.set_nbytes('gmf_data') for sm_id in self.datastore['gmf_data']: for rlzno in self.datastore['gmf_data/' + sm_id]: self.datastore.set_nbytes( 'gmf_data/%s/%s' % (sm_id, rlzno)) if oq.compare_with_classical: # compute classical curves export_dir = os.path.join(oq.export_dir, 'cl') if not os.path.exists(export_dir): os.makedirs(export_dir) oq.export_dir = export_dir # one could also set oq.number_of_logic_tree_samples = 0 = ClassicalCalculator(oq, self.monitor) # TODO: perhaps it is possible to avoid reprocessing the source # model, however usually this is quite fast and do not dominate # the computation cl_mean_curves = get_mean_curves( eb_mean_curves = get_mean_curves(self.datastore) for imt in eb_mean_curves.dtype.names: rdiff, index = util.max_rel_diff_index( cl_mean_curves[imt], eb_mean_curves[imt]) logging.warn('Relative difference with the classical ' 'mean curves for IMT=%s: %d%% at site index %d', imt, rdiff * 100, index)
[docs]def get_mean_curves(dstore): """ Extract the mean hazard curves from the datastore, as a composite array of length nsites. """ imtls = dstore['oqparam'].imtls nsites = len(dstore['sitecol']) hcurves = dstore['hcurves'] if 'mean' in hcurves: mean = dstore['hcurves/mean'] elif len(hcurves) == 1: # there is a single realization mean = dstore['hcurves/rlz-0000'] return mean.convert(imtls, nsites)