Source code for openquake.hazardlib.geo.line

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2012-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
"""
 Module :mod:`openquake.hazardlib.geo.line` defines :class:`Line`.
"""
import numpy as np
from openquake.baselib.general import cached_property
from openquake.baselib.performance import compile
from openquake.hazardlib.geo import geodetic, utils, Point

TOLERANCE = 0.1
SMALL = 1e-2


def _update(rtra, rtra_prj, proj, pnt):
    xg, yg = proj(np.array([pnt[0]]), np.array([pnt[1]]), reverse=True)
    rtra.append(np.array([xg[0], yg[0], pnt[2]]))
    rtra_prj.append(pnt)


def _resample(line, sect_len, orig_extremes):
    # Returns array of resampled trace coordinates
    #
    # :param coo:
    #   A :class:`numpy.ndarray` instance with three columns and n-lines
    #   containing the coordinates of the polyline to be resampled.
    # :param sect_len:
    #   The resampling distance [km]
    # :param orig_extremes:
    #   A boolean. When true the last point in coo is also added.

    # Project the coordinates of the trace and save them in `txy`
    txy = line.proj(line.coo[:, 0], line.coo[:, 1], line.coo[:, 2]).T

    # Compute the total length of the original trace
    # tot_len = sum(utils.get_dist(txy[i], txy[i - 1]) for i in range(1, N))
    inc_len = 0.

    # Initialize the lists with the coordinates of the resampled trace
    rtra_prj = [txy[0]]
    rtra = [line.coo[0]]

    # Resampling
    idx_vtx = -1
    while True:

        # Computing distances from the reference point
        dis = utils.get_dist(txy, rtra_prj[-1])
        if idx_vtx > 0:

            # Fixing distances for points before the index
            dis[0:idx_vtx] = 100000

        # Index of the point on the trace with a distance just below the
        # sampling distance
        idx = np.where(dis <= sect_len, dis, -np.inf).argmax()

        # If the pick a point that is not the last one on the trace we
        # compute the new sample by interpolation
        if idx < len(dis) - 1:

            pnt = find_t(txy[idx + 1, :], txy[idx, :], rtra_prj[-1], sect_len)
            if np.isnan(pnt).any():
                raise ValueError('Did not find the intersection')
            _update(rtra, rtra_prj, line.proj, pnt)
            inc_len += sect_len

            # Adding more points still on the same segment
            delta = txy[idx + 1] - rtra_prj[-1]
            chk_dst = utils.get_dist(txy[idx + 1], rtra_prj[-1])
            rat = delta / chk_dst
            while chk_dst > sect_len * 0.9999:
                _update(rtra, rtra_prj, line.proj,
                        rtra_prj[-1] + sect_len * rat)
                inc_len += sect_len
                # This is the distance between the last resampled point
                # and the second vertex of the segment
                chk_dst = utils.get_dist(txy[idx + 1], rtra_prj[-1])
        else:

            same_dir = True
            if len(rtra) > 1:
                same_dir = _get_same_dir(rtra, line.coo)

            # This is the distance between the last sampled point and the last
            # point on the original edge
            dist_from_last = utils.get_dist(rtra_prj[-1], txy[-1])

            # We are processing the last point
            # if tot_len - inc_len > 0.5 * sect_len and not orig_extremes:
            if ((dist_from_last > sect_len / 2 and not orig_extremes and
                    same_dir) or
                (dist_from_last < sect_len / 2 and not orig_extremes and
                    not same_dir)):

                # Adding more points still on the same segment
                delta = txy[-1] - txy[-2]
                chk_dst = utils.get_dist(txy[-1], txy[-2])
                _update(rtra, rtra_prj, line.proj, rtra_prj[-1] +
                        sect_len * delta / chk_dst)
                inc_len += sect_len

            elif orig_extremes:
                # Adding last point
                rtra.append(line.coo[-1])
            break

        # Updating index
        idx_vtx = idx + 1

    return np.array(utils.clean_points(rtra))


