Source code for openquake.hazardlib.source.non_parametric

# The Hazard Library
# Copyright (C) 2013-2022 GEM Foundation
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
Module :mod:`openquake.hazardlib.source.non_parametric` defines
:class:`NonParametricSeismicSource`
"""
import numpy
from openquake.baselib.general import block_splitter
from openquake.hazardlib.source.base import BaseSeismicSource
from openquake.hazardlib.geo.surface.gridded import GriddedSurface
from openquake.hazardlib.geo.surface.multi import MultiSurface
from openquake.hazardlib.source.rupture import \
    NonParametricProbabilisticRupture
from openquake.hazardlib.geo.utils import angular_distance, KM_TO_DEGREES
from openquake.hazardlib.geo.mesh import Mesh
from openquake.hazardlib.geo.point import Point
from openquake.hazardlib.pmf import PMF

F32 = numpy.float32
U32 = numpy.uint32


[docs]class NonParametricSeismicSource(BaseSeismicSource): """ Non Parametric Seismic Source explicitly defines earthquake ruptures in the constructor. That is earthquake ruptures are not generated algorithmically from a set of source parameters. Ruptures' rectonic region types are overwritten by source tectonic region type. :param data: List of tuples. Each tuple must contain two items. The first item must be an instance of :class:`openquake.hazardlib.source.rupture.Rupture`. The second item must be an instance of :class:`openquake.hazardlib.pmf.PMF` describing the probability of the rupture to occur N times (the PMF must be defined from a minimum number of occurrences equal to 0) """ code = b'N' MODIFICATIONS = set() def __init__(self, source_id, name, tectonic_region_type, data, weights=None): super().__init__(source_id, name, tectonic_region_type) self.data = data if weights is not None: assert len(weights) == len(data) for (rup, pmf), weight in zip(data, weights): rup.weight = weight
[docs] def iter_ruptures(self, **kwargs): """ Get a generator object that yields probabilistic ruptures the source consists of. :returns: Generator of instances of :class:`openquake.hazardlib.source. rupture.NonParametricProbabilisticRupture`. """ for rup, pmf in self.data: if rup.mag >= self.min_mag: yield NonParametricProbabilisticRupture( rup.mag, rup.rake, self.tectonic_region_type, rup.hypocenter, rup.surface, pmf, weight=rup.weight)
[docs] def few_ruptures(self): """ Fast version of iter_ruptures used in estimate_weight """ for i, (rup, pmf) in enumerate(self.data): if i % 50 == 0 and rup.mag >= self.min_mag: yield NonParametricProbabilisticRupture( rup.mag, rup.rake, self.tectonic_region_type, rup.hypocenter, rup.surface, pmf, weight=rup.weight)
def __iter__(self): if len(self.data) == 1: # there is nothing to split yield self return for i, block in enumerate(block_splitter(self.data, 100)): source_id = '%s:%d' % (self.source_id, i) src = self.__class__(source_id, self.name, self.tectonic_region_type, block) src.num_ruptures = len(block) src.trt_smr = self.trt_smr yield src
[docs] def count_ruptures(self): """ See :meth: `openquake.hazardlib.source.base.BaseSeismicSource.count_ruptures`. """ return len(self.data)
[docs] def get_min_max_mag(self): """ Return the minimum and maximum magnitudes of the ruptures generated by the source """ min_mag = min(rup.mag for rup, pmf in self.data) max_mag = max(rup.mag for rup, pmf in self.data) return min_mag, max_mag
[docs] def get_bounding_box(self, maxdist): """ Bounding box containing the surfaces, enlarged by the maximum distance """ surfaces = [] for rup, _ in self.data: if isinstance(rup.surface, MultiSurface): surfaces.extend(rup.surface.surfaces) else: surfaces.append(rup.surface) multi_surf = MultiSurface(surfaces) west, east, north, south = multi_surf.get_bounding_box() a1 = maxdist * KM_TO_DEGREES a2 = angular_distance(maxdist, north, south) return west - a2, south - a1, east + a2, north + a1
[docs] def is_gridded(self): """ :returns: True if containing only GriddedRuptures, False otherwise """ for rup, _ in self.data: if not isinstance(rup.surface, GriddedSurface): return False return True
[docs] def todict(self): """ Convert a GriddedSource into a dictionary of arrays """ assert self.is_gridded(), '%s is not gridded' % self n = len(self.data) m = sum(len(rup.surface.mesh) for rup, pmf in self.data) p = len(self.data[0][1].data) dic = {'probs_occur': numpy.zeros((n, p)), 'magnitude': numpy.zeros(n), 'rake': numpy.zeros(n), 'hypocenter': numpy.zeros((n, 3), F32), 'mesh3d': numpy.zeros((m, 3), F32), 'slice': numpy.zeros((n, 2), U32)} start = 0 for i, (rup, pmf) in enumerate(self.data): dic['probs_occur'][i] = [prob for (prob, _) in pmf.data] dic['magnitude'][i] = rup.mag dic['rake'][i] = rup.rake dic['hypocenter'][i] = (rup.hypocenter.x, rup.hypocenter.y, rup.hypocenter.z) mesh = rup.surface.mesh.array.T # shape (npoints, 3) dic['mesh3d'][start: start + len(mesh)] = mesh dic['slice'][i] = start, start + len(mesh) start += len(mesh) return dic
[docs] def fromdict(self, dic, weights=None): """ Populate a GriddedSource with ruptures """ assert not self.data, '%s is not empty' % self i = 0 for mag, rake, hp, probs, (start, stop) in zip( dic['magnitude'], dic['rake'], dic['hypocenter'], dic['probs_occur'], dic['slice']): mesh = Mesh(dic['mesh3d'][start:stop, 0], dic['mesh3d'][start:stop, 1], dic['mesh3d'][start:stop, 2]) surface = GriddedSurface(mesh) pmf = PMF([(prob, i) for i, prob in enumerate(probs)]) hypocenter = Point(hp[0], hp[1], hp[2]) rup = NonParametricProbabilisticRupture( mag, rake, self.tectonic_region_type, hypocenter, surface, pmf, weight=None if weights is None else weights[i]) self.data.append((rup, pmf)) i += 1
def __repr__(self): return '<%s %s gridded=%s>' % ( self.__class__.__name__, self.source_id, self.is_gridded()) @property def mesh_size(self): """ :returns: the number of points in the underlying meshes (reduced) """ n = 0 for rup in self.few_ruptures(): if isinstance(rup.surface, MultiSurface): for sfc in rup.surface.surfaces: n += len(sfc.mesh) else: n += len(rup.surface.mesh) return n @property def polygon(self): """ The convex hull of a few subsurfaces """ lons, lats = [], [] for rup in self.few_ruptures(): if isinstance(rup.surface, MultiSurface): for sfc in rup.surface.surfaces: lons.extend(sfc.mesh.lons.flat) lats.extend(sfc.mesh.lats.flat) else: lons.extend(rup.surface.mesh.lons.flat) lats.extend(rup.surface.mesh.lats.flat) condition = numpy.isfinite(lons).astype(int) lons = numpy.extract(condition, lons) lats = numpy.extract(condition, lats) points = numpy.zeros(len(lons), [('lon', F32), ('lat', F32)]) points['lon'] = numpy.round(lons, 5) points['lat'] = numpy.round(lats, 5) points = numpy.unique(points) mesh = Mesh(points['lon'], points['lat']) return mesh.get_convex_hull()
[docs] def wkt(self): """ :returns: the geometry as a WKT string """ return self.polygon.wkt
[docs] def get_one_rupture(self, ses_seed, rupture_mutex=False): """ Yields one random rupture from a source """ num_ruptures = self.count_ruptures() if rupture_mutex: weights = numpy.array([rup.weight for rup in self.iter_ruptures()]) else: weights = numpy.ones((num_ruptures)) / num_ruptures idx = numpy.random.choice(range(num_ruptures), p=weights) serial = self.serial(ses_seed) for i, rup in enumerate(self.iter_ruptures()): if i == idx: rup.rup_id = serial + i rup.idx = idx return rup