# The Hazard Library
# Copyright (C) 2012-2025 GEM Foundation
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Module :mod:`openquake.hazardlib.source.point` defines :class:`PointSource`.
"""
import math
import copy
import numpy
from openquake.baselib.general import AccumDict, groupby_grid, Deduplicate
from openquake.hazardlib.geo import Point, geodetic
from openquake.hazardlib.geo.nodalplane import NodalPlane
from openquake.hazardlib.geo.surface.planar import (
build_planar, PlanarSurface, planin_dt, get_rupdims)
from openquake.hazardlib.pmf import PMF
from openquake.hazardlib.scalerel.point import PointMSR
from openquake.hazardlib.source.base import ParametricSeismicSource
from openquake.hazardlib.source.rupture import (
ParametricProbabilisticRupture, PointRupture)
from openquake.hazardlib.geo.utils import get_bounding_box, angular_distance
[docs]def msr_name(src):
"""
:returns: string representation of the MSR or "Undefined" if not applicable
"""
# NB: the MSR is None for characteristicFault sources
try:
return str(src.magnitude_scaling_relationship)
except AttributeError: # no MSR for nonparametric sources
return 'Undefined'
[docs]def calc_average(pointsources):
"""
:returns:
a dict with average strike, dip, rake, lon, lat, dep,
upper_seismogenic_depth, lower_seismogenic_depth
"""
node_w, dep_w, rate_w = [], [], []
acc = dict(lon=[], lat=[], strike=[], dip=[], rake=[], dep=[],
upper_seismogenic_depth=[], lower_seismogenic_depth=[],
rupture_aspect_ratio=[])
trt = pointsources[0].tectonic_region_type
for src in pointsources:
assert src.tectonic_region_type == trt
ws, ds = zip(*src.nodal_plane_distribution.data)
acc['strike'].extend([np.strike for np in ds])
acc['dip'].extend([np.dip for np in ds])
acc['rake'].extend([np.rake for np in ds])
node_w.extend(ws)
ws, deps = zip(*src.hypocenter_distribution.data)
acc['dep'].extend(deps)
dep_w.extend(ws)
acc['lon'].append(src.location.x)
acc['lat'].append(src.location.y)
acc['upper_seismogenic_depth'].append(src.upper_seismogenic_depth)
acc['lower_seismogenic_depth'].append(src.lower_seismogenic_depth)
acc['rupture_aspect_ratio'].append(src.rupture_aspect_ratio)
rate_w.append(sum(r for m, r in src.get_annual_occurrence_rates()))
for key in acc:
if key in ('dip', 'strike', 'rake'):
acc[key] = numpy.average(acc[key], weights=node_w)
elif key == 'dep':
acc[key] = numpy.average(acc[key], weights=dep_w)
else:
acc[key] = numpy.average(acc[key], weights=rate_w)
return acc
[docs]class PointSource(ParametricSeismicSource):
"""
Point source typology represents seismicity on a single geographical
location.
:param upper_seismogenic_depth:
Minimum depth an earthquake rupture can reach, in km.
:param lower_seismogenic_depth:
Maximum depth an earthquake rupture can reach, in km.
:param location:
:class:`~openquake.hazardlib.geo.point.Point` object
representing the location of the seismic source.
:param nodal_plane_distribution:
:class:`~openquake.hazardlib.pmf.PMF` object with values
that are instances
of :class:`openquake.hazardlib.geo.nodalplane.NodalPlane`.
Shows the distribution
of probability for rupture to have the certain nodal plane.
:param hypocenter_distribution:
:class:`~openquake.hazardlib.pmf.PMF` with values being float
numbers in km representing the depth of the hypocenter. Latitude
and longitude of the hypocenter is always set to ones of ``location``.
See also :class:`openquake.hazardlib.source.base.ParametricSeismicSource`
for description of other parameters.
:raises ValueError:
If upper seismogenic depth is below lower seismogenic
depth, if one or more of hypocenter depth values is shallower
than upper seismogenic depth or deeper than lower seismogenic depth.
"""
code = b'P'
MODIFICATIONS = {'set_lower_seismogenic_depth',
'set_upper_seismogenic_depth'}
ps_grid_spacing = 0 # updated in CollapsedPointSource
def __init__(self, source_id, name, tectonic_region_type,
mfd, rupture_mesh_spacing,
magnitude_scaling_relationship, rupture_aspect_ratio,
temporal_occurrence_model,
# point-specific parameters
upper_seismogenic_depth, lower_seismogenic_depth,
location, nodal_plane_distribution, hypocenter_distribution):
super().__init__(
source_id, name, tectonic_region_type, mfd, rupture_mesh_spacing,
magnitude_scaling_relationship, rupture_aspect_ratio,
temporal_occurrence_model)
if not lower_seismogenic_depth > upper_seismogenic_depth:
raise ValueError('lower seismogenic depth must be below '
'upper seismogenic depth')
for _prob, depth in hypocenter_distribution.data:
if depth > lower_seismogenic_depth:
raise ValueError(
f'{depth=} is below {lower_seismogenic_depth=}')
elif depth < upper_seismogenic_depth:
raise ValueError(
f'{depth=} is over {upper_seismogenic_depth=}')
if not upper_seismogenic_depth > geodetic.EARTH_ELEVATION:
raise ValueError(
"Upper seismogenic depth must be greater than the "
"maximum elevation on Earth's surface (-8.848 km)")
self.location = location
self.nodal_plane_distribution = nodal_plane_distribution
self.hypocenter_distribution = hypocenter_distribution
self.upper_seismogenic_depth = upper_seismogenic_depth
self.lower_seismogenic_depth = lower_seismogenic_depth
[docs] def restrict(self, nodalplane, depth):
"""
:returns: source restricted to a single nodal plane and depth
"""
new = copy.copy(self)
new.nodal_plane_distribution = PMF([(1., nodalplane)])
new.hypocenter_distribution = PMF([(1., depth)])
return new
[docs] def get_planin(self, magd=None, npd=None):
"""
:return: array of dtype planin_dt of shape (#mags, #planes)
"""
if magd is None:
magd = [(r, mag) for mag, r in self.get_annual_occurrence_rates()]
if npd is None:
npd = self.nodal_plane_distribution.data
msr = self.magnitude_scaling_relationship
planin = numpy.zeros((len(magd), len(npd)), planin_dt).view(
numpy.recarray)
mrate, mags = numpy.array(magd).T # shape (2, num_mags)
nrate = numpy.array([nrate for nrate, np in npd])
planin['rate'] = mrate[:, None] * nrate
for n, (nrate, np) in enumerate(npd):
arr = planin[:, n]
arr['area'] = msr.get_median_area(mags, np.rake)
arr['mag'] = mags
arr['strike'] = np.strike
arr['dip'] = np.dip
arr['rake'] = np.rake
return planin
[docs] def max_radius(self, maxdist):
"""
:returns: max radius + ps_grid_spacing * sqrt(2)/2
"""
self._get_max_rupture_projection_radius()
eff_radius = min(self.radius[-1], maxdist / 2)
return eff_radius + self.ps_grid_spacing * .707
[docs] def get_psdist(self, m, mag, psdist, magdist):
"""
:returns: the effective pointsource distance for the given magnitude
"""
eff_radius = min(self.radius[m], magdist[mag] / 2)
return eff_radius + self.ps_grid_spacing * .707 + psdist
def _get_max_rupture_projection_radius(self):
"""
Find a maximum radius of a circle on Earth surface enveloping a rupture
produced by this source.
:returns:
Half of maximum rupture's diagonal surface projection.
"""
if hasattr(self, 'radius'):
return self.radius[-1] # max radius
if isinstance(self.magnitude_scaling_relationship, PointMSR):
M = len(self.get_annual_occurrence_rates())
self.radius = numpy.zeros(M)
return self.radius[-1]
magd = [(r, mag) for mag, r in self.get_annual_occurrence_rates()]
self.radius = numpy.zeros(len(magd))
usd = self.upper_seismogenic_depth
lsd = self.lower_seismogenic_depth
rar = self.rupture_aspect_ratio
for m, planin in enumerate(self.get_planin()):
rup_length, rup_width, _ = get_rupdims(
usd, lsd, rar, planin.area[-1], planin.dip[-1])
# the projection radius is half of the rupture diagonal
self.radius[m] = math.sqrt(rup_length ** 2 + rup_width ** 2) / 2.0
return self.radius[-1] # max radius
[docs] def get_planar(self, shift_hypo=False, iruptures=False):
"""
:returns: a dictionary mag -> list of arrays of shape (U, 3)
"""
magd = [(r, mag) for mag, r in self.get_annual_occurrence_rates()]
if isinstance(self, CollapsedPointSource) and not iruptures:
out = AccumDict(accum=[])
for src in self.pointsources:
out += src.get_planar(shift_hypo)
return out
hdd = numpy.array(self.hypocenter_distribution.data)
clon, clat = self.location.x, self.location.y
usd = self.upper_seismogenic_depth
lsd = self.lower_seismogenic_depth
rar = self.rupture_aspect_ratio
planin = self.get_planin()
planar = build_planar(
planin, hdd, clon, clat, usd, lsd, rar, shift_hypo)
dic = {mag: [pla.reshape(-1, 3)] # MND3
for (_rate, mag), pla in zip(magd, planar)}
return dic
def _gen_ruptures(self, shift_hypo=False, step=1, iruptures=False):
magd = [(r, mag) for mag, r in self.get_annual_occurrence_rates()]
npd = self.nodal_plane_distribution.data
hdd = self.hypocenter_distribution.data
clon, clat = self.location.x, self.location.y
if step == 1:
# return full ruptures (one per magnitude)
planardict = self.get_planar(shift_hypo, iruptures)
for mag, [planar] in planardict.items():
for pla in planar.reshape(-1, 3):
surface = PlanarSurface.from_(pla)
_strike, _dip, rake = pla.sdr
rate = pla.wlr[2]
yield ParametricProbabilisticRupture(
mag, rake, self.tectonic_region_type,
Point(*pla.hypo), surface, rate,
self.temporal_occurrence_model)
else:
# return point ruptures (fast)
magd_ = list(enumerate(magd))
npd_ = list(enumerate(npd))
hdd_ = list(enumerate(hdd))
for m, (mrate, mag) in magd_[-1:]:
for n, (nrate, np) in npd_:
for d, (drate, cdep) in hdd_:
rate = mrate * nrate * drate
yield PointRupture(
mag, self.tectonic_region_type,
Point(clon, clat, cdep), rate,
self.temporal_occurrence_model,
self.lower_seismogenic_depth)
[docs] def iter_ruptures(self, **kwargs):
"""
Generate one rupture for each combination of magnitude, nodal plane
and hypocenter depth.
"""
return self._gen_ruptures(
shift_hypo=kwargs.get('shift_hypo'),
step=kwargs.get('step', 1))
# PointSource
[docs] def iruptures(self):
"""
Generate one rupture for each magnitude, called only if nphc > 1
"""
avg = calc_average([self]) # over nodal planes and hypocenters
np = NodalPlane(avg['strike'], avg['dip'], avg['rake'])
yield from self.restrict(np, avg['dep'])._gen_ruptures(iruptures=True)
[docs] def count_nphc(self):
"""
:returns: the number of nodal planes times the number of hypocenters
"""
return len(self.nodal_plane_distribution.data) * len(
self.hypocenter_distribution.data)
[docs] def count_ruptures(self):
"""
See :meth:
`openquake.hazardlib.source.base.BaseSeismicSource.count_ruptures`.
"""
return len(self.get_annual_occurrence_rates()) * self.count_nphc()
[docs] def modify_set_lower_seismogenic_depth(self, lsd):
"""
Modifies the current source geometry by replacing the original
lower seismogenic depth with the passed depth
"""
self.lower_seismogenic_depth = lsd
[docs] def modify_set_upper_seismogenic_depth(self, usd):
"""
Modifies the current source geometry by replacing the original
upper seismogenic depth with the passed depth
"""
self.upper_seismogenic_depth = usd
@property
def polygon(self):
"""
Polygon corresponding to the max_rupture_projection_radius
"""
radius = self._get_max_rupture_projection_radius()
return self.location.to_polygon(radius)
[docs] def get_bounding_box(self, maxdist):
"""
Bounding box of the point, enlarged by the maximum distance
"""
radius = self.max_radius(maxdist)
return get_bounding_box([self.location], maxdist + radius)
[docs] def wkt(self):
"""
:returns: the geometry as a WKT string
"""
loc = self.location
return 'POINT(%s %s)' % (loc.x, loc.y)
[docs]def psources_to_pdata(pointsources, name):
"""
Build a pdata dictionary from a list of homogeneous point sources
with the same tom, trt, msr.
"""
ps = pointsources[0]
dt = [(f, numpy.float32) for f in 'lon lat rar usd lsd'.split()]
array = numpy.empty(len(pointsources), dt)
for i, ps in enumerate(pointsources):
rec = array[i]
loc = ps.location
rec['lon'] = loc.x
rec['lat'] = loc.y
rec['rar'] = ps.rupture_aspect_ratio
rec['usd'] = ps.upper_seismogenic_depth
rec['lsd'] = ps.lower_seismogenic_depth
pdata = dict(name=name,
array=array,
tom=ps.temporal_occurrence_model,
trt=ps.tectonic_region_type,
rms=ps.rupture_mesh_spacing,
npd=Deduplicate([ps.nodal_plane_distribution
for ps in pointsources]),
hcd=Deduplicate([ps.hypocenter_distribution
for ps in pointsources]),
mfd=Deduplicate([ps.mfd for ps in pointsources]),
msr=Deduplicate([ps.magnitude_scaling_relationship
for ps in pointsources]))
return pdata
[docs]def pdata_to_psources(pdata):
"""
Generate point sources from a pdata dictionary
"""
name = pdata['name']
tom = pdata['tom']
trt = pdata['trt']
npd = pdata['npd']
hcd = pdata['hcd']
rms = pdata['rms']
mfd = pdata['mfd']
msr = pdata['msr']
out = []
for i, rec in enumerate(pdata['array']):
out.append(PointSource(
source_id=f'{name}:{i}',
name=name,
tectonic_region_type=trt,
mfd=mfd[i],
rupture_mesh_spacing=rms,
magnitude_scaling_relationship=msr[i],
rupture_aspect_ratio=rec['rar'],
upper_seismogenic_depth=rec['usd'],
lower_seismogenic_depth=rec['lsd'],
location=Point(rec['lon'], rec['lat']),
nodal_plane_distribution=npd[i],
hypocenter_distribution=hcd[i],
temporal_occurrence_model=tom))
return out
[docs]class CollapsedPointSource(PointSource):
"""
Source typology representing a cluster of point sources around a
specific location. The underlying sources must all have the same
tectonic region type, magnitude_scaling_relationship and
temporal_occurrence_model.
"""
code = b'p'
MODIFICATIONS = set()
def __init__(self, source_id, pointsources):
self.source_id = source_id
self.pdata = psources_to_pdata(pointsources, source_id)
self.tectonic_region_type = pointsources[0].tectonic_region_type
self.magnitude_scaling_relationship = (
pointsources[0].magnitude_scaling_relationship)
self.temporal_occurrence_model = (
pointsources[0].temporal_occurrence_model)
vars(self).update(calc_average(pointsources))
self.location = Point(self.lon, self.lat, self.dep)
self.nodal_plane_distribution = PMF(
[(1., NodalPlane(self.strike, self.dip, self.rake))])
self.hypocenter_distribution = PMF([(1., self.dep)])
@property
def pointsources(self):
"""
:returns: the underlying point sources
"""
return pdata_to_psources(self.pdata)
[docs] def get_annual_occurrence_rates(self):
"""
:returns: a list of pairs [(mag, mag_occur_rate), ...]
"""
acc = AccumDict(accum=0)
for psource in self.pointsources:
acc += dict(psource.get_annual_occurrence_rates())
return sorted(acc.items())
[docs] def count_nphc(self):
"""
:returns: the total number of nodal planes and hypocenters
"""
return sum(src.count_nphc() for src in self.pointsources)
[docs] def iter_ruptures(self, **kwargs):
"""
:returns: an iterator over the underlying ruptures
"""
step = kwargs.get('step', 1)
for src in self.pointsources[::-step]:
yield from src.iter_ruptures(**kwargs)
# CollapsedPointSource
[docs] def iruptures(self):
"""
:yields: the underlying ruptures with mean nodal plane and hypocenter
"""
np = NodalPlane(self.strike, self.dip, self.rake)
yield from self.restrict(np, self.location.z)._gen_ruptures(
iruptures=True)
[docs] def count_ruptures(self):
"""
:returns: the total number of underlying ruptures
"""
return sum(src.count_ruptures()
for src in pdata_to_psources(self.pdata))
[docs]def grid_point_sources(sources, ps_grid_spacing):
"""
:param sources:
a list of sources with the same grp_id (point sources and not)
:param ps_grid_spacing:
value of the point source grid spacing in km; if None, do nothing
:returns:
a dict grp_id -> list of non-point sources and collapsed point sources
"""
grp_id = sources[0].grp_id
for src in sources[1:]:
assert src.grp_id == grp_id, (src.grp_id, grp_id)
if not ps_grid_spacing:
return sources
out = [src for src in sources if not hasattr(src, 'location')]
ps = numpy.array([src for src in sources if hasattr(src, 'location')])
if len(ps) < 2: # nothing to collapse
return out + list(ps)
coords = numpy.zeros((len(ps), 3))
for p, psource in enumerate(ps):
coords[p, 0] = psource.location.x
coords[p, 1] = psource.location.y
coords[p, 2] = psource.location.z
if (len(numpy.unique(coords[:, 0])) == 1 or
len(numpy.unique(coords[:, 1])) == 1):
# degenerated rectangle, there is no grid, do not collapse
return out + list(ps)
deltax = angular_distance(ps_grid_spacing, lat=coords[:, 1].mean())
deltay = angular_distance(ps_grid_spacing)
grid = groupby_grid(coords[:, 0], coords[:, 1], deltax, deltay)
cnt = 0
for idxs in grid.values():
if len(idxs) > 1:
cnt += 1
name = 'cps-%03d-%04d' % (grp_id, cnt)
cps = CollapsedPointSource(name, ps[idxs]) # slow part
cps.grp_id = ps[0].grp_id
cps.trt_smr = ps[0].trt_smr
cps.ps_grid_spacing = ps_grid_spacing
out.append(cps)
else: # there is a single source
out.append(ps[idxs[0]])
return out
# used only in view('long_ruptures')
[docs]def get_rup_maxlen(src):
"""
:returns: the maximum rupture length for point sources and area sources
"""
if hasattr(src, 'nodal_plane_distribution'):
maxmag, _rate = src.get_annual_occurrence_rates()[-1]
lsd = src.lower_seismogenic_depth
usd = src.upper_seismogenic_depth
msr = src.magnitude_scaling_relationship
rar = src.rupture_aspect_ratio
lens = []
for _, np in src.nodal_plane_distribution.data:
area = msr.get_median_area(maxmag, np.rake)
dims = get_rupdims(usd, lsd, rar, area, np.dip)
lens.append(dims[0])
return max(lens)
return 0.