Source code for openquake.hazardlib.site

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2012-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

"""
Module :mod:`openquake.hazardlib.site` defines :class:`Site`.
"""
import numpy
import pandas
from scipy.spatial import distance
from shapely import geometry
from openquake.baselib.general import not_equal, get_duplicates
from openquake.hazardlib.geo.utils import (
    fix_lon, cross_idl, _GeographicObjects, geohash, geohash3, CODE32,
    spherical_to_cartesian, get_middle_point)
from openquake.hazardlib.geo.geodetic import npoints_towards
from openquake.hazardlib.geo.mesh import Mesh

U32LIMIT = 2 ** 32
ampcode_dt = (numpy.bytes_, 4)
param = dict(
    vs30measured='reference_vs30_type',
    vs30='reference_vs30_value',
    z1pt0='reference_depth_to_1pt0km_per_sec',
    z2pt5='reference_depth_to_2pt5km_per_sec',
    backarc='reference_backarc')


# TODO: equivalents of calculate_z1pt0 and calculate_z2pt5
# are inside some GSIM implementations, we should avoid duplication
[docs]def calculate_z1pt0(vs30): ''' Reads an array of vs30 values (in m/s) and returns the depth to the 1.0 km/s velocity horizon (in m) Ref: Chiou & Youngs (2014) California model :param vs30: the shear wave velocity (in m/s) at a depth of 30m ''' c1 = 571 ** 4. c2 = 1360.0 ** 4. return numpy.exp((-7.15 / 4.0) * numpy.log((vs30 ** 4. + c1) / (c2 + c1)))
[docs]def calculate_z2pt5(vs30): ''' Reads an array of vs30 values (in m/s) and returns the depth to the 2.5 km/s velocity horizon (in km) Ref: Campbell, K.W. & Bozorgnia, Y., 2014. 'NGA-West2 ground motion model for the average horizontal components of PGA, PGV, and 5pct damped linear acceleration response spectra.' Earthquake Spectra, 30(3), pp.1087–1114. :param vs30: the shear wave velocity (in m/s) at a depth of 30 m ''' c1 = 7.089 c2 = -1.144 return numpy.exp(c1 + numpy.log(vs30) * c2)
[docs]def rnd5(lons): return numpy.round(lons, 5)
[docs]class Site(object): """ Site object represents a geographical location defined by its position as well as its soil characteristics. :param location: Instance of :class:`~openquake.hazardlib.geo.point.Point` representing where the site is located. :param vs30: Average shear wave velocity in the top 30 m, in m/s. :param z1pt0: Vertical distance from earth surface to the layer where seismic waves start to propagate with a speed above 1.0 km/sec, in meters. :param z2pt5: Vertical distance from earth surface to the layer where seismic waves start to propagate with a speed above 2.5 km/sec, in km. :raises ValueError: If any of ``vs30``, ``z1pt0`` or ``z2pt5`` is zero or negative. .. note:: :class:`Sites <Site>` are pickleable """ def __init__(self, location, vs30=numpy.nan, z1pt0=numpy.nan, z2pt5=numpy.nan, **extras): if not numpy.isnan(vs30) and vs30 <= 0: raise ValueError('vs30 must be positive') if not numpy.isnan(z1pt0) and z1pt0 <= 0: raise ValueError('z1pt0 must be positive') if not numpy.isnan(z2pt5) and z2pt5 <= 0: raise ValueError('z2pt5 must be positive') self.location = location self.vs30 = vs30 self.z1pt0 = z1pt0 self.z2pt5 = z2pt5 for param, val in extras.items(): assert param in site_param_dt, param setattr(self, param, val) def __str__(self): """ >>> import openquake.hazardlib >>> loc = openquake.hazardlib.geo.point.Point(1, 2, 3) >>> str(Site(loc, 760.0, 100.0, 5.0)) '<Location=<Latitude=2.000000, Longitude=1.000000, Depth=3.0000>, \ Vs30=760.0000, Depth1.0km=100.0000, Depth2.5km=5.0000>' """ return ( "<Location=%s, Vs30=%.4f, Depth1.0km=%.4f, " "Depth2.5km=%.4f>") % ( self.location, self.vs30, self.z1pt0, self.z2pt5) def __hash__(self): return hash((self.location.x, self.location.y)) def __eq__(self, other): return (self.location.x, self.location.y) == ( other.location.x, other.location.y) def __repr__(self): """ >>> import openquake.hazardlib >>> loc = openquake.hazardlib.geo.point.Point(1, 2, 3) >>> site = Site(loc, 760.0, 100.0, 5.0) >>> str(site) == repr(site) True """ return self.__str__()
def _extract(array_or_float, indices): try: # if array return array_or_float[indices] except TypeError: # if float return array_or_float # dtype of each valid site parameter site_param_dt = { 'sids': numpy.uint32, 'site_id': numpy.uint32, 'lon': numpy.float64, 'lat': numpy.float64, 'depth': numpy.float64, 'vs30': numpy.float64, 'kappa0': numpy.float64, 'vs30measured': bool, 'z1pt0': numpy.float64, 'z2pt5': numpy.float64, 'siteclass': (numpy.bytes_, 1), 'geohash': (numpy.bytes_, 6), 'z1pt4': numpy.float64, 'backarc': numpy.uint8, # 0=forearc,1=backarc,2=alongarc 'xvf': numpy.float64, 'soiltype': numpy.uint32, 'bas': bool, # Parameters for site amplification 'ampcode': ampcode_dt, 'ec8': (numpy.bytes_, 1), 'ec8_p18': (numpy.bytes_, 2), 'h800': numpy.float64, 'geology': (numpy.bytes_, 20), 'amplfactor': numpy.float64, 'ch_ampl03': numpy.float64, 'ch_ampl06': numpy.float64, 'ch_phis2s03': numpy.float64, 'ch_phis2s06': numpy.float64, 'ch_phiss03': numpy.float64, 'ch_phiss06': numpy.float64, 'f0': numpy.float64, # Fundamental period and and amplitude of HVRSR spectra 'THV': numpy.float64, 'PHV': numpy.float64, # parameters for secondary perils 'friction_mid': numpy.float64, 'cohesion_mid': numpy.float64, 'saturation': numpy.float64, 'dry_density': numpy.float64, 'Fs': numpy.float64, 'crit_accel': numpy.float64, 'unit': (numpy.bytes_, 5), 'liq_susc_cat': (numpy.bytes_, 2), 'dw': numpy.float64, 'yield_acceleration': numpy.float64, 'slope': numpy.float64, 'relief': numpy.float64, 'gwd': numpy.float64, 'cti': numpy.float64, 'dc': numpy.float64, 'dr': numpy.float64, 'dwb': numpy.float64, 'zwb': numpy.float64, 'tri': numpy.float64, 'hwater': numpy.float64, 'precip': numpy.float64, # parameters for YoudEtAl2002 'freeface_ratio': numpy.float64, 'T_15': numpy.float64, 'D50_15': numpy.float64, 'F_15': numpy.float64, 'T_eq': numpy.float64, # other parameters 'custom_site_id': (numpy.bytes_, 8), 'region': numpy.uint32, 'in_cshm': bool # used in mcverry }
[docs]class SiteCollection(object): """\ A collection of :class:`sites <Site>`. Instances of this class are intended to represent a large collection of sites in a most efficient way in terms of memory usage. The most common usage is to instantiate it as `SiteCollection.from_points`, by passing the set of required parameters, which must be a subset of the following parameters: %s .. note:: If a :class:`SiteCollection` is created from sites containing only lon and lat, iterating over the collection will yield :class:`Sites <Site>` with a reference depth of 0.0 (the sea level). Otherwise, it is possible to model the sites on a realistic topographic surface by specifying the `depth` of each site. :param sites: A list of instances of :class:`Site` class. """ % '\n'.join(' - %s: %s' % item for item in sorted(site_param_dt.items()) if item[0] not in ('lon', 'lat')) req_site_params = ()
[docs] @classmethod def from_usgs_shakemap(cls, shakemap_array): """ Build a site collection from a shakemap array """ self = object.__new__(cls) self.complete = self n = len(shakemap_array) dtype = numpy.dtype([(p, site_param_dt[p]) for p in 'sids lon lat depth vs30'.split()]) self.array = arr = numpy.zeros(n, dtype) arr['sids'] = numpy.arange(n, dtype=numpy.uint32) arr['lon'] = shakemap_array['lon'] arr['lat'] = shakemap_array['lat'] arr['depth'] = numpy.zeros(n) arr['vs30'] = shakemap_array['vs30'] return self
[docs] @classmethod # this is the method used by the engine def from_points(cls, lons, lats, depths=None, sitemodel=None, req_site_params=()): """ Build the site collection from :param lons: a sequence of longitudes :param lats: a sequence of latitudes :param depths: a sequence of depths (or None) :param sitemodel: None or an object containing site parameters as attributes :param req_site_params: a sequence of required site parameters, possibly empty """ assert len(lons) < U32LIMIT, len(lons) if depths is None: depths = numpy.zeros(len(lons)) assert len(lons) == len(lats) == len(depths), (len(lons), len(lats), len(depths)) self = object.__new__(cls) self.complete = self self.req_site_params = req_site_params req = ['sids', 'lon', 'lat', 'depth'] + sorted( par for par in req_site_params if par not in ('lon', 'lat')) if 'vs30' in req and 'vs30measured' not in req: req.append('vs30measured') dtype = numpy.dtype([(p, site_param_dt[p]) for p in req]) self.array = arr = numpy.zeros(len(lons), dtype) arr['sids'] = numpy.arange(len(lons), dtype=numpy.uint32) arr['lon'] = fix_lon(numpy.array(lons)) arr['lat'] = numpy.array(lats) arr['depth'] = numpy.array(depths) if sitemodel is None: pass elif hasattr(sitemodel, 'reference_vs30_value'): self.set_global_params(sitemodel, req_site_params) else: if hasattr(sitemodel, 'dtype'): names = set(sitemodel.dtype.names) sm = sitemodel else: sm = vars(sitemodel) names = set(sm) & set(req_site_params) for name in names: if name not in ('lon', 'lat'): self._set(name, sm[name]) dupl = get_duplicates(self.array, 'lon', 'lat') if dupl: # raise a decent error message displaying only the first 9 # duplicates (there could be millions) n = len(dupl) dots = ' ...' if n > 9 else '' items = list(dupl.items())[:9] raise ValueError('There are %d duplicate sites %s%s' % (n, items, dots)) return self
[docs] @classmethod def from_planar(cls, rup, point='TC', toward_azimuth=90, direction='positive', hdist=100, step=5., req_site_params=()): """ :param rup: a rupture built with `rupture.get_planar` :return: a :class:`openquake.hazardlib.site.SiteCollection` instance """ sfc = rup.surface if point == 'TC': pnt = sfc.get_top_edge_centroid() lon, lat = pnt.x, pnt.y elif point == 'BC': lon, lat = get_middle_point( sfc.corner_lons[2], sfc.corner_lats[2], sfc.corner_lons[3], sfc.corner_lats[3]) else: idx = {'TL': 0, 'TR': 1, 'BR': 2, 'BL': 3}[point] lon = sfc.corner_lons[idx] lat = sfc.corner_lats[idx] depth = 0 vdist = 0 npoints = hdist / step strike = rup.surface.strike pointsp = [] pointsn = [] if direction in ['positive', 'both']: azi = (strike + toward_azimuth) % 360 pointsp = npoints_towards( lon, lat, depth, azi, hdist, vdist, npoints) if direction in ['negative', 'both']: idx = 0 if direction == 'negative' else 1 azi = (strike + toward_azimuth + 180) % 360 pointsn = npoints_towards( lon, lat, depth, azi, hdist, vdist, npoints) if len(pointsn): lons = reversed(pointsn[0][idx:]) lats = reversed(pointsn[1][idx:]) else: lons = pointsp[0] lats = pointsp[1] return cls.from_points(lons, lats, None, rup, req_site_params)
def _set(self, param, value): if param not in self.array.dtype.names: self.add_col(param, site_param_dt[param]) self.array[param] = value xyz = Mesh.xyz
[docs] def set_global_params( self, oq, req_site_params=('z1pt0', 'z2pt5', 'backarc')): """ Set the global site parameters (vs30, vs30measured, z1pt0, z2pt5, backarc) """ self._set('vs30', oq.reference_vs30_value) self._set('vs30measured', oq.reference_vs30_type == 'measured') if 'z1pt0' in req_site_params: self._set('z1pt0', oq.reference_depth_to_1pt0km_per_sec) if 'z2pt5' in req_site_params: self._set('z2pt5', oq.reference_depth_to_2pt5km_per_sec) if 'backarc' in req_site_params: self._set('backarc', oq.reference_backarc)
[docs] def filtered(self, indices): """ :param indices: a subset of indices in the range [0 .. tot_sites - 1] :returns: a filtered SiteCollection instance if `indices` is a proper subset of the available indices, otherwise returns the full SiteCollection """ if indices is None or len(indices) == len(self): return self new = object.__new__(self.__class__) indices = numpy.uint32(indices) new.array = self.array[indices] new.complete = self.complete return new
[docs] def reduce(self, nsites): """ :returns: a filtered SiteCollection with around nsites (if nsites<=N) """ N = len(self.complete) n = N // nsites if n <= 1: return self sids, = numpy.where(self.complete.sids % n == 0) return self.filtered(sids)
[docs] def add_col(self, colname, dtype, values=None): """ Add a column to the underlying array """ names = self.array.dtype.names dtlist = [(name, self.array.dtype[name]) for name in names] dtlist.append((colname, dtype)) arr = numpy.zeros(len(self), dtlist) for name in names: arr[name] = self.array[name] if values is not None: arr[colname] = values self.array = arr
[docs] def make_complete(self): """ Turns the site collection into a complete one, if needed """ # reset the site indices from 0 to N-1 and set self.complete to self self.array['sids'] = numpy.arange(len(self), dtype=numpy.uint32) self.complete = self
[docs] def one(self): """ :returns: a SiteCollection with a site of the minimal vs30 """ if 'vs30' in self.array.dtype.names: idx = self.array['vs30'].argmin() else: idx = 0 return self.filtered([self.sids[idx]])
# used in preclassical
[docs] def get_cdist(self, rec_or_loc): """ :param rec_or_loc: a record with field 'hypo' or a Point instance :returns: array of N euclidean distances from rec['hypo'] """ try: lon, lat, dep = rec_or_loc['hypo'] except TypeError: lon, lat, dep = rec_or_loc.x, rec_or_loc.y, rec_or_loc.z xyz = spherical_to_cartesian(lon, lat, dep).reshape(1, 3) return distance.cdist(self.xyz, xyz)[:, 0]
def __init__(self, sites): """ Build a complete SiteCollection from a list of Site objects """ extra = [(p, site_param_dt[p]) for p in sorted(vars(sites[0])) if p in site_param_dt] dtlist = [(p, site_param_dt[p]) for p in ('sids', 'lon', 'lat', 'depth')] + extra self.array = arr = numpy.zeros(len(sites), dtlist) self.complete = self for i, site in enumerate(sites): arr['sids'][i] = getattr(site, 'id', i) arr['lon'][i] = site.location.longitude arr['lat'][i] = site.location.latitude arr['depth'][i] = site.location.depth for p, dt in extra: arr[p][i] = getattr(site, p) # NB: in test_correlation.py we define a SiteCollection with # non-unique sites, so we cannot do an # assert len(numpy.unique(self[['lon', 'lat']])) == len(self) def __eq__(self, other): return not self.__ne__(other) def __ne__(self, other): return not_equal(self.array, other.array) def __toh5__(self): names = self.array.dtype.names cols = ' '.join(names) return {n: self.array[n] for n in names}, {'__pdcolumns__': cols} def __fromh5__(self, dic, attrs): if isinstance(dic, dict): # engine >= 3.11 params = attrs['__pdcolumns__'].split() dtype = numpy.dtype([(p, site_param_dt[p]) for p in params]) self.array = numpy.zeros(len(dic['sids']), dtype) for p in dic: self.array[p] = dic[p][()] else: # old engine, dic is actually a structured array self.array = dic self.complete = self @property def mesh(self): """Return a mesh with the given lons, lats, and depths""" return Mesh(self['lon'], self['lat'], self['depth'])
[docs] def at_sea_level(self): """True if all depths are zero""" return (self.depths == 0).all()
# used in the engine
[docs] def split_max(self, max_sites): """ Split a SiteCollection into SiteCollection instances """ return self.split(numpy.ceil(len(self) / max_sites))
[docs] def split(self, ntiles): """ :param ntiles: number of tiles to generate (rounded if float) :returns: self if there are <=1 tiles, otherwise the tiles """ # if ntiles > nsites produce N tiles with a single site each ntiles = min(int(numpy.ceil(ntiles)), len(self)) if ntiles <= 1: return [self] tiles = [] for i in range(ntiles): sc = SiteCollection.__new__(SiteCollection) # smart trick to split in "homogenous" tiles sc.array = self.array[self.sids % ntiles == i] sc.complete = self tiles.append(sc) return tiles
[docs] def split_in_tiles(self, hint): """ Split a SiteCollection into a set of tiles with contiguous site IDs """ if hint > len(self): hint = len(self) tiles = [] for sids in numpy.array_split(self.sids, hint): assert len(sids), 'Cannot split %s in %d tiles' % (self, hint) sc = SiteCollection.__new__(SiteCollection) sc.array = self.complete.array[sids] sc.complete = self.complete tiles.append(sc) return tiles
[docs] def split_by_gh3(self): """ Split a SiteCollection into a set of tiles with the same geohash3 """ gh3s = geohash3(self.lons, self.lats) gb = pandas.DataFrame(dict(sid=self.sids, gh3=gh3s)).groupby('gh3') tiles = [] for gh3, df in gb: sc = SiteCollection.__new__(SiteCollection) sc.array = self.complete.array[df.sid] sc.complete = self.complete sc.gh3 = gh3 tiles.append(sc) return tiles
[docs] def count_close(self, location, distance): """ :returns: the number of sites within the distance from the location """ return (self.get_cdist(location) < distance).sum()
def __iter__(self): """ Iterate through all :class:`sites <Site>` in the collection, yielding one at a time. """ params = self.array.dtype.names[4:] # except sids, lons, lats, depths sids = self.sids for i, location in enumerate(self.mesh): kw = {p: self.array[i][p] for p in params} s = Site(location, **kw) s.id = sids[i] yield s
[docs] def filter(self, mask): """ Create a SiteCollection with only a subset of sites. :param mask: Numpy array of boolean values of the same length as the site collection. ``True`` values should indicate that site with that index should be included into the filtered collection. :returns: A new :class:`SiteCollection` instance, unless all the values in ``mask`` are ``True``, in which case this site collection is returned, or if all the values in ``mask`` are ``False``, in which case method returns ``None``. New collection has data of only those sites that were marked for inclusion in the mask. """ assert len(mask) == len(self), (len(mask), len(self)) if mask.all(): # all sites satisfy the filter, return # this collection unchanged return self if not mask.any(): # no sites pass the filter, return None return None # extract indices of Trues from the mask indices, = mask.nonzero() return self.filtered(indices)
[docs] def assoc(self, site_model, assoc_dist, ignore=()): """ Associate the `site_model` parameters to the sites. Log a warning if the site parameters are more distant than `assoc_dist`. :returns: the site model array reduced to the hazard sites """ m1, m2 = site_model[['lon', 'lat']], self[['lon', 'lat']] if len(m1) != len(m2) or (m1 != m2).any(): # associate _sitecol, site_model, _discarded = _GeographicObjects( site_model).assoc(self, assoc_dist, 'warn') ok = set(self.array.dtype.names) & set(site_model.dtype.names) - set( ignore) - {'lon', 'lat', 'depth'} for name in ok: self._set(name, site_model[name]) for name in set(self.array.dtype.names) - set(site_model.dtype.names): if name == 'vs30measured': self._set(name, 0) # default # NB: by default reference_vs30_type == 'measured' is 1 # but vs30measured is 0 (the opposite!!) # sanity check for param in self.req_site_params: if param in ignore: continue dt = site_param_dt[param] if dt is numpy.float64 and (self.array[param] == 0.).all(): raise ValueError('The site parameter %s is always zero: please' ' check the site model' % param) return site_model
[docs] def extend(self, lons, lats): """ Extend the site collection to additional (and different) points. Used for station_data in conditioned GMFs. """ assert len(lons) == len(lats), (len(lons), len(lats)) orig = set(zip(rnd5(self.lons), rnd5(self.lats))) new = set(zip(rnd5(lons), rnd5(lats))) - orig if not new: return self lons, lats = zip(*sorted(new)) N1 = len(self) N2 = len(lons) array = numpy.zeros(N1 + N2, self.array.dtype) array[:N1] = self.array array[N1:]['sids'] = numpy.arange(N1, N1+N2) array[N1:]['lon'] = lons array[N1:]['lat'] = lats sitecol = object.__new__(self.__class__) sitecol.array = array sitecol.complete = sitecol return sitecol
[docs] def within(self, region): """ :param region: a shapely polygon :returns: a filtered SiteCollection of sites within the region """ mask = numpy.array([ geometry.Point(rec['lon'], rec['lat']).within(region) for rec in self.array]) return self.filter(mask)
[docs] def within_bbox(self, bbox): """ :param bbox: a quartet (min_lon, min_lat, max_lon, max_lat) :returns: site IDs within the bounding box """ min_lon, min_lat, max_lon, max_lat = bbox lons, lats = self['lon'], self['lat'] if cross_idl(lons.min(), lons.max(), min_lon, max_lon): lons = lons % 360 min_lon, max_lon = min_lon % 360, max_lon % 360 mask = (min_lon < lons) * (lons < max_lon) * \ (min_lat < lats) * (lats < max_lat) return mask.nonzero()[0]
[docs] def geohash(self, length): """ :param length: length of the geohash in the range 1..8 :returns: an array of N geohashes, one per site """ ln = numpy.uint8(length) arr = CODE32[geohash(self['lon'], self['lat'], ln)] return [row.tobytes() for row in arr]
[docs] def num_geohashes(self, length): """ :param length: length of the geohash in the range 1..8 :returns: number of distinct geohashes in the site collection """ return len(numpy.unique(self.geohash(length)))
[docs] def calculate_z1pt0(self): """ Compute the column z1pt0 from the vs30 """ self.array['z1pt0'] = calculate_z1pt0(self.vs30)
[docs] def calculate_z2pt5(self): """ Compute the column z2pt5 from the vs30 using a formula for NGA-West2 """ self.array['z2pt5'] = calculate_z2pt5(self.vs30)
def __getstate__(self): return dict(array=self.array, complete=self.complete) def __getitem__(self, sid): """ Return a site record """ return self.array[sid] def __getattr__(self, name): if name in ('lons', 'lats', 'depths'): # legacy names return self.array[name[:-1]] if name not in site_param_dt: raise AttributeError(name) return self.array[name] def __len__(self): """ Return the number of sites in the collection. """ return len(self.array) def __repr__(self): total_sites = len(self.complete.array) return '<SiteCollection with %d/%d sites>' % ( len(self), total_sites)