Source code for openquake.hazardlib.calc.hazard_curve

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2012-2017 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

""":mod:`openquake.hazardlib.calc.hazard_curve` implements
:func:`calc_hazard_curves`. Here is an example of a classical PSHA
parallel calculator computing the hazard curves per each realization in less
than 20 lines of code:

.. code-block:: python

   import sys
   import logging
   from openquake.baselib import parallel
   from openquake.hazardlib.calc.filters import SourceFilter
   from openquake.hazardlib.calc.hazard_curve import calc_hazard_curves
   from openquake.commonlib import readinput

   def main(job_ini):
       logging.basicConfig(level=logging.INFO)
       oq = readinput.get_oqparam(job_ini)
       sitecol = readinput.get_site_collection(oq)
       src_filter = SourceFilter(sitecol, oq.maximum_distance)
       csm = readinput.get_composite_source_model(oq).filter(src_filter)
       rlzs_assoc = csm.info.get_rlzs_assoc()
       for i, sm in enumerate(csm.source_models):
           for rlz in rlzs_assoc.rlzs_by_smodel[i]:
               gsim_by_trt = rlzs_assoc.gsim_by_trt[rlz.ordinal]
               hcurves = calc_hazard_curves(
                   sm.src_groups, src_filter, oq.imtls,
                   gsim_by_trt, oq.truncation_level,
                   parallel.Starmap.apply)
           print('rlz=%s, hcurves=%s' % (rlz, hcurves))

   if __name__ == '__main__':
       main(sys.argv[1])  # path to a job.ini file

NB: the implementation in the engine is smarter and more
efficient. Here we start a parallel computation per each realization,
the engine manages all the realizations at once.
"""
from __future__ import division
import time
import operator
import numpy
from openquake.baselib.python3compat import zip
from openquake.baselib.performance import Monitor
from openquake.baselib.general import DictArray, groupby, AccumDict
from openquake.baselib.parallel import Sequential
from openquake.hazardlib.source import split_source
from openquake.hazardlib.probability_map import ProbabilityMap
from openquake.hazardlib.gsim.base import ContextMaker
from openquake.hazardlib.gsim.base import GroundShakingIntensityModel
from openquake.hazardlib.calc.filters import SourceFilter
from openquake.hazardlib.sourceconverter import SourceGroup


def classical(group, src_filter, gsims, param, monitor=Monitor()):
    """
    Compute the hazard curves for a set of sources belonging to the same
    tectonic region type for all the GSIMs associated to that TRT.
    The arguments are the same as in :func:`calc_hazard_curves`, except
    for ``gsims``, which is a list of GSIM instances.

    :returns:
        a dictionary {grp_id: pmap} with attributes .grp_ids, .calc_times,
        .eff_ruptures
    """
    if getattr(group, 'src_interdep', None) == 'mutex':
        mutex_weight = {src.source_id: weight for src, weight in
                        zip(group.sources, group.srcs_weights)}
        srcs = group.sources
    else:
        mutex_weight = None
        srcs = sum([split_source(src) for src in group], [])
    grp_ids = set()
    for src in group:
        grp_ids.update(src.src_group_ids)
    maxdist = src_filter.integration_distance
    with GroundShakingIntensityModel.forbid_instantiation():
        imtls = param['imtls']
        trunclevel = param.get('truncation_level')
        cmaker = ContextMaker(gsims, maxdist)
        ctx_mon = monitor('make_contexts', measuremem=False)
        poe_mon = monitor('get_poes', measuremem=False)
        pmap = AccumDict({grp_id: ProbabilityMap(len(imtls.array), len(gsims))
                          for grp_id in grp_ids})
        # AccumDict of arrays with 4 elements weight, nsites, calc_time, split
        pmap.calc_times = AccumDict(accum=numpy.zeros(4))
        pmap.eff_ruptures = AccumDict()  # grp_id -> num_ruptures
        for src, s_sites in src_filter(srcs):  # filter now
            t0 = time.time()
            indep = group.rup_interdep == 'indep' if mutex_weight else True
            poemap = cmaker.poe_map(
                src, s_sites, imtls, trunclevel, ctx_mon, poe_mon, indep)
            if mutex_weight:  # mutex sources
                weight = mutex_weight[src.source_id]
                for sid in poemap:
                    pcurve = pmap[group.id].setdefault(sid, 0)
                    pcurve += poemap[sid] * weight
            elif poemap:
                for grp_id in src.src_group_ids:
                    pmap[grp_id] |= poemap
            src_id = src.source_id.split(':', 1)[0]
            pmap.calc_times[src_id] += numpy.array(
                [src.weight, len(s_sites), time.time() - t0, 1])
            # storing the number of contributing ruptures too
            pmap.eff_ruptures += {grp_id: getattr(poemap, 'eff_ruptures', 0)
                                  for grp_id in src.src_group_ids}
        if mutex_weight and group.grp_probability is not None:
            pmap[group.id] *= group.grp_probability
        return pmap


