# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2010-2018 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import os
import re
import copy
import math
import time
import logging
import operator
import collections
import numpy
from openquake.baselib import performance, hdf5
from openquake.baselib.python3compat import decode
from openquake.baselib.general import (
groupby, group_array, gettemp, AccumDict, random_filter, cached_property)
from openquake.hazardlib import (
source, sourceconverter, probability_map, stats, contexts)
from openquake.hazardlib.gsim.gmpe_table import GMPETable
from openquake.commonlib import logictree
MINWEIGHT = source.MINWEIGHT
MAX_INT = 2 ** 31 - 1
U16 = numpy.uint16
U32 = numpy.uint32
I32 = numpy.int32
F32 = numpy.float32
weight = operator.attrgetter('weight')
rlz_dt = numpy.dtype([
('branch_path', 'S200'), ('gsims', 'S100'), ('weight', F32)])
[docs]def split_sources(srcs, min_mag):
"""
:param srcs: sources
:returns: a pair (split sources, split time)
"""
sources = []
split_time = {} # src_id -> dt
for src in srcs:
t0 = time.time()
if min_mag and src.get_min_max_mag()[0] < min_mag:
splits = []
for s in src:
if min_mag and s.get_min_max_mag()[0] < min_mag:
# discard some ruptures
s.min_mag = min_mag
s.num_ruptures = s.count_ruptures()
if s.num_ruptures:
splits.append(s)
else:
splits.append(s)
else:
splits = list(src)
split_time[src.source_id] = time.time() - t0
sources.extend(splits)
has_serial = hasattr(src, 'serial')
has_samples = hasattr(src, 'samples')
if len(splits) > 1:
start = 0
for i, split in enumerate(splits):
split.source_id = '%s:%s' % (src.source_id, i)
split.src_group_id = src.src_group_id
split.ngsims = src.ngsims
split.ndists = src.ndists
if has_serial:
nr = split.num_ruptures
split.serial = src.serial[start:start + nr]
start += nr
if has_samples:
split.samples = src.samples
elif splits: # single source
splits[0].ngsims = src.ngsims
splits[0].ndists = src.ndists
if has_serial:
splits[0].serial = src.serial
if has_samples:
splits[0].samples = src.samples
return sources, split_time
[docs]def gsim_names(rlz):
"""
Names of the underlying GSIMs separated by spaces
"""
return ' '.join(str(v) for v in rlz.gsim_rlz.value)
[docs]class LtRealization(object):
"""
Composite realization build on top of a source model realization and
a GSIM realization.
"""
def __init__(self, ordinal, sm_lt_path, gsim_rlz, weight):
self.ordinal = ordinal
self.sm_lt_path = tuple(sm_lt_path)
self.gsim_rlz = gsim_rlz
self.weight = weight
def __repr__(self):
return '<%d,%s,w=%s>' % (self.ordinal, self.uid, self.weight)
@property
def gsim_lt_path(self):
return self.gsim_rlz.lt_path
@property
def uid(self):
"""An unique identifier for effective realizations"""
return '_'.join(self.sm_lt_path) + '~' + self.gsim_rlz.uid
def __lt__(self, other):
return self.ordinal < other.ordinal
def __eq__(self, other):
return repr(self) == repr(other)
def __ne__(self, other):
return repr(self) != repr(other)
def __hash__(self):
return hash(repr(self))
[docs]def capitalize(words):
"""
Capitalize words separated by spaces.
"""
return ' '.join(w.capitalize() for w in decode(words).split(' '))
def _assert_equal_sources(nodes):
if hasattr(nodes[0], 'source_id'):
n0 = nodes[0]
for n in nodes[1:]:
n.assert_equal(n0, ignore=('id', 'src_group_id'))
else: # assume source nodes
n0 = nodes[0].to_str()
for n in nodes[1:]:
eq = n.to_str() == n0
if not eq:
f0 = gettemp(n0)
f1 = gettemp(n.to_str())
assert eq, 'different parameters for source %s, run meld %s %s' % (
n['id'], f0, f1)
return nodes
[docs]class RlzsAssoc(object):
"""
Realization association class. It should not be instantiated directly,
but only via the method :meth:
`openquake.commonlib.source.CompositeSourceModel.get_rlzs_assoc`.
:attr realizations: list of :class:`LtRealization` objects
:attr gsim_by_trt: list of dictionaries {trt: gsim}
:attr rlzs_assoc: dictionary {src_group_id, gsim: rlzs}
:attr rlzs_by_smodel: list of lists of realizations
For instance, for the non-trivial logic tree in
:mod:`openquake.qa_tests_data.classical.case_15`, which has 4 tectonic
region types and 4 + 2 + 2 realizations, there are the following
associations:
(0, 'BooreAtkinson2008()') ['#0-SM1-BA2008_C2003', '#1-SM1-BA2008_T2002']
(0, 'CampbellBozorgnia2008()') ['#2-SM1-CB2008_C2003', '#3-SM1-CB2008_T2002']
(1, 'Campbell2003()') ['#0-SM1-BA2008_C2003', '#2-SM1-CB2008_C2003']
(1, 'ToroEtAl2002()') ['#1-SM1-BA2008_T2002', '#3-SM1-CB2008_T2002']
(2, 'BooreAtkinson2008()') ['#4-SM2_a3pt2b0pt8-BA2008']
(2, 'CampbellBozorgnia2008()') ['#5-SM2_a3pt2b0pt8-CB2008']
(3, 'BooreAtkinson2008()') ['#6-SM2_a3b1-BA2008']
(3, 'CampbellBozorgnia2008()') ['#7-SM2_a3b1-CB2008']
"""
def __init__(self, csm_info):
self.seed = csm_info.seed
self.csm_info = csm_info
self.num_samples = csm_info.num_samples
self.gsim_by_trt = [] # rlz.ordinal -> {trt: gsim}
self.rlzs_by_smodel = {sm.ordinal: [] for sm in csm_info.source_models}
[docs] def get_rlzs_by_gsim(self, trt_or_grp_id, sm_id=None):
"""
:param trt_or_grp_id: a tectonic region type or a source group ID
:param sm_id: source model ordinal (or None)
:returns: a dictionary gsim -> rlzs
"""
if isinstance(trt_or_grp_id, (int, U32)): # grp_id
trt = self.csm_info.trt_by_grp[trt_or_grp_id]
sm_id = self.csm_info.get_sm_by_grp()[trt_or_grp_id]
else: # assume TRT string
trt = trt_or_grp_id
acc = collections.defaultdict(list)
if sm_id is None: # full dictionary
for rlz, gsim_by_trt in zip(self.realizations, self.gsim_by_trt):
acc[gsim_by_trt[trt]].append(rlz.ordinal)
else: # dictionary for the selected source model
for rlz in self.rlzs_by_smodel[sm_id]:
gsim_by_trt = self.gsim_by_trt[rlz.ordinal]
try: # if there is a single TRT
[gsim] = gsim_by_trt.values()
except ValueError: # there is more than 1 TRT
gsim = gsim_by_trt[trt]
acc[gsim].append(rlz.ordinal)
return collections.OrderedDict(
(gsim, numpy.array(acc[gsim], dtype=U16)) for gsim in sorted(acc))
[docs] def by_grp(self):
"""
:returns: a dictionary grp -> [(gsim_idx, rlzis), ...]
"""
dic = {} # grp -> [(gsim_idx, rlzis), ...]
for sm in self.csm_info.source_models:
for sg in sm.src_groups:
if not sg.eff_ruptures:
continue
rlzs_by_gsim = self.get_rlzs_by_gsim(sg.trt, sm.ordinal)
if not rlzs_by_gsim:
continue
dic['grp-%02d' % sg.id] = [
(gsim_idx, rlzs_by_gsim[gsim])
for gsim_idx, gsim in enumerate(rlzs_by_gsim)]
return dic
def _init(self):
"""
Finalize the initialization of the RlzsAssoc object by setting
the (reduced) weights of the realizations.
"""
if self.num_samples:
assert len(self.realizations) == self.num_samples, (
len(self.realizations), self.num_samples)
tot_weight = sum(rlz.weight for rlz in self.realizations)
for rlz in self.realizations:
rlz.weight /= tot_weight
else:
tot_weight = sum(rlz.weight for rlz in self.realizations)
if tot_weight == 0:
raise ValueError('All realizations have zero weight??')
elif abs(tot_weight - 1) > 1E-8:
# this may happen for rounding errors or because of the
# logic tree reduction; we ensure the sum of the weights is 1
for rlz in self.realizations:
rlz.weight = rlz.weight / tot_weight
@property
def realizations(self):
"""Flat list with all the realizations"""
return sum(self.rlzs_by_smodel.values(), [])
@property
def weights(self):
"""Array with the weight of the realizations"""
return numpy.array([rlz.weight for rlz in self.realizations])
[docs] def combine_pmaps(self, pmap_by_grp):
"""
:param pmap_by_grp: dictionary group string -> probability map
:returns: a list of probability maps, one per realization
"""
grp = list(pmap_by_grp)[0] # pmap_by_grp must be non-empty
num_levels = pmap_by_grp[grp].shape_y
pmaps = [probability_map.ProbabilityMap(num_levels, 1)
for _ in self.realizations]
array = self.by_grp()
for grp in pmap_by_grp:
for gsim_idx, rlzis in array[grp]:
pmap = pmap_by_grp[grp].extract(gsim_idx)
for rlzi in rlzis:
pmaps[rlzi] |= pmap
return pmaps
[docs] def compute_pmap_stats(self, pmap_by_grp, statfuncs):
"""
:param pmap_by_grp: dictionary group string -> probability map
:param statfuncs: a list of statistical functions
:returns: a probability map containing all statistics
"""
pmaps = self.combine_pmaps(pmap_by_grp)
return stats.compute_pmap_stats(pmaps, statfuncs, self.weights)
[docs] def get_rlz(self, rlzstr):
"""
Get a Realization instance for a string of the form 'rlz-\d+'
"""
mo = re.match('rlz-(\d+)', rlzstr)
if not mo:
return
return self.realizations[int(mo.group(1))]
def _add_realizations(self, offset, lt_model, all_trts, gsim_rlzs):
idx = numpy.arange(offset, offset + len(gsim_rlzs))
rlzs = []
for i, gsim_rlz in enumerate(gsim_rlzs):
weight = float(lt_model.weight) * float(gsim_rlz.weight)
rlz = LtRealization(idx[i], lt_model.path, gsim_rlz, weight)
self.gsim_by_trt.append(dict(zip(all_trts, gsim_rlz.value)))
rlzs.append(rlz)
self.rlzs_by_smodel[lt_model.ordinal] = rlzs
def __len__(self):
array = self.by_grp() # TODO: remove this
return sum(len(array[grp]) for grp in array)
def __repr__(self):
pairs = []
dic = self.by_grp()
for grp in sorted(dic):
grp_id = int(grp[4:])
gsims = self.csm_info.get_gsims(grp_id)
for gsim_idx, rlzis in dic[grp]:
if len(rlzis) > 10: # short representation
rlzis = ['%d realizations' % len(rlzis)]
pairs.append(('%s,%s' % (grp_id, gsims[gsim_idx]), rlzis))
return '<%s(size=%d, rlzs=%d)\n%s>' % (
self.__class__.__name__, len(self), len(self.realizations),
'\n'.join('%s: %s' % pair for pair in pairs))
LENGTH = 256
source_model_dt = numpy.dtype([
('name', hdf5.vstr),
('weight', F32),
('path', hdf5.vstr),
('num_rlzs', U32),
('samples', U32),
])
src_group_dt = numpy.dtype(
[('grp_id', U32),
('name', hdf5.vstr),
('trti', U16),
('effrup', I32),
('totrup', I32),
('sm_id', U32)])
[docs]def accept_path(path, ref_path):
"""
:param path: a logic tree path (list or tuple of strings)
:param ref_path: reference logic tree path
:returns: True if `path` is consistent with `ref_path`, False otherwise
>>> accept_path(['SM2'], ('SM2', 'a3b1'))
False
>>> accept_path(['SM2', '@'], ('SM2', 'a3b1'))
True
>>> accept_path(['@', 'a3b1'], ('SM2', 'a3b1'))
True
>>> accept_path('@@', ('SM2', 'a3b1'))
True
"""
if len(path) != len(ref_path):
return False
for a, b in zip(path, ref_path):
if a != '@' and a != b:
return False
return True
[docs]def get_field(data, field, default):
"""
:param data: a record with a field `field`, possibily missing
"""
try:
return data[field]
except ValueError: # field missing in old engines
return default
[docs]class CompositionInfo(object):
"""
An object to collect information about the composition of
a composite source model.
:param source_model_lt: a SourceModelLogicTree object
:param source_models: a list of LtSourceModel instances
"""
[docs] @classmethod
def fake(cls, gsimlt=None):
"""
:returns:
a fake `CompositionInfo` instance with the given gsim logic tree
object; if None, builds automatically a fake gsim logic tree
"""
weight = 1
gsim_lt = gsimlt or logictree.GsimLogicTree.from_('FromFile')
fakeSM = logictree.LtSourceModel(
'scenario', weight, 'b1',
[sourceconverter.SourceGroup('*', eff_ruptures=1)],
gsim_lt.get_num_paths(), ordinal=0, samples=1)
return cls(gsim_lt, seed=0, num_samples=0, source_models=[fakeSM],
totweight=0)
def __init__(self, gsim_lt, seed, num_samples, source_models, totweight=0):
self.gsim_lt = gsim_lt
self.seed = seed
self.num_samples = num_samples
self.source_models = source_models
self.tot_weight = totweight
self.init()
[docs] def init(self):
self.trt_by_grp = self.grp_by("trt")
if self.num_samples:
self.seed_samples_by_grp = {}
seed = self.seed
for sm in self.source_models:
for grp in sm.src_groups:
self.seed_samples_by_grp[grp.id] = seed, sm.samples
seed += sm.samples
@property
def gsim_rlzs(self):
"""
Build and cache the gsim logic tree realizations
"""
try:
return self._gsim_rlzs
except AttributeError:
self._gsim_rlzs = list(self.gsim_lt)
return self._gsim_rlzs
[docs] def get_gsims(self, grp_id):
"""
Get the GSIMs associated with the given group
"""
trt = self.trt_by_grp[grp_id]
if self.num_samples: # sampling
seed, samples = self.seed_samples_by_grp[grp_id]
numpy.random.seed(seed)
idxs = numpy.random.choice(len(self.gsim_rlzs), samples)
rlzs = [self.gsim_rlzs[i] for i in idxs]
else: # full enumeration
rlzs = None
return self.gsim_lt.get_gsims(trt, rlzs)
[docs] def get_info(self, sm_id):
"""
Extract a CompositionInfo instance containing the single
model of index `sm_id`.
"""
sm = self.source_models[sm_id]
num_samples = sm.samples if self.num_samples else 0
return self.__class__(
self.gsim_lt, self.seed, num_samples, [sm], self.tot_weight)
[docs] def get_samples_by_grp(self):
"""
:returns: a dictionary src_group_id -> source_model.samples
"""
return {grp.id: sm.samples for sm in self.source_models
for grp in sm.src_groups}
[docs] def get_rlzs_by_gsim_grp(self, sm_lt_path=None, trts=None):
"""
:returns: a dictionary src_group_id -> gsim -> rlzs
"""
self.rlzs_assoc = self.get_rlzs_assoc(sm_lt_path, trts)
dic = {grp.id: self.rlzs_assoc.get_rlzs_by_gsim(grp.id)
for sm in self.source_models for grp in sm.src_groups}
return dic
def __getnewargs__(self):
# with this CompositionInfo instances will be unpickled correctly
return self.seed, self.num_samples, self.source_models
[docs] def trt2i(self):
"""
:returns: trt -> trti
"""
trts = sorted(set(src_group.trt for sm in self.source_models
for src_group in sm.src_groups))
return {trt: i for i, trt in enumerate(trts)}
def __toh5__(self):
# save csm_info/sg_data, csm_info/sm_data in the datastore
trti = self.trt2i()
sg_data = []
sm_data = []
for sm in self.source_models:
trts = set(sg.trt for sg in sm.src_groups)
num_gsim_paths = self.gsim_lt.reduce(trts).get_num_paths()
sm_data.append((sm.names, sm.weight, '_'.join(sm.path),
num_gsim_paths, sm.samples))
for src_group in sm.src_groups:
sg_data.append((src_group.id, src_group.name,
trti[src_group.trt], src_group.eff_ruptures,
src_group.tot_ruptures, sm.ordinal))
return (dict(
sg_data=numpy.array(sg_data, src_group_dt),
sm_data=numpy.array(sm_data, source_model_dt)),
dict(seed=self.seed, num_samples=self.num_samples,
trts=hdf5.array_of_vstr(sorted(trti)),
gsim_lt_xml=str(self.gsim_lt),
gsim_fname=self.gsim_lt.fname,
tot_weight=self.tot_weight))
def __fromh5__(self, dic, attrs):
# TODO: this is called more times than needed, maybe we should cache it
sg_data = group_array(dic['sg_data'], 'sm_id')
sm_data = dic['sm_data']
vars(self).update(attrs)
self.gsim_fname = decode(self.gsim_fname)
if self.gsim_fname.endswith('.xml'):
# otherwise it would look in the current directory
GMPETable.GMPE_DIR = os.path.dirname(self.gsim_fname)
trts = sorted(self.trts)
tmp = gettemp(self.gsim_lt_xml, suffix='.xml')
self.gsim_lt = logictree.GsimLogicTree(tmp, trts)
else: # fake file with the name of the GSIM
self.gsim_lt = logictree.GsimLogicTree.from_(self.gsim_fname)
self.source_models = []
for sm_id, rec in enumerate(sm_data):
tdata = sg_data[sm_id]
srcgroups = [
sourceconverter.SourceGroup(
self.trts[data['trti']], id=data['grp_id'],
name=get_field(data, 'name', ''),
eff_ruptures=data['effrup'],
tot_ruptures=get_field(data, 'totrup', 0))
for data in tdata]
path = tuple(str(decode(rec['path'])).split('_'))
trts = set(sg.trt for sg in srcgroups)
sm = logictree.LtSourceModel(
rec['name'], rec['weight'], path, srcgroups,
rec['num_rlzs'], sm_id, rec['samples'])
self.source_models.append(sm)
self.init()
try:
os.remove(tmp) # gsim_lt file
except NameError: # tmp is defined only in the regular case, see above
pass
[docs] def get_num_rlzs(self, source_model=None):
"""
:param source_model: a LtSourceModel instance (or None)
:returns: the number of realizations per source model (or all)
"""
if source_model is None:
return sum(self.get_num_rlzs(sm) for sm in self.source_models)
if self.num_samples:
return source_model.samples
trts = set(sg.trt for sg in source_model.src_groups if sg.eff_ruptures)
if sum(sg.eff_ruptures for sg in source_model.src_groups) == 0:
return 0
return self.gsim_lt.reduce(trts).get_num_paths()
@property
def rlzs(self):
"""
:returns: an array of realizations
"""
realizations = self.get_rlzs_assoc().realizations
return numpy.array(
[(r.uid, gsim_names(r), r.weight) for r in realizations], rlz_dt)
[docs] def update_eff_ruptures(self, count_ruptures):
"""
:param count_ruptures: function or dict src_group_id -> num_ruptures
"""
for smodel in self.source_models:
for sg in smodel.src_groups:
sg.eff_ruptures = (count_ruptures(sg.id)
if callable(count_ruptures)
else count_ruptures[sg.id])
[docs] def get_rlzs_assoc(self, sm_lt_path=None, trts=None):
"""
:param sm_lt_path: logic tree path tuple used to select a source model
:param trts: tectonic region types to accept
"""
assoc = RlzsAssoc(self)
offset = 0
trtset = set(self.gsim_lt.tectonic_region_types)
for smodel in self.source_models:
# discard source models with non-acceptable lt_path
if sm_lt_path and not accept_path(smodel.path, sm_lt_path):
continue
# collect the effective tectonic region types and ruptures
trts_ = set()
for sg in smodel.src_groups:
if sg.eff_ruptures:
if (trts and sg.trt in trts) or not trts:
trts_.add(sg.trt)
# recompute the GSIM logic tree if needed
if trtset != trts_:
before = self.gsim_lt.get_num_paths()
gsim_lt = self.gsim_lt.reduce(trts_)
after = gsim_lt.get_num_paths()
if sm_lt_path and before > after:
# print the warning only when saving the logic tree,
# i.e. when called with sm_lt_path in store_source_info
logging.warn('Reducing the logic tree of %s from %d to %d '
'realizations', smodel.name, before, after)
gsim_rlzs = list(gsim_lt)
all_trts = gsim_lt.all_trts
else:
gsim_rlzs = self.gsim_rlzs
all_trts = self.gsim_lt.all_trts
rlzs = self._get_rlzs(smodel, gsim_rlzs, self.seed + offset)
assoc._add_realizations(offset, smodel, all_trts, rlzs)
offset += len(rlzs)
if assoc.realizations:
assoc._init()
return assoc
[docs] def get_source_model(self, src_group_id):
"""
Return the source model for the given src_group_id
"""
for smodel in self.source_models:
for src_group in smodel.src_groups:
if src_group.id == src_group_id:
return smodel
[docs] def get_grp_ids(self, sm_id):
"""
:returns: a list of source group IDs for the given source model ID
"""
return [sg.id for sg in self.source_models[sm_id].src_groups]
[docs] def get_sm_by_grp(self):
"""
:returns: a dictionary grp_id -> sm_id
"""
return {grp.id: sm.ordinal for sm in self.source_models
for grp in sm.src_groups}
[docs] def grp_by(self, name):
"""
:returns: a dictionary grp_id -> TRT string
"""
dic = {}
for smodel in self.source_models:
for src_group in smodel.src_groups:
dic[src_group.id] = getattr(src_group, name)
return dic
def _get_rlzs(self, smodel, all_rlzs, seed):
if self.num_samples:
# NB: the weights are considered when combining the results, not
# when sampling, therefore there are no weights in the function
# numpy.random.choice below
numpy.random.seed(seed)
idxs = numpy.random.choice(len(all_rlzs), smodel.samples)
rlzs = [all_rlzs[idx] for idx in idxs]
else: # full enumeration
rlzs = logictree.get_effective_rlzs(all_rlzs)
return rlzs
def __repr__(self):
info_by_model = collections.OrderedDict()
for sm in self.source_models:
info_by_model[sm.path] = (
'_'.join(map(decode, sm.path)),
decode(sm.names),
[sg.id for sg in sm.src_groups],
sm.weight,
self.get_num_rlzs(sm))
summary = ['%s, %s, grp=%s, weight=%s: %d realization(s)' % ibm
for ibm in info_by_model.values()]
return '<%s\n%s>' % (
self.__class__.__name__, '\n'.join(summary))
[docs]class CompositeSourceModel(collections.Sequence):
"""
:param source_model_lt:
a :class:`openquake.commonlib.logictree.SourceModelLogicTree` instance
:param source_models:
a list of :class:`openquake.hazardlib.sourceconverter.SourceModel`
tuples
"""
def __init__(self, gsim_lt, source_model_lt, source_models,
optimize_same_id):
self.gsim_lt = gsim_lt
self.source_model_lt = source_model_lt
self.source_models = source_models
self.optimize_same_id = optimize_same_id
self.source_info = ()
self.info = CompositionInfo(
gsim_lt, self.source_model_lt.seed,
self.source_model_lt.num_samples,
[sm.get_skeleton() for sm in self.source_models])
# dictionary src_group_id, source_id -> SourceInfo,
# populated by the .split_in_blocks method
self.infos = {}
try:
dupl_sources = self.check_dupl_sources()
except AssertionError:
# different sources with the same ID
self.has_dupl_sources = 0
else:
self.has_dupl_sources = len(dupl_sources)
[docs] def split_all(self, min_mag=0):
"""
Split all sources in the composite source model.
:param samples_factor: if given, sample the sources
:returns: a dictionary source_id -> split_time
"""
sample_factor = os.environ.get('OQ_SAMPLE_SOURCES')
split_time = AccumDict()
for sm in self.source_models:
for src_group in sm.src_groups:
self.add_infos(src_group)
if getattr(src_group, 'src_interdep', None) == 'mutex':
# mutex sources cannot be split, just set the mutex_weight
for src, sw in zip(src_group, src_group.srcs_weights):
src.mutex_weight = sw
else:
# split regular sources
srcs, stime = split_sources(src_group, min_mag)
for src in src_group:
s = src.source_id
self.infos[s].split_time = stime[s]
if sample_factor:
# debugging tip to reduce the size of a calculation
# OQ_SAMPLE_SOURCES=.01 oq engine --run job.ini
# will run a computation 100 times smaller
srcs = random_filter(srcs, float(sample_factor))
src_group.sources = srcs
split_time += stime
return split_time
[docs] def grp_by_src(self):
"""
:returns: a new CompositeSourceModel with one group per source
"""
smodels = []
grp_id = 0
for sm in self.source_models:
src_groups = []
smodel = sm.__class__(sm.names, sm.weight, sm.path, src_groups,
sm.num_gsim_paths, sm.ordinal, sm.samples)
for sg in sm.src_groups:
for src in sg.sources:
src.src_group_id = grp_id
src_groups.append(
sourceconverter.SourceGroup(
sg.trt, [src], name=src.source_id, id=grp_id))
grp_id += 1
smodels.append(smodel)
return self.__class__(self.gsim_lt, self.source_model_lt, smodels,
self.optimize_same_id)
[docs] def get_model(self, sm_id):
"""
Extract a CompositeSourceModel instance containing the single
model of index `sm_id`.
"""
sm = self.source_models[sm_id]
if self.source_model_lt.num_samples:
self.source_model_lt.num_samples = sm.samples
new = self.__class__(self.gsim_lt, self.source_model_lt, [sm],
self.optimize_same_id)
new.sm_id = sm_id
return new
[docs] def new(self, sources_by_grp):
"""
Generate a new CompositeSourceModel from the given dictionary.
:param sources_by_group: a dictionary grp_id -> sources
:returns: a new CompositeSourceModel instance
"""
source_models = []
for sm in self.source_models:
src_groups = []
for src_group in sm.src_groups:
sg = copy.copy(src_group)
sg.sources = sources_by_grp.get(sg.id, [])
src_groups.append(sg)
newsm = logictree.LtSourceModel(
sm.names, sm.weight, sm.path, src_groups,
sm.num_gsim_paths, sm.ordinal, sm.samples)
source_models.append(newsm)
new = self.__class__(self.gsim_lt, self.source_model_lt, source_models,
self.optimize_same_id)
new.info.update_eff_ruptures(new.get_num_ruptures().__getitem__)
new.infos = self.infos
return new
[docs] def get_weight(self, weight):
"""
:param weight: source weight function
:returns: total weight of the source model
"""
tot_weight = 0
for srcs in self.sources_by_trt.values():
tot_weight += sum(map(weight, srcs))
for grp in self.gen_mutex_groups():
tot_weight += sum(map(weight, grp))
self.info.tot_weight = tot_weight
return tot_weight
@property
def src_groups(self):
"""
Yields the SourceGroups inside each source model.
"""
for sm in self.source_models:
for src_group in sm.src_groups:
yield src_group
[docs] def get_nonparametric_sources(self):
"""
:returns: list of non parametric sources in the composite source model
"""
return [src for sm in self.source_models
for src_group in sm.src_groups
for src in src_group if hasattr(src, 'data')]
[docs] def check_dupl_sources(self): # used in print_csm_info
"""
Extracts duplicated sources, i.e. sources with the same source_id in
different source groups. Raise an exception if there are sources with
the same ID which are not duplicated.
:returns: a list of list of sources, ordered by source_id
"""
dd = collections.defaultdict(list)
for src_group in self.src_groups:
for src in src_group:
try:
srcid = src.source_id
except AttributeError: # src is a Node object
srcid = src['id']
dd[srcid].append(src)
return [_assert_equal_sources(srcs)
for srcid, srcs in sorted(dd.items()) if len(srcs) > 1]
[docs] def gen_mutex_groups(self):
"""
Yield groups of mutually exclusive sources
"""
for sg in self.src_groups:
if sg.src_interdep == 'mutex':
yield sg
[docs] def get_sources(self, kind='all'):
"""
Extract the sources contained in the source models by optionally
filtering and splitting them, depending on the passed parameter.
"""
assert kind in ('all', 'indep', 'mutex'), kind
sources = []
for sm in self.source_models:
for src_group in sm.src_groups:
if kind in ('all', src_group.src_interdep):
for src in src_group:
if sm.samples > 1:
src.samples = sm.samples
sources.append(src)
if kind == 'all' and not sources:
raise RuntimeError('All sources were filtered away!')
return sources
@cached_property
def sources_by_trt(self):
"""
Build a dictionary TRT string -> sources. Sources of kind "mutex"
(if any) are silently discarded.
"""
acc = AccumDict(accum=[])
for sm in self.source_models:
for grp in sm.src_groups:
if grp.src_interdep != 'mutex':
acc[grp.trt].extend(grp)
if self.optimize_same_id is False:
return acc
# extract a single source from multiple sources with the same ID
n = 0
tot = 0
dic = {}
for trt in acc:
dic[trt] = []
for grp in groupby(acc[trt], lambda x: x.source_id).values():
src = grp[0]
n += 1
tot += len(grp)
# src.src_group_id can be a list if get_sources_by_trt was
# called before
if len(grp) > 1 and not isinstance(src.src_group_id, list):
src.src_group_id = [s.src_group_id for s in grp]
dic[trt].append(src)
if n < tot:
logging.info('Reduced %d sources to %d sources with unique IDs',
tot, n)
return dic
[docs] def get_num_ruptures(self):
"""
:returns: the number of ruptures per source group ID
"""
return {grp.id: sum(src.num_ruptures for src in grp)
for grp in self.src_groups}
[docs] def init_serials(self, ses_seed):
"""
Generate unique seeds for each rupture with numpy.arange.
This should be called only in event based calculators
"""
sources = self.get_sources()
n = sum(src.num_ruptures for src in sources)
rup_serial = numpy.arange(ses_seed, ses_seed + n, dtype=numpy.uint32)
start = 0
for src in sources:
nr = src.num_ruptures
src.serial = rup_serial[start:start + nr]
start += nr
[docs] def get_maxweight(self, weight, concurrent_tasks, minweight=MINWEIGHT):
"""
Return an appropriate maxweight for use in the block_splitter
"""
totweight = self.get_weight(weight)
ct = concurrent_tasks or 1
mw = math.ceil(totweight / ct)
return max(mw, minweight)
[docs] def add_infos(self, sources):
"""
Populate the .infos dictionary src_id -> <SourceInfo>
"""
gsims = self.gsim_lt.values
for src in sources:
info = SourceInfo(src)
self.infos[info.source_id] = info
trt = src.tectonic_region_type
src.ngsims = len(gsims[trt])
src.ndists = contexts.get_num_distances(gsims[trt])
[docs] def get_floating_spinning_factors(self):
"""
:returns: (floating rupture factor, spinning rupture factor)
"""
data = []
for src in self.get_sources():
if hasattr(src, 'hypocenter_distribution'):
data.append(
(len(src.hypocenter_distribution.data),
len(src.nodal_plane_distribution.data)))
if not data:
return numpy.array([1, 1])
return numpy.array(data).mean(axis=0)
def __repr__(self):
"""
Return a string representation of the composite model
"""
models = ['%d-%s-%s,w=%s [%d src_group(s)]' % (
sm.ordinal, sm.name, '_'.join(sm.path), sm.weight,
len(sm.src_groups)) for sm in self.source_models]
return '<%s\n%s>' % (self.__class__.__name__, '\n'.join(models))
def __getitem__(self, i):
"""Return the i-th source model"""
return self.source_models[i]
def __iter__(self):
"""Return an iterator over the underlying source models"""
return iter(self.source_models)
def __len__(self):
"""Return the number of underlying source models"""
return len(self.source_models)
# ########################## SourceManager ########################### #
[docs]class SourceInfo(object):
dt = numpy.dtype([
('source_id', (bytes, 100)), # 0
('source_class', (bytes, 30)), # 1
('num_ruptures', numpy.uint32), # 2
('calc_time', numpy.float32), # 3
('split_time', numpy.float32), # 4
('num_sites', numpy.float32), # 5
('num_split', numpy.uint32), # 6
('events', numpy.uint32), # 7
])
def __init__(self, src, calc_time=0, split_time=0, num_split=0):
self.source_id = src.source_id.rsplit(':', 1)[0]
self.source_class = src.__class__.__name__
self.num_ruptures = src.num_ruptures
self.num_sites = 0 # set later on
self.calc_time = calc_time
self.split_time = split_time
self.num_split = num_split
self.events = 0 # set in event based
def __repr__(self):
return '<%s>' % ' '.join('%s=%s' % (name, getattr(self, name))
for name in self.dt.names)