Source code for openquake.calculators.disaggregation

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

"""
Disaggregation calculator core functionality
"""
from __future__ import division
import sys
import math
import logging
from collections import namedtuple
import numpy

from openquake.baselib.general import split_in_blocks
from openquake.hazardlib.calc import disagg
from openquake.hazardlib.imt import from_string
from openquake.hazardlib.site import SiteCollection
from openquake.hazardlib.gsim.base import ContextMaker
from openquake.commonlib import parallel
from openquake.commonlib.calc import gen_ruptures_for_site
from openquake.calculators import base, classical

DISAGG_RES_FMT = 'disagg/poe-%(poe)s-rlz-%(rlz)s-%(imt)s-%(lon)s-%(lat)s'


# a 6-uple containing float 4 arrays mags, dists, lons, lats,
# 1 int array trts and a list of dictionaries pnes
BinData = namedtuple('BinData', 'mags, dists, lons, lats, trts, pnes')


def _collect_bins_data(trt_num, source_ruptures, site, curves, trt_model_id,
                       rlzs_assoc, gsims, imtls, poes, truncation_level,
                       n_epsilons, mon):
    # returns a BinData instance
    sitecol = SiteCollection([site])
    mags = []
    dists = []
    lons = []
    lats = []
    trts = []
    pnes = []
    sitemesh = sitecol.mesh
    make_ctxt = mon('making contexts', measuremem=False)
    disagg_poe = mon('disaggregate_poe', measuremem=False)
    cmaker = ContextMaker(gsims)
    for source, ruptures in source_ruptures:
        try:
            tect_reg = trt_num[source.tectonic_region_type]
            for rupture in ruptures:
                with make_ctxt:
                    sctx, rctx, dctx = cmaker.make_contexts(sitecol, rupture)
                # extract rupture parameters of interest
                mags.append(rupture.mag)
                dists.append(dctx.rjb[0])  # single site => single distance
                [closest_point] = rupture.surface.get_closest_points(sitemesh)
                lons.append(closest_point.longitude)
                lats.append(closest_point.latitude)
                trts.append(tect_reg)

                pne_dict = {}
                # a dictionary rlz.id, poe, imt_str -> prob_no_exceed
                for gsim in gsims:
                    gs = str(gsim)
                    for imt_str, imls in imtls.iteritems():
                        imt = from_string(imt_str)
                        imls = numpy.array(imls[::-1])
                        for rlz in rlzs_assoc[trt_model_id, gs]:
                            rlzi = rlz.ordinal
                            curve_poes = curves[rlzi, imt_str][::-1]
                            for poe in poes:
                                iml = numpy.interp(poe, curve_poes, imls)
                                # compute probability of exceeding iml given
                                # the current rupture and epsilon_bin, that is
                                # ``P(IMT >= iml | rup, epsilon_bin)``
                                # for each of the epsilon bins
                                with disagg_poe:
                                    [poes_given_rup_eps] = \
                                        gsim.disaggregate_poe(
                                            sctx, rctx, dctx, imt, iml,
                                            truncation_level, n_epsilons)
                                pne = rupture.get_probability_no_exceedance(
                                    poes_given_rup_eps)
                                pne_dict[rlzi, poe, imt_str] = (iml, pne)

                pnes.append(pne_dict)
        except Exception as err:
            etype, err, tb = sys.exc_info()
            msg = 'An error occurred with source id=%s. Error: %s'
            msg %= (source.source_id, err)
            raise etype, msg, tb

    return BinData(numpy.array(mags, float),
                   numpy.array(dists, float),
                   numpy.array(lons, float),
                   numpy.array(lats, float),
                   numpy.array(trts, int),
                   pnes)