def calc_hazard_curves(
        groups, ss_filter, imtls, gsim_by_trt, truncation_level=None,
        apply=Sequential.apply):
    """
    Compute hazard curves on a list of sites, given a set of seismic source
    groups and a dictionary of ground shaking intensity models (one per
    tectonic region type).

    Probability of ground motion exceedance is computed in different ways
    depending if the sources are independent or mutually exclusive.

    :param groups:
        A sequence of groups of seismic sources objects (instances of
        of :class:`~openquake.hazardlib.source.base.BaseSeismicSource`).
    :param ss_filter:
        A source filter over the site collection or the site collection itself
    :param imtls:
        Dictionary mapping intensity measure type strings
        to lists of intensity measure levels.
    :param gsim_by_trt:
        Dictionary mapping tectonic region types (members
        of :class:`openquake.hazardlib.const.TRT`) to
        :class:`~openquake.hazardlib.gsim.base.GMPE` or
        :class:`~openquake.hazardlib.gsim.base.IPE` objects.
    :param truncation_level:
        Float, number of standard deviations for truncation of the intensity
        distribution.
    :param maximum_distance:
        The integration distance, if any
    :returns:
        An array of size N, where N is the number of sites, which elements
        are records with fields given by the intensity measure types; the
        size of each field is given by the number of levels in ``imtls``.
    """
    # This is ensuring backward compatibility i.e. processing a list of
    # sources
    if not isinstance(groups[0], SourceGroup):  # sent a list of sources
        odic = groupby(groups, operator.attrgetter('tectonic_region_type'))
        groups = [SourceGroup(trt, odic[trt], 'src_group', 'indep', 'indep')
                  for trt in odic]
    for i, grp in enumerate(groups):
        for src in grp:
            if src.src_group_id is None:
                src.src_group_id = i
    if hasattr(ss_filter, 'sitecol'):  # a filter, as it should be
        sitecol = ss_filter.sitecol
    else:  # backward compatibility, a site collection was passed
        sitecol = ss_filter
        ss_filter = SourceFilter(sitecol, {})

    imtls = DictArray(imtls)
    param = dict(imtls=imtls, truncation_level=truncation_level)
    pmap = ProbabilityMap(len(imtls.array), 1)
    # Processing groups with homogeneous tectonic region
    gsim = gsim_by_trt[groups[0][0].tectonic_region_type]
    for group in groups:
        if group.src_interdep == 'mutex':  # do not split the group
            it = [classical(group, ss_filter, [gsim], param)]
        else:  # split the group and apply `classical` in parallel
            it = apply(
                classical, (group, ss_filter, [gsim], param),
                weight=operator.attrgetter('weight'))
        for res in it:
            for grp_id in res:
                pmap |= res[grp_id]
    return pmap.convert(imtls, len(sitecol.complete))