def _get_same_dir(rtra, coo):

    # If the line is vertical
    if (np.abs(rtra[-2][0] - rtra[-1][0]) < SMALL and
            np.abs(rtra[-2][1] - rtra[-1][1]) < SMALL):
        same_dir = True
        if coo[-1, 2] < rtra[-1][2]:
            same_dir = False
        return same_dir

    # Azimuth of the resampled edge
    azim_rsmp_edge = geodetic.azimuth(rtra[-2][0], rtra[-2][1],
                                      rtra[-1][0], rtra[-1][1])
    # Azimuth from the last resampled edge and the last point on
    # the original edge
    azim_orig_edge = geodetic.azimuth(rtra[-1][0], rtra[-1][1],
                                      coo[-1, 0], coo[-1, 1])
    # Check
    same_dir = np.abs(azim_rsmp_edge - azim_orig_edge) < 30

    return same_dir


[docs]@compile('(f8[:,:],f8[:,:],f8[:],f8[:,:])') def line_get_tu(ui, ti, sl, weights): """ Compute the T and U quantitities. :param ui: A :class:`numpy.ndarray` instance of cardinality (num segments x num sites) :param ti: A :class:`numpy.ndarray` instance of cardinality (num segments x num sites) :param sl: A :class:`numpy.ndarray` instance with the segments' length :param weights: A :class:`numpy.ndarray` instance of cardinality (num segments x num sites) """ # Sum of weights - This has shape equal to the number of sites weight_sum = weights.sum(axis=0) # Compute T t_upp = (ti * weights / weight_sum.T).sum(axis=0) # Compute U u_upp = ui[0] * weights[0] for i in range(1, len(sl)): delta = np.sum(sl[:i]) u_upp += (ui[i] + delta) * weights[i] u_upp = (u_upp / weight_sum.T).T return t_upp, u_upp
[docs]@compile('(f8,f8,f8[:,:],f8[:],f8[:],f8[:,:],f8[:,:])') def get_ui_ti(lam0, phi0, coo, lons, lats, uhat, that): """ Compute the t and u coordinates. ti and ui have shape (num_segments x num_sites). """ N = len(lons) L = len(coo) # Sites projected coordinates sx, sy = utils.project_direct(lam0, phi0, lons, lats) # Polyline projected coordinates tx, ty = utils.project_direct(lam0, phi0, coo[:, 0], coo[:, 1]) # Initializing ti and ui coordinates ui = np.zeros((L - 1, N)) ti = np.zeros((L - 1, N)) # For each segment for i in range(L-1): dx = sx - tx[i] dy = sy - ty[i] ui[i] = dx * uhat[i, 0] + dy * uhat[i, 1] ti[i] = dx * that[i, 0] + dy * that[i, 1] return ui, ti
# has effect on case_65 with multifaultsources and rx0 distance # affects the performance of ~/oq-risk-tests/test/disaggregation/NZ
[docs]@compile('(f8[:,:],f8[:,:],f8[:])') def get_ti_weights(ui, ti, segments_len): """ :returns: (weights, idx_on_trace) """ S1, S2 = ui.shape weights = np.zeros_like(ui) terma = np.zeros_like(ui) term1 = np.zeros_like(ui) term2 = np.zeros_like(ui) idx_on_trace = np.zeros(S2, dtype=np.bool_) for i in range(S1): ti_ = ti[i] ui_ = ui[i] terma_ = terma[i] term1_ = term1[i] term2_ = term2[i] ws = weights[i] seglen = segments_len[i] # More general case cond0 = np.abs(ti_) >= TOLERANCE if cond0.any(): terma_[cond0] = seglen - ui_[cond0] term1_[cond0] = np.arctan(terma_[cond0] / ti_[cond0]) term2_[cond0] = np.arctan(-ui_[cond0] / ti_[cond0]) ws[cond0] = (term1_[cond0] - term2_[cond0]) / ti_[cond0] # Case for sites on the extension of one segment cond1 = np.abs(ti_) < TOLERANCE cond2 = np.logical_or(ui_ < 0. - TOLERANCE, ui_ > seglen + TOLERANCE) iii = np.logical_and(cond1, cond2) if len(iii): ws[iii] = 1. / (ui_[iii] - seglen) - 1. / ui_[iii] # Case for sites on one segment cond3 = np.logical_and(ui_ >= - TOLERANCE, ui_ <= seglen + TOLERANCE) jjj = np.logical_and(cond1, cond3) ws[jjj] = 1 / (-0.01 - seglen) + 1 / 0.01 idx_on_trace[jjj] = 1.0 return weights, idx_on_trace
[docs]@compile('(f8,f8,f8[:,:],f8[:],f8[:,:],f8[:,:],f8[:],f8[:])') def get_tuw(lam0, phi0, coo, slen, uhat, that, lons, lats): """ :returns: array of float32 of shape (N, 3) """ N = len(lons) out = np.empty((N, 3), np.float32) ui, ti = get_ui_ti(lam0, phi0, coo, lons, lats, uhat, that) weights, iot = get_ti_weights(ui, ti, slen) t, u = line_get_tu(ui, ti, slen, weights) t[iot] = 0.0 out[:, 0] = t out[:, 1] = u out[:, 2] = weights.sum(axis=0) return out
[docs]class Line(object): """ This class represents a geographical line, which is basically a sequence of geographical points. A line is defined by at least two points. :param points: The sequence of points defining this line. :type points: list of :class:`~openquake.hazardlib.geo.point.Point` instances """
[docs] @classmethod def from_coo(cls, coo): """ Build a Line object for an array of coordinates, assuming they have e been cleaned already, i.e. there are no adjacent duplicate points """ self = cls.__new__(cls) self.init(coo) return self
def __init__(self, points): points = utils.clean_points(points) # can remove points! self.init(np.array([[p.x, p.y, p.z] for p in points]))
[docs] def init(self, coo): self.coo = coo self.proj = utils.OrthographicProjection.from_lons_lats( self.coo[:, 0], self.coo[:, 1]) if len(coo) == 2: # segment p0, p1 = self.points self.length = p0.distance(p1) self.azimuth = p0.azimuth(p1) else: self.length = np.sum(self.get_lengths()) azimuths = self.get_azimuths() distances = geodetic.geodetic_distance( coo[:-1, 0], coo[:-1, 1], coo[1:, 0], coo[1:, 1]) self.azimuth = utils.angular_mean(azimuths, distances) % 360
@property def points(self): return [self[i] for i in range(len(self.coo))] def __eq__(self, other): """ >>> from openquake.hazardlib.geo.point import Point >>> points = [Point(1, 2), Point(3, 4)]; Line(points) == Line(points) True >>> Line(points) == Line(list(reversed(points))) False """ return np.allclose(self.coo, other.coo, atol=1E-6) def __ne__(self, other): """ >>> from openquake.hazardlib.geo.point import Point >>> Line([Point(1,2), Point(1,3)]) != Line([Point(1,2), Point(1,3)]) False >>> Line([Point(1,2), Point(1,3)]) != Line([Point(1,2), Point(1,4)]) True """ return not self.__eq__(other) def __len__(self): return len(self.coo) def __getitem__(self, i): return Point(*self.coo[i])
[docs] def flip(self): """ Returns a new line with the points flipped. Here is an example, taking advantage of the string representation of Lines in terms of geohashes of 5 letters (~2 km of precision): >>> line = Line([Point(1, 2), Point(1, 3)]) >>> print(line) s02eq_s089n >>> print(line.flip()) s089n_s02eq >>> line.get_azimuths() [0.0] >>> line.flip().get_azimuths() [180.0] >>> line = Line([Point(1, 0), Point(2, 0)]) >>> line.get_azimuths() [90.0] >>> line.flip().get_azimuths() [270.0] """ return self.from_coo(np.flip(self.coo, axis=0))
[docs] @classmethod def from_vectors(cls, lons, lats, deps=None): """ Creates a line from three numpy.ndarray instances containing longitude, latitude and depths values """ arrs = lons, lats if deps is not None: arrs = lons, lats, deps return cls([Point(*coo) for coo in zip(*arrs)])
[docs] def on_surface(self): """ Check if this line is defined on the surface (i.e. all points are on the surfance, depth=0.0). :returns bool: True if this line is on the surface, false otherwise. """ return all(point.on_surface() for point in self)
[docs] def horizontal(self): """ Check if this line is horizontal (i.e. all depths of points are equal). :returns bool: True if this line is horizontal, false otherwise. """ return all(p.depth == self.coo[0, 2] for p in self)
[docs] def get_azimuths(self): """ Return the azimuths of all the segments composing the polyline """ if len(self.coo) == 2: return [self[0].azimuth(self[1])] lons = self.coo[:, 0] lats = self.coo[:, 1] return geodetic.azimuth(lons[:-1], lats[:-1], lons[1:], lats[1:])
[docs] def average_azimuth(self): """ Calculate and return weighted average azimuth of all line's segments in decimal degrees. Uses formula from http://en.wikipedia.org/wiki/Mean_of_circular_quantities >>> from openquake.hazardlib.geo.point import Point as P >>> '%.1f' % Line([P(0, 0), P(1e-5, 1e-5)]).average_azimuth() '45.0' >>> '%.1f' % Line([P(0, 0), P(0, 1e-5), P(1e-5, 1e-5)] ... ).average_azimuth() '45.0' >>> line = Line([P(0, 0), P(-2e-5, 0), P(-2e-5, 1.154e-5)]) >>> '%.1f' % line.average_azimuth() '300.0' """ return self.azimuth
[docs] def resample(self, sect_len: float, orig_extremes=False): """ Resample this line into sections. The first point in the resampled line corresponds to the first point in the original line. Starting from the first point in the original line, a line segment is defined as the line connecting the last point in the resampled line and the next point in the original line. :param float sect_len: The length of the section, in km. :param bool original_extremes: A boolean controlling the way in which the last point is added. When true the first and last point match the original extremes. When false the last point is at a `sect_len` distance from the previous one, before or after the last point. :returns: A new line resampled into sections based on the given length. """ return Line.from_coo(_resample(self, sect_len, orig_extremes))
[docs] def get_lengths(self) -> np.ndarray: """ Calculate a numpy.ndarray instance with the length of the segments composing the polyline. :returns: Segments length in km. """ lengths = [] for i, point in enumerate(self): if i != 0: lengths.append(point.distance(self[i - 1])) return np.array(lengths)
[docs] def get_length(self) -> float: """ Calculate the length of the line as a sum of lengths of all its segments. :returns: Total length in km. """ return self.length
[docs] def keep_corners(self, delta): """ Removes the points where the change in direction is lower than a tolerance value and returns a new line. :param delta: An angle in decimal degrees """ coo = self.coo # Compute the azimuth of all the segments azim = geodetic.azimuth(coo[:-1, 0], coo[:-1, 1], coo[1:, 0], coo[1:, 1]) pidx = {0, coo.shape[0] - 1} idx, = np.nonzero(np.abs(np.diff(azim)) > delta) pidx = sorted(pidx | set(idx + 1)) return self.from_coo(coo[pidx])
[docs] def resample_to_num_points(self, num_points): """ Resample the line to a specified number of points. :param num_points: Integer number of points the resulting line should have. :returns: A new line with that many points as requested. """ assert len(self) > 1, "can not resample the line of one point" section_length = self.get_length() / (num_points - 1) resampled_points = [self[0]] segment = 0 acc_length = 0 last_segment_length = 0 points = self.points for i in range(num_points - 1): tot_length = (i + 1) * section_length while tot_length > acc_length and segment < len(points) - 1: last_segment_length = points[segment].distance( points[segment + 1]) acc_length += last_segment_length segment += 1 p1, p2 = points[segment - 1:segment + 1] offset = tot_length - (acc_length - last_segment_length) if offset < 1e-5: # forward geodetic transformations for very small distances # are very inefficient (and also unneeded). if target point # is just 1 cm away from original (non-resampled) line vertex, # don't even bother doing geodetic calculations. resampled = p1 else: resampled = p1.equally_spaced_points(p2, offset)[1] resampled_points.append(resampled) return Line(resampled_points)
[docs] def get_tuw(self, mesh): """ Computes the U and T coordinates of the GC2 method for a mesh of points. :param mesh: An instance of :class:`openquake.hazardlib.geo.mesh.Mesh` """ slen, uhat, that = self.sut_hat tuw = get_tuw(self.proj.lam0, self.proj.phi0, self.coo, slen, uhat, that, mesh.lons, mesh.lats) return tuw[:, 0], tuw[:, 1], tuw[:, 2]
[docs] def get_ui_ti(self, mesh, uhat, that): """ Compute the t and u coordinates. ti and ui have shape (num_segments x num_sites). """ return get_ui_ti(self.proj.lam0, self.proj.phi0, self.coo, mesh.lons, mesh.lats, uhat, that)
@cached_property def sut_hat(self): """ Return the unit vectors defining the local origin for each segment of the trace. :param sx: The vector with the x coordinates of the trace :param sy: The vector with the y coordinates of the trace :returns: Two arrays of size n x 3 (when n is the number of segments composing the trace """ # Projected coordinates sx, sy = self.proj(self.coo[:, 0], self.coo[:, 1]) slen = ((sx[1:] - sx[:-1])**2 + (sy[1:] - sy[:-1])**2)**0.5 sg = np.zeros((len(sx) - 1, 3)) sg[:, 0] = sx[1:] - sx[:-1] sg[:, 1] = sy[1:] - sy[:-1] uhat = get_versor(sg) that = get_versor(np.cross(sg, np.array([0, 0, 1]))) return slen, uhat, that def __str__(self): return utils.geohash5(self.coo)
[docs]def get_versor(arr): """ Returns the versor (i.e. a unit vector) of a vector """ norm = np.linalg.norm(arr, axis=1) assert (norm > 0).all(), norm return (arr.T / norm).T
[docs]@compile("(f8[:],f8[:],f8[:],f8)") def find_t(pnt0, pnt1, ref_pnt, distance): """ Find the point on the segment within `pnt0` and `pnt1` at `distance` from `ref_pnt`. See https://tinyurl.com/meyt4ft3 :param pnt0: A 1D :class:`numpy.ndarray` instance of length 3 :param pnt1: A 1D :class:`numpy.ndarray` instance of length 3 :param ref_pnt: A 1D :class:`numpy.ndarray` instance of length 3 :param distance: A float with the distance in km from `ref_pnt` to the point on the segment. :returns: A 1D :class:`numpy.ndarray` instance of length 3 """ x1 = pnt0[0] y1 = pnt0[1] z1 = pnt0[2] x2 = pnt1[0] y2 = pnt1[1] z2 = pnt1[2] x3 = ref_pnt[0] y3 = ref_pnt[1] z3 = ref_pnt[2] r = distance pa = (x2 - x1)**2 + (y2 - y1)**2 + (z2 - z1)**2 pb = 2 * ((x2 - x1) * (x1 - x3) + (y2 - y1) * (y1 - y3) + (z2 - z1) * (z1 - z3)) pc = (x3**2 + y3**2 + z3**2 + x1**2 + y1**2 + z1**2 - 2 * (x3 * x1 + y3 * y1 + z3 * z1) - r**2) chk = pb * pb - 4 * pa * pc # In this case the line is not intersecting the sphere if chk < 0: return np.array([np.nan, np.nan, np.nan]) # Computing the points of intersection pu = (-pb + (pb**2 - 4 * pa * pc)**0.5) / (2 * pa) x = x1 + pu * (x2 - x1) y = y1 + pu * (y2 - y1) z = z1 + pu * (z2 - z1) if (x >= min(x1, x2) and x <= max(x1, x2) and y >= min(y1, y2) and y <= max(y1, y2) and z >= min(z1, z2) and z <= max(z1, z2)): out = [x, y, z] else: pu = (-pb - (pb**2 - 4 * pa * pc)**0.5) / (2 * pa) x = x1 + pu * (x2 - x1) y = y1 + pu * (y2 - y1) z = z1 + pu * (z2 - z1) out = [x, y, z] return np.array(out)