Source code for openquake.calculators.classical

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2017 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

from __future__ import division
import logging
import operator
import collections
from functools import partial, reduce
import numpy

from openquake.baselib import hdf5, parallel
from openquake.baselib.general import AccumDict, block_splitter
from openquake.hazardlib import sourceconverter
from openquake.hazardlib.geo.utils import get_spherical_bounding_box
from openquake.hazardlib.geo.utils import get_longitudinal_extent
from openquake.hazardlib.geo.geodetic import npoints_between
from openquake.hazardlib.calc.hazard_curve import (
    pmap_from_grp, ProbabilityMap)
from openquake.hazardlib.probability_map import PmapStats
from openquake.commonlib import datastore, source, calc
from openquake.calculators import base

U16 = numpy.uint16
F32 = numpy.float32
F64 = numpy.float64

HazardCurve = collections.namedtuple('HazardCurve', 'location poes')


[docs]def split_filter_source(src, sites, src_filter, random_seed): """ :param src: an heavy source :param sites: sites affected by the source :param src_filter: a SourceFilter instance :random_seed: used only for event based calculations :returns: a list of split sources """ split_sources = [] start = 0 for split in sourceconverter.split_source(src): if random_seed: nr = split.num_ruptures split.serial = src.serial[start:start + nr] start += nr if src_filter.affected(split) is not None: split_sources.append(split) return split_sources
[docs]class BBdict(AccumDict): """ A serializable dictionary containing bounding box information """ dt = numpy.dtype([('lt_model_id', U16), ('site_id', U16), ('min_dist', F64), ('max_dist', F64), ('east', F64), ('west', F64), ('south', F64), ('north', F64)]) def __toh5__(self): rows = [] for lt_model_id, site_id in self: bb = self[lt_model_id, site_id] rows.append((lt_model_id, site_id, bb.min_dist, bb.max_dist, bb.east, bb.west, bb.south, bb.north)) return numpy.array(rows, self.dt), {} def __fromh5__(self, array, attrs): for row in array: lt_model_id = row['lt_model_id'] site_id = row['site_id'] bb = BoundingBox(lt_model_id, site_id) bb.min_dist = row['min_dist'] bb.max_dist = row['max_dist'] bb.east = row['east'] bb.west = row['west'] bb.north = row['north'] bb.south = row['south'] self[lt_model_id, site_id] = bb
# this is needed for the disaggregation
[docs]class BoundingBox(object): """ A class to store the bounding box in distances, longitudes and magnitudes, given a source model and a site. This is used for disaggregation calculations. The goal is to determine the minimum and maximum distances of the ruptures generated from the model from the site; moreover the maximum and minimum longitudes and magnitudes are stored, by taking in account the international date line. """ def __init__(self, lt_model_id, site_id): self.lt_model_id = lt_model_id self.site_id = site_id self.min_dist = self.max_dist = 0 self.east = self.west = self.south = self.north = 0
[docs] def update(self, dists, lons, lats): """ Compare the current bounding box with the value in the arrays dists, lons, lats and enlarge it if needed. :param dists: a sequence of distances :param lons: a sequence of longitudes :param lats: a sequence of latitudes """ if self.min_dist: dists = [self.min_dist, self.max_dist] + dists if self.west: lons = [self.west, self.east] + lons if self.south: lats = [self.south, self.north] + lats self.min_dist, self.max_dist = min(dists), max(dists) self.west, self.east, self.north, self.south = \ get_spherical_bounding_box(lons, lats)
[docs] def update_bb(self, bb): """ Compare the current bounding box with the given bounding box and enlarge it if needed. :param bb: an instance of :class: `openquake.engine.calculators.hazard.classical.core.BoundingBox` """ if bb: # the given bounding box must be non-empty self.update([bb.min_dist, bb.max_dist], [bb.west, bb.east], [bb.south, bb.north])
[docs] def bins_edges(self, dist_bin_width, coord_bin_width): """ Define bin edges for disaggregation histograms, from the bin data collected from the ruptures. :param dists: array of distances from the ruptures :param lons: array of longitudes from the ruptures :param lats: array of latitudes from the ruptures :param dist_bin_width: distance_bin_width from job.ini :param coord_bin_width: coordinate_bin_width from job.ini """ dist_edges = dist_bin_width * numpy.arange( int(self.min_dist / dist_bin_width), int(numpy.ceil(self.max_dist / dist_bin_width) + 1)) west = numpy.floor(self.west / coord_bin_width) * coord_bin_width east = numpy.ceil(self.east / coord_bin_width) * coord_bin_width lon_extent = get_longitudinal_extent(west, east) lon_edges, _, _ = npoints_between( west, 0, 0, east, 0, 0, numpy.round(lon_extent / coord_bin_width) + 1) lat_edges = coord_bin_width * numpy.arange( int(numpy.floor(self.south / coord_bin_width)), int(numpy.ceil(self.north / coord_bin_width) + 1)) return dist_edges, lon_edges, lat_edges
def __bool__(self): """ True if the bounding box is non empty. """ return bool(self.max_dist - self.min_dist or self.west - self.east or self.north - self.south) __nonzero__ = __bool__
[docs]def classical(sources, src_filter, gsims, monitor): """ :param sources: a non-empty sequence of sources of homogeneous tectonic region type :param src_filter: source filter :param gsims: a list of GSIMs for the current tectonic region type :param monitor: a monitor instance :returns: an AccumDict rlz -> curves """ truncation_level = monitor.truncation_level imtls = monitor.imtls src_group_id = sources[0].src_group_id # sanity check: the src_group must be the same for all sources for src in sources[1:]: assert src.src_group_id == src_group_id if monitor.disagg: sm_id = monitor.sm_id bbs = [BoundingBox(sm_id, sid) for sid in src_filter.sitecol.sids] else: bbs = [] pmap = pmap_from_grp( sources, src_filter, imtls, gsims, truncation_level, bbs=bbs, monitor=monitor) pmap.bbs = bbs pmap.grp_id = src_group_id return pmap
[docs]def saving_sources_by_task(iterargs, dstore): """ Yield the iterargs again by populating 'task_info/source_ids' """ source_ids = [] for args in iterargs: source_ids.append(' ' .join(src.source_id for src in args[0])) yield args dstore['task_sources'] = numpy.array(source_ids, hdf5.vstr)
@base.calculators.add('psha')
[docs]class PSHACalculator(base.HazardCalculator): """ Classical PSHA calculator """ core_task = classical source_info = datastore.persistent_attribute('source_info')
[docs] def agg_dicts(self, acc, pmap): """ Aggregate dictionaries of hazard curves by updating the accumulator. :param acc: accumulator dictionary :param pmap: a ProbabilityMap """ with self.monitor('aggregate curves', autoflush=True): for src_id, nsites, calc_time in pmap.calc_times: src_id = src_id.split(':', 1)[0] info = self.infos[pmap.grp_id, src_id] info.calc_time += calc_time info.num_sites = max(info.num_sites, nsites) info.num_split += 1 acc.eff_ruptures += pmap.eff_ruptures for bb in getattr(pmap, 'bbs', []): # for disaggregation acc.bb_dict[bb.lt_model_id, bb.site_id].update_bb(bb) acc[pmap.grp_id] |= pmap self.datastore.flush() return acc
[docs] def count_eff_ruptures(self, result_dict, src_group): """ Returns the number of ruptures in the src_group (after filtering) or 0 if the src_group has been filtered away. :param result_dict: a dictionary with keys (grp_id, gsim) :param src_group: a SourceGroup instance """ return result_dict.eff_ruptures.get(src_group.id, 0)
[docs] def zerodict(self): """ Initial accumulator, a dict grp_id -> ProbabilityMap(L, G) """ zd = AccumDict() num_levels = len(self.oqparam.imtls.array) for grp in self.csm.src_groups: num_gsims = len(self.rlzs_assoc.gsims_by_grp_id[grp.id]) zd[grp.id] = ProbabilityMap(num_levels, num_gsims) zd.calc_times = [] zd.eff_ruptures = AccumDict() # grp_id -> eff_ruptures zd.bb_dict = BBdict() if self.oqparam.poes_disagg or self.oqparam.iml_disagg: for sid in self.sitecol.sids: for smodel in self.csm.source_models: zd.bb_dict[smodel.ordinal, sid] = BoundingBox( smodel.ordinal, sid) return zd
[docs] def execute(self): """ Run in parallel `core_task(sources, sitecol, monitor)`, by parallelizing on the sources according to their weight and tectonic region type. """ oq = self.oqparam monitor = self.monitor.new( self.core_task.__name__, truncation_level=oq.truncation_level, imtls=oq.imtls, maximum_distance=oq.maximum_distance, disagg=oq.poes_disagg or oq.iml_disagg, ses_per_logic_tree_path=oq.ses_per_logic_tree_path, seed=oq.random_seed) with self.monitor('managing sources', autoflush=True): src_groups = list(self.csm.src_groups) iterargs = saving_sources_by_task( self.gen_args(src_groups, oq, monitor), self.datastore) res = parallel.Starmap( self.core_task.__func__, iterargs).submit_all() acc = reduce(self.agg_dicts, res, self.zerodict()) self.save_data_transfer(res) with self.monitor('store source_info', autoflush=True): self.store_source_info(self.infos) self.rlzs_assoc = self.csm.info.get_rlzs_assoc( partial(self.count_eff_ruptures, acc)) self.datastore['csm_info'] = self.csm.info return acc
[docs] def gen_args(self, src_groups, oq, monitor): """ Used in the case of large source model logic trees. :param src_groups: a list of SourceGroup instances :param oq: a :class:`openquake.commonlib.oqvalidation.OqParam` instance :param monitor: a :class:`openquake.baselib.performance.Monitor` :yields: (sources, sites, gsims, monitor) tuples """ ngroups = len(src_groups) maxweight = self.csm.get_maxweight(oq.concurrent_tasks) logging.info('Using a maxweight of %d', maxweight) nheavy = nlight = 0 self.infos = {} for sg in src_groups: logging.info('Sending source group #%d of %d (%s, %d sources)', sg.id + 1, ngroups, sg.trt, len(sg.sources)) gsims = self.rlzs_assoc.gsims_by_grp_id[sg.id] if oq.poes_disagg or oq.iml_disagg: # only for disaggregation monitor.sm_id = self.rlzs_assoc.sm_ids[sg.id] monitor.seed = self.rlzs_assoc.seed monitor.samples = self.rlzs_assoc.samples[sg.id] light = [src for src in sg.sources if src.weight <= maxweight] for block in block_splitter( light, maxweight, weight=operator.attrgetter('weight')): for src in block: self.infos[sg.id, src.source_id] = source.SourceInfo(src) yield block, self.src_filter, gsims, monitor nlight += 1 heavy = [src for src in sg.sources if src.weight > maxweight] if not heavy: continue with self.monitor('split/filter heavy sources', autoflush=True): for src in heavy: sites = self.src_filter.affected(src) self.infos[sg.id, src.source_id] = source.SourceInfo(src) sources = split_filter_source( src, sites, self.src_filter, self.random_seed) if len(sources) > 1: logging.info( 'Splitting %s "%s" in %d sources', src.__class__.__name__, src.source_id, len(sources)) for block in block_splitter( sources, maxweight, weight=operator.attrgetter('weight')): yield block, self.src_filter, gsims, monitor nheavy += 1 logging.info('Sent %d light and %d heavy tasks', nlight, nheavy)
[docs] def store_source_info(self, infos): # save the calculation times per each source if infos: rows = sorted( infos.values(), key=operator.attrgetter('calc_time'), reverse=True) array = numpy.zeros(len(rows), source.SourceInfo.dt) for i, row in enumerate(rows): for name in array.dtype.names: array[i][name] = getattr(row, name) self.source_info = array infos.clear() self.datastore.flush()
[docs] def post_execute(self, pmap_by_grp_id): """ Collect the hazard curves by realization and export them. :param pmap_by_grp_id: a dictionary grp_id -> hazard curves """ if pmap_by_grp_id.bb_dict: self.datastore['bb_dict'] = pmap_by_grp_id.bb_dict grp_trt = self.csm.info.grp_trt() with self.monitor('saving probability maps', autoflush=True): for grp_id, pmap in pmap_by_grp_id.items(): if pmap: # pmap can be missing if the group is filtered away key = 'poes/%04d' % grp_id self.datastore[key] = pmap self.datastore.set_attrs(key, trt=grp_trt[grp_id]) if 'poes' in self.datastore: self.datastore.set_nbytes('poes')
[docs]def build_hcurves_and_stats(pmap_by_grp, sids, pstats, rlzs_assoc, monitor): """ :param pmap_by_grp: dictionary of probability maps by source group ID :param sids: array of site IDs :param pstats: instance of PmapStats :param rlzs_assoc: instance of RlzsAssoc :param monitor: instance of Monitor :returns: a dictionary kind -> ProbabilityMap The "kind" is a string of the form 'rlz-XXX' or 'mean' of 'quantile-XXX' used to specify the kind of output. """ if sum(len(pmap) for pmap in pmap_by_grp.values()) == 0: # all empty return {} rlzs = rlzs_assoc.realizations with monitor('combine pmaps'): pmap_by_rlz = calc.combine_pmaps(rlzs_assoc, pmap_by_grp) with monitor('compute stats'): pmap_by_kind = dict( pstats.compute(sids, [pmap_by_rlz[rlz] for rlz in rlzs])) if monitor.individual_curves: for rlz in rlzs: pmap_by_kind['rlz-%03d' % rlz.ordinal] = pmap_by_rlz[rlz] return pmap_by_kind
@base.calculators.add('classical')
[docs]class ClassicalCalculator(PSHACalculator): """ Classical PSHA calculator """ pre_calculator = 'psha' core_task = build_hcurves_and_stats
[docs] def execute(self): """ Builds hcurves and stats from the stored PoEs """ if 'poes' not in self.datastore: # for short report return oq = self.oqparam rlzs = self.rlzs_assoc.realizations # initialize datasets N = len(self.sitecol) L = len(oq.imtls.array) attrs = dict( __pyclass__='openquake.hazardlib.probability_map.ProbabilityMap', sids=numpy.arange(N, dtype=numpy.uint32)) nbytes = N * L * 4 # bytes per realization (32 bit floats) totbytes = 0 if oq.individual_curves: for rlz in rlzs: self.datastore.create_dset( 'hcurves/rlz-%03d' % rlz.ordinal, F32, (N, L, 1), attrs=attrs) totbytes += nbytes if oq.mean_hazard_curves and len(rlzs) > 1: self.datastore.create_dset( 'hcurves/mean', F32, (N, L, 1), attrs=attrs) totbytes += nbytes for q in oq.quantile_hazard_curves: self.datastore.create_dset( 'hcurves/quantile-%s' % q, F32, (N, L, 1), attrs=attrs) totbytes += nbytes if 'hcurves' in self.datastore: self.datastore.set_attrs('hcurves', nbytes=totbytes) self.datastore.flush() logging.info('Building hazard curves') with self.monitor('submitting poes', autoflush=True): pmap_by_grp = { int(group_id): self.datastore['poes/' + group_id] for group_id in self.datastore['poes']} res = parallel.Starmap( build_hcurves_and_stats, list(self.gen_args(pmap_by_grp))).submit_all() nbytes = reduce(self.save_hcurves, res, AccumDict()) self.save_data_transfer(res) return nbytes
[docs] def gen_args(self, pmap_by_grp): """ :param pmap_by_grp: dictionary of ProbabilityMaps keyed by src_grp_id :yields: arguments for the function build_hcurves_and_stats """ monitor = self.monitor.new( 'build_hcurves_and_stats', individual_curves=self.oqparam.individual_curves) weights = (None if self.oqparam.number_of_logic_tree_samples else [rlz.weight for rlz in self.rlzs_assoc.realizations]) pstats = PmapStats(self.oqparam.quantile_hazard_curves, weights) num_rlzs = len(self.rlzs_assoc.realizations) for block in self.sitecol.split_in_tiles(num_rlzs): pg = {grp_id: pmap_by_grp[grp_id].filter(block.sids) for grp_id in pmap_by_grp} yield pg, block.sids, pstats, self.rlzs_assoc, monitor
[docs] def save_hcurves(self, acc, pmap_by_kind): """ Works by side effect by saving hcurves and statistics on the datastore; the accumulator stores the number of bytes saved. :param acc: dictionary kind -> nbytes :param pmap_by_kind: a dictionary of ProbabilityMaps """ with self.monitor('saving hcurves and stats', autoflush=True): oq = self.oqparam for kind in pmap_by_kind: if kind == 'mean' and not oq.mean_hazard_curves: continue # do not save the mean curves pmap = pmap_by_kind[kind] if pmap: key = 'hcurves/' + kind dset = self.datastore.getitem(key) for sid in pmap: dset[sid] = pmap[sid].array # in the datastore we save 4 byte floats, thus we # divide the memory consumption by 2: pmap.nbytes / 2 acc += {kind: pmap.nbytes // 2} self.datastore.flush() return acc
[docs] def post_execute(self, acc): """Save the number of bytes per each dataset""" for kind, nbytes in acc.items(): self.datastore.getitem('hcurves/' + kind).attrs['nbytes'] = nbytes