Source code for openquake.hazardlib.calc.gmf

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2012-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

"""
Module :mod:`~openquake.hazardlib.calc.gmf` exports
:func:`ground_motion_fields`.
"""
import numpy
import pandas

from openquake.baselib.general import AccumDict
from openquake.baselib.performance import Monitor, compile
from openquake.hazardlib.const import StdDev
from openquake.hazardlib.source.rupture import EBRupture, get_eid_rlz
from openquake.hazardlib.cross_correlation import NoCrossCorrelation
from openquake.hazardlib.contexts import ContextMaker, FarAwayRupture
from openquake.hazardlib.imt import from_string

U32 = numpy.uint32
F32 = numpy.float32

[docs]class CorrelationButNoInterIntraStdDevs(Exception): def __init__(self, corr, gsim): self.corr = corr self.gsim = gsim def __str__(self): return '''\ You cannot use the correlation model %s with the GSIM %s, \ that defines only the total standard deviation. If you want to use a \ correlation model you have to select a GMPE that provides the inter and \ intra event standard deviations.''' % ( self.corr.__class__.__name__, self.gsim.__class__.__name__)
[docs]@compile(["float64[:,:](float64[:,:], boolean)", "float64[:](float64[:], boolean)", "float64(float64, boolean)"]) def exp(vals, notMMI): """ Exponentiate the values unless the IMT is MMI """ if notMMI: return numpy.exp(vals) return vals
[docs]@compile("(float32[:,:,:],float64[:,:],float64[:],float64[:],int64)") def set_max_min(array, mean, max_iml, min_iml, mmi_index): N, M, E = array.shape # manage max_iml for m in range(M): iml = max_iml[m] for n in range(N): # capping the gmv at the median value if val > max_iml[m] maxval = exp(mean[m, n], m!=mmi_index) for e in range(E): val = array[n, m, e] if val > iml: array[n, m, e] = maxval # manage min_iml for n in range(N): for e in range(E): # set to zero only if all IMTs are below the thresholds if (array[n, :, e] < min_iml).all(): array[n, :, e] = 0
[docs]@compile("(uint32[:],uint32[:],uint32[:],uint32[:])") def build_eid_sid_rlz(allrlzs, sids, eids, rlzs): eid_sid_rlz = numpy.zeros((3, len(sids) * len(eids)), U32) idx = 0 for rlz in allrlzs: for eid in eids[rlzs == rlz]: for sid in sids: eid_sid_rlz[0, idx] = eid eid_sid_rlz[1, idx] = sid eid_sid_rlz[2, idx] = rlz idx += 1 return eid_sid_rlz
[docs]def calc_gmf_simplified(ebrupture, sitecol, cmaker): """ A simplified version of the GmfComputer for event based calculations. Used only for pedagogical purposes. Here is an example of usage: from unittest.mock import Mock import numpy from openquake.hazardlib import valid, contexts, site, geo from openquake.hazardlib.source.rupture import EBRupture, build_planar from openquake.hazardlib.calc.gmf import calc_gmf_simplified, GmfComputer imts = ['PGA'] rlzs = numpy.arange(3, dtype=numpy.uint32) rlzs_by_gsim = {valid.gsim('BooreAtkinson2008'): rlzs} lons = [0., 0.] lats = [0., 1.] siteparams = Mock(reference_vs30_value=760.) sitecol = site.SiteCollection.from_points(lons, lats, sitemodel=siteparams) hypo = geo.point.Point(0, .5, 20) rup = build_planar(hypo, mag=7., rake=0.) cmaker = contexts.simple_cmaker(rlzs_by_gsim, imts, truncation_level=3.) ebr = EBRupture(rup, 0, 0, n_occ=2, id=1) ebr.seed = 42 print(cmaker) print(sitecol.array) print(ebr) gmfa = calc_gmf_simplified(ebr, sitecol, cmaker) print(gmfa) # numbers considering the full site collection sites = site.SiteCollection.from_points([0], [1], sitemodel=siteparams) gmfa = calc_gmf_simplified(ebr, sites, cmaker) print(gmfa) # different numbers considering half of the site collection """ N = len(sitecol) M = len(cmaker.imtls) [ctx] = cmaker.get_ctx_iter([ebrupture.rupture], sitecol) mean, sig, tau, phi = cmaker.get_mean_stds([ctx]) # shapes (G, M, N) rlzs = numpy.concatenate(list(cmaker.gsims.values())) eid, rlz = get_eid_rlz(vars(ebrupture), rlzs, False) rng = numpy.random.default_rng(ebrupture.seed) cross_correl = NoCrossCorrelation(cmaker.truncation_level) ccdist = cross_correl.distribution gmfs = [] for g, (gs, rlzs) in enumerate(cmaker.gsims.items()): idxs, = numpy.where(numpy.isin(rlz, rlzs)) E = len(idxs) # build arrays of random numbers of shape (M, N, E) and (M, E) intra_eps = [ccdist.rvs((N, E), rng) for _ in range(M)] eps = numpy.zeros((E, M), F32) eps[idxs] = cross_correl.get_inter_eps(cmaker.imtls, E, rng).T gmf = numpy.zeros((M, N, E)) for m, imt in enumerate(cmaker.imtls): intra_res = phi[g, m, :, None] * intra_eps # shape (N, E) inter_res = tau[g, m, :, None] * eps[idxs, m] # shape (N, E) gmf[m] = numpy.exp(mean[g, m, :, None] + intra_res + inter_res) gmfs.append(gmf) return numpy.concatenate(gmfs) # shape (M, N, E)
[docs]class GmfComputer(object): """ Given an earthquake rupture, the GmfComputer computes ground shaking over a set of sites, by randomly sampling a ground shaking intensity model. :param rupture: EBRupture to calculate ground motion fields radiated from. :param :class:`openquake.hazardlib.site.SiteCollection` sitecol: a complete SiteCollection :param cmaker: a :class:`openquake.hazardlib.gsim.base.ContextMaker` instance :param correlation_model: Instance of a spatial correlation model object. See :mod:`openquake.hazardlib.correlation`. Can be ``None``, in which case non-correlated ground motion fields are calculated. Correlation model is not used if ``truncation_level`` is zero. :param cross_correl: Instance of a cross correlation model object. See :mod:`openquake.hazardlib.cross_correlation`. Can be ``None``, in which case non-cross-correlated ground motion fields are calculated. :param amplifier: None or an instance of Amplifier :param sec_perils: Tuple of secondary perils. See :mod:`openquake.hazardlib.sep`. Can be ``None``, in which case no secondary perils need to be evaluated. """ # The GmfComputer is called from the OpenQuake Engine. In that case # the rupture is an EBRupture instance containing a # :class:`openquake.hazardlib.source.rupture.Rupture` instance as an # attribute. Then the `.compute(gsim, num_events, ms)` method is called and # a matrix of size (M, N, E) is returned, where M is the number of # IMTs, N the number of affected sites and E the number of events. The # seed is extracted from the underlying rupture. def __init__(self, rupture, sitecol, cmaker, correlation_model=None, cross_correl=None, amplifier=None, sec_perils=()): if len(sitecol) == 0: raise ValueError('No sites') elif len(cmaker.imtls) == 0: raise ValueError('No IMTs') elif len(cmaker.gsims) == 0: raise ValueError('No GSIMs') self.cmaker = cmaker self.imts = [from_string(imt) for imt in cmaker.imtls] self.cmaker = cmaker self.gsims = sorted(cmaker.gsims) self.correlation_model = correlation_model self.amplifier = amplifier self.sec_perils = sec_perils self.ebrupture = rupture self.seed = rupture.seed rupture = rupture.rupture # the underlying rupture ctxs = list(cmaker.get_ctx_iter([rupture], sitecol)) if not ctxs: raise FarAwayRupture [self.ctx] = ctxs self.N = len(self.ctx) if correlation_model: # store the filtered sitecol self.sites = sitecol.complete.filtered(self.ctx.sids) self.cross_correl = cross_correl or NoCrossCorrelation( cmaker.truncation_level) self.gmv_fields = [f'gmv_{m}' for m in range(len(cmaker.imts))]
[docs] def init_eid_rlz_sig_eps(self): """ Initialize the attributes eid, rlz, sig, eps with shapes E, E, EM, EM """ self.rlzs = numpy.concatenate(list(self.cmaker.gsims.values())) self.eid, self.rlz = get_eid_rlz( vars(self.ebrupture), self.rlzs, self.cmaker.scenario) self.E = E = len(self.eid) self.M = M = len(self.gmv_fields) self.sig = numpy.zeros((E, M), F32) # same for all events self.eps = numpy.zeros((E, M), F32) # not the same
[docs] def build_sig_eps(self, se_dt): """ :returns: a structured array of size E with fields (eid, rlz_id, sig_inter_IMT, eps_inter_IMT) """ sig_eps = numpy.zeros(self.E, se_dt) sig_eps['eid'] = self.eid sig_eps['rlz_id'] = self.rlz for m, imt in enumerate(self.cmaker.imtls): sig_eps[f'sig_inter_{imt}'] = self.sig[:, m] sig_eps[f'eps_inter_{imt}'] = self.eps[:, m] return sig_eps
[docs] def update(self, data, array, rlzs, mean_stds, max_iml=None): """ Updates the data dictionary with the values coming from the array of GMVs. Also indirectly updates the arrays .sig and .eps. """ min_iml = self.cmaker.min_iml mag = self.ebrupture.rupture.mag mean = mean_stds[0] if len(mean.shape) == 3: # shape (M, N, 1) for conditioned gmfs mean = mean[:, :, 0] mmi_index = -1 for m, imt in enumerate(self.cmaker.imtls): if imt == 'MMI': mmi_index = m if max_iml is None: M = len(self.cmaker.imts) max_iml = numpy.full(M, numpy.inf, float) set_max_min(array, mean, max_iml, min_iml, mmi_index) data['gmv'].append(array) if self.sec_perils: n = 0 for rlz in rlzs: eids = self.eid[self.rlz == rlz] E = len(eids) for e, eid in enumerate(eids): gmfa = array[:, :, n + e].T # shape (M, N) for sp in self.sec_perils: o = sp.compute(mag, zip(self.imts, gmfa), self.ctx) for outkey, outarr in zip(sp.outputs, o): data[outkey].append(outarr) n += E
[docs] def strip_zeros(self, data): """ :returns: a DataFrame with the nonzero GMVs """ # building an array of shape (3, NE) eid_sid_rlz = build_eid_sid_rlz( self.rlzs, self.ctx.sids, self.eid, self.rlz) for key, val in sorted(data.items()): data[key] = numpy.concatenate(data[key], axis=-1, dtype=F32) gmv = data.pop('gmv') # shape (N, M, E) ok = gmv.sum(axis=1).T.reshape(-1) > 0 for m, gmv_field in enumerate(self.gmv_fields): data[gmv_field] = gmv[:, m].T.reshape(-1) # build dataframe df = pandas.DataFrame(data) df['eid'] = eid_sid_rlz[0] df['sid'] = eid_sid_rlz[1] df['rlz'] = eid_sid_rlz[2] # remove the rows with all zero values return df[ok]
[docs] def compute_all(self, mean_stds, max_iml=None, cmon=Monitor(), umon=Monitor()): """ :returns: DataFrame with fields eid, rlz, sid, gmv_X, ... """ self.init_eid_rlz_sig_eps() rng = numpy.random.default_rng(self.seed) data = AccumDict(accum=[]) for g, (gs, rlzs) in enumerate(self.cmaker.gsims.items()): idxs, = numpy.where(numpy.isin(self.rlz, rlzs)) E = len(idxs) if E == 0: # crucial for performance continue with cmon: array = self.compute(gs, idxs, mean_stds[:, g], rng) # NME with umon: self.update(data, array, rlzs, mean_stds[:, g], max_iml) with umon: return self.strip_zeros(data)
[docs] def compute(self, gsim, idxs, mean_stds, rng): """ :param gsim: GSIM used to compute mean_stds :param idxs: affected indices :param mean_stds: array of shape (4, M, N) :param rng: random number generator for the rupture :returns: a 32 bit array of shape (N, M, E) """ # sets self.eps M = len(self.imts) E = len(idxs) result = numpy.zeros( (len(self.imts), len(self.ctx.sids), E), F32) ccdist = self.cross_correl.distribution # build arrays of random numbers of shape (M, N, E) and (M, E) intra_eps = [ccdist.rvs((self.N, E), rng) for _ in range(M)] self.eps[idxs] = self.cross_correl.get_inter_eps(self.imts, E, rng).T for m, imt in enumerate(self.imts): try: result[m] = self._compute( mean_stds[:, m], m, imt, gsim, intra_eps[m], idxs) except Exception as exc: raise RuntimeError( '(%s, %s, %s): %s' % (gsim, imt, exc.__class__.__name__, exc) ).with_traceback(exc.__traceback__) if self.amplifier: self.amplifier.amplify_gmfs( self.ctx.ampcode, result, self.imts, self.seed) return result.transpose(1, 0, 2)
def _compute(self, mean_stds, m, imt, gsim, intra_eps, idxs): # sets self.sig, returns gmf im = imt.string mean, sig, tau, phi = mean_stds if self.cmaker.truncation_level <= 1E-9: # for truncation_level = 0 there is only mean, no stds if self.correlation_model: raise ValueError('truncation_level=0 requires ' 'no correlation model') gmf = exp(mean, im!='MMI')[:, None].repeat(len(idxs), axis=1) elif gsim.DEFINED_FOR_STANDARD_DEVIATION_TYPES == {StdDev.TOTAL}: # If the GSIM provides only total standard deviation, we need # to compute mean and total standard deviation at the sites # of interest. # In this case, we also assume no correlation model is used. if self.correlation_model: raise CorrelationButNoInterIntraStdDevs( self.correlation_model, gsim) gmf = exp(mean[:, None] + sig[:, None] * intra_eps, im!='MMI') self.sig[idxs, m] = numpy.nan else: # the [:, None] is used to implement multiplication by row; # for instance if a = [1 2], b = [[1 2] [3 4]] then # a[:, None] * b = [[1 2] [6 8]] which is the expected result; # otherwise one would get multiplication by column [[1 4] [3 8]] intra_res = phi[:, None] * intra_eps # shape (N, E) if self.correlation_model is not None: intra_res = self.correlation_model.apply_correlation( self.sites, imt, intra_res, phi) if len(intra_res.shape) == 1: # a vector intra_res = intra_res[:, None] inter_res = tau[:, None] * self.eps[idxs, m] # shape (N, 1) * E => (N, E) gmf = exp(mean[:, None] + intra_res + inter_res, im!='MMI') self.sig[idxs, m] = tau.max() # from shape (N, 1) => scalar return gmf # shapes (N, E)
# this is not used in the engine; it is still useful for usage in IPython # when demonstrating hazardlib capabilities
[docs]def ground_motion_fields(rupture, sites, imts, gsim, truncation_level, realizations, correlation_model=None, seed=0): """ Given an earthquake rupture, the ground motion field calculator computes ground shaking over a set of sites, by randomly sampling a ground shaking intensity model. A ground motion field represents a possible 'realization' of the ground shaking due to an earthquake rupture. .. note:: This calculator is using random numbers. In order to reproduce the same results numpy random numbers generator needs to be seeded, see http://docs.scipy.org/doc/numpy/reference/generated/numpy.random.seed.html :param openquake.hazardlib.source.rupture.Rupture rupture: Rupture to calculate ground motion fields radiated from. :param openquake.hazardlib.site.SiteCollection sites: Sites of interest to calculate GMFs. :param imts: List of intensity measure type objects (see :mod:`openquake.hazardlib.imt`). :param gsim: Ground-shaking intensity model, instance of subclass of either :class:`~openquake.hazardlib.gsim.base.GMPE` or :class:`~openquake.hazardlib.gsim.base.IPE`. :param truncation_level: Float, number of standard deviations for truncation of the intensity distribution :param realizations: Integer number of GMF simulations to compute. :param correlation_model: Instance of correlation model object. See :mod:`openquake.hazardlib.correlation`. Can be ``None``, in which case non-correlated ground motion fields are calculated. Correlation model is not used if ``truncation_level`` is zero. :param int seed: The seed used in the numpy random number generator :returns: Dictionary mapping intensity measure type objects (same as in parameter ``imts``) to 2d numpy arrays of floats, representing different simulations of ground shaking intensity for all sites in the collection. First dimension represents sites and second one is for simulations. """ cmaker = ContextMaker(rupture.tectonic_region_type, {gsim: U32([0])}, dict(truncation_level=truncation_level, imtls={str(imt): numpy.array([0.]) for imt in imts})) cmaker.scenario = True ebr = EBRupture( rupture, source_id=0, trt_smr=0, n_occ=realizations, id=0, e0=0) ebr.seed = seed gc = GmfComputer(ebr, sites, cmaker, correlation_model) mean_stds = cmaker.get_mean_stds([gc.ctx])[:, 0] # shape (4, M, N) gc.init_eid_rlz_sig_eps() res = gc.compute(gsim, U32([0]), mean_stds, numpy.random.default_rng(seed)) return {imt: res[:, m] for m, imt in enumerate(gc.imts)}