Source code for openquake.hazardlib.calc.filters

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2012-2026 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

import ast
import logging
import operator
from contextlib import contextmanager
import numpy
import pandas
from scipy.spatial import KDTree, distance
from scipy.interpolate import interp1d

from openquake.baselib.parallel import Starmap
from openquake.hazardlib import site
from openquake.hazardlib.geo.mesh import Mesh
from openquake.hazardlib.geo.utils import (
    KM_TO_DEGREES, angular_distance, get_bounding_box,
    get_longitudinal_extent, BBoxError, spherical_to_cartesian)

F32 = numpy.float32
U32 = numpy.uint32
MINMAG = 2.5
MAXMAG = 10.2  # to avoid breaking PAC
MAX_DISTANCE = 2000  # km, ultra big distance used if there is no filter
trt_smr = operator.attrgetter('trt_smr')

[docs]class FilteredAway(Exception): pass
[docs]def magstr(mag): """ :returns: a string representation of the magnitude """ return '%.2f' % numpy.float32(mag)
[docs]def get_dparam(surface, sites, param): if param == 'rrup': dist = surface.get_min_distance(sites) elif param == 'tuw': tuw0 = surface.tor.get_tuw(sites) tuw1 = surface.tor.flip().get_tuw(sites) arr = numpy.array([tuw0, tuw1]) # shape (2, 3, N) dist = arr.transpose(2, 0, 1) # shape (N, 2, 3) elif param == 'rjb': dist = surface.get_joyner_boore_distance(sites) elif param == 'clon_clat': t = surface.get_closest_points(sites) # tested in classical/case_83 dist = t.array.T[:, 0:2] # shape (N, 2) elif param == 'rx': dist = surface.get_rx_distance(sites) elif param == 'ry0': dist = surface.get_ry0_distance(sites) else: raise ValueError('Unknown distance measure %r' % param) return dist
[docs]def get_distances(rupture, sites, param): """ :param rupture: a rupture :param sites: a mesh of points or a site collection :param param: the kind of distance to compute (default rjb) :param dcache: distance cache dictionary or None if disabled :returns: an array of distances from the given sites """ surf = rupture.surface if not surf: # PointRupture if param == 'clon_clat': dist = numpy.empty((len(sites), 2)) dist[:, 0] = rupture.hypocenter.x dist[:, 1] = rupture.hypocenter.y else: dist = rupture.hypocenter.distance_to_mesh(sites) elif param == 'rrup': dist = get_dparam(surf, sites, 'rrup') elif param == 'rx': dist = get_dparam(surf, sites, 'rx') elif param == 'ry0': dist = get_dparam(surf, sites, 'ry0') elif param == 'rjb': dist = get_dparam(surf, sites, 'rjb') elif param == 'rhypo': dist = rupture.hypocenter.distance_to_mesh(sites) elif param == 'repi': dist = rupture.hypocenter.distance_to_mesh(sites, with_depths=False) elif param == 'rcdpp': dist = rupture.get_cdppvalue(sites) elif param == 'azimuth': dist = surf.get_azimuth(sites) elif param == 'azimuthcp': dist = surf.get_azimuth_of_closest_point(sites) elif param == 'clon_clat': dist = get_dparam(surf, sites, param) elif param == "rvolc": # Volcanic distance not yet supported, defaulting to zero dist = numpy.zeros_like(sites.lons) else: raise ValueError('Unknown distance measure %r' % param) return dist
[docs]@contextmanager def context(src): """ Used to add the source_id to the error message. To be used as with context(src): operation_with(src) Typically the operation is filtering a source, that can fail for tricky geometries. """ try: yield except Exception as err: raise err.__class__('An error occurred with source id=%s. Error: %s' % (src.source_id, err)) from None
[docs]def getdefault(dic_with_default, key): """ :param dic_with_default: a dictionary with a 'default' key :param key: a key that may be present in the dictionary or not :returns: the value associated to the key, or to 'default' """ try: return dic_with_default[key] except KeyError: return dic_with_default['default']
[docs]def unique_sorted(items): """ Check that the items are unique and sorted """ if len(set(items)) < len(items): raise ValueError('Found duplicates in %s' % items) elif items != sorted(items): raise ValueError('%s is not ordered' % items) return items
# used for the maximum distance parameter in the job.ini file
[docs]def floatdict(value): """ :param value: input string corresponding to a literal Python number or dictionary :returns: a Python dictionary key -> number >>> floatdict("200") {'default': 200} >>> floatdict("{'active shallow crust': 250., 'default': 200}") {'active shallow crust': 250.0, 'default': 200} """ value = ast.literal_eval(value) if isinstance(value, (int, float, list)): return {'default': value} return value
[docs]def magdepdist(pairs=((MINMAG, 1000), (MAXMAG, 1000))): """ :param pairs: a list of pairs [(mag, dist), ...] :returns: a scipy.interpolate.interp1d function """ mags, dists = zip(*pairs) return interp1d(mags, dists, bounds_error=False, fill_value=0.)
[docs]def upper_maxdist(idist): """ :returns: the maximum distance in a dictionary trt->dists """ return max(idist[trt][-1][1] for trt in idist)
[docs]class IntegrationDistance(dict): """ A dictionary trt -> [(mag, dist), ...] """
[docs] @classmethod def new(cls, value): """ :param value: string to be converted :returns: IntegrationDistance dictionary >>> md = IntegrationDistance.new('50') >>> md {'default': [(2.5, 50), (10.2, 50)]} """ if value == 'magdist': items_by_trt = {'default': [(3, 0), (6, 150), (10, 600)]} else: items_by_trt = floatdict(value) self = cls() for trt, items in items_by_trt.items(): if isinstance(items, list): pairs = unique_sorted([tuple(it) for it in items]) for mag, dist in pairs: if mag < MINMAG or mag > MAXMAG: raise ValueError('Invalid magnitude %s' % mag) self[trt] = pairs else: # assume scalar distance assert items >= 0, items self[trt] = [(MINMAG, items), (MAXMAG, items)] return self
# tested in case_miriam, case_75 and ebdamage/case_15
[docs] def cut(self, min_mag_by_trt): """ Cut the lower magnitudes. For instance >>> maxdist = IntegrationDistance.new('[(4., 50), (8., 200.)]') >>> maxdist.cut({'default': 5.}) >>> maxdist {'default': [(5.0, 87.5), (8.0, 200.0)]} >>> maxdist = IntegrationDistance.new('200') >>> maxdist.cut({"Active Shallow Crust": 5.2, "default": 4.}) >>> maxdist {'default': [(4.0, 200.0), (10.2, 200)], 'Active Shallow Crust': [(5.2, 200.0), (10.2, 200)]} """ if 'default' not in self: maxval = max(self.values(), key=lambda val: max(dist for mag, dist in val)) self['default'] = maxval if 'default' not in min_mag_by_trt: min_mag_by_trt['default'] = min(min_mag_by_trt.values()) for trt in set(self) | set(min_mag_by_trt): min_mag = getdefault(min_mag_by_trt, trt) if not min_mag: continue first = (min_mag, float(self(trt)(min_mag))) magdists = [(mag, dist) for (mag, dist) in self[trt] if mag >= min_mag] if min_mag < magdists[0][0]: self[trt] = [first] + magdists else: self[trt] = magdists
def __call__(self, trt): return magdepdist(self[trt]) def __missing__(self, trt): assert 'default' in self, 'missing "default" key in maximum_distance' return self['default']
[docs] def get_bounding_box(self, lon, lat, trt=None): """ Build a bounding box around the given lon, lat by computing the maximum_distance at the given tectonic region type and magnitude. :param lon: longitude :param lat: latitude :param trt: tectonic region type, possibly None :returns: min_lon, min_lat, max_lon, max_lat """ if trt is None: # take the greatest integration distance maxdist = max(self.max().values()) else: # get the integration distance for the given TRT maxdist = self[trt][-1][1] a1 = min(maxdist * KM_TO_DEGREES, 90) a2 = min(angular_distance(maxdist, lat), 180) return lon - a2, lat - a1, lon + a2, lat + a1
[docs] def get_dist_bins(self, trt, nbins=51): """ :returns: an array of distance bins, from 10m to maxdist """ return .01 + numpy.arange(nbins) * self(trt) / (nbins - 1)
[docs]def split_source(src): """ :param src: a splittable (or not splittable) source :returns: the underlying sources (or the source itself) """ from openquake.hazardlib.source import splittable # avoid circular import if not splittable(src): return [src] splits = list(src) if len(splits) == 1: return [src] has_hdf5 = hasattr(src, 'hdf5path') has_samples = hasattr(src, 'samples') has_smweight = hasattr(src, 'smweight') has_scaling_rate = hasattr(src, 'scaling_rate') has_grp_id = hasattr(src, 'grp_id') grp_id = getattr(src, 'grp_id', 0) # 0 in hazardlib offset = src.offset for i, split in enumerate(splits): split.offset = offset split.source_id = '%s.%s' % (src.source_id, i) split.trt_smr = src.trt_smr split.grp_id = grp_id split.id = src.id if has_hdf5: split.hdf5path = src.hdf5path if has_samples: split.samples = src.samples if has_smweight: split.smweight = src.smweight if has_scaling_rate: split.scaling_rate = src.scaling_rate if has_grp_id: split.grp_id = src.grp_id offset += split.num_ruptures #split.nsites = src.nsites return splits
[docs]def filter_rups(ruptures, sitetree, orig_sids, num_assets, dist, mon): """ :param ruptures: array of ruptures with the same magnitude :param sitetree: kdtree for the reduced sites :param num_assets: dictionary site_id -> number of assets on that site :param dist: integration distance at that magnitude :returns: ruptures close to the global site collection """ hypos = ruptures['hypo'] kr = KDTree(spherical_to_cartesian( hypos[:, 0], hypos[:, 1], hypos[::, 2])) all_sids = kr.query_ball_tree(sitetree, dist, eps=.1) out = [] for r, sids in enumerate(all_sids): if sids: rup = ruptures[r] # NB: if there are stations num_assets[sid] can be empty rup['nsites'] = sum(num_assets.get(s, 1) for sid in sids for s in orig_sids[sid]) out.append(rup) return numpy.array(out)
# NB: this is fast because of KDTree and the magdist # NB: the magdist here is hard-coded and independent from oq # NB: sitecol.lower_res(5) is needed to save memory: for India # with 1.3M sites the filtering was taking 83 GB, now only 6 GB
[docs]def close_ruptures(ruptures, sitecol, assetcol=None, h5=None, magdist=magdepdist( [(3, 0.), (4, 40.), (5., 100.), (6., 200.), (7., 300.), (8., 400.), (9., 700.), (11., 1200.)])): """ :param ruptures: an array of rupture records :param sitecol: a SiteCollection instance :param assetcol: an AssetCollection or None :param h5: hdf5 where to save the performance info :returns: the ruptures close to the sites """ if assetcol: sids, counts = numpy.unique(assetcol.array['site_id'], return_counts=1) num_assets = dict(zip(sids, counts)) else: num_assets = {} sites, orig_sids = sitecol.lower_res(5) # H3 edge of 10 km if len(sites) < len(sitecol): logging.info('Reducing %s->%d sites', sitecol, len(sites)) mags = numpy.round(ruptures['mag'], 1) ks = KDTree(spherical_to_cartesian(sites.lons, sites.lats, sites.depths)) smap = Starmap(filter_rups, distribute='no' if len(mags) < 1000 else None, h5=h5) for mag in F32(numpy.arange(3, 11, .1)): ok = mags == mag if ok.sum() == 0: # no ruptures in this magnitude range continue smap.submit((ruptures[ok], ks, orig_sids, num_assets, magdist(mag))) if len(ruptures) > 100_000: logging.info('Considering %d ruptures of magnitude %s', ok.sum(), mag) arrays = [arr for arr in smap if len(arr)] assert arrays, "All ruptures have been prefiltered out!" return numpy.concatenate(arrays, dtype=arrays[0].dtype)
default = IntegrationDistance({'default': [(MINMAG, 1000), (MAXMAG, 1000)]})
[docs]def rup_radius(rup): """ Maximum distance from the rupture mesh to the hypocenter """ hypo = rup.hypocenter xyz = spherical_to_cartesian(hypo.x, hypo.y, hypo.z).reshape(1, 3) radius = distance.cdist(rup.surface.mesh.xyz, xyz).max(axis=0) return radius
[docs]def filter_site_array_around(array, rup, dist): """ :param array: array with fields 'lon', 'lat' :param rup: a rupture object :param dist: integration distance in km :returns: slice to the rupture """ # first raw filtering hypo = rup.hypocenter x, y, z = hypo.x, hypo.y, hypo.z xyz_all = spherical_to_cartesian(array['lon'], array['lat'], 0) xyz = spherical_to_cartesian(x, y, z) tree = KDTree(xyz_all) # NB: on macOS query_ball returns the indices in a different order # than on linux and windows, hence the need to sort idxs = tree.query_ball_point(xyz, dist + rup_radius(rup), eps=.001) idxs.sort() # then fine filtering r_sites = site.SiteCollection.from_(array[idxs]) dists = get_distances(rup, r_sites, 'rrup') ids, = numpy.where(dists < dist) if len(ids) < len(idxs): logging.info('Filtered %d/%d sites', len(ids), len(idxs)) return r_sites.array[ids]
[docs]class RuptureFilter(object): """ Filter arrays/dataframes with lon, lat around a rupture """ def __init__(self, rup, dist): self.rup = rup self.dist = dist def __call__(self, array_df): if isinstance(array_df, pandas.DataFrame): # computing all the distances the slow way if hasattr(array_df, 'lon'): # when reading the exposure.csv files mesh = Mesh(F32(array_df.lon), F32(array_df.lat)) else: # when reading the exposure.hdf5 file mesh = Mesh(F32(array_df.LONGITUDE), F32(array_df.LATITUDE)) dists = get_distances(self.rup, mesh, 'rrup') return array_df[dists < self.dist] # this is called when reading the site model from eexposure.hdf5 return filter_site_array_around(array_df, self.rup, self.dist)
[docs] def filter(self, lons, lats): """ :returns: (mask, rup-sites distances) """ mesh = Mesh(F32(lons), F32(lats)) dists = get_distances(self.rup, mesh, 'rrup') return dists < self.dist, dists
def __repr__(self): return '<%s mag=%.1f dist=%.0f>' % (self.__class__.__name__, self.rup.mag, self.dist)
[docs]class SourceFilter(object): """ Filter objects have a .filter method yielding filtered sources and the IDs of the sites within the given maximum distance. Filter the sources by using `self.sitecol.within_bbox` which is based on numpy. """ multiplier = 1 # not reduce def __init__(self, sitecol, integration_distance=default): self.sitecol = sitecol self.integration_distance = integration_distance self.slc = slice(None) # TODO: check if we can remove this
[docs] def reduce(self, multiplier=5): """ Reduce the SourceFilter to a subset of sites """ idxs = numpy.arange(0, len(self.sitecol), multiplier) sc = object.__new__(site.SiteCollection) sc.array = self.sitecol[idxs] sc.complete = self.sitecol.complete new = self.__class__(sc, self.integration_distance) new.multiplier = multiplier return new
[docs] def get_enlarged_box(self, src, maxdist): """ Get the enlarged bounding box of a source. :param src: a source object :param maxdist: a scalar maximum distance :returns: a bounding box (min_lon, min_lat, max_lon, max_lat) """ try: bbox = get_bounding_box(src, maxdist) except (FilteredAway, BBoxError): # this is expected around the poles and will be ignored raise except Exception: logging.error(f'Error in source {src.source_id}', exc_info=True) raise return bbox
[docs] def get_close_sites(self, source, trt=None): """ Returns the sites within the integration distance from the source, or None. """ sids = self.close_sids(source, trt) if len(sids): return self.sitecol.complete.filtered(sids)
[docs] def split(self, sources): """ :yields: pairs (split, sites) """ for src, _sites in self.filter(sources): for s in split_source(src): sites = self.get_close_sites(s) if sites is not None: yield s, sites
# used in source and rupture prefiltering: it should not discard too much
[docs] def close_sids(self, src_or_rec, trt=None, maxdist=None): """ :param src_or_rec: a source or a rupture record :param trt: passed only if src_or_rec is a rupture record :returns: the site indices within the maximum_distance of the hypocenter, plus the maximum size of the bounding box """ assert self.sitecol is not None if not self.integration_distance: # do not filter return self.sitecol.sids if trt: # rupture proxy if not hasattr(self.integration_distance, 'x'): raise ValueError( 'The SourceFilter was instantiated with ' 'maximum_distance and not maximum_distance(trt)') dlon = get_longitudinal_extent( src_or_rec['minlon'], src_or_rec['maxlon']) / 2. dlat = (src_or_rec['maxlat'] - src_or_rec['minlat']) / 2. lon, lat, dep = src_or_rec['hypo'] dist = self.integration_distance(src_or_rec['mag']) + numpy.sqrt( dlon**2 + dlat**2) / KM_TO_DEGREES dist += 10 # added 10 km of buffer to guard against numeric errors # the test most sensitive to the buffer effect is in oq-risk-tests, # case_ucerf/job_eb.ini; without buffer, sites can be discarded # even if within the maximum_distance return self._close_sids(lon, lat, dep, dist) else: # source trt = src_or_rec.tectonic_region_type if maxdist is None: if hasattr(self.integration_distance, 'y'): # interp1d maxdist = self.integration_distance.y[-1] else: maxdist = getdefault(self.integration_distance, trt)[-1][1] loc = getattr(src_or_rec, 'location', None) if loc: # special case for pointlike sources maxdist += src_or_rec.max_radius(maxdist) return self._close_sids(loc.x, loc.y, loc.z, maxdist) try: bbox = self.get_enlarged_box(src_or_rec, maxdist) except FilteredAway: # can be raised by multiFaultSources return U32([]) except BBoxError: # do not filter return self.sitecol.sids return self.sitecol.within_bbox(bbox)
def _close_sids(self, lon, lat, dep, dist): if not hasattr(self, 'kdt'): self.kdt = KDTree(self.sitecol.xyz) xyz = spherical_to_cartesian(lon, lat, dep) ids = U32(self.kdt.query_ball_point(xyz, dist, eps=.001)) ids.sort() # for cross-platform consistency return self.sitecol.sids[ids] # works also for filtered sitecol
[docs] def filter(self, sources): """ :param sources: a sequence of sources :yields: pairs (sources, sites) """ if self.sitecol is None: # nofilter for src in sources: yield src, None return for src in sources: sids = self.close_sids(src) if len(sids): yield src, self.sitecol.filtered(sids)
[docs] def get_close(self, secparams): """ :param secparams: a structured array with fields tl0, tl1, tr0, tr1 :returns: an array with the number of close sites per secparams """ xyz = self.sitecol.xyz tl0, tl1, tr0, tr1 = (secparams['tl0'], secparams['tl1'], secparams['tr0'], secparams['tr1']) distl = distance.cdist(xyz, spherical_to_cartesian(tl0, tl1)) distr = distance.cdist(xyz, spherical_to_cartesian(tr0, tr1)) dists = numpy.min([distl, distr], axis=0) # shape (N, S) maxdist = self.integration_distance.y[-1] return (dists <= maxdist).sum(axis=0) # shape (N, S) => S
def __getitem__(self, slc): if slc.start is None and slc.stop is None: return self sitecol = object.__new__(self.sitecol.__class__) sitecol.array = self.sitecol[slc] sitecol.complete = self.sitecol.complete return self.__class__(sitecol, self.integration_distance)
nofilter = SourceFilter(None, {})