Source code for openquake.calculators.disaggregation

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2017 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

"""
Disaggregation calculator core functionality
"""
from __future__ import division
import math
import logging
import numpy

from openquake.baselib import hdf5
from openquake.baselib.general import split_in_blocks
from openquake.hazardlib.calc import disagg
from openquake.hazardlib.calc.filters import SourceFilter
from openquake.hazardlib.gsim.base import ContextMaker
from openquake.baselib import parallel
from openquake.hazardlib import sourceconverter
from openquake.commonlib import calc
from openquake.calculators import base, classical

DISAGG_RES_FMT = 'disagg/poe-%(poe)s-rlz-%(rlz)s-%(imt)s-%(lon)s-%(lat)s'


[docs]def compute_disagg(src_filter, sources, src_group_id, rlzs_assoc, trt_names, curves_dict, bin_edges, oqparam, monitor): # see https://bugs.launchpad.net/oq-engine/+bug/1279247 for an explanation # of the algorithm used """ :param src_filter: a :class:`openquake.hazardlib.calc.filter.SourceFilter` instance :param sources: list of hazardlib source objects :param src_group_id: numeric ID of a SourceGroup instance :param rlzs_assoc: a :class:`openquake.commonlib.source.RlzsAssoc` instance :param dict trt_names: a tuple of names for the given tectonic region type :param curves_dict: a dictionary with the hazard curves for sites, realizations and IMTs :param bin_egdes: a dictionary site_id -> edges :param oqparam: the parameters in the job.ini file :param monitor: monitor of the currently running job :returns: a dictionary of probability arrays, with composite key (sid, rlz.id, poe, imt, iml, trt_names). """ sitecol = src_filter.sitecol trt_num = dict((trt, i) for i, trt in enumerate(trt_names)) gsims = rlzs_assoc.gsims_by_grp_id[src_group_id] rlzs_by_gsim = rlzs_assoc.rlzs_by_gsim[src_group_id] result = {} # sid, rlz.id, poe, imt, iml, trt_names -> array collecting_mon = monitor('collecting bins') arranging_mon = monitor('arranging bins') for site, sid in zip(sitecol, sitecol.sids): # edges as wanted by disagg._arrange_data_in_bins try: edges = bin_edges[sid] except KeyError: # bin_edges for a given site are missing if the site is far away continue # generate source, rupture, sites once per site with collecting_mon: cmaker = ContextMaker(gsims, src_filter.integration_distance) bdata = disagg._collect_bins_data( trt_num, sources, site, curves_dict[sid], src_group_id, rlzs_by_gsim, cmaker, oqparam.imtls, oqparam.poes_disagg, oqparam.truncation_level, oqparam.num_epsilon_bins, oqparam.iml_disagg, monitor) for (rlzi, poe, imt), iml_pne_pairs in bdata.pnes.items(): # extract the probabilities of non-exceedance for the # given realization, disaggregation PoE, and IMT iml = iml_pne_pairs[0][0] probs = numpy.array([p for (i, p) in iml_pne_pairs], float) # bins in a format handy for hazardlib bins = [bdata.mags, bdata.dists, bdata.lons, bdata.lats, bdata.trts, None, probs] # call disagg._arrange_data_in_bins with arranging_mon: key = (sid, rlzi, poe, imt, iml, trt_names) matrix = disagg._arrange_data_in_bins( bins, edges + (trt_names,)) result[key] = numpy.array( [fn(matrix) for fn in disagg.pmf_map.values()]) return result
@base.calculators.add('disaggregation')
[docs]class DisaggregationCalculator(classical.ClassicalCalculator): """ Classical PSHA disaggregation calculator """
[docs] def post_execute(self, nbytes_by_kind): """Performs the disaggregation""" self.full_disaggregation()
[docs] def agg_result(self, acc, result): """ Collect the results coming from compute_disagg into self.results, a dictionary with key (sid, rlz.id, poe, imt, iml, trt_names) and values which are probability arrays. :param acc: dictionary accumulating the results :param result: dictionary with the result coming from a task """ for key, val in result.items(): acc[key] = 1. - (1. - acc.get(key, 0)) * (1. - val) return acc
[docs] def get_curves(self, sid): """ Get all the relevant hazard curves for the given site ordinal. Returns a dictionary {(rlz_id, imt) -> curve}. """ dic = {} imtls = self.oqparam.imtls pgetter = calc.PmapGetter(self.datastore) for rlz in self.rlzs_assoc.realizations: try: pmap = pgetter.get(numpy.array([sid]), rlz.ordinal) except ValueError: # empty pmaps logging.info( 'hazard curve contains all zero probabilities; ' 'skipping site %d, rlz=%d', sid, rlz.ordinal) continue if sid not in pmap: continue poes = pmap[sid].convert(imtls) for imt_str in imtls: if all(x == 0.0 for x in poes[imt_str]): logging.info( 'hazard curve contains all zero probabilities; ' 'skipping site %d, rlz=%d, IMT=%s', sid, rlz.ordinal, imt_str) continue dic[rlz.ordinal, imt_str] = poes[imt_str] return dic
[docs] def full_disaggregation(self): """ Run the disaggregation phase after hazard curve finalization. """ oq = self.oqparam tl = self.oqparam.truncation_level bb_dict = self.datastore['bb_dict'] sitecol = self.sitecol mag_bin_width = self.oqparam.mag_bin_width eps_edges = numpy.linspace(-tl, tl, self.oqparam.num_epsilon_bins + 1) logging.info('%d epsilon bins from %s to %s', len(eps_edges) - 1, min(eps_edges), max(eps_edges)) self.bin_edges = {} curves_dict = {sid: self.get_curves(sid) for sid in sitecol.sids} # determine the number of effective source groups sg_data = self.datastore['csm_info/sg_data'] num_grps = sum(1 for effrup in sg_data['effrup'] if effrup > 0) nblocks = math.ceil(oq.concurrent_tasks / num_grps) all_args = [] for smodel in self.csm.source_models: sm_id = smodel.ordinal trt_names = tuple(mod.trt for mod in smodel.src_groups) max_mag = max(mod.max_mag for mod in smodel.src_groups) min_mag = min(mod.min_mag for mod in smodel.src_groups) mag_edges = mag_bin_width * numpy.arange( int(numpy.floor(min_mag / mag_bin_width)), int(numpy.ceil(max_mag / mag_bin_width) + 1)) logging.info('%d mag bins from %s to %s', len(mag_edges) - 1, min_mag, max_mag) for src_group in smodel.src_groups: if src_group.id not in self.rlzs_assoc.rlzs_by_gsim: continue # the group has been filtered away for sid, site in zip(sitecol.sids, sitecol): curves = curves_dict[sid] if not curves: continue # skip zero-valued hazard curves bb = bb_dict[sm_id, sid] if not bb: logging.info( 'location %s was too far, skipping disaggregation', site.location) continue dist_edges, lon_edges, lat_edges = bb.bins_edges( oq.distance_bin_width, oq.coordinate_bin_width) logging.info( '[sid=%d] %d dist bins from %s to %s', sid, len(dist_edges) - 1, min(dist_edges), max(dist_edges)) logging.info( '[sid=%d] %d lon bins from %s to %s', sid, len(lon_edges) - 1, bb.west, bb.east) logging.info( '[sid=%d] %d lat bins from %s to %s', sid, len(lon_edges) - 1, bb.south, bb.north) self.bin_edges[sm_id, sid] = ( mag_edges, dist_edges, lon_edges, lat_edges, eps_edges) bin_edges = {} for sid, site in zip(sitecol.sids, sitecol): if (sm_id, sid) in self.bin_edges: bin_edges[sid] = self.bin_edges[sm_id, sid] src_filter = SourceFilter(sitecol, oq.maximum_distance) split_sources = [] for src in src_group: for split, _sites in src_filter( sourceconverter.split_source(src), sitecol): split_sources.append(split) mon = self.monitor('disaggregation') for srcs in split_in_blocks(split_sources, nblocks): all_args.append( (src_filter, srcs, src_group.id, self.rlzs_assoc, trt_names, curves_dict, bin_edges, oq, mon)) results = parallel.Starmap(compute_disagg, all_args).reduce( self.agg_result) self.save_disagg_results(results)
[docs] def save_disagg_results(self, results): """ Save all the results of the disaggregation. NB: the number of results to save is #sites * #rlzs * #disagg_poes * #IMTs. :param results: a dictionary of probability arrays """ # build a dictionary rlz.ordinal -> source_model.ordinal sm_id = {} for i, rlzs in self.rlzs_assoc.rlzs_by_smodel.items(): for rlz in rlzs: sm_id[rlz.ordinal] = i # since an extremely small subset of the full disaggregation matrix # is saved this method can be run sequentially on the controller node for key, probs in sorted(results.items()): sid, rlz_id, poe, imt, iml, trt_names = key edges = self.bin_edges[sm_id[rlz_id], sid] self.save_disagg_result( sid, edges, trt_names, probs, rlz_id, self.oqparam.investigation_time, imt, iml, poe)
[docs] def save_disagg_result(self, site_id, bin_edges, trt_names, matrix, rlz_id, investigation_time, imt_str, iml, poe): """ Save a computed disaggregation matrix to `hzrdr.disagg_result` (see :class:`~openquake.engine.db.models.DisaggResult`). :param site_id: id of the current site :param bin_edges: The 5-uple mag, dist, lon, lat, eps :param trt_names: The list of Tectonic Region Types :param matrix: A probability array :param rlz_id: ordinal of the realization to which the results belong. :param float investigation_time: Investigation time (years) for the calculation. :param imt_str: Intensity measure type string (PGA, SA, etc.) :param float iml: Intensity measure level interpolated (using `poe`) from the hazard curve at the `site`. :param float poe: Disaggregation probability of exceedance value for this result. """ lon = self.sitecol.lons[site_id] lat = self.sitecol.lats[site_id] mag, dist, lons, lats, eps = bin_edges disp_name = DISAGG_RES_FMT % dict( poe=poe, rlz=rlz_id, imt=imt_str, lon=lon, lat=lat) self.datastore[disp_name] = { '_'.join(key): mat for key, mat in zip(disagg.pmf_map, matrix)} attrs = self.datastore.hdf5[disp_name].attrs attrs['rlzi'] = rlz_id attrs['imt'] = imt_str attrs['iml'] = iml attrs['poe'] = poe attrs['trts'] = hdf5.array_of_vstr(trt_names) attrs['mag_bin_edges'] = mag attrs['dist_bin_edges'] = dist attrs['lon_bin_edges'] = lons attrs['lat_bin_edges'] = lats attrs['eps_bin_edges'] = eps attrs['location'] = (lon, lat)