Source code for openquake.calculators.disaggregation

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2017 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

"""
Disaggregation calculator core functionality
"""
from __future__ import division
import math
import logging
import numpy

from openquake.baselib import hdf5
from openquake.baselib.general import split_in_blocks
from openquake.hazardlib.calc import disagg
from openquake.hazardlib.calc.filters import SourceFilter
from openquake.baselib import parallel
from openquake.hazardlib import sourceconverter
from openquake.commonlib import calc
from openquake.calculators import base, classical

DISAGG_RES_FMT = 'disagg/poe-%(poe)s-rlz-%(rlz)s-%(imt)s-%(lon)s-%(lat)s'


[docs]def compute_disagg(src_filter, sources, src_group_id, rlzs_assoc, trt_names, curves_dict, bin_edges, oqparam, monitor): # see https://bugs.launchpad.net/oq-engine/+bug/1279247 for an explanation # of the algorithm used """ :param src_filter: a :class:`openquake.hazardlib.calc.filter.SourceFilter` instance :param sources: list of hazardlib source objects :param src_group_id: numeric ID of a SourceGroup instance :param rlzs_assoc: a :class:`openquake.commonlib.source.RlzsAssoc` instance :param dict trt_names: a tuple of names for the given tectonic region type :param curves_dict: a dictionary with the hazard curves for sites, realizations and IMTs :param bin_egdes: a dictionary site_id -> edges :param oqparam: the parameters in the job.ini file :param monitor: monitor of the currently running job :returns: a dictionary of probability arrays, with composite key (sid, rlz.id, poe, imt, iml, trt_names). """ sitecol = src_filter.sitecol trt_num = dict((trt, i) for i, trt in enumerate(trt_names)) gsims = rlzs_assoc.gsims_by_grp_id[src_group_id] result = {} # sid, rlz.id, poe, imt, iml, trt_names -> array collecting_mon = monitor('collecting bins') arranging_mon = monitor('arranging bins') for site, sid in zip(sitecol, sitecol.sids): # edges as wanted by disagg._arrange_data_in_bins try: edges = bin_edges[sid] except KeyError: # bin_edges for a given site are missing if the site is far away continue # generate source, rupture, sites once per site with collecting_mon: bdata = disagg._collect_bins_data( trt_num, sources, site, curves_dict[sid], src_group_id, rlzs_assoc, gsims, oqparam.imtls, oqparam.poes_disagg, oqparam.truncation_level, oqparam.num_epsilon_bins, oqparam.iml_disagg, monitor) for (rlzi, poe, imt), iml_pne_pairs in bdata.pnes.items(): # extract the probabilities of non-exceedance for the # given realization, disaggregation PoE, and IMT iml = iml_pne_pairs[0][0] probs = numpy.array([p for (i, p) in iml_pne_pairs], float) # bins in a format handy for hazardlib bins = [bdata.mags, bdata.dists, bdata.lons, bdata.lats, bdata.trts, None, probs] # call disagg._arrange_data_in_bins with arranging_mon: key = (sid, rlzi, poe, imt, iml, trt_names) matrix = disagg._arrange_data_in_bins( bins, edges + (trt_names,)) result[key] = numpy.array( [fn(matrix) for fn in disagg.pmf_map.values()]) return result
@base.calculators.add('disaggregation')
[docs]class DisaggregationCalculator(classical.ClassicalCalculator): """ Classical PSHA disaggregation calculator """
[docs] def post_execute(self, nbytes_by_kind): """Performs the disaggregation""" self.full_disaggregation()
[docs] def agg_result(self, acc, result): """ Collect the results coming from compute_disagg into self.results, a dictionary with key (sid, rlz.id, poe, imt, iml, trt_names) and values which are probability arrays. :param acc: dictionary accumulating the results :param result: dictionary with the result coming from a task """ for key, val in result.items(): acc[key] = 1. - (1. - acc.get(key, 0)) * (1. - val) return acc
[docs] def get_curves(self, sid): """ Get all the relevant hazard curves for the given site ordinal. Returns a dictionary {(rlz_id, imt) -> curve}. """ dic = {} imtls = self.oqparam.imtls pgetter = calc.PmapGetter(self.datastore) for rlz in self.rlzs_assoc.realizations: try: pmap = pgetter.get(numpy.array([sid]), rlz.ordinal) except ValueError: # empty pmaps logging.info( 'hazard curve contains all zero probabilities; ' 'skipping site %d, rlz=%d', sid, rlz.ordinal) continue if sid not in pmap: continue poes = pmap[sid].convert(imtls) for imt_str in imtls: if all(x == 0.0 for x in poes[imt_str]): logging.info( 'hazard curve contains all zero probabilities; ' 'skipping site %d, rlz=%d, IMT=%s', sid, rlz.ordinal, imt_str) continue dic[rlz.ordinal, imt_str] = poes[imt_str] return dic
[docs] def full_disaggregation(self): """ Run the disaggregation phase after hazard curve finalization. """ oq = self.oqparam tl = self.oqparam.truncation_level bb_dict = self.datastore['bb_dict'] sitecol = self.sitecol mag_bin_width = self.oqparam.mag_bin_width eps_edges = numpy.linspace(-tl, tl, self.oqparam.num_epsilon_bins + 1) logging.info('%d epsilon bins from %s to %s', len(eps_edges) - 1, min(eps_edges), max(eps_edges)) self.bin_edges = {} curves_dict = {sid: self.get_curves(sid) for sid in sitecol.sids} all_args = [] num_trts = sum(len(sm.src_groups) for sm in self.csm.source_models) nblocks = math.ceil(oq.concurrent_tasks / num_trts) for smodel in self.csm.source_models: sm_id = smodel.ordinal trt_names = tuple(mod.trt for mod in smodel.src_groups) max_mag = max(mod.max_mag for mod in smodel.src_groups) min_mag = min(mod.min_mag for mod in smodel.src_groups) mag_edges = mag_bin_width * numpy.arange( int(numpy.floor(min_mag / mag_bin_width)), int(numpy.ceil(max_mag / mag_bin_width) + 1)) logging.info('%d mag bins from %s to %s', len(mag_edges) - 1, min_mag, max_mag) for src_group in smodel.src_groups: if src_group.id not in self.rlzs_assoc.gsims_by_grp_id: continue # the group has been filtered away for sid, site in zip(sitecol.sids, sitecol): curves = curves_dict[sid] if not curves: continue # skip zero-valued hazard curves bb = bb_dict[sm_id, sid] if not bb: logging.info( 'location %s was too far, skipping disaggregation', site.location) continue dist_edges, lon_edges, lat_edges = bb.bins_edges( oq.distance_bin_width, oq.coordinate_bin_width) logging.info( '%d dist bins from %s to %s', len(dist_edges) - 1, min(dist_edges), max(dist_edges)) logging.info( '%d lon bins from %s to %s', len(lon_edges) - 1, bb.west, bb.east) logging.info( '%d lat bins from %s to %s', len(lon_edges) - 1, bb.south, bb.north) self.bin_edges[sm_id, sid] = ( mag_edges, dist_edges, lon_edges, lat_edges, eps_edges) bin_edges = {} for sid, site in zip(sitecol.sids, sitecol): if (sm_id, sid) in self.bin_edges: bin_edges[sid] = self.bin_edges[sm_id, sid] src_filter = SourceFilter(sitecol, oq.maximum_distance) split_sources = [] for src in src_group: for split, _sites in src_filter( sourceconverter.split_source(src), sitecol): split_sources.append(split) mon = self.monitor('disaggregation') for srcs in split_in_blocks(split_sources, nblocks): all_args.append( (src_filter, srcs, src_group.id, self.rlzs_assoc, trt_names, curves_dict, bin_edges, oq, mon)) results = parallel.Starmap(compute_disagg, all_args).reduce( self.agg_result) self.save_disagg_results(results)
[docs] def save_disagg_results(self, results): """ Save all the results of the disaggregation. NB: the number of results to save is #sites * #rlzs * #disagg_poes * #IMTs. :param results: a dictionary of probability arrays """ # build a dictionary rlz.ordinal -> source_model.ordinal sm_id = {} for i, rlzs in self.rlzs_assoc.rlzs_by_smodel.items(): for rlz in rlzs: sm_id[rlz.ordinal] = i # since an extremely small subset of the full disaggregation matrix # is saved this method can be run sequentially on the controller node for key, probs in sorted(results.items()): sid, rlz_id, poe, imt, iml, trt_names = key edges = self.bin_edges[sm_id[rlz_id], sid] self.save_disagg_result( sid, edges, trt_names, probs, rlz_id, self.oqparam.investigation_time, imt, iml, poe)
[docs] def save_disagg_result(self, site_id, bin_edges, trt_names, matrix, rlz_id, investigation_time, imt_str, iml, poe): """ Save a computed disaggregation matrix to `hzrdr.disagg_result` (see :class:`~openquake.engine.db.models.DisaggResult`). :param site_id: id of the current site :param bin_edges: The 5-uple mag, dist, lon, lat, eps :param trt_names: The list of Tectonic Region Types :param matrix: A probability array :param rlz_id: ordinal of the realization to which the results belong. :param float investigation_time: Investigation time (years) for the calculation. :param imt_str: Intensity measure type string (PGA, SA, etc.) :param float iml: Intensity measure level interpolated (using `poe`) from the hazard curve at the `site`. :param float poe: Disaggregation probability of exceedance value for this result. """ lon = self.sitecol.lons[site_id] lat = self.sitecol.lats[site_id] mag, dist, lons, lats, eps = bin_edges disp_name = DISAGG_RES_FMT % dict( poe=poe, rlz=rlz_id, imt=imt_str, lon=lon, lat=lat) self.datastore[disp_name] = { '_'.join(key): mat for key, mat in zip(disagg.pmf_map, matrix)} attrs = self.datastore.hdf5[disp_name].attrs attrs['rlzi'] = rlz_id attrs['imt'] = imt_str attrs['iml'] = iml attrs['poe'] = poe attrs['trts'] = hdf5.array_of_vstr(trt_names) attrs['mag_bin_edges'] = mag attrs['dist_bin_edges'] = dist attrs['lon_bin_edges'] = lons attrs['lat_bin_edges'] = lats attrs['eps_bin_edges'] = eps attrs['location'] = (lon, lat)