# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2019, GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import os.path
import pickle
import operator
import logging
import zlib
import numpy
from openquake.baselib import parallel, general, hdf5, python3compat
from openquake.hazardlib import nrml, sourceconverter, InvalidFile, calc
from openquake.hazardlib.source.multi_fault import save
from openquake.hazardlib.valid import basename, fragmentno
from openquake.hazardlib.lt import apply_uncertainties
U16 = numpy.uint16
TWO16 = 2 ** 16 # 65,536
TWO24 = 2 ** 24 # 16,777,216
TWO30 = 2 ** 30 # 1,073,741,24
TWO32 = 2 ** 32 # 4,294,967,296
bybranch = operator.attrgetter('branch')
CALC_TIME, NUM_SITES, NUM_RUPTURES, WEIGHT, MUTEX = 3, 4, 5, 6, 7
source_info_dt = numpy.dtype([
('source_id', hdf5.vstr), # 0
('grp_id', numpy.uint16), # 1
('code', (numpy.bytes_, 1)), # 2
('calc_time', numpy.float32), # 3
('num_sites', numpy.uint32), # 4
('num_ruptures', numpy.uint32), # 5
('weight', numpy.float32), # 6
('mutex_weight', numpy.float64), # 7
('trti', numpy.uint8), # 8
])
checksum = operator.attrgetter('checksum')
[docs]def check_unique(ids, msg='', strict=True):
"""
Raise a DuplicatedID exception if there are duplicated IDs
"""
if isinstance(ids, dict): # ids by key
all_ids = sum(ids.values(), [])
unique, counts = numpy.unique(all_ids, return_counts=True)
for dupl in unique[counts > 1]:
keys = [k for k in ids if dupl in ids[k]]
if keys:
errmsg = '%r appears in %s %s' % (dupl, keys, msg)
if strict:
raise nrml.DuplicatedID(errmsg)
else:
logging.info('*' * 60 + ' DuplicatedID:\n' + errmsg)
return
unique, counts = numpy.unique(ids, return_counts=True)
for u, c in zip(unique, counts):
if c > 1:
errmsg = '%r appears %d times %s' % (u, c, msg)
if strict:
raise nrml.DuplicatedID(errmsg)
else:
logging.info('*' * 60 + ' DuplicatedID:\n' + errmsg)
[docs]def zpik(obj):
"""
zip and pickle a python object
"""
gz = zlib.compress(pickle.dumps(obj, pickle.HIGHEST_PROTOCOL))
return numpy.frombuffer(gz, numpy.uint8)
[docs]def mutex_by_grp(src_groups):
"""
:returns: a composite array with boolean fields src_mutex, rup_mutex
"""
lst = []
for sg in src_groups:
lst.append((sg.src_interdep == 'mutex', sg.rup_interdep == 'mutex'))
return numpy.array(lst, [('src_mutex', bool), ('rup_mutex', bool)])
[docs]def build_rup_mutex(src_groups):
"""
:returns: a composite array with fields (grp_id, src_id, rup_id, weight)
"""
lst = []
dtlist = [('grp_id', numpy.uint16), ('src_id', numpy.uint32),
('rup_id', numpy.int64), ('weight', numpy.float64)]
for sg in src_groups:
if sg.rup_interdep == 'mutex':
for src in sg:
for i, (rup, _) in enumerate(src.data):
lst.append((src.grp_id, src.id, i, rup.weight))
return numpy.array(lst, dtlist)
[docs]def create_source_info(csm, h5):
"""
Creates source_info, source_wkt, trt_smrs, toms
"""
data = {} # src_id -> row
wkts = []
lens = []
for srcid, srcs in general.groupby(
csm.get_sources(), basename).items():
src = srcs[0]
num_ruptures = sum(src.num_ruptures for src in srcs)
mutex = getattr(src, 'mutex_weight', 0)
trti = csm.full_lt.trti.get(src.tectonic_region_type, 0)
if src.code == b'p':
code = b'p'
else:
code = csm.code.get(srcid, b'P')
lens.append(len(src.trt_smrs))
row = [srcid, src.grp_id, code, 0, 0, num_ruptures,
src.weight, mutex, trti]
wkts.append(getattr(src, '_wkt', ''))
data[srcid] = row
logging.info('There are %d groups and %d sources with len(trt_smrs)=%.2f',
len(csm.src_groups), len(data), numpy.mean(lens))
csm.source_info = data # src_id -> row
num_srcs = len(csm.source_info)
# avoid hdf5 damned bug by creating source_info in advance
h5.create_dataset('source_info', (num_srcs,), source_info_dt)
h5['mutex_by_grp'] = mutex_by_grp(csm.src_groups)
h5['rup_mutex'] = build_rup_mutex(csm.src_groups)
h5['source_wkt'] = numpy.array(wkts, hdf5.vstr)
[docs]def trt_smrs(src):
return tuple(src.trt_smrs)
[docs]def read_source_model(fname, branch, converter, monitor):
"""
:param fname: path to a source model XML file
:param branch: source model logic tree branch ID
:param converter: SourceConverter
:param monitor: a Monitor instance
:returns: a SourceModel instance
"""
[sm] = nrml.read_source_models([fname], converter)
sm.branch = branch
return {fname: sm}
# NB: in classical this is called after reduce_sources, so ";" is not
# added if the same source appears multiple times, len(srcs) == 1
# in event based instead identical sources can appear multiple times
# but will have different seeds and produce different rupture sets
def _fix_dupl_ids(src_groups):
sources = general.AccumDict(accum=[])
for sg in src_groups:
for src in sg.sources:
sources[src.source_id].append(src)
for src_id, srcs in sources.items():
if len(srcs) > 1:
# happens in logictree/case_01/rup.ini
for i, src in enumerate(srcs):
src.source_id = '%s;%d' % (src.source_id, i)
[docs]def check_branchID(branchID):
"""
Forbids invalid characters .:; used in fragmentno
"""
if '.' in branchID:
raise InvalidFile('branchID %r contains an invalid "."' % branchID)
elif ':' in branchID:
raise InvalidFile('branchID %r contains an invalid ":"' % branchID)
elif ';' in branchID:
raise InvalidFile('branchID %r contains an invalid ";"' % branchID)
[docs]def check_duplicates(smdict, strict):
# check_duplicates in the same file
for sm in smdict.values():
srcids = []
for sg in sm.src_groups:
srcids.extend(src.source_id for src in sg)
if sg.src_interdep == 'mutex':
# mutex sources in the same group must have all the same
# basename, i.e. the colon convention must be used
basenames = set(map(basename, sg))
assert len(basenames) == 1, basenames
check_unique(srcids, 'in ' + sm.fname, strict)
# check duplicates in different files but in the same branch
# the problem was discovered in the DOM model
for branch, sms in general.groupby(smdict.values(), bybranch).items():
srcids = general.AccumDict(accum=[])
fnames = []
for sm in sms:
if isinstance(sm, nrml.GeometryModel):
# the section IDs are not checked since they not count
# as real sources
continue
for sg in sm.src_groups:
srcids[sm.fname].extend(src.source_id for src in sg)
fnames.append(sm.fname)
check_unique(srcids, 'in branch %s' % branch, strict=strict)
found = find_false_duplicates(smdict)
if found:
logging.warning('Found different sources with same ID %s',
general.shortlist(found))
[docs]def get_csm(oq, full_lt, dstore=None):
"""
Build source models from the logic tree and to store
them inside the `source_full_lt` dataset.
"""
converter = sourceconverter.SourceConverter(
oq.investigation_time, oq.rupture_mesh_spacing,
oq.complex_fault_mesh_spacing, oq.width_of_mfd_bin,
oq.area_source_discretization, oq.minimum_magnitude,
oq.source_id,
discard_trts=[s.strip() for s in oq.discard_trts.split(',')],
floating_x_step=oq.floating_x_step,
floating_y_step=oq.floating_y_step,
source_nodes=oq.source_nodes,
infer_occur_rates=oq.infer_occur_rates)
full_lt.ses_seed = oq.ses_seed
logging.info('Reading the source model(s) in parallel')
# NB: the source models file must be in the shared directory
# NB: dstore is None in logictree_test.py
allargs = []
sdata = full_lt.source_model_lt.source_data
allpaths = set(full_lt.source_model_lt.info.smpaths)
dic = general.group_array(sdata, 'fname')
smpaths = []
for fname, rows in dic.items():
path = os.path.abspath(
os.path.join(full_lt.source_model_lt.basepath, fname))
smpaths.append(path)
allargs.append((path, rows[0]['branch'], converter))
for path in allpaths - set(smpaths): # geometry models
allargs.append((path, '', converter))
smdict = parallel.Starmap(read_source_model, allargs,
h5=dstore if dstore else None).reduce()
smdict = {k: smdict[k] for k in sorted(smdict)}
parallel.Starmap.shutdown() # save memory
check_duplicates(smdict, strict=oq.disagg_by_src)
logging.info('Applying uncertainties')
groups = _build_groups(full_lt, smdict)
# checking the changes
changes = sum(sg.changes for sg in groups)
if changes:
logging.info('Applied {:_d} changes to the composite source model'.
format(changes))
is_event_based = oq.calculation_mode.startswith(('event_based', 'ebrisk'))
if oq.sites and len(oq.sites) == 1:
# disable wkt in single-site calculations to avoid excessive slowdown
set_wkt = False
else:
set_wkt = True
logging.info('Setting src._wkt')
csm = _get_csm(full_lt, groups, is_event_based, set_wkt)
out = []
probs = []
for sg in csm.src_groups:
if sg.src_interdep == 'mutex' and 'src_mutex' not in dstore:
segments = []
for src in sg:
segments.append(int(src.source_id.split(':')[1]))
t = (src.source_id, src.grp_id,
src.count_ruptures(),src.mutex_weight)
out.append(t)
probs.append((src.grp_id, sg.grp_probability))
assert len(segments) == len(set(segments)), segments
if out:
dtlist = [('src_id', hdf5.vstr), ('grp_id', int),
('num_ruptures', int), ('mutex_weight', float)]
dstore.create_dset('src_mutex', numpy.array(out, dtlist),
fillvalue=None)
lst = [('grp_id', int), ('probability', float)]
dstore.create_dset('grp_probability', numpy.array(probs, lst),
fillvalue=None)
# must be called *after* _fix_dupl_ids
fix_geometry_sections(smdict, csm, dstore)
return csm
[docs]def add_checksums(srcs):
"""
Build and attach a checksum to each source
"""
for src in srcs:
dic = {k: v for k, v in vars(src).items()
if k not in 'source_id trt_smr smweight samples branch'}
src.checksum = zlib.adler32(pickle.dumps(dic, protocol=4))
# called before _fix_dupl_ids
[docs]def find_false_duplicates(smdict):
"""
Discriminate different sources with same ID (false duplicates)
and put a question mark in their source ID
"""
acc = general.AccumDict(accum=[])
atomic = set()
for smodel in smdict.values():
for sgroup in smodel.src_groups:
for src in sgroup:
src.branch = smodel.branch
srcid = (src.source_id if sgroup.atomic
else basename(src))
acc[srcid].append(src)
if sgroup.atomic:
atomic.add(src.source_id)
found = []
for srcid, srcs in acc.items():
if len(srcs) > 1: # duplicated ID
if any(src.source_id in atomic for src in srcs):
raise RuntimeError('Sources in atomic groups cannot be '
'duplicated: %s', srcid)
if any(getattr(src, 'mutex_weight', 0) for src in srcs):
raise RuntimeError('Mutually exclusive sources cannot be '
'duplicated: %s', srcid)
add_checksums(srcs)
gb = general.AccumDict(accum=[])
for src in srcs:
gb[checksum(src)].append(src)
if len(gb) > 1:
for same_checksum in gb.values():
for src in same_checksum:
check_branchID(src.branch)
src.source_id += '!%s' % src.branch
found.append(srcid)
return found
[docs]def fix_geometry_sections(smdict, csm, dstore):
"""
If there are MultiFaultSources, fix the sections according to the
GeometryModels (if any).
"""
gmodels = []
gfiles = []
for fname, mod in smdict.items():
if isinstance(mod, nrml.GeometryModel):
gmodels.append(mod)
gfiles.append(fname)
# merge and reorder the sections
sec_ids = []
sections = {}
for gmod in gmodels:
sec_ids.extend(gmod.sections)
sections.update(gmod.sections)
check_unique(sec_ids, 'section ID in files ' + ' '.join(gfiles))
if sections:
# save in the temporary file
assert dstore, ('You forgot to pass the dstore to '
'get_composite_source_model')
mfsources = []
for sg in csm.src_groups:
for src in sg:
if src.code == b'F':
mfsources.append(src)
save(mfsources, sections, dstore.tempname)
def _groups_ids(smlt_dir, smdict, fnames):
# extract the source groups and ids from a sequence of source files
groups = []
for fname in fnames:
fullname = os.path.abspath(os.path.join(smlt_dir, fname))
groups.extend(smdict[fullname].src_groups)
return groups, set(src.source_id for grp in groups for src in grp)
def _build_groups(full_lt, smdict):
# build all the possible source groups from the full logic tree
smlt_file = full_lt.source_model_lt.filename
smlt_dir = os.path.dirname(smlt_file)
groups = []
frac = 1. / len(full_lt.sm_rlzs)
for rlz in full_lt.sm_rlzs:
src_groups, source_ids = _groups_ids(
smlt_dir, smdict, rlz.value[0].split())
bset_values = full_lt.source_model_lt.bset_values(rlz.lt_path)
while (bset_values and
bset_values[0][0].uncertainty_type == 'extendModel'):
(bset, value), *bset_values = bset_values
extra, extra_ids = _groups_ids(smlt_dir, smdict, value.split())
common = source_ids & extra_ids
if common:
raise InvalidFile(
'%s contains source(s) %s already present in %s' %
(value, common, rlz.value))
src_groups.extend(extra)
for src_group in src_groups:
sg = apply_uncertainties(bset_values, src_group)
full_lt.set_trt_smr(sg, smr=rlz.ordinal)
for src in sg:
# the smweight is used in event based sampling:
# see oq-risk-tests etna
src.smweight = rlz.weight if full_lt.num_samples else frac
if rlz.samples > 1:
src.samples = rlz.samples
groups.append(sg)
# check applyToSources
sm_branch = rlz.lt_path[0]
src_id = full_lt.source_model_lt.info.applytosources[sm_branch]
for srcid in src_id:
if srcid not in source_ids:
raise ValueError(
"The source %s is not in the source model,"
" please fix applyToSources in %s or the "
"source model(s) %s" % (srcid, smlt_file,
rlz.value[0].split()))
return groups
[docs]def reduce_sources(sources_with_same_id, full_lt):
"""
:param sources_with_same_id: a list of sources with the same source_id
:returns: a list of truly unique sources, ordered by trt_smr
"""
out = []
add_checksums(sources_with_same_id)
for srcs in general.groupby(sources_with_same_id, checksum).values():
# duplicate sources: same id, same checksum
src = srcs[0]
if len(srcs) > 1: # happens in logictree/case_07
src.trt_smr = tuple(s.trt_smr for s in srcs)
else:
src.trt_smr = src.trt_smr,
out.append(src)
out.sort(key=operator.attrgetter('trt_smr'))
return out
def _get_csm(full_lt, groups, event_based, set_wkt):
# 1. extract a single source from multiple sources with the same ID
# 2. regroup the sources in non-atomic groups by TRT
# 3. reorder the sources by source_id
atomic = []
acc = general.AccumDict(accum=[])
for grp in groups:
if grp and grp.atomic:
atomic.append(grp)
elif grp:
acc[grp.trt].extend(grp)
key = operator.attrgetter('source_id', 'code')
src_groups = []
for trt in acc:
lst = []
for srcs in general.groupby(acc[trt], key).values():
# NB: not reducing the sources in event based
if len(srcs) > 1 and not event_based:
srcs = reduce_sources(srcs, full_lt)
lst.extend(srcs)
for sources in general.groupby(lst, trt_smrs).values():
if set_wkt:
# set ._wkt attribute (for later storage in the source_wkt
# dataset); this is slow
msg = ('src "{}" has {:_d} underlying meshes with a total '
'of {:_d} points!')
for src in [s for s in sources if s.code == b'N']:
# check on NonParametricSources
mesh_size = getattr(src, 'mesh_size', 0)
if mesh_size > 1E6:
logging.warning(msg.format(
src.source_id, src.count_ruptures(), mesh_size))
src._wkt = src.wkt()
src_groups.append(sourceconverter.SourceGroup(trt, sources))
if set_wkt:
for ag in atomic:
for src in ag:
src._wkt = src.wkt()
src_groups.extend(atomic)
_fix_dupl_ids(src_groups)
# optionally sample the sources
ss = os.environ.get('OQ_SAMPLE_SOURCES')
for sg in src_groups:
sg.sources.sort(key=operator.attrgetter('source_id'))
if ss:
srcs = []
for src in sg:
srcs.extend(calc.filters.split_source(src))
sg.sources = general.random_filter(srcs, float(ss)) or [srcs[0]]
return CompositeSourceModel(full_lt, src_groups)
[docs]class CompositeSourceModel:
"""
:param full_lt:
a :class:`FullLogicTree` instance
:param src_groups:
a list of SourceGroups
:param event_based:
a flag True for event based calculations, flag otherwise
"""
def __init__(self, full_lt, src_groups):
self.src_groups = src_groups
self.init(full_lt)
[docs] def init(self, full_lt):
self.full_lt = full_lt
self.gsim_lt = full_lt.gsim_lt
self.source_model_lt = full_lt.source_model_lt
self.sm_rlzs = full_lt.sm_rlzs
# initialize the code dictionary
self.code = {} # srcid -> code
for grp_id, sg in enumerate(self.src_groups):
assert len(sg) # sanity check
for src in sg:
src.grp_id = grp_id
if src.code != b'P':
source_id = basename(src)
self.code[source_id] = src.code
[docs] def get_trt_smrs(self):
"""
:returns: an array of trt_smrs (to be stored as an hdf5.vuint32 array)
"""
keys = [sg.sources[0].trt_smrs for sg in self.src_groups]
assert len(keys) < TWO16, len(keys)
return [numpy.array(trt_smrs, numpy.uint32) for trt_smrs in keys]
[docs] def get_sources(self, atomic=None):
"""
There are 3 options:
atomic == None => return all the sources (default)
atomic == True => return all the sources in atomic groups
atomic == True => return all the sources not in atomic groups
"""
srcs = []
for src_group in self.src_groups:
if atomic is None: # get all sources
srcs.extend(src_group)
elif atomic == src_group.atomic:
srcs.extend(src_group)
return srcs
[docs] def get_basenames(self):
"""
:returns: a sorted list of source names stripped of the suffixes
"""
sources = set()
for src in self.get_sources():
sources.add(basename(src, ';:.').split('!')[0])
return sorted(sources)
[docs] def get_mags_by_trt(self, maximum_distance):
"""
:param maximum_distance: dictionary trt -> magdist interpolator
:returns: a dictionary trt -> magnitudes in the sources as strings
"""
mags = general.AccumDict(accum=set()) # trt -> mags
for sg in self.src_groups:
for src in sg:
mags[sg.trt].update(src.get_magstrs())
out = {}
for trt in mags:
minmag = maximum_distance(trt).x[0]
out[trt] = sorted(m for m in mags[trt] if float(m) >= minmag)
return out
[docs] def get_floating_spinning_factors(self):
"""
:returns: (floating rupture factor, spinning rupture factor)
"""
data = []
for sg in self.src_groups:
for src in sg:
if hasattr(src, 'hypocenter_distribution'):
data.append(
(len(src.hypocenter_distribution.data),
len(src.nodal_plane_distribution.data)))
if not data:
return numpy.array([1, 1])
return numpy.array(data).mean(axis=0)
[docs] def update_source_info(self, source_data):
"""
Update (eff_ruptures, num_sites, calc_time) inside the source_info
"""
assert len(source_data) < TWO24, len(source_data)
for src_id, nsites, weight, ctimes in python3compat.zip(
source_data['src_id'], source_data['nsites'],
source_data['weight'], source_data['ctimes']):
baseid = basename(src_id)
row = self.source_info[baseid]
row[CALC_TIME] += ctimes
row[WEIGHT] += weight
row[NUM_SITES] += nsites
[docs] def count_ruptures(self):
"""
Call src.count_ruptures() on each source. Slow.
"""
n = 0
for src in self.get_sources():
n += src.count_ruptures()
return n
[docs] def fix_src_offset(self):
"""
Set the src.offset field for each source
"""
src_id = 0
for srcs in general.groupby(self.get_sources(), basename).values():
offset = 0
if len(srcs) > 1: # order by split number
srcs.sort(key=fragmentno)
for src in srcs:
src.id = src_id
src.offset = offset
if not src.num_ruptures:
src.num_ruptures = src.count_ruptures()
offset += src.num_ruptures
if src.num_ruptures >= TWO30:
raise ValueError(
'%s contains more than 2**30 ruptures' % src)
# print(src, src.offset, offset)
src_id += 1
[docs] def get_max_weight(self, oq): # used in preclassical
"""
:param oq: an OqParam instance
:returns: total weight and max weight of the sources
"""
srcs = self.get_sources()
tot_weight = 0
nr = 0
for src in srcs:
nr += src.num_ruptures
tot_weight += src.weight
if src.code == b'C' and src.num_ruptures > 20_000:
msg = ('{} is suspiciously large, containing {:_d} '
'ruptures with complex_fault_mesh_spacing={} km')
spc = oq.complex_fault_mesh_spacing
logging.info(msg.format(src, src.num_ruptures, spc))
assert tot_weight
max_weight = tot_weight / (oq.concurrent_tasks or 1)
logging.info('tot_weight={:_d}, max_weight={:_d}, num_sources={:_d}'.
format(int(tot_weight), int(max_weight), len(srcs)))
heavy = [src for src in srcs if src.weight > max_weight]
for src in sorted(heavy, key=lambda s: s.weight, reverse=True):
logging.info('%s', src)
if not heavy:
maxsrc = max(srcs, key=lambda s: s.weight)
logging.info('Heaviest: %s', maxsrc)
return max_weight
def __toh5__(self):
G = len(self.src_groups)
arr = numpy.zeros(G + 1, hdf5.vuint8)
for grp_id, grp in enumerate(self.src_groups):
arr[grp_id] = zpik(grp)
arr[G] = zpik(self.source_info)
size = sum(len(val) for val in arr)
logging.info(f'Storing {general.humansize(size)} '
'of CompositeSourceModel')
return arr, {}
# tested in case_36
def __fromh5__(self, arr, attrs):
objs = [pickle.loads(zlib.decompress(a.tobytes())) for a in arr]
self.src_groups = objs[:-1]
self.source_info = objs[-1]
def __repr__(self):
"""
Return a string representation of the composite model
"""
contents = []
for sg in self.src_groups:
arr = numpy.array(_strip_colons(sg))
line = f'grp_id={sg.sources[0].grp_id} {arr}'
contents.append(line)
return '<%s\n%s>' % (self.__class__.__name__, '\n'.join(contents))
def _strip_colons(sources):
ids = set()
for src in sources:
if ':' in src.source_id:
ids.add(src.source_id.split(':')[0])
else:
ids.add(src.source_id)
return sorted(ids)