# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2012-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
"""
Module :mod:`~openquake.hazardlib.calc.gmf` exports
:func:`ground_motion_fields`.
"""
import numpy
import pandas
from openquake.baselib.general import AccumDict
from openquake.baselib.performance import Monitor, compile
from openquake.hazardlib.const import StdDev
from openquake.hazardlib.source.rupture import EBRupture, get_eid_rlz
from openquake.hazardlib.cross_correlation import NoCrossCorrelation
from openquake.hazardlib.contexts import ContextMaker, FarAwayRupture
from openquake.hazardlib.imt import from_string
U32 = numpy.uint32
F32 = numpy.float32
[docs]class CorrelationButNoInterIntraStdDevs(Exception):
    def __init__(self, corr, gsim):
        self.corr = corr
        self.gsim = gsim
    def __str__(self):
        return '''\
You cannot use the correlation model %s with the GSIM %s, \
that defines only the total standard deviation. If you want to use a \
correlation model you have to select a GMPE that provides the inter and \
intra event standard deviations.''' % (
            self.corr.__class__.__name__, self.gsim.__class__.__name__) 
[docs]@compile(["float64[:,:](float64[:,:], boolean)",
          "float64[:](float64[:], boolean)",
          "float64(float64, boolean)"])
def exp(vals, notMMI):
    """
    Exponentiate the values unless the IMT is MMI
    """
    if notMMI:
        return numpy.exp(vals)
    return vals 
[docs]@compile("(float32[:,:,:],float64[:,:],float64[:],float64[:],int64)")
def set_max_min(array, mean, max_iml, min_iml, mmi_index):
    N, M, E = array.shape
    # manage max_iml
    for m in range(M):
        iml = max_iml[m]
        for n in range(N):
            maxval = exp(mean[m, n], m!=mmi_index)
            for e in range(E):
                val = array[n, m, e]
                if val > iml:
                    array[n, m, e] = maxval
    # manage min_iml
    for n in range(N):
        for e in range(E):
            # set to zero only if all IMTs are below the thresholds
            if (array[n, :, e] < min_iml).all():
                array[n, :, e] = 0 
[docs]@compile("(uint32[:],uint32[:],uint32[:],uint32[:])")
def build_eid_sid_rlz(allrlzs, sids, eids, rlzs):
    eid_sid_rlz = numpy.zeros((3, len(sids) * len(eids)), U32)
    idx = 0
    for rlz in allrlzs:
        for eid in eids[rlzs == rlz]:
            for sid in sids:
                eid_sid_rlz[0, idx] = eid
                eid_sid_rlz[1, idx] = sid
                eid_sid_rlz[2, idx] = rlz
                idx += 1
    return eid_sid_rlz 
