# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2012-2026 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
"""
:mod:`openquake.hazardlib.calc.stochastic` contains
:func:`stochastic_event_set`.
"""
import time
import numpy
from openquake.baselib import hdf5
from openquake.baselib.general import AccumDict, random_histogram
from openquake.baselib.performance import Monitor
from openquake.hazardlib.source.rupture import (
BaseRupture, EBRupture, rupture_dt)
from openquake.hazardlib.geo.surface.base import to_geom_lons_lats
I64 = numpy.int64
TWO16 = 2 ** 16 # 65,536
TWO30 = 2 ** 30 # 1,073,741,824
TWO32 = 2 ** 32 # 4,294,967,296
F64 = numpy.float64
U16 = numpy.uint16
U32 = numpy.uint32
U8 = numpy.uint8
I32 = numpy.int32
F32 = numpy.float32
MAX_RUPTURES = 2000
# ######################## rupture calculator ############################ #
# this is really fast
[docs]def get_rup_array(ebruptures, magdist):
"""
Convert a list of EBRuptures into a numpy composite array, by filtering
out the ruptures below the minimum msgnitude.
"""
if not BaseRupture._code:
BaseRupture.init() # initialize rupture codes
rups = []
geoms = []
for ebrupture in ebruptures:
rup = ebrupture.rupture
geom, lons, lats = to_geom_lons_lats(rup.surface)
hypo = rup.hypocenter.x, rup.hypocenter.y, rup.hypocenter.z
minlon = numpy.nanmin(lons) # NaNs are in KiteSurfaces
minlat = numpy.nanmin(lats)
maxlon = numpy.nanmax(lons)
maxlat = numpy.nanmax(lats)
# apply magnitude filtering
if magdist(rup.mag) == 0:
continue
rate = getattr(rup, 'occurrence_rate', numpy.nan)
rupid = ebrupture.id + I64(int(ebrupture.source_id) * TWO30)
tup = (rupid, ebrupture.seed, ebrupture.source_id,
ebrupture.trt_smr, rup.code, ebrupture.n_occ, rup.mag, rup.rake,
rate, minlon, minlat, maxlon, maxlat, hypo, 0, 1, 0, '???')
rups.append(tup)
# we are storing the geometries as arrays of 32 bit floating points;
# the first element is the number of surfaces, then there are
# 2 * num_surfaces integers describing the first and second
# dimension of each surface, and then the lons, lats and deps of
# the underlying meshes of points; in event_based/case_1 there
# is a point source, i.e. planar surfaces, with shapes = [1, 4]
# and points.reshape(3, 4) containing lons, lats and depths;
# in classical/case_29 there is a non parametric source containing
# 2 KiteSurfaces with shapes=[8, 5, 8, 5] and 240 = 3*2*8*5 coordinates
geoms.append(geom)
if not rups:
return ()
dic = dict(geom=numpy.array(geoms, object))
# NB: PMFs for nonparametric ruptures are not saved since they
# are useless for the GMF computation
arr = numpy.array(rups, rupture_dt)
return hdf5.ArrayWrapper(arr, dic)
[docs]def sample_cluster(group, num_ses, ses_seed):
"""
Yields ruptures generated by a cluster of sources
:param group:
A sequence of sources of the same group
:param num_ses:
Number of stochastic event sets
:param ses_seed:
Global seed for rupture sampling
:yields:
dictionaries with keys rup_array, source_data, eff_ruptures
"""
eb_ruptures = []
# group[0] is a source. 'serial' creates a random seed from source_id and
# ses_seed
seed = group[0].serial(ses_seed)
rng = numpy.random.default_rng(seed)
[trt_smr] = set(src.trt_smr for src in group)
# Set the parameters required to compute the number of occurrences
# of the cluster
samples = getattr(group[0], 'samples', 1)
tom = group.temporal_occurrence_model
rate = getattr(tom, 'occurrence_rate', None)
# Compute the number of occurrences of the cluster
if rate is None: # time dependent cluster
if hasattr(group, 'grp_probability'):
grp_p = getattr(group, 'grp_probability')
tmp = rng.random(samples * num_ses)
tot_num_occ = numpy.sum(tmp < grp_p)
else:
msg = 'Rate or Cluster probability of occurrence not defined'
raise ValueError(msg)
else: # poissonian sources with ClusterPoissonTOM
tmp = rng.poisson(rate * tom.time_span * samples, num_ses)
tot_num_occ = numpy.sum(tmp)
# Check number of occurrences of the cluster
if tot_num_occ < 1:
return eb_ruptures
# Now process the sources included in the cluster. Possible cases:
# * Traditional cluster -> all the sources occur
# * Sources in the cluster are indepedent and the cluster TOM controls
# the occurrence
# * Sources are mutually exclusive and the cluster TOM controls the
# occurrence
if group.src_interdep is None:
allrups = []
rupids = []
# Loop over the sources in the cluster and add them to 'allrups'
for src in group:
cnt = 0
for rup in src.iter_ruptures():
rup.src_id = src.id
allrups.append(rup)
cnt += 1
rupids.extend(src.offset + numpy.arange(cnt))
# Create the EB ruptures
for rup, rupid in zip(allrups, rupids):
ebr = EBRupture(rup, rup.src_id, trt_smr, tot_num_occ, rupid)
ebr.seed = ebr.id + ses_seed
eb_ruptures.append(ebr)
elif group.src_interdep == 'indep':
eb_ruptures = _get_rups_from_indep_src(
group, tot_num_occ, trt_smr, seed, ses_seed)
elif group.src_interdep == 'mutex':
# Check that ruptures are independent
if not group.rup_interdep == 'indep':
raise NotImplementedError(
f'{group.src_interdep=}, {group.rup_interdep=}')
eb_ruptures = _get_rups_from_mutex_src(
group, tot_num_occ, trt_smr, seed, ses_seed)
else:
raise NotImplementedError(
f'{group.src_interdep=}, {group.rup_interdep=}')
return eb_ruptures
def _get_rups_from_mutex_src(group, tot_num_occ, trt_smr, seed, ses_seed):
eb_ruptures = []
# Compute the number of occurrences of each (mutex) source. One source
# is selected every time we have a realization of a cluster. In other
# words, the array 'src_noccs' must have a number of elements equal to
# 'tot_num_occ'
ws = [src.mutex_weight for src in group]
src_noccs = random_histogram(tot_num_occ, ws, seed)
msg = 'Src number of occurrences does not match the initial total number'
if numpy.sum(src_noccs) != tot_num_occ:
raise ValueError(msg)
# Find the index of the ruptures generated at each occurrence of
# a source and create the EBRuptures
for src, src_nocc in zip(group, src_noccs):
if src_nocc < 1:
continue
# Source seed
src_seed = src.serial(ses_seed)
rng = numpy.random.default_rng(src_seed)
# For each rlz, find the number ruptures produced by each source. This
# is a number comprised between 1 and the number of ruptures admitted
# by that source.
idxs = numpy.arange(1, src.num_ruptures + 1)
n_rups_rlz = rng.choice(idxs, src_nocc)
ids = numpy.empty((src_nocc, src.num_ruptures))
ids[:] = numpy.nan
_set_ids(n_rups_rlz, ids, src_seed, weights=None)
# Ruptures IDs
rupids = src.offset + numpy.arange(src.num_ruptures)
# Generate ruptures
idxs = numpy.arange(src.num_ruptures)
for i_rup, rup, rupid in zip(idxs, src.iter_ruptures(), rupids):
# Find the number of occurrences per rupture
n_occ = int(numpy.sum(ids == i_rup))
# Check if the rupture has a probability of occurrence
if hasattr(rup, 'probs_occur'):
msg = 'We support only len(probs_occur) == 2'
assert len(rup.probs_occur) == 2
n_occ = numpy.min([
int(numpy.round(n_occ * rup.probs_occur[1])),
n_occ
])
# Update the list with the ruptures
if n_occ:
ebr = EBRupture(rup, src.id, trt_smr, n_occ, rupid)
ebr.seed = ebr.id + ses_seed
eb_ruptures.append(ebr)
return eb_ruptures
def _get_rups_from_indep_src(group, tot_num_occ, trt_smr, seed, ses_seed):
eb_ruptures = []
# Find the number of occurrences per source
occ_per_src = _get_occs_indep_sources(tot_num_occ, group, seed)
# Create the ruptures per LT branch
if not group.rup_interdep == 'mutex':
raise NotImplementedError(
f'{group.src_interdep=}, {group.rup_interdep=}')
# Create the EBRuptures
for nocc, src in zip(occ_per_src, group):
if nocc < 1:
continue
# Get rupture weights
rwei = src.rup_weights
if not hasattr(src, 'rup_weights'):
rwei = 1.0 / src.num_ruptures
# Source seed
src_seed = src.serial(ses_seed)
rng = numpy.random.default_rng(src_seed)
# Get rupture index for each realization
tmp_rup_ids = numpy.arange(src.num_ruptures)
ridx = rng.choice(tmp_rup_ids, nocc, p=rwei)
# Set the rupture IDs for the current source
rupids = src.offset + numpy.arange(src.num_ruptures)
# Create EBruptures
tmp = zip(rupids, src.iter_ruptures())
for i_rup, (rupid, rup) in enumerate(tmp):
n_occ = sum(ridx == i_rup)
rup.src_id = src.id
ebr = EBRupture(rup, src.id, trt_smr, n_occ, rupid)
ebr.seed = ebr.id + ses_seed
eb_ruptures.append(ebr)
return eb_ruptures
def _get_occs_indep_sources(tot_num_occ, group, seed):
# Compute the number of occurrences for each source given a number of
# occurrences of the cluster 'tot_num_occ' and the whole 'group' object
# Get the number of sources included in each cluster and find the
# IDs of the sources for each realization. 'ids' is a numpy.ndarray
# with shape number_of_cluster_rlz x number_of_sources that provides
# the indexes of the sources activated in each cluster realization
ids = numpy.empty((tot_num_occ, len(group)))
ids[:] = numpy.nan
rng = numpy.random.default_rng(seed)
n_srcs_rlz = rng.choice(range(1, len(group) + 1), tot_num_occ)
_set_ids(n_srcs_rlz, ids, seed, None)
# Find the number of occurrences per source
occ_per_src = numpy.zeros((ids.shape[1]), dtype=int)
for i_src in range(ids.shape[1]):
occ_per_src[i_src] = int(numpy.sum(ids == i_src))
return occ_per_src
def _set_ids(n_srcs, ids, seed, weights=None):
# Set the indexes of the sources (or ruptures) in each realization.
# 'n_srcs' is a 1D array with a length corresponding to the number of
# rlzs; 'ids' is 2D array where the number of sources/ruptures is the
# number of rows and the number of sources/ruptures is the number of colums
rng = numpy.random.default_rng(seed)
if weights is None:
weights = numpy.ones((ids.shape[1])) * 1.0 / ids.shape[1]
src_idxs = numpy.arange(0, ids.shape[1], 1)
for irlz, n_src in enumerate(n_srcs):
ids[irlz, 0:n_src] = sorted(
rng.choice(src_idxs, n_src, replace=False))
[docs]def sample_ruptures(sources, param, monitor=Monitor()):
"""
:param sources:
a sequence of sources of the same group
:param param:
a dictionary with ses_per_logic_tree_path, ses_seed, magdist
:param monitor:
monitor instance
:yields:
dictionaries with keys rup_array, source_data
"""
# AccumDict of arrays with 3 elements nsites, nruptures, calc_time
source_data = AccumDict(accum=[])
# Compute and save stochastic event sets
num_ses = param['ses_per_logic_tree_path']
magdist = param['magdist']
grp_id = sources[0].grp_id
# Compute the number of occurrences of the source group. This is used
# for cluster groups or groups with mutually exclusive sources.
if getattr(sources, 'atomic', False):
t0 = time.time()
eb_ruptures = sample_cluster(sources, num_ses, param['ses_seed'])
dt = time.time() - t0
# populate source_data
tot = sum(src.num_ruptures for src in sources)
for src in sources:
source_data['src_id'].append(src.source_id)
source_data['nctxs'].append(src.nsites * src.num_ruptures)
source_data['nrups'].append(src.num_ruptures)
source_data['ctimes'].append(dt * src.num_ruptures / tot)
source_data['weight'].append(src.weight)
source_data['taskno'].append(monitor.task_no)
# Yield ruptures
er = sum(src.num_ruptures for src in sources)
dic = dict(
rup_array=get_rup_array(eb_ruptures, magdist),
source_data=source_data, eff_ruptures={grp_id: er})
yield AccumDict(dic)
else:
eb_ruptures = []
eff_ruptures = 0
source_data = AccumDict(accum=[])
for src in sources:
nr = src.num_ruptures
eff_ruptures += nr
if len(eb_ruptures) > MAX_RUPTURES:
# yield partial result to avoid running out of memory
yield AccumDict(dict(
rup_array=get_rup_array(eb_ruptures, magdist),
source_data={}, eff_ruptures={}))
eb_ruptures.clear()
samples = getattr(src, 'samples', 1)
t0 = time.time()
eb_ruptures.extend(
src.sample_ruptures(samples * num_ses, param['ses_seed']))
dt = time.time() - t0
source_data['src_id'].append(src.source_id)
source_data['nctxs'].append(src.nsites * nr)
source_data['nrups'].append(nr)
source_data['ctimes'].append(dt)
source_data['weight'].append(src.weight)
source_data['taskno'].append(monitor.task_no)
t0 = time.time()
rup_array = get_rup_array(eb_ruptures, magdist)
dt = time.time() - t0
if len(rup_array):
yield AccumDict(dict(rup_array=rup_array, source_data=source_data,
eff_ruptures={grp_id: eff_ruptures}))