Source code for openquake.hazardlib.source.rupture

# coding: utf-8
# The Hazard Library
# Copyright (C) 2012-2019 GEM Foundation
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
Module :mod:`openquake.hazardlib.source.rupture` defines classes
:class:`BaseRupture` and its subclasses
:class:`NonParametricProbabilisticRupture` and
:class:`ParametricProbabilisticRupture`
"""
import abc
import numpy
import math
import itertools
import collections
from openquake.baselib import general
from openquake.baselib.slots import with_slots
from openquake.hazardlib import geo, contexts
from openquake.hazardlib.geo.nodalplane import NodalPlane
from openquake.hazardlib.geo.mesh import RectangularMesh
from openquake.hazardlib.geo.point import Point
from openquake.hazardlib.geo.geodetic import geodetic_distance
from openquake.hazardlib.near_fault import (
    get_plane_equation, projection_pp, directp, average_s_rad, isochone_ratio)
from openquake.hazardlib.geo.surface.base import BaseSurface

U16 = numpy.uint16
U32 = numpy.uint32
TWO16 = 2 ** 16
TWO32 = 2 ** 32
pmf_dt = numpy.dtype([('prob', float), ('occ', U32)])
events_dt = numpy.dtype([('id', U32), ('rup_id', U32), ('rlz_id', U16)])


[docs]def to_checksum(cls1, cls2): """ Convert a pair of classes into a numeric code (uint8) """ names = '%s,%s' % (cls1.__name__, cls2.__name__) return sum(map(ord, names)) % 256
[docs]@with_slots class BaseRupture(metaclass=abc.ABCMeta): """ Rupture object represents a single earthquake rupture. :param mag: Magnitude of the rupture. :param rake: Rake value of the rupture. See :class:`~openquake.hazardlib.geo.nodalplane.NodalPlane`. :param tectonic_region_type: Rupture's tectonic regime. One of constants in :class:`openquake.hazardlib.const.TRT`. :param hypocenter: A :class:`~openquake.hazardlib.geo.point.Point`, rupture's hypocenter. :param surface: An instance of subclass of :class:`~openquake.hazardlib.geo.surface.base.BaseSurface`. Object representing the rupture surface geometry. :param rupture_slip_direction: Angle describing rupture propagation direction in decimal degrees. :raises ValueError: If magnitude value is not positive, or tectonic region type is unknown. NB: if you want to convert the rupture into XML, you should set the attribute surface_nodes to an appropriate value. """ _slots_ = '''mag rake tectonic_region_type hypocenter surface rupture_slip_direction weight'''.split() rup_id = 0 # set to a value > 0 by the engine _code = {}
[docs] @classmethod def init(cls): """ Initialize the class dictionary `._code` by encoding the bidirectional correspondence between an integer in the range 0..255 (the code) and a pair of classes (rupture_class, surface_class). This is useful when serializing the rupture to and from HDF5. :returns: {code: pair of classes} """ rupture_classes = [BaseRupture] + list(get_subclasses(BaseRupture)) surface_classes = list(get_subclasses(BaseSurface)) code2cls = {} for rup, sur in itertools.product(rupture_classes, surface_classes): chk = to_checksum(rup, sur) if chk in code2cls and code2cls[chk] != (rup, sur): raise ValueError('Non-unique checksum %d for %s, %s' % (chk, rup, sur)) cls._code[rup, sur] = chk code2cls[chk] = rup, sur return code2cls
def __init__(self, mag, rake, tectonic_region_type, hypocenter, surface, rupture_slip_direction=None, weight=None): if not mag > 0: raise ValueError('magnitude must be positive') NodalPlane.check_rake(rake) self.tectonic_region_type = tectonic_region_type self.rake = rake self.mag = mag self.hypocenter = hypocenter self.surface = surface self.rupture_slip_direction = rupture_slip_direction self.weight = weight @property def code(self): """Returns the code (integer in the range 0 .. 255) of the rupture""" return self._code[self.__class__, self.surface.__class__] get_probability_no_exceedance = ( contexts.RuptureContext.get_probability_no_exceedance)
[docs] def sample_number_of_occurrences(self, n=1): """ Randomly sample number of occurrences from temporal occurrence model probability distribution. .. note:: This method is using random numbers. In order to reproduce the same results numpy random numbers generator needs to be seeded, see http://docs.scipy.org/doc/numpy/reference/generated/numpy.random.seed.html :returns: numpy array of size n with number of rupture occurrences """ raise NotImplementedError
[docs]class NonParametricProbabilisticRupture(BaseRupture): """ Probabilistic rupture for which the probability distribution for rupture occurrence is described through a generic probability mass function. :param pmf: Instance of :class:`openquake.hazardlib.pmf.PMF`. Values in the abscissae represent number of rupture occurrences (in increasing order, staring from 0) and values in the ordinates represent associated probabilities. Example: if, for a given time span, a rupture has probability ``0.8`` to not occurr, ``0.15`` to occur once, and ``0.05`` to occur twice, the ``pmf`` can be defined as :: pmf = PMF([(0.8, 0), (0.15, 1), 0.05, 2)]) :raises ValueError: If number of ruptures in ``pmf`` do not start from 0, are not defined in increasing order, and if they are not defined with unit step """ def __init__(self, mag, rake, tectonic_region_type, hypocenter, surface, pmf, rupture_slip_direction=None, weight=None): occ = numpy.array([occ for (prob, occ) in pmf.data]) if not occ[0] == 0: raise ValueError('minimum number of ruptures must be zero') if not numpy.all(numpy.sort(occ) == occ): raise ValueError( 'numbers of ruptures must be defined in increasing order') if not numpy.all(numpy.diff(occ) == 1): raise ValueError( 'numbers of ruptures must be defined with unit step') super().__init__( mag, rake, tectonic_region_type, hypocenter, surface, rupture_slip_direction, weight) # an array of probabilities with sum 1 self.probs_occur = numpy.array( [prob for (prob, occ) in pmf.data], numpy.float32) self.occurrence_rate = numpy.nan
[docs] def sample_number_of_occurrences(self, n=1): """ See :meth:`superclass method <.rupture.BaseRupture.sample_number_of_occurrences>` for spec of input and result values. Uses 'Inverse Transform Sampling' method. """ # compute cdf from pmf cdf = numpy.cumsum(self.probs_occur) n_occ = numpy.digitize(numpy.random.random(n), cdf) return n_occ
[docs]@with_slots class ParametricProbabilisticRupture(BaseRupture): """ :class:`Rupture` associated with an occurrence rate and a temporal occurrence model. :param occurrence_rate: Number of times rupture happens per year. :param temporal_occurrence_model: Temporal occurrence model assigned for this rupture. Should be an instance of :class:`openquake.hazardlib.tom.PoissonTOM`. :raises ValueError: If occurrence rate is not positive. """ _slots_ = BaseRupture._slots_ + [ 'occurrence_rate', 'temporal_occurrence_model'] def __init__(self, mag, rake, tectonic_region_type, hypocenter, surface, occurrence_rate, temporal_occurrence_model, rupture_slip_direction=None): if not occurrence_rate > 0: raise ValueError('occurrence rate must be positive') super().__init__( mag, rake, tectonic_region_type, hypocenter, surface, rupture_slip_direction) self.temporal_occurrence_model = temporal_occurrence_model self.occurrence_rate = occurrence_rate
[docs] def get_probability_one_or_more_occurrences(self): """ Return the probability of this rupture to occur one or more times. Uses :meth:`~openquake.hazardlib.tom.PoissonTOM.get_probability_one_or_more_occurrences` of an assigned temporal occurrence model. """ tom = self.temporal_occurrence_model rate = self.occurrence_rate return tom.get_probability_one_or_more_occurrences(rate)
[docs] def get_probability_one_occurrence(self): """ Return the probability of this rupture to occur exactly one time. Uses :meth: `openquake.hazardlib.tom.PoissonTOM.get_probability_n_occurrences` of an assigned temporal occurrence model. """ tom = self.temporal_occurrence_model rate = self.occurrence_rate return tom.get_probability_n_occurrences(rate, 1)
[docs] def sample_number_of_occurrences(self, n=1): """ Draw a random sample from the distribution and return a number of events to occur as an array of integers of size n. Uses :meth: `openquake.hazardlib.tom.PoissonTOM.sample_number_of_occurrences` of an assigned temporal occurrence model. """ r = self.occurrence_rate * self.temporal_occurrence_model.time_span return numpy.random.poisson(r, n)
[docs] def get_probability_no_exceedance(self, poes): """ See :meth:`superclass method <.rupture.BaseRupture.get_probability_no_exceedance>` for spec of input and result values. Uses :meth:`openquake.hazardlib.tom.PoissonTOM.get_probability_no_exceedance` """ tom = self.temporal_occurrence_model rate = self.occurrence_rate return tom.get_probability_no_exceedance(rate, poes)
[docs] def get_dppvalue(self, site): """ Get the directivity prediction value, DPP at a given site as described in Spudich et al. (2013). :param site: :class:`~openquake.hazardlib.geo.point.Point` object representing the location of the target site :returns: A float number, directivity prediction value (DPP). """ origin = self.surface.get_resampled_top_edge()[0] dpp_multi = [] index_patch = self.surface.hypocentre_patch_index( self.hypocenter, self.surface.get_resampled_top_edge(), self.surface.mesh.depths[0][0], self.surface.mesh.depths[-1][0], self.surface.get_dip()) idx_nxtp = True hypocenter = self.hypocenter while idx_nxtp: # E Plane Calculation p0, p1, p2, p3 = self.surface.get_fault_patch_vertices( self.surface.get_resampled_top_edge(), self.surface.mesh.depths[0][0], self.surface.mesh.depths[-1][0], self.surface.get_dip(), index_patch=index_patch) [normal, dist_to_plane] = get_plane_equation( p0, p1, p2, origin) pp = projection_pp(site, normal, dist_to_plane, origin) pd, e, idx_nxtp = directp( p0, p1, p2, p3, hypocenter, origin, pp) pd_geo = origin.point_at( (pd[0] ** 2 + pd[1] ** 2) ** 0.5, -pd[2], numpy.degrees(math.atan2(pd[0], pd[1]))) # determine the lower bound of E path value f1 = geodetic_distance(p0.longitude, p0.latitude, p1.longitude, p1.latitude) f2 = geodetic_distance(p2.longitude, p2.latitude, p3.longitude, p3.latitude) if f1 > f2: f = f1 else: f = f2 fs, rd, r_hyp = average_s_rad(site, hypocenter, origin, pp, normal, dist_to_plane, e, p0, p1, self.rupture_slip_direction) cprime = isochone_ratio(e, rd, r_hyp) dpp_exp = cprime * numpy.maximum(e, 0.1 * f) *\ numpy.maximum(fs, 0.2) dpp_multi.append(dpp_exp) # check if go through the next patch of the fault index_patch = index_patch + 1 if (len(self.surface.get_resampled_top_edge()) <= 2) and (index_patch >= len(self.surface.get_resampled_top_edge())): idx_nxtp = False elif index_patch >= len(self.surface.get_resampled_top_edge()): idx_nxtp = False elif idx_nxtp: hypocenter = pd_geo idx_nxtp = True # calculate DPP value of the site. dpp = numpy.log(numpy.sum(dpp_multi)) return dpp
[docs] def get_cdppvalue(self, target, buf=1.0, delta=0.01, space=2.): """ Get the directivity prediction value, centered DPP(cdpp) at a given site as described in Spudich et al. (2013), and this cdpp is used in Chiou and Young (2014) GMPE for near-fault directivity term prediction. :param target_site: A mesh object representing the location of the target sites. :param buf: A buffer distance in km to extend the mesh borders :param delta: The distance between two adjacent points in the mesh :param space: The tolerance for the distance of the sites (default 2 km) :returns: The centered directivity prediction value of Chiou and Young """ min_lon, max_lon, max_lat, min_lat = self.surface.get_bounding_box() min_lon -= buf max_lon += buf min_lat -= buf max_lat += buf lons = numpy.arange(min_lon, max_lon + delta, delta) # ex shape (233,) lats = numpy.arange(min_lat, max_lat + delta, delta) # ex shape (204,) mesh = RectangularMesh(*numpy.meshgrid(lons, lats)) mesh_rup = self.surface.get_min_distance(mesh).reshape(mesh.shape) # ex shape (204, 233) target_rup = self.surface.get_min_distance(target) # ex shape (2,) cdpp = numpy.zeros_like(target.lons) for i, (target_lon, target_lat) in enumerate( zip(target.lons, target.lats)): # indices around target_rup[i] around = (mesh_rup <= target_rup[i] + space) & ( mesh_rup >= target_rup[i] - space) dpp_target = self.get_dppvalue(Point(target_lon, target_lat)) dpp_mean = numpy.mean( [self.get_dppvalue(Point(lon, lat)) for lon, lat in zip(mesh.lons[around], mesh.lats[around])]) cdpp[i] = dpp_target - dpp_mean return cdpp
[docs]def get_subclasses(cls): """ :returns: the subclasses of `cls`, ordered by name """ for subclass in sorted(cls.__subclasses__(), key=lambda cls: cls.__name__): yield subclass for ssc in get_subclasses(subclass): yield ssc
[docs]def get_geom(surface, is_from_fault_source, is_multi_surface, is_gridded_surface): """ The following fields can be interpreted different ways, depending on the value of `is_from_fault_source`. If `is_from_fault_source` is True, each of these fields should contain a 2D numpy array (all of the same shape). Each triple of (lon, lat, depth) for a given index represents the node of a rectangular mesh. If `is_from_fault_source` is False, each of these fields should contain a sequence (tuple, list, or numpy array, for example) of 4 values. In order, the triples of (lon, lat, depth) represent top left, top right, bottom left, and bottom right corners of the the rupture's planar surface. Update: There is now a third case. If the rupture originated from a characteristic fault source with a multi-planar-surface geometry, `lons`, `lats`, and `depths` will contain one or more sets of 4 points, similar to how planar surface geometry is stored (see above). :param surface: a Surface instance :param is_from_fault_source: a boolean :param is_multi_surface: a boolean """ if is_from_fault_source: # for simple and complex fault sources, # rupture surface geometry is represented by a mesh surf_mesh = surface.mesh lons = surf_mesh.lons lats = surf_mesh.lats depths = surf_mesh.depths else: if is_multi_surface: # `list` of # openquake.hazardlib.geo.surface.planar.PlanarSurface # objects: surfaces = surface.surfaces # lons, lats, and depths are arrays with len == 4*N, # where N is the number of surfaces in the # multisurface for each `corner_*`, the ordering is: # - top left # - top right # - bottom left # - bottom right lons = numpy.concatenate([x.corner_lons for x in surfaces]) lats = numpy.concatenate([x.corner_lats for x in surfaces]) depths = numpy.concatenate([x.corner_depths for x in surfaces]) elif is_gridded_surface: # the surface mesh has shape (1, N) lons = surface.mesh.lons[0] lats = surface.mesh.lats[0] depths = surface.mesh.depths[0] else: # For area or point source, # rupture geometry is represented by a planar surface, # defined by 3D corner points lons = numpy.zeros((4)) lats = numpy.zeros((4)) depths = numpy.zeros((4)) # NOTE: It is important to maintain the order of these # corner points. TODO: check the ordering for i, corner in enumerate((surface.top_left, surface.top_right, surface.bottom_left, surface.bottom_right)): lons[i] = corner.longitude lats[i] = corner.latitude depths[i] = corner.depth return lons, lats, depths
[docs]class ExportedRupture(object): """ Simplified Rupture class with attributes rupid, events_by_ses, indices and others, used in export. :param rupid: rupture rup_id ID :param events_by_ses: dictionary ses_idx -> event records :param indices: site indices """ def __init__(self, rupid, n_occ, events_by_ses, indices=None): self.rupid = rupid self.n_occ = n_occ self.events_by_ses = events_by_ses self.indices = indices
[docs]def get_eids(rup_array, samples_by_grp, num_rlzs_by_grp): """ :param rup_array: a composite array with fields rup_id, n_occ and grp_id :param samples_by_grp: a dictionary grp_id -> samples :param num_rlzs_by_grp: a dictionary grp_id -> num_rlzs """ all_eids = [] for rup in rup_array: grp_id = rup['grp_id'] samples = samples_by_grp[grp_id] num_rlzs = num_rlzs_by_grp[grp_id] num_events = rup['n_occ'] if samples > 1 else rup['n_occ'] * num_rlzs eids = numpy.arange(num_events, dtype=U32) all_eids.append(eids) return numpy.concatenate(all_eids)
[docs]class EBRupture(object): """ An event based rupture. It is a wrapper over a hazardlib rupture object, containing an array of site indices affected by the rupture, as well as the IDs of the corresponding seismic events. """ def __init__(self, rupture, srcidx, grp_id, n_occ, samples=1, id=None): # NB: when reading an exported ruptures.xml the rup_id will be 0 # for the first rupture; it used to be the seed instead assert rupture.rup_id >= 0 # sanity check self.rupture = rupture self.srcidx = srcidx self.grp_id = grp_id self.n_occ = n_occ self.samples = samples self.id = id # id of the rupture on the DataStore, to be overridden @property def rup_id(self): """ Serial number of the rupture """ return self.rupture.rup_id
[docs] def get_eids_by_rlz(self, rlzs_by_gsim): """ :param n_occ: number of occurrences :params rlzs_by_gsim: a dictionary gsims -> rlzs array :param samples: number of samples in current source group :returns: a dictionary rlz index -> eids array """ j = 0 dic = {} if self.samples == 1: # full enumeration or akin to it for rlzs in rlzs_by_gsim.values(): for rlz in rlzs: dic[rlz] = numpy.arange(j, j + self.n_occ, dtype=U32) j += self.n_occ else: # associated eids to the realizations rlzs = numpy.concatenate(list(rlzs_by_gsim.values())) assert len(rlzs) == self.samples, (len(rlzs), self.samples) histo = general.random_histogram( self.n_occ, self.samples, self.rup_id) for rlz, n in zip(rlzs, histo): dic[rlz] = numpy.arange(j, j + n, dtype=U32) j += n return dic
[docs] def get_events(self, rlzs_by_gsim, e0=None): """ :returns: an array of events with fields eid, rlz """ all_eids, rlzs = [], [] for rlz, eids in self.get_eids_by_rlz(rlzs_by_gsim).items(): all_eids.extend(eids) rlzs.extend([rlz] * len(eids)) evs = U32(all_eids) + (e0 if e0 is not None else self.e0) rupids = [self.rup_id] * len(evs) return numpy.fromiter(zip(evs, rupids, rlzs), events_dt)
[docs] def get_eids(self, num_rlzs): """ :param num_rlzs: the number of realizations for the given group :returns: an array of event IDs """ num_events = self.n_occ if self.samples > 1 else self.n_occ * num_rlzs return numpy.arange(num_events, dtype=U32)
[docs] def get_events_by_ses(self, events, num_ses): """ :returns: a dictionary ses index -> events array """ numpy.random.seed(self.rup_id) sess = numpy.random.choice(num_ses, len(events)) + 1 events_by_ses = collections.defaultdict(list) for ses, event in zip(sess, events): events_by_ses[ses].append(event) for ses in events_by_ses: events_by_ses[ses] = numpy.array(events_by_ses[ses]) return events_by_ses
[docs] def get_ses_by_eid(self, rlzs_by_gsim, num_ses): events = self.get_events(rlzs_by_gsim) numpy.random.seed(self.rup_id) sess = numpy.random.choice(num_ses, len(events)) + 1 return dict(zip(events['id'], sess))
[docs] def export(self, rlzs_by_gsim, num_ses): """ Yield :class:`Rupture` objects, with all the attributes set, suitable for export in XML format. """ rupture = self.rupture events = self.get_events(rlzs_by_gsim, e0=TWO32 * self.rup_id) events_by_ses = self.get_events_by_ses(events, num_ses) new = ExportedRupture(self.id, self.n_occ, events_by_ses) if isinstance(rupture.surface, geo.ComplexFaultSurface): new.typology = 'complexFaultsurface' elif isinstance(rupture.surface, geo.SimpleFaultSurface): new.typology = 'simpleFaultsurface' elif isinstance(rupture.surface, geo.GriddedSurface): new.typology = 'griddedRupture' elif isinstance(rupture.surface, geo.MultiSurface): new.typology = 'multiPlanesRupture' else: new.typology = 'singlePlaneRupture' new.is_from_fault_source = iffs = isinstance( rupture.surface, (geo.ComplexFaultSurface, geo.SimpleFaultSurface)) new.is_gridded_surface = igs = isinstance( rupture.surface, geo.GriddedSurface) new.is_multi_surface = ims = isinstance( rupture.surface, geo.MultiSurface) new.lons, new.lats, new.depths = get_geom( rupture.surface, iffs, ims, igs) new.surface = rupture.surface new.strike = rupture.surface.get_strike() new.dip = rupture.surface.get_dip() new.rake = rupture.rake new.hypocenter = rupture.hypocenter new.tectonic_region_type = rupture.tectonic_region_type new.magnitude = new.mag = rupture.mag new.top_left_corner = None if iffs or ims or igs else ( new.lons[0], new.lats[0], new.depths[0]) new.top_right_corner = None if iffs or ims or igs else ( new.lons[1], new.lats[1], new.depths[1]) new.bottom_left_corner = None if iffs or ims or igs else ( new.lons[2], new.lats[2], new.depths[2]) new.bottom_right_corner = None if iffs or ims or igs else ( new.lons[3], new.lats[3], new.depths[3]) return new
def __repr__(self): return '<%s %d[%d]>' % ( self.__class__.__name__, self.rup_id, self.n_occ)