# The Hazard Library
# Copyright (C) 2013-2020 GEM Foundation
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Module :mod:`openquake.hazardlib.source.non_parametric` defines
:class:`NonParametricSeismicSource`
"""
import numpy
from openquake.hazardlib.source.base import BaseSeismicSource
from openquake.hazardlib.geo.surface.gridded import GriddedSurface
from openquake.hazardlib.geo.surface.multi import MultiSurface
from openquake.hazardlib.source.rupture import \
NonParametricProbabilisticRupture
from openquake.hazardlib.geo.utils import angular_distance, KM_TO_DEGREES
from openquake.hazardlib.geo.mesh import Mesh, point3d
from openquake.hazardlib.geo.point import Point
from openquake.hazardlib.pmf import PMF
F32 = numpy.float32
[docs]class NonParametricSeismicSource(BaseSeismicSource):
"""
Non Parametric Seismic Source explicitly defines earthquake ruptures in the
constructor. That is earthquake ruptures are not generated algorithmically
from a set of source parameters.
Ruptures' rectonic region types are overwritten by source tectonic region
type.
:param data:
List of tuples. Each tuple must contain two items. The first item must
be an instance of :class:`openquake.hazardlib.source.rupture.Rupture`.
The second item must be an instance of
:class:`openquake.hazardlib.pmf.PMF` describing the probability of the
rupture to occur N times (the PMF must be defined from a minimum number
of occurrences equal to 0)
"""
code = b'N'
MODIFICATIONS = set()
def __init__(self, source_id, name, tectonic_region_type, data):
super().__init__(source_id, name, tectonic_region_type)
self.data = data
[docs] def iter_ruptures(self, **kwargs):
"""
Get a generator object that yields probabilistic ruptures the source
consists of.
:returns:
Generator of instances of :class:`openquake.hazardlib.source.
rupture.NonParametricProbabilisticRupture`.
"""
for rup, pmf in self.data:
if rup.mag >= self.min_mag:
yield NonParametricProbabilisticRupture(
rup.mag, rup.rake, self.tectonic_region_type,
rup.hypocenter, rup.surface, pmf, weight=rup.weight)
def __iter__(self):
if len(self.data) == 1: # there is nothing to split
yield self
return
for i, rup_pmf in enumerate(self.data):
source_id = '%s:%d' % (self.source_id, i)
src = self.__class__(source_id, self.name,
self.tectonic_region_type, [rup_pmf])
src.num_ruptures = 1
src.grp_id = self.grp_id
yield src
[docs] def count_ruptures(self):
"""
See :meth:
`openquake.hazardlib.source.base.BaseSeismicSource.count_ruptures`.
"""
return len(self.data)
[docs] def get_min_max_mag(self):
"""
Return the minimum and maximum magnitudes of the ruptures generated
by the source
"""
min_mag = min(rup.mag for rup, pmf in self.data)
max_mag = max(rup.mag for rup, pmf in self.data)
return min_mag, max_mag
[docs] def get_bounding_box(self, maxdist):
"""
Bounding box containing all surfaces, enlarged by the maximum distance
"""
surfaces = []
for rup, _ in self.data:
if isinstance(rup.surface, MultiSurface):
for s in rup.surface.surfaces:
surfaces.append(s)
else:
surfaces.append(rup.surface)
multi_surf = MultiSurface(surfaces)
west, east, north, south = multi_surf.get_bounding_box()
a1 = maxdist * KM_TO_DEGREES
a2 = angular_distance(maxdist, north, south)
return west - a2, south - a1, east + a2, north + a1
[docs] def is_gridded(self):
"""
:returns: True if containing only GriddedRuptures, False otherwise
"""
for rup, _ in self.data:
if not isinstance(rup.surface, GriddedSurface):
return False
return True
def __toh5__(self):
assert self.is_gridded(), '%s is not gridded' % self
attrs = {'source_id': self.source_id, 'name': self.name,
'tectonic_region_type': self.tectonic_region_type}
dic = {'probs_occur': [], 'magnitude': [], 'rake': [],
'hypocenter': [], 'points': []}
for rup, pmf in self.data:
dic['probs_occur'].append([prob for (prob, _) in pmf.data])
dic['magnitude'].append(rup.mag)
dic['rake'].append(rup.rake)
dic['hypocenter'].append((rup.hypocenter.x, rup.hypocenter.y,
rup.hypocenter.z))
dic['points'].append(rup.surface.mesh.array)
dic['hypocenter'] = numpy.array(dic['hypocenter'], point3d)
return dic, attrs
def __fromh5__(self, dic, attrs):
vars(self).update(attrs)
self.data = []
for mag, rake, hp, probs, points in zip(
dic['magnitude'], dic['rake'], dic['hypocenter'],
dic['probs_occur'], dic['points']):
mesh = Mesh(points[0], points[1], points[2])
surface = GriddedSurface(mesh)
pmf = PMF([(prob, i) for i, prob in enumerate(probs)])
hypocenter = Point(hp['lon'], hp['lat'], hp['depth'])
rup = NonParametricProbabilisticRupture(
mag, rake, self.tectonic_region_type, hypocenter, surface, pmf)
self.data.append((rup, pmf))
def __repr__(self):
return '<%s gridded=%s>' % (self.__class__.__name__, self.is_gridded())
@property
def polygon(self):
"""
The convex hull of the underlying mesh of points
"""
lons = numpy.concatenate(
[rup.surface.mesh.lons.flatten() for rup, pmf in self.data])
lats = numpy.concatenate(
[rup.surface.mesh.lats.flatten() for rup, pmf in self.data])
points = numpy.zeros(len(lons), [('lon', F32), ('lat', F32)])
points['lon'] = lons
points['lat'] = lats
points = numpy.unique(points)
mesh = Mesh(points['lon'], points['lat'])
return mesh.get_convex_hull()
[docs] def wkt(self):
"""
:returns: the geometry as a WKT string
"""
return self.polygon.wkt
[docs] def get_one_rupture(self, rupture_mutex=False):
"""
Yields one random rupture from a source
"""
num_ruptures = self.count_ruptures()
if rupture_mutex:
weights = numpy.array([rup.weight for rup in self.iter_ruptures()])
else:
weights = numpy.ones((num_ruptures))*1./num_ruptures
idx = numpy.random.choice(range(num_ruptures), p=weights)
for i, rup in enumerate(self.iter_ruptures()):
if i == idx:
rup.rup_id = self.serial + i
rup.idx = idx
return rup