Source code for openquake.calculators.event_based

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

import time
import os.path
import operator
import logging
import functools
import collections

import numpy

from openquake.baselib import hdf5
from openquake.baselib.general import AccumDict, group_array
from openquake.hazardlib.calc.filters import \
    filter_sites_by_distance_to_rupture
from openquake.hazardlib.calc.hazard_curve import (
    array_of_curves, ProbabilityMap)
from openquake.hazardlib import geo
from openquake.hazardlib.gsim.base import ContextMaker
from openquake.commonlib import readinput, parallel, datastore, calc
from openquake.commonlib.util import max_rel_diff_index, Rupture
from openquake.risklib.riskinput import create
from openquake.calculators import base
from openquake.calculators.classical import ClassicalCalculator

# ######################## rupture calculator ############################ #

U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
POEMAP = 1

event_dt = numpy.dtype([('eid', U32), ('ses', U32), ('occ', U32),
                        ('sample', U32)])


[docs]def get_geom(surface, is_from_fault_source, is_multi_surface): """ The following fields can be interpreted different ways, depending on the value of `is_from_fault_source`. If `is_from_fault_source` is True, each of these fields should contain a 2D numpy array (all of the same shape). Each triple of (lon, lat, depth) for a given index represents the node of a rectangular mesh. If `is_from_fault_source` is False, each of these fields should contain a sequence (tuple, list, or numpy array, for example) of 4 values. In order, the triples of (lon, lat, depth) represent top left, top right, bottom left, and bottom right corners of the the rupture's planar surface. Update: There is now a third case. If the rupture originated from a characteristic fault source with a multi-planar-surface geometry, `lons`, `lats`, and `depths` will contain one or more sets of 4 points, similar to how planar surface geometry is stored (see above). :param rupture: an instance of :class:`openquake.hazardlib.source.rupture.BaseProbabilisticRupture` :param is_from_fault_source: a boolean :param is_multi_surface: a boolean """ if is_from_fault_source: # for simple and complex fault sources, # rupture surface geometry is represented by a mesh surf_mesh = surface.get_mesh() lons = surf_mesh.lons lats = surf_mesh.lats depths = surf_mesh.depths else: if is_multi_surface: # `list` of # openquake.hazardlib.geo.surface.planar.PlanarSurface # objects: surfaces = surface.surfaces # lons, lats, and depths are arrays with len == 4*N, # where N is the number of surfaces in the # multisurface for each `corner_*`, the ordering is: # - top left # - top right # - bottom left # - bottom right lons = numpy.concatenate([x.corner_lons for x in surfaces]) lats = numpy.concatenate([x.corner_lats for x in surfaces]) depths = numpy.concatenate([x.corner_depths for x in surfaces]) else: # For area or point source, # rupture geometry is represented by a planar surface, # defined by 3D corner points lons = numpy.zeros((4)) lats = numpy.zeros((4)) depths = numpy.zeros((4)) # NOTE: It is important to maintain the order of these # corner points. TODO: check the ordering for i, corner in enumerate((surface.top_left, surface.top_right, surface.bottom_left, surface.bottom_right)): lons[i] = corner.longitude lats[i] = corner.latitude depths[i] = corner.depth return lons, lats, depths
[docs]class EBRupture(object): """ An event based rupture. It is a wrapper over a hazardlib rupture object, containing an array of site indices affected by the rupture, as well as the tags of the corresponding seismic events. """ def __init__(self, rupture, indices, events, source_id, trt_id, serial): self.rupture = rupture self.indices = indices self.events = events self.source_id = source_id self.trt_id = trt_id self.serial = serial self.weight = len(indices) * len(events) # changed in set_weight @property def etags(self): """ An array of tags for the underlying seismic events """ tags = [] for (eid, ses, occ, sampleid) in self.events: tag = 'trt=%02d~ses=%04d~src=%s~rup=%d-%02d' % ( self.trt_id, ses, self.source_id, self.serial, occ) if sampleid > 0: tag += '~sample=%d' % sampleid tags.append(tag) return numpy.array(tags) @property def eids(self): """ An array with the underlying event IDs """ return self.events['eid'] @property def multiplicity(self): """ How many times the underlying rupture occurs. """ return len(self.events)
[docs] def set_weight(self, num_rlzs_by_trt_id, num_assets_by_site_id): """ Set the weight attribute of each rupture with the formula weight = multiplicity * affected_sites * realizations :param num_rlzs_by_trt_id: dictionary, possibly empty :param num_assets_by_site_id: dictionary, possibly empty """ num_assets = sum(num_assets_by_site_id.get(sid, 1) for sid in self.indices) self.weight = (len(self.events) * num_assets * num_rlzs_by_trt_id.get(self.trt_id, 1))
[docs] def export(self, mesh): """ Yield :class:`openquake.commonlib.util.Rupture` objects, with all the attributes set, suitable for export in XML format. """ rupture = self.rupture for etag in self.etags: new = Rupture(etag, self.indices) new.mesh = mesh[self.indices] new.etag = etag new.rupture = new new.is_from_fault_source = iffs = isinstance( rupture.surface, (geo.ComplexFaultSurface, geo.SimpleFaultSurface)) new.is_multi_surface = ims = isinstance( rupture.surface, geo.MultiSurface) new.lons, new.lats, new.depths = get_geom( rupture.surface, iffs, ims) new.surface = rupture.surface new.strike = rupture.surface.get_strike() new.dip = rupture.surface.get_dip() new.rake = rupture.rake new.hypocenter = rupture.hypocenter new.tectonic_region_type = rupture.tectonic_region_type new.magnitude = new.mag = rupture.mag new.top_left_corner = None if iffs or ims else ( new.lons[0], new.lats[0], new.depths[0]) new.top_right_corner = None if iffs or ims else ( new.lons[1], new.lats[1], new.depths[1]) new.bottom_left_corner = None if iffs or ims else ( new.lons[2], new.lats[2], new.depths[2]) new.bottom_right_corner = None if iffs or ims else ( new.lons[3], new.lats[3], new.depths[3]) yield new
def __lt__(self, other): return self.serial < other.serial def __repr__(self): return '<%s #%d, trt_id=%d>' % (self.__class__.__name__, self.serial, self.trt_id)
@parallel.litetask
[docs]def compute_ruptures(sources, sitecol, siteidx, rlzs_assoc, monitor): """ :param sources: List of commonlib.source.Source tuples :param sitecol: a :class:`openquake.hazardlib.site.SiteCollection` instance :param siteidx: always equal to 0 :param rlzs_assoc: a :class:`openquake.commonlib.source.RlzsAssoc` instance :param monitor: monitor instance :returns: a dictionary trt_model_id -> [Rupture instances] """ assert siteidx == 0, ( 'siteidx can be nonzero only for the classical_tiling calculations: ' 'tiling with the EventBasedRuptureCalculator is an error') # NB: by construction each block is a non-empty list with # sources of the same trt_model_id trt_model_id = sources[0].trt_model_id oq = monitor.oqparam trt = sources[0].tectonic_region_type max_dist = oq.maximum_distance[trt] cmaker = ContextMaker(rlzs_assoc.gsims_by_trt_id[trt_model_id]) params = cmaker.REQUIRES_RUPTURE_PARAMETERS rup_data_dt = numpy.dtype( [('rupserial', U32), ('multiplicity', U16), ('numsites', U32), ('occurrence_rate', F32)] + [ (param, F32) for param in params]) eb_ruptures = [] rup_data = [] calc_times = [] rup_mon = monitor('filtering ruptures', measuremem=False) num_samples = rlzs_assoc.samples[trt_model_id] # Compute and save stochastic event sets for src in sources: t0 = time.time() s_sites = src.filter_sites_by_distance_to_source(max_dist, sitecol) if s_sites is None: continue rupture_filter = functools.partial( filter_sites_by_distance_to_rupture, integration_distance=max_dist, sites=s_sites) num_occ_by_rup = sample_ruptures( src, oq.ses_per_logic_tree_path, num_samples, rlzs_assoc.seed) # NB: the number of occurrences is very low, << 1, so it is # more efficient to filter only the ruptures that occur, i.e. # to call sample_ruptures *before* the filtering for ebr in build_eb_ruptures( src, num_occ_by_rup, rupture_filter, oq.random_seed, rup_mon): nsites = len(ebr.indices) try: rate = ebr.rupture.occurrence_rate except AttributeError: # for nonparametric sources rate = numpy.nan rc = cmaker.make_rupture_context(ebr.rupture) ruptparams = tuple(getattr(rc, param) for param in params) rup_data.append((ebr.serial, len(ebr.etags), nsites, rate) + ruptparams) eb_ruptures.append(ebr) dt = time.time() - t0 calc_times.append((src.id, dt)) res = AccumDict({trt_model_id: eb_ruptures}) res.calc_times = calc_times res.rup_data = numpy.array(rup_data, rup_data_dt) res.trt = trt return res
[docs]def sample_ruptures(src, num_ses, num_samples, seed): """ Sample the ruptures contained in the given source. :param src: a hazardlib source object :param num_ses: the number of Stochastic Event Sets to generate :param num_samples: how many samples for the given source :param seed: master seed from the job.ini file :returns: a dictionary of dictionaries rupture -> {ses_id: num_occurrences} """ # the dictionary `num_occ_by_rup` contains a dictionary # ses_id -> num_occurrences for each occurring rupture num_occ_by_rup = collections.defaultdict(AccumDict) # generating ruptures for the given source for rup_no, rup in enumerate(src.iter_ruptures()): rup.seed = src.serial[rup_no] + seed numpy.random.seed(rup.seed) for sampleid in range(num_samples): for ses_idx in range(1, num_ses + 1): num_occurrences = rup.sample_number_of_occurrences() if num_occurrences: num_occ_by_rup[rup] += { (sampleid, ses_idx): num_occurrences} rup.rup_no = rup_no + 1 return num_occ_by_rup
[docs]def build_eb_ruptures( src, num_occ_by_rup, rupture_filter, random_seed, rup_mon): """ Filter the ruptures stored in the dictionary num_occ_by_rup and yield pairs (rupture, <list of associated EBRuptures>) """ for rup in sorted(num_occ_by_rup, key=operator.attrgetter('rup_no')): with rup_mon: r_sites = rupture_filter(rup) if r_sites is None: # ignore ruptures which are far away del num_occ_by_rup[rup] # save memory continue # creating EBRuptures serial = rup.seed - random_seed + 1 events = [] for (sampleid, ses_idx), num_occ in sorted( num_occ_by_rup[rup].items()): for occ_no in range(1, num_occ + 1): # NB: the 0 below is a placeholder; the right eid will be # set later, in EventBasedRuptureCalculator.post_execute events.append((0, ses_idx, occ_no, sampleid)) if events: yield EBRupture(rup, r_sites.indices, numpy.array(events, event_dt), src.source_id, src.trt_model_id, serial)
@base.calculators.add('event_based_rupture')
[docs]class EventBasedRuptureCalculator(ClassicalCalculator): """ Event based PSHA calculator generating the ruptures only """ core_task = compute_ruptures etags = datastore.persistent_attribute('etags') is_stochastic = True
[docs] def init(self): """ Set the random seed passed to the SourceManager and the minimum_intensity dictionary. """ oq = self.oqparam self.random_seed = oq.random_seed self.rlzs_assoc = self.datastore['csm_info'].get_rlzs_assoc() self.min_iml = calc.fix_minimum_intensity( oq.minimum_intensity, oq.imtls) self.rup_data = {}
[docs] def count_eff_ruptures(self, ruptures_by_trt_id, trt_model): """ Returns the number of ruptures sampled in the given trt_model. :param ruptures_by_trt_id: a dictionary with key trt_id :param trt_model: a TrtModel instance """ return sum( len(ruptures) for trt_id, ruptures in ruptures_by_trt_id.items() if trt_model.id == trt_id)
[docs] def zerodict(self): """ Initial accumulator, a dictionary (trt_id, gsim) -> curves """ zd = AccumDict() zd.calc_times = [] zd.eff_ruptures = AccumDict() return zd
[docs] def agg_dicts(self, acc, ruptures_by_trt_id): """ Aggregate dictionaries of hazard curves by updating the accumulator. :param acc: accumulator dictionary :param ruptures_by_trt_id: a nested dictionary trt_id -> ProbabilityMap """ with self.monitor('aggregate curves', autoflush=True): if hasattr(ruptures_by_trt_id, 'calc_times'): acc.calc_times.extend(ruptures_by_trt_id.calc_times) if hasattr(ruptures_by_trt_id, 'eff_ruptures'): acc.eff_ruptures += ruptures_by_trt_id.eff_ruptures acc += ruptures_by_trt_id if len(ruptures_by_trt_id): trt = ruptures_by_trt_id.trt try: dset = self.rup_data[trt] except KeyError: dset = self.rup_data[trt] = self.datastore.create_dset( 'rup_data/' + trt, ruptures_by_trt_id.rup_data.dtype) dset.extend(ruptures_by_trt_id.rup_data) self.datastore.flush() return acc
[docs] def post_execute(self, result): """ Save the SES collection """ with self.monitor('saving ruptures', autoflush=True): # ordering ruptures sescollection = [] for trt_id in result: for ebr in result[trt_id]: sescollection.append(ebr) sescollection.sort(key=operator.attrgetter('serial')) etags = numpy.concatenate([ebr.etags for ebr in sescollection]) self.etags = numpy.array(etags, (bytes, 100)) nr = len(sescollection) logging.info('Saving SES collection with %d ruptures, %d events', nr, len(etags)) eid = 0 for ebr in sescollection: eids = [] for event in ebr.events: event['eid'] = eid eids.append(eid) eid += 1 self.datastore['sescollection/%s' % ebr.serial] = ebr self.datastore.set_nbytes('sescollection') for dset in self.rup_data.values(): if len(dset.dset): numsites = dset.dset['numsites'] multiplicity = dset.dset['multiplicity'] spr = numpy.average(numsites, weights=multiplicity) mul = numpy.average(multiplicity, weights=numsites) self.datastore.set_attrs(dset.name, sites_per_rupture=spr, multiplicity=mul) if self.rup_data: self.datastore.set_nbytes('rup_data')
# ######################## GMF calculator ############################ # @parallel.litetask
[docs]def compute_gmfs_and_curves(eb_ruptures, sitecol, imts, rlzs_assoc, min_iml, monitor): """ :param eb_ruptures: a list of blocks of EBRuptures of the same SESCollection :param sitecol: a :class:`openquake.hazardlib.site.SiteCollection` instance :param imts: a list of IMT string :param rlzs_assoc: a RlzsAssoc instance :param monitor: a Monitor instance :returns: a dictionary (rlzi, imt) -> [gmfarray, haz_curves] """ oq = monitor.oqparam # NB: by construction each block is a non-empty list with # ruptures of the same trt_model_id trunc_level = oq.truncation_level correl_model = readinput.get_correl_model(oq) gmfadict = create( calc.GmfColl, eb_ruptures, sitecol, imts, rlzs_assoc, trunc_level, correl_model, min_iml, monitor).by_rlzi() result = {rlzi: [gmfadict[rlzi], None] if oq.ground_motion_fields else [None, None] for rlzi in gmfadict} if oq.hazard_curves_from_gmfs: with monitor('bulding hazard curves', measuremem=False): duration = oq.investigation_time * oq.ses_per_logic_tree_path for rlzi in gmfadict: gmvs_by_sid = group_array(gmfadict[rlzi], 'sid') result[rlzi][POEMAP] = calc.gmvs_to_poe_map( gmvs_by_sid, oq.imtls, oq.investigation_time, duration) return result
@base.calculators.add('event_based')
[docs]class EventBasedCalculator(ClassicalCalculator): """ Event based PSHA calculator generating the ground motion fields and the hazard curves from the ruptures, depending on the configuration parameters. """ pre_calculator = 'event_based_rupture' core_task = compute_gmfs_and_curves is_stochastic = True
[docs] def pre_execute(self): """ Read the precomputed ruptures (or compute them on the fly) and prepare some empty files in the export directory to store the gmfs (if any). If there were pre-existing files, they will be erased. """ super(EventBasedCalculator, self).pre_execute() rlzs_by_tr_id = self.rlzs_assoc.get_rlzs_by_trt_id() num_rlzs = {t: len(rlzs) for t, rlzs in rlzs_by_tr_id.items()} self.sesruptures = [] for serial in self.datastore['sescollection']: sr = self.datastore['sescollection/' + serial] sr.set_weight(num_rlzs, {}) self.sesruptures.append(sr) self.sesruptures.sort(key=operator.attrgetter('serial')) if self.oqparam.ground_motion_fields: calc.check_overflow(self) for rlz in self.rlzs_assoc.realizations: self.datastore.create_dset( 'gmf_data/%04d' % rlz.ordinal, calc.gmv_dt)
[docs] def combine_curves_and_save_gmfs(self, acc, res): """ Combine the hazard curves (if any) and save the gmfs (if any) sequentially; notice that the gmfs may come from different tasks in any order. :param acc: an accumulator for the hazard curves :param res: a dictionary rlzi, imt -> [gmf_array, curves_by_imt] :returns: a new accumulator """ sav_mon = self.monitor('saving gmfs') agg_mon = self.monitor('aggregating hcurves') for rlzi in res: gmfa, curves = res[rlzi] if gmfa is not None: with sav_mon: hdf5.extend(self.datastore['gmf_data/%04d' % rlzi], gmfa) if curves is not None: # aggregate hcurves with agg_mon: self.agg_dicts(acc, {rlzi: curves}) sav_mon.flush() agg_mon.flush() self.datastore.flush() return acc
[docs] def execute(self): """ Run in parallel `core_task(sources, sitecol, monitor)`, by parallelizing on the ruptures according to their weight and tectonic region type. """ oq = self.oqparam if not oq.hazard_curves_from_gmfs and not oq.ground_motion_fields: return monitor = self.monitor(self.core_task.__name__) monitor.oqparam = oq min_iml = calc.fix_minimum_intensity( oq.minimum_intensity, oq.imtls) acc = parallel.apply_reduce( self.core_task.__func__, (self.sesruptures, self.sitecol, oq.imtls, self.rlzs_assoc, min_iml, monitor), concurrent_tasks=self.oqparam.concurrent_tasks, agg=self.combine_curves_and_save_gmfs, acc=ProbabilityMap(), key=operator.attrgetter('trt_id'), weight=operator.attrgetter('weight')) if oq.ground_motion_fields: self.datastore.set_nbytes('gmf_data') return acc
[docs] def post_execute(self, result): """ :param result: a dictionary (trt_model_id, gsim) -> haz_curves or an empty dictionary if hazard_curves_from_gmfs is false """ oq = self.oqparam if not oq.hazard_curves_from_gmfs and not oq.ground_motion_fields: return elif oq.hazard_curves_from_gmfs: rlzs = self.rlzs_assoc.realizations dic = {} for rlzi in result: dic[rlzs[rlzi]] = array_of_curves( result[rlzi], len(self.sitecol), oq.imtls) self.save_curves(dic) if oq.compare_with_classical: # compute classical curves export_dir = os.path.join(oq.export_dir, 'cl') if not os.path.exists(export_dir): os.makedirs(export_dir) oq.export_dir = export_dir # one could also set oq.number_of_logic_tree_samples = 0 self.cl = ClassicalCalculator(oq, self.monitor) # TODO: perhaps it is possible to avoid reprocessing the source # model, however usually this is quite fast and do not dominate # the computation self.cl.run(hazard_calculation_id=self.datastore.calc_id) for imt in self.mean_curves.dtype.fields: rdiff, index = max_rel_diff_index( self.cl.mean_curves[imt], self.mean_curves[imt]) logging.warn('Relative difference with the classical ' 'mean curves for IMT=%s: %d%% at site index %d', imt, rdiff * 100, index)