[docs]class GmfComputer(object):
    """
    Given an earthquake rupture, the GmfComputer computes
    ground shaking over a set of sites, by randomly sampling a ground
    shaking intensity model.
    :param rupture:
        EBRupture to calculate ground motion fields radiated from.
    :param :class:`openquake.hazardlib.site.SiteCollection` sitecol:
        a complete SiteCollection
    :param cmaker:
        a :class:`openquake.hazardlib.gsim.base.ContextMaker` instance
    :param correlation_model:
        Instance of a spatial correlation model object. See
        :mod:`openquake.hazardlib.correlation`. Can be ``None``, in which
        case non-correlated ground motion fields are calculated.
        Correlation model is not used if ``truncation_level`` is zero.
    :param cross_correl:
        Instance of a cross correlation model object. See
        :mod:`openquake.hazardlib.cross_correlation`. Can be ``None``, in which
        case non-cross-correlated ground motion fields are calculated.
    :param amplifier:
        None or an instance of Amplifier
    :param sec_perils:
        Tuple of secondary perils. See
        :mod:`openquake.hazardlib.sep`. Can be ``None``, in which
        case no secondary perils need to be evaluated.
    """
    # The GmfComputer is called from the OpenQuake Engine. In that case
    # the rupture is an EBRupture instance containing a
    # :class:`openquake.hazardlib.source.rupture.Rupture` instance as an
    # attribute. Then the `.compute(gsim, num_events, ms)` method is called and
    # a matrix of size (M, N, E) is returned, where M is the number of
    # IMTs, N the number of affected sites and E the number of events. The
    # seed is extracted from the underlying rupture.
    def __init__(self, rupture, sitecol, cmaker, correlation_model=None,
                 cross_correl=None, amplifier=None, sec_perils=()):
        if len(sitecol) == 0:
            raise ValueError('No sites')
        elif len(cmaker.imtls) == 0:
            raise ValueError('No IMTs')
        elif len(cmaker.gsims) == 0:
            raise ValueError('No GSIMs')
        self.cmaker = cmaker
        self.imts = [from_string(imt) for imt in cmaker.imtls]
        self.cmaker = cmaker
        self.gsims = sorted(cmaker.gsims)
        self.correlation_model = correlation_model
        self.amplifier = amplifier
        self.sec_perils = sec_perils
        self.ebrupture = rupture
        self.seed = rupture.seed
        rupture = rupture.rupture  # the underlying rupture
        ctxs = list(cmaker.get_ctx_iter([rupture], sitecol))
        if not ctxs:
            raise FarAwayRupture
        [self.ctx] = ctxs
        self.N = len(self.ctx)
        if correlation_model:  # store the filtered sitecol
            self.sites = sitecol.complete.filtered(self.ctx.sids)
        self.cross_correl = cross_correl or NoCrossCorrelation(
            cmaker.truncation_level)
        self.gmv_fields = [f'gmv_{m}' for m in range(len(cmaker.imts))]
[docs]    def init_eid_rlz_sig_eps(self):
        """
        Initialize the attributes eid, rlz, sig, eps with shapes E, E, EM, EM
        """
        self.rlzs = numpy.concatenate(list(self.cmaker.gsims.values()))
        self.eid, self.rlz = get_eid_rlz(
            vars(self.ebrupture), self.rlzs, self.cmaker.scenario)
        self.E = E = len(self.eid)
        self.M = M = len(self.gmv_fields)
        self.sig = numpy.zeros((E, M), F32)  # same for all events
        self.eps = numpy.zeros((E, M), F32)  # not the same 
[docs]    def build_sig_eps(self, se_dt):
        """
        :returns: a structured array of size E with fields
                  (eid, rlz_id, sig_inter_IMT, eps_inter_IMT)
        """
        sig_eps = numpy.zeros(self.E, se_dt)
        sig_eps['eid'] = self.eid
        sig_eps['rlz_id'] = self.rlz
        for m, imt in enumerate(self.cmaker.imtls):
            sig_eps[f'sig_inter_{imt}'] = self.sig[:, m]
            sig_eps[f'eps_inter_{imt}'] = self.eps[:, m]
        return sig_eps 
[docs]    def update(self, data, array, rlzs, mean_stds, max_iml=None):
        """
        Updates the data dictionary with the values coming from the array
        of GMVs. Also indirectly updates the arrays .sig and .eps.
        """
        min_iml = self.cmaker.min_iml
        mag = self.ebrupture.rupture.mag
        mean = mean_stds[0]
        if len(mean.shape) == 3:  # shape (M, N, 1) for conditioned gmfs
            mean = mean[:, :, 0]
        mmi_index = -1
        for m, imt in enumerate(self.cmaker.imtls):
            if imt == 'MMI':
                mmi_index = m
        if max_iml is None:
            M = len(self.cmaker.imts)
            max_iml = numpy.full(M, numpy.inf, float)
        set_max_min(array, mean, max_iml, min_iml, mmi_index)
        data['gmv'].append(array)
        if self.sec_perils:
            n = 0
            for rlz in rlzs:
                eids = self.eid[self.rlz == rlz]
                E = len(eids)
                for e, eid in enumerate(eids):
                    gmfa = array[:, :, n + e].T  # shape (M, N)
                    for sp in self.sec_perils:
                        o = sp.compute(mag, zip(self.imts, gmfa), self.ctx)
                        for outkey, outarr in zip(sp.outputs, o):
                            data[outkey].append(outarr)
                n += E 
