# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2013-2022 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
"""
Module :mod:`openquake.hazardlib.geo.surface.multi` defines
:class:`MultiSurface`.
"""
import copy
import numpy as np
from shapely.geometry import Polygon
from openquake.hazardlib.geo.surface.base import BaseSurface
from openquake.hazardlib.geo.mesh import Mesh
from openquake.hazardlib.geo import utils
from openquake.hazardlib import geo
from openquake.hazardlib.geo.surface import (
PlanarSurface, SimpleFaultSurface, ComplexFaultSurface)
from openquake.hazardlib.geo.multiline import get_tu, MultiLine
[docs]class MultiSurface(BaseSurface):
"""
Represent a surface as a collection of independent surface elements.
:param surfaces:
List of instances of subclasses of
:class:`~openquake.hazardlib.geo.surface.base.BaseSurface`
each representing a surface geometry element.
"""
@property
def surface_nodes(self):
"""
:returns:
a list of surface nodes from the underlying single node surfaces
"""
if type(self.surfaces[0]).__name__ == 'PlanarSurface':
return [surf.surface_nodes[0] for surf in self.surfaces]
return [surf.surface_nodes for surf in self.surfaces]
[docs] @classmethod
def from_csv(cls, fname: str):
"""
:param fname:
path to a CSV file with header (lon, lat, dep) and 4 x P
rows describing planes in terms of corner points in the order
topleft, topright, bottomright, bottomleft
:returns:
a MultiSurface made of P planar surfaces
"""
surfaces = []
tmp = np.genfromtxt(fname, delimiter=',', comments='#', skip_header=1)
tmp = tmp.reshape(-1, 4, 3, order='A')
for i in range(tmp.shape[0]):
arr = tmp[i, :, :]
surfaces.append(PlanarSurface.from_ucerf(arr))
return cls(surfaces)
@property
def mesh(self):
"""
:returns: mesh corresponding to the whole multi surface
"""
lons = []
lats = []
deps = []
for m in [surface.mesh for surface in self.surfaces]:
ok = np.isfinite(m.lons) & np.isfinite(m.lats)
lons.append(m.lons[ok])
lats.append(m.lats[ok])
deps.append(m.depths[ok])
return Mesh(np.concatenate(lons), np.concatenate(lats),
np.concatenate(deps))
def __init__(self, surfaces: list, tol: float = 1):
"""
Intialize a multi surface object from a list of surfaces
:param list surfaces:
A list of instances of subclasses of
:class:`openquake.hazardlib.geo.surface.BaseSurface`
:param float tol:
A float in decimal degrees representing the tolerance admitted in
representing the rupture trace.
"""
self.surfaces = surfaces
self.tol = tol
self._set_tor()
self.areas = None
self.tut = None
self.uut = None
self.site_mesh = None
def _set_tor(self):
"""
Computes the list of the vertical surface projections of the top of
the ruptures from the set of surfaces defining the multi fault.
We represent the surface projection of each top of rupture with an
instance of a :class:`openquake.hazardlib.geo.multiline.Multiline`
"""
tors = []
classes = (ComplexFaultSurface, SimpleFaultSurface)
for srfc in self.surfaces:
if isinstance(srfc, geo.surface.kite_fault.KiteSurface):
lo, la = srfc.get_tor()
line = geo.Line.from_vectors(lo, la)
line.keep_corners(self.tol)
tors.append(line)
elif isinstance(srfc, PlanarSurface):
lo = []
la = []
for pnt in [srfc.top_left, srfc.top_right]:
lo.append(pnt.longitude)
la.append(pnt.latitude)
tors.append(geo.line.Line.from_vectors(lo, la))
elif isinstance(srfc, classes):
lons = srfc.mesh.lons[0, :]
lats = srfc.mesh.lats[0, :]
coo = np.array([[lo, la] for lo, la in zip(lons, lats)])
line = geo.line.Line.from_vectors(coo[:, 0], coo[:, 1])
line.keep_corners(self.tol)
tors.append(line)
else:
raise ValueError(f"Surface {str(srfc)} not supported")
# Set the multiline representing the rupture traces i.e. vertical
# projections at the surface of the top of ruptures
self.tors = geo.MultiLine(tors)
[docs] def get_min_distance(self, mesh):
"""
For each point in ``mesh`` compute the minimum distance to each
surface element and return the smallest value.
See :meth:`superclass method
<.base.BaseSurface.get_min_distance>`
for spec of input and result values.
"""
dists = [surf.get_min_distance(mesh) for surf in self.surfaces]
return np.min(dists, axis=0)
[docs] def get_closest_points(self, mesh):
"""
For each point in ``mesh`` find the closest surface element, and return
the corresponding closest point.
See :meth:`superclass method
<.base.BaseSurface.get_closest_points>`
for spec of input and result values.
"""
# first, for each point in mesh compute minimum distance to each
# surface. The distance matrix is flattend, because mesh can be of
# an arbitrary shape. By flattening we obtain a ``distances`` matrix
# for which the first dimension represents the different surfaces
# and the second dimension the mesh points.
dists = np.array(
[surf.get_min_distance(mesh).flatten() for surf in self.surfaces]
)
# find for each point in mesh the index of closest surface
idx = dists == np.min(dists, axis=0)
# loop again over surfaces. For each surface compute the closest
# points, and associate them to the mesh points for which the surface
# is the closest. Note that if a surface is not the closest to any of
# the mesh points then the calculation is skipped
lons = np.empty_like(mesh.lons.flatten())
lats = np.empty_like(mesh.lats.flatten())
depths = None if mesh.depths is None else \
np.empty_like(mesh.depths.flatten())
for i, surf in enumerate(self.surfaces):
if not idx[i, :].any():
continue
cps = surf.get_closest_points(mesh)
lons[idx[i, :]] = cps.lons.flatten()[idx[i, :]]
lats[idx[i, :]] = cps.lats.flatten()[idx[i, :]]
if depths is not None:
depths[idx[i, :]] = cps.depths.flatten()[idx[i, :]]
lons = lons.reshape(mesh.lons.shape)
lats = lats.reshape(mesh.lats.shape)
if depths is not None:
depths = depths.reshape(mesh.depths.shape)
return Mesh(lons, lats, depths)
[docs] def get_joyner_boore_distance(self, mesh):
"""
For each point in mesh compute the Joyner-Boore distance to all the
surface elements and return the smallest value.
See :meth:`superclass method
<.base.BaseSurface.get_joyner_boore_distance>`
for spec of input and result values.
"""
# For each point in mesh compute the Joyner-Boore distance to all the
# surfaces and return the shortest one.
dists = [
surf.get_joyner_boore_distance(mesh) for surf in self.surfaces]
return np.min(dists, axis=0)
[docs] def get_top_edge_depth(self):
"""
Compute top edge depth of each surface element and return area-weighted
average value (in km).
"""
areas = self._get_areas()
depths = np.array([np.mean(surf.get_top_edge_depth()) for surf
in self.surfaces])
ted = np.sum(areas * depths) / np.sum(areas)
assert np.isfinite(ted).all()
return ted
[docs] def get_strike(self):
"""
Compute strike of each surface element and return area-weighted average
value (in range ``[0, 360]``) using formula from:
http://en.wikipedia.org/wiki/Mean_of_circular_quantities
Note that the original formula has been adapted to compute a weighted
rather than arithmetic mean.
"""
areas = self._get_areas()
strikes = np.array([surf.get_strike() for surf in self.surfaces])
v1 = (np.sum(areas * np.sin(np.radians(strikes))) /
np.sum(areas))
v2 = (np.sum(areas * np.cos(np.radians(strikes))) /
np.sum(areas))
return np.degrees(np.arctan2(v1, v2)) % 360
[docs] def get_dip(self):
"""
Compute dip of each surface element and return area-weighted average
value (in range ``(0, 90]``).
Given that dip values are constrained in the range (0, 90], the simple
formula for weighted mean is used.
"""
areas = self._get_areas()
dips = np.array([surf.get_dip() for surf in self.surfaces])
ok = np.logical_and(np.isfinite(dips), np.isfinite(areas))[0]
dips = dips[ok]
areas = areas[ok]
dip = np.sum(areas * dips) / np.sum(areas)
return dip
[docs] def get_width(self):
"""
Compute width of each surface element, and return area-weighted
average value (in km).
"""
areas = self._get_areas()
widths = np.array([surf.get_width() for surf in self.surfaces])
return np.sum(areas * widths) / np.sum(areas)
[docs] def get_area(self):
"""
Return sum of surface elements areas (in squared km).
"""
return np.sum(self._get_areas())
[docs] def get_bounding_box(self):
"""
Compute bounding box for each surface element, and then return
the bounding box of all surface elements' bounding boxes.
:return:
A tuple of four items. These items represent western, eastern,
northern and southern borders of the bounding box respectively.
Values are floats in decimal degrees.
"""
lons = []
lats = []
for surf in self.surfaces:
west, east, north, south = surf.get_bounding_box()
lons.extend([west, east])
lats.extend([north, south])
return utils.get_spherical_bounding_box(lons, lats)
[docs] def get_middle_point(self):
"""
If :class:`MultiSurface` is defined by a single surface, simply
returns surface's middle point, otherwise find surface element closest
to the surface's bounding box centroid and return corresponding
middle point.
Note that the concept of middle point for a multi surface is ambiguous
and alternative definitions may be possible. However, this method is
mostly used to define the hypocenter location for ruptures described
by a multi surface
(see :meth:`openquake.hazardlib.source.characteristic.CharacteristicFaultSource.iter_ruptures`).
This is needed because when creating fault based sources, the rupture's
hypocenter locations are not explicitly defined, and therefore an
automated way to define them is required.
"""
if len(self.surfaces) == 1:
return self.surfaces[0].get_middle_point()
west, east, north, south = self.get_bounding_box()
longitude, latitude = utils.get_middle_point(west, north, east, south)
dists = []
for surf in self.surfaces:
dists.append(
surf.get_min_distance(Mesh(np.array([longitude]),
np.array([latitude]),
None)))
dists = np.array(dists).flatten()
idx = dists == np.min(dists)
return np.array(self.surfaces)[idx][0].get_middle_point()
[docs] def get_surface_boundaries(self):
los, las = self.surfaces[0].get_surface_boundaries()
poly = Polygon((lo, la) for lo, la in zip(los, las))
for i in range(1, len(self.surfaces)):
los, las = self.surfaces[i].get_surface_boundaries()
polyt = Polygon([(lo, la) for lo, la in zip(los, las)])
poly = poly.union(polyt)
coo = np.array([[lo, la] for lo, la in list(poly.exterior.coords)])
return coo[:, 0], coo[:, 1]
[docs] def get_surface_boundaries_3d(self):
lons = []
lats = []
deps = []
for surf in self.surfaces:
lons_surf, lats_surf, deps_surf = surf.get_surface_boundaries_3d()
lons.extend(lons_surf)
lats.extend(lats_surf)
deps.extend(deps_surf)
return lons, lats, deps
def _get_areas(self):
"""
Return surface elements area values in a numpy array.
"""
if self.areas is None:
self.areas = []
for surf in self.surfaces:
self.areas.append(surf.get_area())
self.areas = np.array(self.areas)
return self.areas
def _set_tu(self, mesh: Mesh):
"""
Set the values of T and U
"""
if self.tors is None:
self._set_tor()
if self.tors.shift is None:
self.tors._set_coordinate_shift()
tupps = []
uupps = []
weis = []
for line in self.tors.lines:
tu, uu, we = line.get_tu(mesh)
tupps.append(tu)
uupps.append(uu)
weis.append(np.squeeze(np.sum(we, axis=0)))
# `get_tu` is a function in the multiline module
uut, tut = get_tu(self.tors.shift, tupps, uupps, weis)
self.uut = uut
self.tut = tut
self.site_mesh = mesh
[docs] def get_rx_distance(self, mesh):
"""
:param mesh:
An instance of :class:`openquake.hazardlib.geo.mesh.Mesh` with the
coordinates of the sites.
:returns:
A :class:`numpy.ndarray` instance with the Rx distance. Note that
the Rx distance is directly taken from the GC2 t-coordinate.
"""
# This checks that the info stored is consistent with the mesh of
# points used
condition2 = (self.site_mesh is not None and self.site_mesh != mesh)
if (self.uut is None) or condition2:
self._set_tu(mesh)
rx = self.tut[0] if len(self.tut[0].shape) > 1 else self.tut
return rx
[docs] def get_ry0_distance(self, mesh):
"""
:param mesh:
"""
if self.tors is None:
self._set_tor()
return self.tors.get_ry0_distance(mesh)
"""
def get_ry0_distance(self, mesh):
if self.uut is None or self.site_mesh != mesh:
self._set_tu(mesh)
if self.tors.u_max is None:
self.tors.set_u_max()
ry0 = np.zeros_like(mesh.lons)
ry0[self.uut < 0] = abs(self.uut[self.uut < 0])
condition = self.uut > self.tors.u_max
ry0[condition] = self.uut[condition] - self.tors.u_max
out = ry0[0] if len(ry0.shape) > 1 else ry0
return out
"""
def _update_cache(rup, sites, params, dcache):
""" Updating distance cache """
# This is a list with the IDs of the surfaces representing the geometry of
# the rupture in question
suids = []
# Updating the cache with the distances for the surfaces not yet
# considered
for srf in rup.surface.surfaces:
suids.append(srf.suid)
if srf.suid not in dcache:
dcache[srf.suid] = {}
for param in params:
# This function returns the distances that will be added to the
# cache. In case of Rx and Ry0, the information cache will
# include the ToR of each surface as well as the GC2 t and u
# coordinates for each section.
distances = _get_distances(srf, sites, param)
# Save information into the cache for the current surfac.
for key in distances.keys():
dcache[srf.suid][key] = distances[key]
return dcache, suids
[docs]def get_distdic(rup, sites, params, dcache):
"""
Calculates the distances for multi-surfaces using a cache.
:param rup:
An instance of :class:`openquake.hazardlib.source.rupture.BaseRupture`
:param sites:
A list of sites or a site collection
:param params:
A list of keys defining the required rupture-distance parameters
:param dcache:
A dictionary of dictionaries with the distances. The first key is the
surface ID and the second one is the type of distance. In a traditional
calculation dcache is instatianted by in the `get_ctxs` method of the
:class:`openquake.hazardlib.contexts.ContextMaker`
:returns:
A dictionary with the computed distances for the rupture in input
"""
# Update the distance cache
dcache, suids = _update_cache(rup, sites, params, dcache)
# Computing distances using the cache
output = {}
for param in params:
if param not in output:
distances, keys = _get_distances_from_cache(dcache, suids, param)
for dst, key in zip(distances, keys):
output[key] = dst
return output
def _get_distances_from_cache(dcache: dict, suids: list, param: str):
"""
:param dcache:
See description in
:method:`openquake.hazardlib.geo.surface.multi.get_distdic`
:param suids:
A list with the IDs of the surfaces to consider
:param param:
A string defining a rupture-site metric (e.g. 'rjb')
"""
if param in ['rjb', 'rrup']:
distances = dcache[suids[0]][param]
# This is looping over all the surface IDs composing the rupture
for suid in suids:
distances = np.minimum(distances, dcache[suid][param])
params = [param]
distances = [distances]
elif param in ['rx', 'ry0']:
# The computed distances. In this case we are not going to add them to
# the cache since they cannot be reused
distances, params = _get_rx_ry0_from_cache(dcache, suids, param)
else:
raise ValueError("Unknown distance measure %r" % param)
return distances, params
def _get_distances(surface, sites, param):
"""
:param surface:
An instance of :class:`openquake.hazardlib.geo.surface.BaseSurface`
:param sites:
An instance of :class:`openquake.hazardlib.geo.Mesh`
:param param:
A string defining a rupture-site metric (e.g. 'rjb')
:returns:
The requested distances
"""
if param == 'rrup':
dist = surface.get_min_distance(sites)
elif param == 'rjb':
dist = surface.get_joyner_boore_distance(sites)
elif param in ['rx', 'ry0']:
# In this case we compute the GC2 coordinates for the surface
tor_lo, tor_la = surface.get_tor()
tor = geo.line.Line.from_vectors(tor_lo, tor_la)
t_upp, u_upp, wei = tor.get_tu(sites)
wei_sum = np.squeeze(np.sum(wei, axis=0))
# Get the u-coordinate on the last vertex of the trace
mesh = Mesh(tor_lo[[0, -1]], tor_la[[0, -1]])
_, uu, _ = tor.get_tu(mesh)
# Save data
dists = {'tor': tor, 't_upp': t_upp, 'u_upp': u_upp, 'wei': wei_sum,
'umax': max(uu)}
else:
raise ValueError(f'Unknown distance measure {param}')
if param in ['rrup', 'rjb']:
dists = {param: dist}
return dists
def _get_rx_ry0_from_cache(dcache, suids, param):
"""
See :function:`openquake.hazardlib.geo.surface.multi._get_distances`
:returns:
Two lists. One with distances and one with the corresponding distance
names.
"""
# Get the multiline used to compute distances
multil = _get_multi_line(dcache, suids)
# Get GC coordinates
multil.set_tu()
# Get Rx and Ry0
rx = multil.get_rx_distance()
ry0 = multil.get_ry0_distance()
return [rx, ry0], ['rx', 'ry0']
def _get_multi_line(dcache, suids):
# Retrieve info from the cache
lines = [dcache[key]['tor'] for key in suids]
# Create the multiline
multil = MultiLine(lines)
revert = multil.set_overall_strike()
soidx = multil._set_origin()
revert = revert[soidx]
# Load data in cache
tupps = [dcache[suids[i]]['t_upp'] for i in soidx]
uupps = [dcache[suids[i]]['u_upp'] for i in soidx]
weis = [dcache[suids[i]]['wei'] for i in soidx]
umax = [dcache[suids[i]]['umax'] for i in soidx]
# Change sign to the reverted surface traces
for i, flag in enumerate(revert):
if flag:
tupps[i] = -tupps[i]
uupps[i] = -(uupps[i] - umax[i])
# Fixing shift
multil._set_coordinate_shift()
multil.set_u_max()
multil.tupps = tupps
multil.uupps = uupps
multil.weis = np.array(weis) # shape (N, 3)
multil.weis.flags.writeable = False # nobody must change the weights
return multil