Source code for openquake.calculators.event_based

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2017 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

import time
import os.path
import operator
import logging
import functools
import collections

import numpy

from openquake.baselib.python3compat import zip
from openquake.baselib.general import AccumDict, split_in_blocks
from openquake.hazardlib.gsim.base import ContextMaker, FarAwayRupture
from openquake.hazardlib.probability_map import ProbabilityMap, PmapStats
from openquake.hazardlib.geo.surface import PlanarSurface
from openquake.risklib.riskinput import GmfGetter, str2rsi, rsi2str
from openquake.baselib import parallel
from openquake.commonlib import calc, util, datastore
from openquake.calculators import base
from openquake.calculators.classical import ClassicalCalculator, PSHACalculator

U8 = numpy.uint8
U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64
TWO16 = 2 ** 16

# ######################## rupture calculator ############################ #


[docs]def get_seq_ids(task_no, num_ids): """ Get an array of sequential indices for the given task. :param task_no: the number of the task :param num_ids: the number of indices to return >>> list(get_seq_ids(1, 3)) [65536, 65537, 65538] """ assert 0 <= task_no < TWO16, task_no assert 0 <= num_ids < TWO16, num_ids start = task_no * TWO16 return numpy.arange(start, start + num_ids, dtype=U32)
[docs]def compute_ruptures(sources, src_filter, gsims, monitor): """ :param sources: List of commonlib.source.Source tuples :param src_filter: a source site filter :param gsims: a list of GSIMs for the current tectonic region model :param monitor: monitor instance :returns: a dictionary src_group_id -> [Rupture instances] """ # NB: by construction each block is a non-empty list with # sources of the same src_group_id grp_id = sources[0].src_group_id trt = sources[0].tectonic_region_type eb_ruptures = [] calc_times = [] rup_mon = monitor('filtering ruptures', measuremem=False) num_samples = monitor.samples num_events = 0 cmaker = ContextMaker(gsims, src_filter.integration_distance) # Compute and save stochastic event sets for src, s_sites in src_filter(sources): t0 = time.time() if s_sites is None: continue num_occ_by_rup = sample_ruptures( src, monitor.ses_per_logic_tree_path, num_samples, monitor.seed) # NB: the number of occurrences is very low, << 1, so it is # more efficient to filter only the ruptures that occur, i.e. # to call sample_ruptures *before* the filtering for ebr in build_eb_ruptures( src, num_occ_by_rup, cmaker, s_sites, monitor.seed, rup_mon): eb_ruptures.append(ebr) num_events += ebr.multiplicity dt = time.time() - t0 calc_times.append((src.id, dt)) eids = get_seq_ids(monitor.task_no, num_events) start = 0 for ebr in eb_ruptures: m = ebr.multiplicity ebr.events['eid'] = eids[start: start + m] start += m res = AccumDict({grp_id: eb_ruptures}) res.num_events = num_events res.calc_times = calc_times if gsims: # we can pass an empty gsims list to disable saving of rup_data res.rup_data = { grp_id: calc.RuptureData(trt, gsims).to_array(eb_ruptures)} return res
[docs]def sample_ruptures(src, num_ses, num_samples, seed): """ Sample the ruptures contained in the given source. :param src: a hazardlib source object :param num_ses: the number of Stochastic Event Sets to generate :param num_samples: how many samples for the given source :param seed: master seed from the job.ini file :returns: a dictionary of dictionaries rupture -> {ses_id: num_occurrences} """ # the dictionary `num_occ_by_rup` contains a dictionary # ses_id -> num_occurrences for each occurring rupture num_occ_by_rup = collections.defaultdict(AccumDict) # generating ruptures for the given source for rup_no, rup in enumerate(src.iter_ruptures()): rup.seed = src.serial[rup_no] + seed numpy.random.seed(rup.seed) for sampleid in range(num_samples): for ses_idx in range(1, num_ses + 1): num_occurrences = rup.sample_number_of_occurrences() if num_occurrences: num_occ_by_rup[rup] += { (sampleid, ses_idx): num_occurrences} rup.rup_no = rup_no + 1 return num_occ_by_rup
[docs]def build_eb_ruptures( src, num_occ_by_rup, cmaker, s_sites, random_seed, rup_mon): """ Filter the ruptures stored in the dictionary num_occ_by_rup and yield pairs (rupture, <list of associated EBRuptures>) """ for rup in sorted(num_occ_by_rup, key=operator.attrgetter('rup_no')): with rup_mon: try: r_sites, dists = cmaker.get_closest(s_sites, rup) except FarAwayRupture: # ignore ruptures which are far away del num_occ_by_rup[rup] # save memory continue # creating EBRuptures serial = rup.seed - random_seed + 1 events = [] for (sampleid, ses_idx), num_occ in sorted( num_occ_by_rup[rup].items()): for occ_no in range(1, num_occ + 1): # NB: the 0 below is a placeholder; the right eid will be # set a bit later, in compute_ruptures events.append((0, ses_idx, occ_no, sampleid)) if events: yield calc.EBRupture( rup, r_sites.indices, numpy.array(events, calc.event_dt), src.source_id, src.src_group_id, serial)
def _count(ruptures): if isinstance(ruptures, int): # passed the number of ruptures return ruptures return sum(ebr.multiplicity for ebr in ruptures) @base.calculators.add('event_based_rupture')
[docs]class EventBasedRuptureCalculator(PSHACalculator): """ Event based PSHA calculator generating the ruptures only """ core_task = compute_ruptures is_stochastic = True
[docs] def init(self): """ Set the random seed passed to the SourceManager and the minimum_intensity dictionary. """ oq = self.oqparam self.rlzs_assoc = self.datastore['csm_info'].get_rlzs_assoc() self.min_iml = calc.fix_minimum_intensity( oq.minimum_intensity, oq.imtls)
[docs] def count_eff_ruptures(self, ruptures_by_grp_id, src_group): """ Returns the number of ruptures sampled in the given src_group. :param ruptures_by_grp_id: a dictionary with key grp_id :param src_group: a SourceGroup instance """ nr = sum( len(ruptures) for grp_id, ruptures in ruptures_by_grp_id.items() if src_group.id == grp_id) return nr
[docs] def zerodict(self): """ Initial accumulator, a dictionary (grp_id, gsim) -> curves """ zd = AccumDict() zd.calc_times = [] zd.eff_ruptures = AccumDict() self.sm_by_grp = self.csm.info.get_sm_by_grp() self.grp_trt = self.csm.info.grp_trt() return zd
[docs] def agg_dicts(self, acc, ruptures_by_grp_id): """ Accumulate dictionaries of ruptures and populate the `events` dataset in the datastore. :param acc: accumulator dictionary :param ruptures_by_grp_id: a nested dictionary grp_id -> ruptures """ if hasattr(ruptures_by_grp_id, 'calc_times'): acc.calc_times.extend(ruptures_by_grp_id.calc_times) if hasattr(ruptures_by_grp_id, 'eff_ruptures'): acc.eff_ruptures += ruptures_by_grp_id.eff_ruptures acc += ruptures_by_grp_id self.save_events(ruptures_by_grp_id) return acc
[docs] def save_events(self, ruptures_by_grp_id): """Extend the 'events' dataset with the given ruptures""" with self.monitor('saving ruptures', autoflush=True): for grp_id, ebrs in ruptures_by_grp_id.items(): events = [] sm_id = self.sm_by_grp[grp_id] for ebr in ebrs: for event in ebr.events: rec = (event['eid'], ebr.serial, 0, # year to be set event['ses'], event['occ'], event['sample'], grp_id) events.append(rec) if self.oqparam.save_ruptures: key = 'ruptures/grp-%02d/%s' % (grp_id, ebr.serial) self.datastore[key] = ebr if events: ev = 'events/sm-%04d' % sm_id self.datastore.extend( ev, numpy.array(events, calc.stored_event_dt)) # save rup_data if hasattr(ruptures_by_grp_id, 'rup_data'): for grp_id, data in sorted( ruptures_by_grp_id.rup_data.items()): if len(data): key = 'rup_data/grp-%02d' % grp_id self.rup_data = self.datastore.extend(key, data)
[docs] def post_execute(self, result): """ Save the SES collection """ num_events = sum(_count(ruptures) for ruptures in result.values()) if num_events == 0: raise RuntimeError( 'No seismic events! Perhaps the investigation time is too ' 'small or the maximum_distance is too small') logging.info('Setting %d event years', num_events) with self.monitor('setting event years', measuremem=True, autoflush=True): inv_time = int(self.oqparam.investigation_time) numpy.random.seed(self.oqparam.random_seed) for sm in sorted(self.datastore['events']): set_random_years(self.datastore, 'events/' + sm, inv_time) if 'ruptures' in self.datastore: self.datastore.set_nbytes('ruptures') self.datastore.set_nbytes('events') if 'rup_data' not in self.datastore: return for dset in self.datastore['rup_data'].values(): if len(dset): numsites = dset['numsites'] multiplicity = dset['multiplicity'] spr = numpy.average(numsites, weights=multiplicity) mul = numpy.average(multiplicity, weights=numsites) self.datastore.set_attrs( dset.name, sites_per_rupture=spr, multiplicity=mul, nbytes=datastore.get_nbytes(dset)) self.datastore.set_nbytes('rup_data')
[docs]def set_random_years(dstore, events_sm, investigation_time): """ Sort the `events` array and attach year labels sensitive to the SES ordinal and the investigation time. """ events = dstore[events_sm].value sorted_events = sorted(tuple(event)[1:] for event in events) years = numpy.random.choice(investigation_time, len(events)) + 1 year_of = dict(zip(sorted_events, years)) for event in events: idx = event['ses'] - 1 # starts from 0 event['year'] = idx * investigation_time + year_of[tuple(event)[1:]] dstore[events_sm] = events
# ######################## GMF calculator ############################ #
[docs]def compute_gmfs_and_curves(getter, rlzs, monitor): """ :param getter: a GmfGetter instance :param rlzs: realizations for the current source group :param monitor: a Monitor instance :returns: a dictionary with keys gmfcoll and hcurves """ oq = monitor.oqparam with monitor('making contexts', measuremem=True): getter.init() haz = {sid: {} for sid in getter.sids} gmfcoll = {} # rlz -> gmfa for rlz in rlzs: gmfcoll[rlz] = [] for i, gmvdict in enumerate(getter(rlz)): if gmvdict: sid = getter.sids[i] for imti, imt in enumerate(getter.imts): if oq.hazard_curves_from_gmfs: try: gmv = gmvdict[imt] except KeyError: # no gmv for the given imt, this may happen pass else: haz[sid][imt, rlz] = gmv for rec in gmvdict.get(imt, []): gmfcoll[rlz].append( (sid, rec['eid'], imti, rec['gmv'])) for rlz in gmfcoll: gmfcoll[rlz] = numpy.array(gmfcoll[rlz], calc.gmv_dt) result = dict(gmfcoll=gmfcoll if oq.ground_motion_fields else None, hcurves={}) if oq.hazard_curves_from_gmfs: with monitor('building hazard curves', measuremem=False): duration = oq.investigation_time * oq.ses_per_logic_tree_path for sid, haz_by_imt_rlz in haz.items(): for imt, rlz in haz_by_imt_rlz: gmvs = haz_by_imt_rlz[imt, rlz]['gmv'] poes = calc._gmvs_to_haz_curve( gmvs, oq.imtls[imt], oq.investigation_time, duration) key = rsi2str(rlz.ordinal, sid, imt) result['hcurves'][key] = poes return result
[docs]def get_ruptures_by_grp(dstore): """ Extracts the dictionary `ruptures_by_grp` from the given calculator """ n = 0 for grp in dstore['ruptures']: n += len(dstore['ruptures/' + grp]) logging.info('Reading %d ruptures from the datastore', n) # disable check on PlaceSurface to support UCERF ruptures PlanarSurface.IMPERFECT_RECTANGLE_TOLERANCE = numpy.inf ruptures_by_grp = AccumDict(accum=[]) for grp in dstore['ruptures']: grp_id = int(grp[4:]) # strip 'grp-' for serial in dstore['ruptures/' + grp]: sr = dstore['ruptures/%s/%s' % (grp, serial)] ruptures_by_grp[grp_id].append(sr) return ruptures_by_grp
@base.calculators.add('event_based')
[docs]class EventBasedCalculator(ClassicalCalculator): """ Event based PSHA calculator generating the ground motion fields and the hazard curves from the ruptures, depending on the configuration parameters. """ pre_calculator = 'event_based_rupture' core_task = compute_gmfs_and_curves is_stochastic = True
[docs] def combine_pmaps_and_save_gmfs(self, acc, res): """ Combine the hazard curves (if any) and save the gmfs (if any) sequentially; notice that the gmfs may come from different tasks in any order. :param acc: an accumulator for the hazard curves :param res: a dictionary rlzi, imt -> [gmf_array, curves_by_imt] :returns: a new accumulator """ sav_mon = self.monitor('saving gmfs') agg_mon = self.monitor('aggregating hcurves') if res['gmfcoll'] is not None: with sav_mon: for rlz, array in res['gmfcoll'].items(): if len(array): sm_id = self.sm_id[rlz.sm_lt_path] key = 'gmf_data/sm-%04d/%04d' % (sm_id, rlz.ordinal) self.datastore.extend(key, array) slicedic = self.oqparam.imtls.slicedic with agg_mon: for key, poes in res['hcurves'].items(): rlzi, sid, imt = str2rsi(key) array = acc[rlzi].setdefault(sid, 0).array[slicedic[imt], 0] array[:] = 1. - (1. - array) * (1. - poes) sav_mon.flush() agg_mon.flush() self.datastore.flush() if 'ruptures' in res: vars(EventBasedRuptureCalculator)['save_events']( self, res['ruptures']) return acc
[docs] def gen_args(self, ruptures_by_grp): """ :param ruptures_by_grp: a dictionary of EBRupture objects :yields: the arguments for compute_gmfs_and_curves """ oq = self.oqparam monitor = self.monitor(self.core_task.__name__) monitor.oqparam = oq imts = list(oq.imtls) min_iml = calc.fix_minimum_intensity(oq.minimum_intensity, imts) self.grp_trt = self.csm.info.grp_trt() rlzs_by_grp = self.rlzs_assoc.get_rlzs_by_grp_id() correl_model = oq.get_correl_model() for grp_id in ruptures_by_grp: ruptures = ruptures_by_grp[grp_id] if not ruptures: continue for block in split_in_blocks(ruptures, oq.concurrent_tasks or 1): trt = self.grp_trt[grp_id] gsims = [dic[trt] for dic in self.rlzs_assoc.gsim_by_trt] samples = self.rlzs_assoc.samples[grp_id] getter = GmfGetter(gsims, block, self.sitecol, imts, min_iml, oq.truncation_level, correl_model, samples) yield getter, rlzs_by_grp[grp_id], monitor
[docs] def execute(self): """ Run in parallel `core_task(sources, sitecol, monitor)`, by parallelizing on the ruptures according to their weight and tectonic region type. """ oq = self.oqparam if not oq.hazard_curves_from_gmfs and not oq.ground_motion_fields: return ruptures_by_grp = (self.precalc.result if self.precalc else get_ruptures_by_grp(self.datastore.parent)) if self.oqparam.ground_motion_fields: calc.check_overflow(self) self.sm_id = {tuple(sm.path): sm.ordinal for sm in self.csm.info.source_models} L = len(oq.imtls.array) res = parallel.Starmap( self.core_task.__func__, self.gen_args(ruptures_by_grp) ).submit_all() acc = functools.reduce(self.combine_pmaps_and_save_gmfs, res, { rlz.ordinal: ProbabilityMap(L, 1) for rlz in self.rlzs_assoc.realizations}) self.save_data_transfer(res) return acc
[docs] def post_execute(self, result): """ :param result: a dictionary (src_group_id, gsim) -> haz_curves or an empty dictionary if hazard_curves_from_gmfs is false """ oq = self.oqparam if not oq.hazard_curves_from_gmfs and not oq.ground_motion_fields: return elif oq.hazard_curves_from_gmfs: rlzs = self.rlzs_assoc.realizations # save individual curves if self.oqparam.individual_curves: for i in sorted(result): key = 'hcurves/rlz-%03d' % i if result[i]: self.datastore[key] = result[i] else: logging.info('Zero curves for %s', key) # compute and save statistics; this is done in process # we don't need to parallelize, since event based calculations # involves a "small" number of sites (<= 65,536) weights = (None if self.oqparam.number_of_logic_tree_samples else [rlz.weight for rlz in rlzs]) pstats = PmapStats(self.oqparam.quantile_hazard_curves, weights) for kind, stat in pstats.compute( self.sitecol.sids, list(result.values())): if kind == 'mean' and not self.oqparam.mean_hazard_curves: continue self.datastore['hcurves/' + kind] = stat if ('gmf_data' in self.datastore and 'nbytes' not in self.datastore['gmf_data'].attrs): self.datastore.set_nbytes('gmf_data') for sm_id in self.datastore['gmf_data']: for rlzno in self.datastore['gmf_data/' + sm_id]: self.datastore.set_nbytes( 'gmf_data/%s/%s' % (sm_id, rlzno)) if oq.compare_with_classical: # compute classical curves export_dir = os.path.join(oq.export_dir, 'cl') if not os.path.exists(export_dir): os.makedirs(export_dir) oq.export_dir = export_dir # one could also set oq.number_of_logic_tree_samples = 0 self.cl = ClassicalCalculator(oq, self.monitor) # TODO: perhaps it is possible to avoid reprocessing the source # model, however usually this is quite fast and do not dominate # the computation self.cl.run(close=False) cl_mean_curves = get_mean_curves(self.cl.datastore) eb_mean_curves = get_mean_curves(self.datastore) for imt in eb_mean_curves.dtype.names: rdiff, index = util.max_rel_diff_index( cl_mean_curves[imt], eb_mean_curves[imt]) logging.warn('Relative difference with the classical ' 'mean curves for IMT=%s: %d%% at site index %d', imt, rdiff * 100, index)
[docs]def get_mean_curves(dstore): """ Extract the mean hazard curves from the datastore, as a composite array of length nsites. """ imtls = dstore['oqparam'].imtls nsites = len(dstore['sitecol']) hcurves = dstore['hcurves'] if 'mean' in hcurves: mean = dstore['hcurves/mean'] elif len(hcurves) == 1: # there is a single realization mean = dstore['hcurves/rlz-0000'] return mean.convert(imtls, nsites)