[docs]    def strip_zeros(self, data):
        """
        :returns: a DataFrame with the nonzero GMVs
        """
        # building an array of shape (3, NE)
        eid_sid_rlz = build_eid_sid_rlz(
            self.rlzs, self.ctx.sids, self.eid, self.rlz)
        for key, val in sorted(data.items()):
            data[key] = numpy.concatenate(data[key], axis=-1, dtype=F32)
        gmv = data.pop('gmv')  # shape (N, M, E)
        ok = gmv.sum(axis=1).T.reshape(-1) > 0
        for m, gmv_field in enumerate(self.gmv_fields):
            data[gmv_field] = gmv[:, m].T.reshape(-1)
        # build dataframe
        df = pandas.DataFrame(data)
        df['eid'] = eid_sid_rlz[0]
        df['sid'] = eid_sid_rlz[1]
        df['rlz'] = eid_sid_rlz[2]
        # remove the rows with all zero values
        return df[ok] 
[docs]    def compute_all(self, mean_stds, max_iml=None,
                    cmon=Monitor(), umon=Monitor()):
        """
        :returns: DataFrame with fields eid, rlz, sid, gmv_X, ...
        """
        self.init_eid_rlz_sig_eps()
        rng = numpy.random.default_rng(self.seed)
        data = AccumDict(accum=[])
        for g, (gs, rlzs) in enumerate(self.cmaker.gsims.items()):
            idxs, = numpy.where(numpy.isin(self.rlz, rlzs))
            E = len(idxs)
            if E == 0:  # crucial for performance
                continue
            with cmon:
                array = self.compute(gs, idxs, mean_stds[:, g], rng)  # NME
            with umon:
                self.update(data, array, rlzs, mean_stds[:, g], max_iml)
        with umon:
            return self.strip_zeros(data) 
[docs]    def compute(self, gsim, idxs, mean_stds, rng):
        """
        :param gsim: GSIM used to compute mean_stds
        :param idxs: affected indices
        :param mean_stds: array of shape (4, M, N)
        :param rng: random number generator for the rupture
        :returns: a 32 bit array of shape (N, M, E)
        """
        # sets self.eps
        M = len(self.imts)
        E = len(idxs)
        result = numpy.zeros(
            (len(self.imts), len(self.ctx.sids), E), F32)
        ccdist = self.cross_correl.distribution
        # build arrays of random numbers of shape (M, N, E) and (M, E)
        intra_eps = [ccdist.rvs((self.N, E), rng) for _ in range(M)]
        self.eps[idxs] = self.cross_correl.get_inter_eps(self.imts, E, rng).T
        for m, imt in enumerate(self.imts):
            try:
                result[m] = self._compute(
                    mean_stds[:, m], m, imt, gsim, intra_eps[m], idxs)
            except Exception as exc:
                raise RuntimeError(
                    '(%s, %s, %s): %s' %
                    (gsim, imt, exc.__class__.__name__, exc)
                ).with_traceback(exc.__traceback__)
        if self.amplifier:
            self.amplifier.amplify_gmfs(
                self.ctx.ampcode, result, self.imts, self.seed)
        return result.transpose(1, 0, 2) 
    def _compute(self, mean_stds, m, imt, gsim, intra_eps, idxs):
        # sets self.sig, returns gmf
        im = imt.string
        mean, sig, tau, phi = mean_stds
        if self.cmaker.truncation_level <= 1E-9:
            # for truncation_level = 0 there is only mean, no stds
            if self.correlation_model:
                raise ValueError('truncation_level=0 requires '
                                 'no correlation model')
            gmf = exp(mean, im!='MMI')[:, None].repeat(len(idxs), axis=1)
        elif gsim.DEFINED_FOR_STANDARD_DEVIATION_TYPES == {StdDev.TOTAL}:
            # If the GSIM provides only total standard deviation, we need
            # to compute mean and total standard deviation at the sites
            # of interest.
            # In this case, we also assume no correlation model is used.
            if self.correlation_model:
                raise CorrelationButNoInterIntraStdDevs(
                    self.correlation_model, gsim)
            gmf = exp(mean[:, None] + sig[:, None] * intra_eps, im!='MMI')
            self.sig[idxs, m] = numpy.nan
        else:
            # the [:, None] is used to implement multiplication by row;
            # for instance if  a = [1 2], b = [[1 2] [3 4]] then
            # a[:, None] * b = [[1 2] [6 8]] which is the expected result;
            # otherwise one would get multiplication by column [[1 4] [3 8]]
            intra_res = phi[:, None] * intra_eps  # shape (N, E)
            if self.correlation_model is not None:
                intra_res = self.correlation_model.apply_correlation(
                    self.sites, imt, intra_res, phi)
                if len(intra_res.shape) == 1:  # a vector
                    intra_res = intra_res[:, None]
            inter_res = tau[:, None] * self.eps[idxs, m]
            # shape (N, 1) * E => (N, E)
            gmf = exp(mean[:, None] + intra_res + inter_res, im!='MMI')
            self.sig[idxs, m] = tau.max()  # from shape (N, 1) => scalar
        return gmf  # shapes (N, E) 
