Source code for openquake.hazardlib.calc.stochastic

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2012-2026 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

"""
:mod:`openquake.hazardlib.calc.stochastic` contains
:func:`stochastic_event_set`.
"""
import time
import numpy
from openquake.baselib import hdf5
from openquake.baselib.general import AccumDict, random_histogram
from openquake.baselib.performance import Monitor
from openquake.hazardlib.source.rupture import (
    BaseRupture, EBRupture, rupture_dt)
from openquake.hazardlib.geo.surface.base import to_geom_lons_lats

I64 = numpy.int64
TWO16 = 2 ** 16  # 65,536
TWO30 = 2 ** 30  # 1,073,741,824
TWO32 = 2 ** 32  # 4,294,967,296
F64 = numpy.float64
U16 = numpy.uint16
U32 = numpy.uint32
U8 = numpy.uint8
I32 = numpy.int32
F32 = numpy.float32
MAX_RUPTURES = 2000


# ######################## rupture calculator ############################ #


# this is really fast
[docs]def get_rup_array(ebruptures, magdist): """ Convert a list of EBRuptures into a numpy composite array, by filtering out the ruptures below the minimum msgnitude. """ if not BaseRupture._code: BaseRupture.init() # initialize rupture codes rups = [] geoms = [] for ebrupture in ebruptures: rup = ebrupture.rupture geom, lons, lats = to_geom_lons_lats(rup.surface) hypo = rup.hypocenter.x, rup.hypocenter.y, rup.hypocenter.z minlon = numpy.nanmin(lons) # NaNs are in KiteSurfaces minlat = numpy.nanmin(lats) maxlon = numpy.nanmax(lons) maxlat = numpy.nanmax(lats) # apply magnitude filtering if magdist(rup.mag) == 0: continue rate = getattr(rup, 'occurrence_rate', numpy.nan) rupid = ebrupture.id + I64(int(ebrupture.source_id) * TWO30) tup = (rupid, ebrupture.seed, ebrupture.source_id, ebrupture.trt_smr, rup.code, ebrupture.n_occ, rup.mag, rup.rake, rate, minlon, minlat, maxlon, maxlat, hypo, 0, 1, 0, '???') rups.append(tup) # we are storing the geometries as arrays of 32 bit floating points; # the first element is the number of surfaces, then there are # 2 * num_surfaces integers describing the first and second # dimension of each surface, and then the lons, lats and deps of # the underlying meshes of points; in event_based/case_1 there # is a point source, i.e. planar surfaces, with shapes = [1, 4] # and points.reshape(3, 4) containing lons, lats and depths; # in classical/case_29 there is a non parametric source containing # 2 KiteSurfaces with shapes=[8, 5, 8, 5] and 240 = 3*2*8*5 coordinates geoms.append(geom) if not rups: return () dic = dict(geom=numpy.array(geoms, object)) # NB: PMFs for nonparametric ruptures are not saved since they # are useless for the GMF computation arr = numpy.array(rups, rupture_dt) return hdf5.ArrayWrapper(arr, dic)
[docs]def sample_cluster(group, num_ses, ses_seed): """ Yields ruptures generated by a cluster of sources :param group: A sequence of sources of the same group :param num_ses: Number of stochastic event sets :param ses_seed: Global seed for rupture sampling :yields: dictionaries with keys rup_array, source_data, eff_ruptures """ eb_ruptures = [] # group[0] is a source. 'serial' creates a random seed from source_id and # ses_seed seed = group[0].serial(ses_seed) rng = numpy.random.default_rng(seed) [trt_smr] = set(src.trt_smr for src in group) # Set the parameters required to compute the number of occurrences # of the cluster samples = getattr(group[0], 'samples', 1) tom = group.temporal_occurrence_model rate = getattr(tom, 'occurrence_rate', None) # Compute the number of occurrences of the cluster if rate is None: # time dependent cluster if hasattr(group, 'grp_probability'): grp_p = getattr(group, 'grp_probability') tmp = rng.random(samples * num_ses) tot_num_occ = numpy.sum(tmp < grp_p) else: msg = 'Rate or Cluster probability of occurrence not defined' raise ValueError(msg) else: # poissonian sources with ClusterPoissonTOM tmp = rng.poisson(rate * tom.time_span * samples, num_ses) tot_num_occ = numpy.sum(tmp) # Check number of occurrences of the cluster if tot_num_occ < 1: return eb_ruptures # Now process the sources included in the cluster. Possible cases: # * Traditional cluster -> all the sources occur # * Sources in the cluster are indepedent and the cluster TOM controls # the occurrence # * Sources are mutually exclusive and the cluster TOM controls the # occurrence if group.src_interdep is None: allrups = [] rupids = [] # Loop over the sources in the cluster and add them to 'allrups' for src in group: cnt = 0 for rup in src.iter_ruptures(): rup.src_id = src.id allrups.append(rup) cnt += 1 rupids.extend(src.offset + numpy.arange(cnt)) # Create the EB ruptures for rup, rupid in zip(allrups, rupids): ebr = EBRupture(rup, rup.src_id, trt_smr, tot_num_occ, rupid) ebr.seed = ebr.id + ses_seed eb_ruptures.append(ebr) elif group.src_interdep == 'indep': eb_ruptures = _get_rups_from_indep_src( group, tot_num_occ, trt_smr, seed, ses_seed) elif group.src_interdep == 'mutex': # Check that ruptures are independent if not group.rup_interdep == 'indep': raise NotImplementedError( f'{group.src_interdep=}, {group.rup_interdep=}') eb_ruptures = _get_rups_from_mutex_src( group, tot_num_occ, trt_smr, seed, ses_seed) else: raise NotImplementedError( f'{group.src_interdep=}, {group.rup_interdep=}') return eb_ruptures
def _get_rups_from_mutex_src(group, tot_num_occ, trt_smr, seed, ses_seed): eb_ruptures = [] # Compute the number of occurrences of each (mutex) source. One source # is selected every time we have a realization of a cluster. In other # words, the array 'src_noccs' must have a number of elements equal to # 'tot_num_occ' ws = [src.mutex_weight for src in group] src_noccs = random_histogram(tot_num_occ, ws, seed) msg = 'Src number of occurrences does not match the initial total number' if numpy.sum(src_noccs) != tot_num_occ: raise ValueError(msg) # Find the index of the ruptures generated at each occurrence of # a source and create the EBRuptures for src, src_nocc in zip(group, src_noccs): if src_nocc < 1: continue # Source seed src_seed = src.serial(ses_seed) rng = numpy.random.default_rng(src_seed) # For each rlz, find the number ruptures produced by each source. This # is a number comprised between 1 and the number of ruptures admitted # by that source. idxs = numpy.arange(1, src.num_ruptures + 1) n_rups_rlz = rng.choice(idxs, src_nocc) ids = numpy.empty((src_nocc, src.num_ruptures)) ids[:] = numpy.nan _set_ids(n_rups_rlz, ids, src_seed, weights=None) # Ruptures IDs rupids = src.offset + numpy.arange(src.num_ruptures) # Generate ruptures idxs = numpy.arange(src.num_ruptures) for i_rup, rup, rupid in zip(idxs, src.iter_ruptures(), rupids): # Find the number of occurrences per rupture n_occ = int(numpy.sum(ids == i_rup)) # Check if the rupture has a probability of occurrence if hasattr(rup, 'probs_occur'): msg = 'We support only len(probs_occur) == 2' assert len(rup.probs_occur) == 2 n_occ = numpy.min([ int(numpy.round(n_occ * rup.probs_occur[1])), n_occ ]) # Update the list with the ruptures if n_occ: ebr = EBRupture(rup, src.id, trt_smr, n_occ, rupid) ebr.seed = ebr.id + ses_seed eb_ruptures.append(ebr) return eb_ruptures def _get_rups_from_indep_src(group, tot_num_occ, trt_smr, seed, ses_seed): eb_ruptures = [] # Find the number of occurrences per source occ_per_src = _get_occs_indep_sources(tot_num_occ, group, seed) # Create the ruptures per LT branch if not group.rup_interdep == 'mutex': raise NotImplementedError( f'{group.src_interdep=}, {group.rup_interdep=}') # Create the EBRuptures for nocc, src in zip(occ_per_src, group): if nocc < 1: continue # Get rupture weights rwei = src.rup_weights if not hasattr(src, 'rup_weights'): rwei = 1.0 / src.num_ruptures # Source seed src_seed = src.serial(ses_seed) rng = numpy.random.default_rng(src_seed) # Get rupture index for each realization tmp_rup_ids = numpy.arange(src.num_ruptures) ridx = rng.choice(tmp_rup_ids, nocc, p=rwei) # Set the rupture IDs for the current source rupids = src.offset + numpy.arange(src.num_ruptures) # Create EBruptures tmp = zip(rupids, src.iter_ruptures()) for i_rup, (rupid, rup) in enumerate(tmp): n_occ = sum(ridx == i_rup) rup.src_id = src.id ebr = EBRupture(rup, src.id, trt_smr, n_occ, rupid) ebr.seed = ebr.id + ses_seed eb_ruptures.append(ebr) return eb_ruptures def _get_occs_indep_sources(tot_num_occ, group, seed): # Compute the number of occurrences for each source given a number of # occurrences of the cluster 'tot_num_occ' and the whole 'group' object # Get the number of sources included in each cluster and find the # IDs of the sources for each realization. 'ids' is a numpy.ndarray # with shape number_of_cluster_rlz x number_of_sources that provides # the indexes of the sources activated in each cluster realization ids = numpy.empty((tot_num_occ, len(group))) ids[:] = numpy.nan rng = numpy.random.default_rng(seed) n_srcs_rlz = rng.choice(range(1, len(group) + 1), tot_num_occ) _set_ids(n_srcs_rlz, ids, seed, None) # Find the number of occurrences per source occ_per_src = numpy.zeros((ids.shape[1]), dtype=int) for i_src in range(ids.shape[1]): occ_per_src[i_src] = int(numpy.sum(ids == i_src)) return occ_per_src def _set_ids(n_srcs, ids, seed, weights=None): # Set the indexes of the sources (or ruptures) in each realization. # 'n_srcs' is a 1D array with a length corresponding to the number of # rlzs; 'ids' is 2D array where the number of sources/ruptures is the # number of rows and the number of sources/ruptures is the number of colums rng = numpy.random.default_rng(seed) if weights is None: weights = numpy.ones((ids.shape[1])) * 1.0 / ids.shape[1] src_idxs = numpy.arange(0, ids.shape[1], 1) for irlz, n_src in enumerate(n_srcs): ids[irlz, 0:n_src] = sorted( rng.choice(src_idxs, n_src, replace=False))
[docs]def sample_ruptures(sources, param, monitor=Monitor()): """ :param sources: a sequence of sources of the same group :param param: a dictionary with ses_per_logic_tree_path, ses_seed, magdist :param monitor: monitor instance :yields: dictionaries with keys rup_array, source_data """ # AccumDict of arrays with 3 elements nsites, nruptures, calc_time source_data = AccumDict(accum=[]) # Compute and save stochastic event sets num_ses = param['ses_per_logic_tree_path'] magdist = param['magdist'] grp_id = sources[0].grp_id # Compute the number of occurrences of the source group. This is used # for cluster groups or groups with mutually exclusive sources. if getattr(sources, 'atomic', False): t0 = time.time() eb_ruptures = sample_cluster(sources, num_ses, param['ses_seed']) dt = time.time() - t0 # populate source_data tot = sum(src.num_ruptures for src in sources) for src in sources: source_data['src_id'].append(src.source_id) source_data['nctxs'].append(src.nsites * src.num_ruptures) source_data['nrups'].append(src.num_ruptures) source_data['ctimes'].append(dt * src.num_ruptures / tot) source_data['weight'].append(src.weight) source_data['taskno'].append(monitor.task_no) # Yield ruptures er = sum(src.num_ruptures for src in sources) dic = dict( rup_array=get_rup_array(eb_ruptures, magdist), source_data=source_data, eff_ruptures={grp_id: er}) yield AccumDict(dic) else: eb_ruptures = [] eff_ruptures = 0 source_data = AccumDict(accum=[]) for src in sources: nr = src.num_ruptures eff_ruptures += nr if len(eb_ruptures) > MAX_RUPTURES: # yield partial result to avoid running out of memory yield AccumDict(dict( rup_array=get_rup_array(eb_ruptures, magdist), source_data={}, eff_ruptures={})) eb_ruptures.clear() samples = getattr(src, 'samples', 1) t0 = time.time() eb_ruptures.extend( src.sample_ruptures(samples * num_ses, param['ses_seed'])) dt = time.time() - t0 source_data['src_id'].append(src.source_id) source_data['nctxs'].append(src.nsites * nr) source_data['nrups'].append(nr) source_data['ctimes'].append(dt) source_data['weight'].append(src.weight) source_data['taskno'].append(monitor.task_no) t0 = time.time() rup_array = get_rup_array(eb_ruptures, magdist) dt = time.time() - t0 if len(rup_array): yield AccumDict(dict(rup_array=rup_array, source_data=source_data, eff_ruptures={grp_id: eff_ruptures}))