Source code for openquake.hazardlib.source.point

# The Hazard Library
# Copyright (C) 2012-2025 GEM Foundation
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
Module :mod:`openquake.hazardlib.source.point` defines :class:`PointSource`.
"""
import math
import copy
import numpy
from openquake.baselib.general import AccumDict, groupby_grid, Deduplicate
from openquake.hazardlib.geo import Point, geodetic
from openquake.hazardlib.geo.nodalplane import NodalPlane
from openquake.hazardlib.geo.surface.planar import (
    build_planar, PlanarSurface, planin_dt, get_rupdims)
from openquake.hazardlib.pmf import PMF
from openquake.hazardlib.scalerel.point import PointMSR
from openquake.hazardlib.source.base import ParametricSeismicSource
from openquake.hazardlib.source.rupture import (
    ParametricProbabilisticRupture, PointRupture)
from openquake.hazardlib.geo.utils import get_bounding_box, angular_distance


[docs]def msr_name(src): """ :returns: string representation of the MSR or "Undefined" if not applicable """ # NB: the MSR is None for characteristicFault sources try: return str(src.magnitude_scaling_relationship) except AttributeError: # no MSR for nonparametric sources return 'Undefined'
[docs]def calc_average(pointsources): """ :returns: a dict with average strike, dip, rake, lon, lat, dep, upper_seismogenic_depth, lower_seismogenic_depth """ node_w, dep_w, rate_w = [], [], [] acc = dict(lon=[], lat=[], strike=[], dip=[], rake=[], dep=[], upper_seismogenic_depth=[], lower_seismogenic_depth=[], rupture_aspect_ratio=[]) trt = pointsources[0].tectonic_region_type for src in pointsources: assert src.tectonic_region_type == trt ws, ds = zip(*src.nodal_plane_distribution.data) acc['strike'].extend([np.strike for np in ds]) acc['dip'].extend([np.dip for np in ds]) acc['rake'].extend([np.rake for np in ds]) node_w.extend(ws) ws, deps = zip(*src.hypocenter_distribution.data) acc['dep'].extend(deps) dep_w.extend(ws) acc['lon'].append(src.location.x) acc['lat'].append(src.location.y) acc['upper_seismogenic_depth'].append(src.upper_seismogenic_depth) acc['lower_seismogenic_depth'].append(src.lower_seismogenic_depth) acc['rupture_aspect_ratio'].append(src.rupture_aspect_ratio) rate_w.append(sum(r for m, r in src.get_annual_occurrence_rates())) for key in acc: if key in ('dip', 'strike', 'rake'): acc[key] = numpy.average(acc[key], weights=node_w) elif key == 'dep': acc[key] = numpy.average(acc[key], weights=dep_w) else: acc[key] = numpy.average(acc[key], weights=rate_w) return acc
[docs]class PointSource(ParametricSeismicSource): """ Point source typology represents seismicity on a single geographical location. :param upper_seismogenic_depth: Minimum depth an earthquake rupture can reach, in km. :param lower_seismogenic_depth: Maximum depth an earthquake rupture can reach, in km. :param location: :class:`~openquake.hazardlib.geo.point.Point` object representing the location of the seismic source. :param nodal_plane_distribution: :class:`~openquake.hazardlib.pmf.PMF` object with values that are instances of :class:`openquake.hazardlib.geo.nodalplane.NodalPlane`. Shows the distribution of probability for rupture to have the certain nodal plane. :param hypocenter_distribution: :class:`~openquake.hazardlib.pmf.PMF` with values being float numbers in km representing the depth of the hypocenter. Latitude and longitude of the hypocenter is always set to ones of ``location``. See also :class:`openquake.hazardlib.source.base.ParametricSeismicSource` for description of other parameters. :raises ValueError: If upper seismogenic depth is below lower seismogenic depth, if one or more of hypocenter depth values is shallower than upper seismogenic depth or deeper than lower seismogenic depth. """ code = b'P' MODIFICATIONS = {'set_lower_seismogenic_depth', 'set_upper_seismogenic_depth'} ps_grid_spacing = 0 # updated in CollapsedPointSource def __init__(self, source_id, name, tectonic_region_type, mfd, rupture_mesh_spacing, magnitude_scaling_relationship, rupture_aspect_ratio, temporal_occurrence_model, # point-specific parameters upper_seismogenic_depth, lower_seismogenic_depth, location, nodal_plane_distribution, hypocenter_distribution): super().__init__( source_id, name, tectonic_region_type, mfd, rupture_mesh_spacing, magnitude_scaling_relationship, rupture_aspect_ratio, temporal_occurrence_model) if not lower_seismogenic_depth > upper_seismogenic_depth: raise ValueError('lower seismogenic depth must be below ' 'upper seismogenic depth') for _prob, depth in hypocenter_distribution.data: if depth > lower_seismogenic_depth: raise ValueError( f'{depth=} is below {lower_seismogenic_depth=}') elif depth < upper_seismogenic_depth: raise ValueError( f'{depth=} is over {upper_seismogenic_depth=}') if not upper_seismogenic_depth > geodetic.EARTH_ELEVATION: raise ValueError( "Upper seismogenic depth must be greater than the " "maximum elevation on Earth's surface (-8.848 km)") self.location = location self.nodal_plane_distribution = nodal_plane_distribution self.hypocenter_distribution = hypocenter_distribution self.upper_seismogenic_depth = upper_seismogenic_depth self.lower_seismogenic_depth = lower_seismogenic_depth
[docs] def restrict(self, nodalplane, depth): """ :returns: source restricted to a single nodal plane and depth """ new = copy.copy(self) new.nodal_plane_distribution = PMF([(1., nodalplane)]) new.hypocenter_distribution = PMF([(1., depth)]) return new
[docs] def get_planin(self, magd=None, npd=None): """ :return: array of dtype planin_dt of shape (#mags, #planes) """ if magd is None: magd = [(r, mag) for mag, r in self.get_annual_occurrence_rates()] if npd is None: npd = self.nodal_plane_distribution.data msr = self.magnitude_scaling_relationship planin = numpy.zeros((len(magd), len(npd)), planin_dt).view( numpy.recarray) mrate, mags = numpy.array(magd).T # shape (2, num_mags) nrate = numpy.array([nrate for nrate, np in npd]) planin['rate'] = mrate[:, None] * nrate for n, (nrate, np) in enumerate(npd): arr = planin[:, n] arr['area'] = msr.get_median_area(mags, np.rake) arr['mag'] = mags arr['strike'] = np.strike arr['dip'] = np.dip arr['rake'] = np.rake return planin
[docs] def max_radius(self, maxdist): """ :returns: max radius + ps_grid_spacing * sqrt(2)/2 """ self._get_max_rupture_projection_radius() eff_radius = min(self.radius[-1], maxdist / 2) return eff_radius + self.ps_grid_spacing * .707
[docs] def get_psdist(self, m, mag, psdist, magdist): """ :returns: the effective pointsource distance for the given magnitude """ eff_radius = min(self.radius[m], magdist[mag] / 2) return eff_radius + self.ps_grid_spacing * .707 + psdist
def _get_max_rupture_projection_radius(self): """ Find a maximum radius of a circle on Earth surface enveloping a rupture produced by this source. :returns: Half of maximum rupture's diagonal surface projection. """ if hasattr(self, 'radius'): return self.radius[-1] # max radius if isinstance(self.magnitude_scaling_relationship, PointMSR): M = len(self.get_annual_occurrence_rates()) self.radius = numpy.zeros(M) return self.radius[-1] magd = [(r, mag) for mag, r in self.get_annual_occurrence_rates()] self.radius = numpy.zeros(len(magd)) usd = self.upper_seismogenic_depth lsd = self.lower_seismogenic_depth rar = self.rupture_aspect_ratio for m, planin in enumerate(self.get_planin()): rup_length, rup_width, _ = get_rupdims( usd, lsd, rar, planin.area[-1], planin.dip[-1]) # the projection radius is half of the rupture diagonal self.radius[m] = math.sqrt(rup_length ** 2 + rup_width ** 2) / 2.0 return self.radius[-1] # max radius
[docs] def get_planar(self, shift_hypo=False, iruptures=False): """ :returns: a dictionary mag -> list of arrays of shape (U, 3) """ magd = [(r, mag) for mag, r in self.get_annual_occurrence_rates()] if isinstance(self, CollapsedPointSource) and not iruptures: out = AccumDict(accum=[]) for src in self.pointsources: out += src.get_planar(shift_hypo) return out hdd = numpy.array(self.hypocenter_distribution.data) clon, clat = self.location.x, self.location.y usd = self.upper_seismogenic_depth lsd = self.lower_seismogenic_depth rar = self.rupture_aspect_ratio planin = self.get_planin() planar = build_planar( planin, hdd, clon, clat, usd, lsd, rar, shift_hypo) dic = {mag: [pla.reshape(-1, 3)] # MND3 for (_rate, mag), pla in zip(magd, planar)} return dic
def _gen_ruptures(self, shift_hypo=False, step=1, iruptures=False): magd = [(r, mag) for mag, r in self.get_annual_occurrence_rates()] npd = self.nodal_plane_distribution.data hdd = self.hypocenter_distribution.data clon, clat = self.location.x, self.location.y if step == 1: # return full ruptures (one per magnitude) planardict = self.get_planar(shift_hypo, iruptures) for mag, [planar] in planardict.items(): for pla in planar.reshape(-1, 3): surface = PlanarSurface.from_(pla) _strike, _dip, rake = pla.sdr rate = pla.wlr[2] yield ParametricProbabilisticRupture( mag, rake, self.tectonic_region_type, Point(*pla.hypo), surface, rate, self.temporal_occurrence_model) else: # return point ruptures (fast) magd_ = list(enumerate(magd)) npd_ = list(enumerate(npd)) hdd_ = list(enumerate(hdd)) for m, (mrate, mag) in magd_[-1:]: for n, (nrate, np) in npd_: for d, (drate, cdep) in hdd_: rate = mrate * nrate * drate yield PointRupture( mag, self.tectonic_region_type, Point(clon, clat, cdep), rate, self.temporal_occurrence_model, self.lower_seismogenic_depth)
[docs] def iter_ruptures(self, **kwargs): """ Generate one rupture for each combination of magnitude, nodal plane and hypocenter depth. """ return self._gen_ruptures( shift_hypo=kwargs.get('shift_hypo'), step=kwargs.get('step', 1))
# PointSource
[docs] def iruptures(self): """ Generate one rupture for each magnitude, called only if nphc > 1 """ avg = calc_average([self]) # over nodal planes and hypocenters np = NodalPlane(avg['strike'], avg['dip'], avg['rake']) yield from self.restrict(np, avg['dep'])._gen_ruptures(iruptures=True)
[docs] def count_nphc(self): """ :returns: the number of nodal planes times the number of hypocenters """ return len(self.nodal_plane_distribution.data) * len( self.hypocenter_distribution.data)
[docs] def count_ruptures(self): """ See :meth: `openquake.hazardlib.source.base.BaseSeismicSource.count_ruptures`. """ return len(self.get_annual_occurrence_rates()) * self.count_nphc()
[docs] def modify_set_lower_seismogenic_depth(self, lsd): """ Modifies the current source geometry by replacing the original lower seismogenic depth with the passed depth """ self.lower_seismogenic_depth = lsd
[docs] def modify_set_upper_seismogenic_depth(self, usd): """ Modifies the current source geometry by replacing the original upper seismogenic depth with the passed depth """ self.upper_seismogenic_depth = usd
@property def polygon(self): """ Polygon corresponding to the max_rupture_projection_radius """ radius = self._get_max_rupture_projection_radius() return self.location.to_polygon(radius)
[docs] def get_bounding_box(self, maxdist): """ Bounding box of the point, enlarged by the maximum distance """ radius = self.max_radius(maxdist) return get_bounding_box([self.location], maxdist + radius)
[docs] def wkt(self): """ :returns: the geometry as a WKT string """ loc = self.location return 'POINT(%s %s)' % (loc.x, loc.y)
[docs]def psources_to_pdata(pointsources, name): """ Build a pdata dictionary from a list of homogeneous point sources with the same tom, trt, msr. """ ps = pointsources[0] dt = [(f, numpy.float32) for f in 'lon lat rar usd lsd'.split()] array = numpy.empty(len(pointsources), dt) for i, ps in enumerate(pointsources): rec = array[i] loc = ps.location rec['lon'] = loc.x rec['lat'] = loc.y rec['rar'] = ps.rupture_aspect_ratio rec['usd'] = ps.upper_seismogenic_depth rec['lsd'] = ps.lower_seismogenic_depth pdata = dict(name=name, array=array, tom=ps.temporal_occurrence_model, trt=ps.tectonic_region_type, rms=ps.rupture_mesh_spacing, npd=Deduplicate([ps.nodal_plane_distribution for ps in pointsources]), hcd=Deduplicate([ps.hypocenter_distribution for ps in pointsources]), mfd=Deduplicate([ps.mfd for ps in pointsources]), msr=Deduplicate([ps.magnitude_scaling_relationship for ps in pointsources])) return pdata
[docs]def pdata_to_psources(pdata): """ Generate point sources from a pdata dictionary """ name = pdata['name'] tom = pdata['tom'] trt = pdata['trt'] npd = pdata['npd'] hcd = pdata['hcd'] rms = pdata['rms'] mfd = pdata['mfd'] msr = pdata['msr'] out = [] for i, rec in enumerate(pdata['array']): out.append(PointSource( source_id=f'{name}:{i}', name=name, tectonic_region_type=trt, mfd=mfd[i], rupture_mesh_spacing=rms, magnitude_scaling_relationship=msr[i], rupture_aspect_ratio=rec['rar'], upper_seismogenic_depth=rec['usd'], lower_seismogenic_depth=rec['lsd'], location=Point(rec['lon'], rec['lat']), nodal_plane_distribution=npd[i], hypocenter_distribution=hcd[i], temporal_occurrence_model=tom)) return out
[docs]class CollapsedPointSource(PointSource): """ Source typology representing a cluster of point sources around a specific location. The underlying sources must all have the same tectonic region type, magnitude_scaling_relationship and temporal_occurrence_model. """ code = b'p' MODIFICATIONS = set() def __init__(self, source_id, pointsources): self.source_id = source_id self.pdata = psources_to_pdata(pointsources, source_id) self.tectonic_region_type = pointsources[0].tectonic_region_type self.magnitude_scaling_relationship = ( pointsources[0].magnitude_scaling_relationship) self.temporal_occurrence_model = ( pointsources[0].temporal_occurrence_model) vars(self).update(calc_average(pointsources)) self.location = Point(self.lon, self.lat, self.dep) self.nodal_plane_distribution = PMF( [(1., NodalPlane(self.strike, self.dip, self.rake))]) self.hypocenter_distribution = PMF([(1., self.dep)]) @property def pointsources(self): """ :returns: the underlying point sources """ return pdata_to_psources(self.pdata)
[docs] def get_annual_occurrence_rates(self): """ :returns: a list of pairs [(mag, mag_occur_rate), ...] """ acc = AccumDict(accum=0) for psource in self.pointsources: acc += dict(psource.get_annual_occurrence_rates()) return sorted(acc.items())
[docs] def count_nphc(self): """ :returns: the total number of nodal planes and hypocenters """ return sum(src.count_nphc() for src in self.pointsources)
[docs] def iter_ruptures(self, **kwargs): """ :returns: an iterator over the underlying ruptures """ step = kwargs.get('step', 1) for src in self.pointsources[::-step]: yield from src.iter_ruptures(**kwargs)
# CollapsedPointSource
[docs] def iruptures(self): """ :yields: the underlying ruptures with mean nodal plane and hypocenter """ np = NodalPlane(self.strike, self.dip, self.rake) yield from self.restrict(np, self.location.z)._gen_ruptures( iruptures=True)
[docs] def count_ruptures(self): """ :returns: the total number of underlying ruptures """ return sum(src.count_ruptures() for src in pdata_to_psources(self.pdata))
[docs]def grid_point_sources(sources, ps_grid_spacing): """ :param sources: a list of sources with the same grp_id (point sources and not) :param ps_grid_spacing: value of the point source grid spacing in km; if None, do nothing :returns: a dict grp_id -> list of non-point sources and collapsed point sources """ grp_id = sources[0].grp_id for src in sources[1:]: assert src.grp_id == grp_id, (src.grp_id, grp_id) if not ps_grid_spacing: return sources out = [src for src in sources if not hasattr(src, 'location')] ps = numpy.array([src for src in sources if hasattr(src, 'location')]) if len(ps) < 2: # nothing to collapse return out + list(ps) coords = numpy.zeros((len(ps), 3)) for p, psource in enumerate(ps): coords[p, 0] = psource.location.x coords[p, 1] = psource.location.y coords[p, 2] = psource.location.z if (len(numpy.unique(coords[:, 0])) == 1 or len(numpy.unique(coords[:, 1])) == 1): # degenerated rectangle, there is no grid, do not collapse return out + list(ps) deltax = angular_distance(ps_grid_spacing, lat=coords[:, 1].mean()) deltay = angular_distance(ps_grid_spacing) grid = groupby_grid(coords[:, 0], coords[:, 1], deltax, deltay) cnt = 0 for idxs in grid.values(): if len(idxs) > 1: cnt += 1 name = 'cps-%03d-%04d' % (grp_id, cnt) cps = CollapsedPointSource(name, ps[idxs]) # slow part cps.grp_id = ps[0].grp_id cps.trt_smr = ps[0].trt_smr cps.ps_grid_spacing = ps_grid_spacing out.append(cps) else: # there is a single source out.append(ps[idxs[0]]) return out
# used only in view('long_ruptures')
[docs]def get_rup_maxlen(src): """ :returns: the maximum rupture length for point sources and area sources """ if hasattr(src, 'nodal_plane_distribution'): maxmag, _rate = src.get_annual_occurrence_rates()[-1] lsd = src.lower_seismogenic_depth usd = src.upper_seismogenic_depth msr = src.magnitude_scaling_relationship rar = src.rupture_aspect_ratio lens = [] for _, np in src.nodal_plane_distribution.data: area = msr.get_median_area(maxmag, np.rake) dims = get_rupdims(usd, lsd, rar, area, np.dip) lens.append(dims[0]) return max(lens) return 0.