@parallel.litetask
[docs]def compute_disagg(sitecol, sources, trt_model_id, rlzs_assoc, trt_names, curves_dict, bin_edges, oqparam, monitor): # see https://bugs.launchpad.net/oq-engine/+bug/1279247 for an explanation # of the algorithm used """ :param sitecol: a :class:`openquake.hazardlib.site.SiteCollection` instance :param sources: list of hazardlib source objects :param trt_model_id: numeric ID of a TrtModel instance :param rlzs_assoc: a :class:`openquake.commonlib.source.RlzsAssoc` instance :param dict trt_names: a tuple of names for the given tectonic region type :param curves_dict: a dictionary with the hazard curves for sites, realizations and IMTs :param bin_egdes: a dictionary site_id -> edges :param oqparam: the parameters in the job.ini file :param monitor: monitor of the currently running job :returns: a dictionary of probability arrays, with composite key (sid, rlz.id, poe, imt, iml, trt_names). """ trt = sources[0].tectonic_region_type try: max_dist = oqparam.maximum_distance[trt] except KeyError: max_dist = oqparam.maximum_distance['default'] trt_num = dict((trt, i) for i, trt in enumerate(trt_names)) gsims = rlzs_assoc.gsims_by_trt_id[trt_model_id] result = {} # sid, rlz.id, poe, imt, iml, trt_names -> array collecting_mon = monitor('collecting bins') arranging_mon = monitor('arranging bins') for site, sid in zip(sitecol, sitecol.sids): # edges as wanted by disagg._arrange_data_in_bins try: edges = bin_edges[sid] except KeyError: # bin_edges for a given site are missing if the site is far away continue # generate source, rupture, sites once per site source_ruptures = list( gen_ruptures_for_site(site, sources, max_dist, monitor)) if not source_ruptures: continue with collecting_mon: bdata = _collect_bins_data( trt_num, source_ruptures, site, curves_dict[sid], trt_model_id, rlzs_assoc, gsims, oqparam.imtls, oqparam.poes_disagg, oqparam.truncation_level, oqparam.num_epsilon_bins, monitor) if not bdata.pnes: # no contributions for this site continue for poe in oqparam.poes_disagg: for imt in oqparam.imtls: for gsim in gsims: for rlz in rlzs_assoc[trt_model_id, gsim]: rlzi = rlz.ordinal # extract the probabilities of non-exceedance for the # given realization, disaggregation PoE, and IMT iml_pne_pairs = [pne[rlzi, poe, imt] for pne in bdata.pnes] iml = iml_pne_pairs[0][0] probs = numpy.array( [p for (i, p) in iml_pne_pairs], float) # bins in a format handy for hazardlib bins = [bdata.mags, bdata.dists, bdata.lons, bdata.lats, bdata.trts, None, probs] # call disagg._arrange_data_in_bins with arranging_mon: key = (sid, rlzi, poe, imt, iml, trt_names) matrix = disagg._arrange_data_in_bins( bins, edges + (trt_names,)) result[key] = numpy.array( [fn(matrix) for fn in disagg.pmf_map.values()]) return result
@base.calculators.add('disaggregation')
[docs]class DisaggregationCalculator(classical.ClassicalCalculator): """ Classical PSHA disaggregation calculator """
[docs] def post_execute(self, result=None): super(DisaggregationCalculator, self).post_execute(result) self.full_disaggregation(result)
[docs] def agg_result(self, acc, result): """ Collect the results coming from compute_disagg into self.results, a dictionary with key (sid, rlz.id, poe, imt, iml, trt_names) and values which are probability arrays. :param acc: dictionary accumulating the results :param result: dictionary with the result coming from a task """ for key, val in result.iteritems(): acc[key] = 1. - (1. - acc.get(key, 0)) * (1. - val) return acc
[docs] def get_curves(self, sid): """ Get all the relevant hazard curves for the given site ordinal. Returns a dictionary {(rlz_id, imt) -> curve}. """ dic = {} for rlz in self.rlzs_assoc.realizations: poes = self.datastore['hcurves/rlz-%03d' % rlz.ordinal][sid] for imt_str in self.oqparam.imtls: if all(x == 0.0 for x in poes[imt_str]): logging.info( 'hazard curve contains all zero probabilities; ' 'skipping site %d, rlz=%d, IMT=%s', sid, rlz.ordinal, imt_str) continue dic[rlz.ordinal, imt_str] = poes[imt_str] return dic
[docs] def full_disaggregation(self, curves_by_trt_gsim): """ Run the disaggregation phase after hazard curve finalization. """ oq = self.oqparam tl = self.oqparam.truncation_level sitecol = self.sitecol mag_bin_width = self.oqparam.mag_bin_width eps_edges = numpy.linspace(-tl, tl, self.oqparam.num_epsilon_bins + 1) logging.info('%d epsilon bins from %s to %s', len(eps_edges) - 1, min(eps_edges), max(eps_edges)) self.bin_edges = {} curves_dict = {sid: self.get_curves(sid) for sid in sitecol.sids} all_args = [] num_trts = sum(len(sm.trt_models) for sm in self.csm.source_models) nblocks = math.ceil(oq.concurrent_tasks / num_trts) for smodel in self.csm.source_models: sm_id = smodel.ordinal trt_names = tuple(mod.trt for mod in smodel.trt_models) max_mag = max(mod.max_mag for mod in smodel.trt_models) min_mag = min(mod.min_mag for mod in smodel.trt_models) mag_edges = mag_bin_width * numpy.arange( int(numpy.floor(min_mag / mag_bin_width)), int(numpy.ceil(max_mag / mag_bin_width) + 1)) logging.info('%d mag bins from %s to %s', len(mag_edges) - 1, min_mag, max_mag) for trt_model in smodel.trt_models: for sid, site in zip(sitecol.sids, sitecol): curves = curves_dict[sid] if not curves: continue # skip zero-valued hazard curves bb = curves_by_trt_gsim.bb_dict[sm_id, sid] if not bb: logging.info( 'location %s was too far, skipping disaggregation', site.location) continue dist_edges, lon_edges, lat_edges = bb.bins_edges( oq.distance_bin_width, oq.coordinate_bin_width) logging.info( '%d dist bins from %s to %s', len(dist_edges) - 1, min(dist_edges), max(dist_edges)) logging.info( '%d lon bins from %s to %s', len(lon_edges) - 1, bb.west, bb.east) logging.info( '%d lat bins from %s to %s', len(lon_edges) - 1, bb.south, bb.north) self.bin_edges[sm_id, sid] = ( mag_edges, dist_edges, lon_edges, lat_edges, eps_edges) bin_edges = {} for sid, site in zip(sitecol.sids, sitecol): if (sm_id, sid) in self.bin_edges: bin_edges[sid] = self.bin_edges[sm_id, sid] for srcs in split_in_blocks(trt_model, nblocks): all_args.append( (sitecol, srcs, trt_model.id, self.rlzs_assoc, trt_names, curves_dict, bin_edges, oq, self.monitor)) results = parallel.starmap(compute_disagg, all_args).reduce( self.agg_result) self.save_disagg_results(results)
[docs] def save_disagg_results(self, results): """ Save all the results of the disaggregation. NB: the number of results to save is #sites * #rlzs * #disagg_poes * #IMTs. :param results: a dictionary of probability arrays """ # build a dictionary rlz.ordinal -> source_model.ordinal sm_id = {} for i, rlzs in enumerate(self.rlzs_assoc.rlzs_by_smodel): for rlz in rlzs: sm_id[rlz.ordinal] = i # since an extremely small subset of the full disaggregation matrix # is saved this method can be run sequentially on the controller node for key, probs in sorted(results.iteritems()): sid, rlz_id, poe, imt, iml, trt_names = key edges = self.bin_edges[sm_id[rlz_id], sid] self.save_disagg_result( sid, edges, trt_names, probs, rlz_id, self.oqparam.investigation_time, imt, iml, poe)
[docs] def save_disagg_result(self, site_id, bin_edges, trt_names, matrix, rlz_id, investigation_time, imt_str, iml, poe): """ Save a computed disaggregation matrix to `hzrdr.disagg_result` (see :class:`~openquake.engine.db.models.DisaggResult`). :param site_id: id of the current site :param bin_edges: The 5-uple mag, dist, lon, lat, eps :param trt_names: The list of Tectonic Region Types :param matrix: A probability array :param rlz_id: ordinal of the realization to which the results belong. :param float investigation_time: Investigation time (years) for the calculation. :param imt_str: Intensity measure type string (PGA, SA, etc.) :param float iml: Intensity measure level interpolated (using `poe`) from the hazard curve at the `site`. :param float poe: Disaggregation probability of exceedance value for this result. """ lon = self.sitecol.lons[site_id] lat = self.sitecol.lats[site_id] mag, dist, lons, lats, eps = bin_edges disp_name = DISAGG_RES_FMT % dict( poe=poe, rlz=rlz_id, imt=imt_str, lon=lon, lat=lat) self.datastore[disp_name] = matrix attrs = self.datastore.hdf5[disp_name].attrs attrs['rlzi'] = rlz_id attrs['imt'] = imt_str attrs['iml'] = iml attrs['poe'] = poe attrs['trts'] = trt_names attrs['mag_bin_edges'] = mag attrs['dist_bin_edges'] = dist attrs['lon_bin_edges'] = lons attrs['lat_bin_edges'] = lats attrs['eps_bin_edges'] = eps attrs['location'] = (lon, lat)