# this is not used in the engine; it is still useful for usage in IPython
# when demonstrating hazardlib capabilities
[docs]def ground_motion_fields(rupture, sites, imts, gsim, truncation_level,
                         realizations, correlation_model=None, seed=0):
    """
    Given an earthquake rupture, the ground motion field calculator computes
    ground shaking over a set of sites, by randomly sampling a ground shaking
    intensity model. A ground motion field represents a possible 'realization'
    of the ground shaking due to an earthquake rupture.
    .. note::
     This calculator is using random numbers. In order to reproduce the
     same results numpy random numbers generator needs to be seeded, see
     http://docs.scipy.org/doc/numpy/reference/generated/numpy.random.seed.html
    :param openquake.hazardlib.source.rupture.Rupture rupture:
        Rupture to calculate ground motion fields radiated from.
    :param openquake.hazardlib.site.SiteCollection sites:
        Sites of interest to calculate GMFs.
    :param imts:
        List of intensity measure type objects (see
        :mod:`openquake.hazardlib.imt`).
    :param gsim:
        Ground-shaking intensity model, instance of subclass of either
        :class:`~openquake.hazardlib.gsim.base.GMPE` or
        :class:`~openquake.hazardlib.gsim.base.IPE`.
    :param truncation_level:
        Float, number of standard deviations for truncation of the intensity
        distribution
    :param realizations:
        Integer number of GMF simulations to compute.
    :param correlation_model:
        Instance of correlation model object. See
        :mod:`openquake.hazardlib.correlation`. Can be ``None``, in which case
        non-correlated ground motion fields are calculated. Correlation model
        is not used if ``truncation_level`` is zero.
    :param int seed:
        The seed used in the numpy random number generator
    :returns:
        Dictionary mapping intensity measure type objects (same
        as in parameter ``imts``) to 2d numpy arrays of floats,
        representing different simulations of ground shaking intensity
        for all sites in the collection. First dimension represents
        sites and second one is for simulations.
    """
    cmaker = ContextMaker(rupture.tectonic_region_type, {gsim: U32([0])},
                          dict(truncation_level=truncation_level,
                               imtls={str(imt): numpy.array([0.])
                                      for imt in imts}))
    cmaker.scenario = True
    ebr = EBRupture(
        rupture, source_id=0, trt_smr=0, n_occ=realizations, id=0, e0=0)
    ebr.seed = seed
    gc = GmfComputer(ebr, sites, cmaker, correlation_model)
    mean_stds = cmaker.get_mean_stds([gc.ctx])[:, 0]  # shape (4, M, N)
    gc.init_eid_rlz_sig_eps()
    res = gc.compute(gsim, U32([0]), mean_stds,
                     numpy.random.default_rng(seed))
    return {imt: res[:, m] for m, imt in enumerate(gc.imts)}