Source code for openquake.commonlib.source_reader

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2019, GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake.  If not, see <http://www.gnu.org/licenses/>.

import re
import copy
import os.path
import pickle
import operator
import logging
import collections
import gzip
import zlib
import numpy

from openquake.baselib import parallel, general, hdf5
from openquake.hazardlib import nrml, sourceconverter, InvalidFile, tom
from openquake.hazardlib.contexts import ContextMaker, basename
from openquake.hazardlib.calc.filters import magstr
from openquake.hazardlib.lt import apply_uncertainties
from openquake.hazardlib.geo.surface.kite_fault import kite_to_geom

TWO16 = 2 ** 16  # 65,536
TWO32 = 2 ** 32  # 4,294,967,296
by_id = operator.attrgetter('source_id')

CALC_TIME, NUM_SITES, NUM_RUPTURES, WEIGHT, MUTEX = 3, 4, 5, 6, 7

source_info_dt = numpy.dtype([
    ('source_id', hdf5.vstr),          # 0
    ('grp_id', numpy.uint16),          # 1
    ('code', (numpy.string_, 1)),      # 2
    ('calc_time', numpy.float32),      # 3
    ('num_sites', numpy.uint32),       # 4
    ('num_ruptures', numpy.uint32),    # 5
    ('weight', numpy.float32),         # 6
    ('mutex_weight', numpy.float64),   # 7
    ('trti', numpy.uint8),             # 8
])

[docs]def gzpik(obj): """ gzip and pickle a python object """ gz = gzip.compress(pickle.dumps(obj, pickle.HIGHEST_PROTOCOL)) return numpy.frombuffer(gz, numpy.uint8)
[docs]def fragmentno(src): "Postfix after :.; as an integer" fragment = re.split('[:.;]', src.source_id, 1)[1] return int(fragment.replace('.', '').replace(';', ''))
[docs]def mutex_by_grp(src_groups): """ :returns: a composite array with boolean fields src_mutex, rup_mutex """ lst = [] for sg in src_groups: lst.append((sg.src_interdep == 'mutex', sg.rup_interdep == 'mutex')) return numpy.array(lst, [('src_mutex', bool), ('rup_mutex', bool)])
[docs]def create_source_info(csm, h5): """ Creates source_info, source_wkt, trt_smrs, toms """ data = {} # src_id -> row wkts = [] lens = [] for srcid, srcs in general.groupby( csm.get_sources(), basename).items(): src = srcs[0] num_ruptures = sum(src.num_ruptures for src in srcs) mutex = getattr(src, 'mutex_weight', 0) trti = csm.full_lt.trti.get(src.tectonic_region_type, -1) if src.code == b'p': code = b'p' else: code = csm.code.get(srcid, b'P') lens.append(len(src.trt_smrs)) row = [srcid, src.grp_id, code, 0, 0, num_ruptures, src.weight, mutex, trti] wkts.append(getattr(src, '_wkt', '')) data[srcid] = row srcid = len(data) - 1 for src in srcs: src.id = srcid logging.info('There are %d groups and %d sources with len(trt_smrs)=%.2f', len(csm.src_groups), len(data), numpy.mean(lens)) csm.source_info = data # src_id -> row num_srcs = len(csm.source_info) # avoid hdf5 damned bug by creating source_info in advance h5.create_dataset('source_info', (num_srcs,), source_info_dt) h5['mutex_by_grp'] = mutex_by_grp(csm.src_groups) h5['source_wkt'] = numpy.array(wkts, hdf5.vstr)
[docs]def trt_smrs(src): return tuple(src.trt_smrs)
[docs]def read_source_model(fname, converter, monitor): """ :param fname: path to a source model XML file :param converter: SourceConverter :param monitor: a Monitor instance :returns: a SourceModel instance """ [sm] = nrml.read_source_models([fname], converter) return {fname: sm}
# NB: called after the .checksum has been stored in reduce_sources def _check_dupl_ids(src_groups): sources = general.AccumDict(accum=[]) for sg in src_groups: for src in sg.sources: sources[src.source_id].append(src) first = True for src_id, srcs in sources.items(): if len(srcs) > 1: # duplicate IDs with different checksums, see cases 11, 13, 20 for i, src in enumerate(srcs): src.source_id = '%s;%d' % (src.source_id, i) if first: logging.info('There are multiple different sources with ' 'the same ID %s', srcs) first = False
[docs]def get_csm(oq, full_lt, h5=None): """ Build source models from the logic tree and to store them inside the `source_full_lt` dataset. """ converter = sourceconverter.SourceConverter( oq.investigation_time, oq.rupture_mesh_spacing, oq.complex_fault_mesh_spacing, oq.width_of_mfd_bin, oq.area_source_discretization, oq.minimum_magnitude, oq.source_id, discard_trts=[s.strip() for s in oq.discard_trts.split(',')], floating_x_step=oq.floating_x_step, floating_y_step=oq.floating_y_step, source_nodes=oq.source_nodes) full_lt.ses_seed = oq.ses_seed logging.info('Reading the source model(s) in parallel') # NB: the source models file are often NOT in the shared directory # (for instance in oq-engine/demos) so the processpool must be used dist = ('no' if os.environ.get('OQ_DISTRIBUTE') == 'no' else 'processpool') # NB: h5 is None in logictree_test.py allargs = [] for fname in full_lt.source_model_lt.info.smpaths: allargs.append((fname, converter)) smdict = parallel.Starmap(read_source_model, allargs, distribute=dist, h5=h5 if h5 else None).reduce() parallel.Starmap.shutdown() # save memory fix_geometry_sections(smdict, h5) logging.info('Applying uncertainties') groups = _build_groups(full_lt, smdict) # checking the changes changes = sum(sg.changes for sg in groups) if changes: logging.info('Applied {:_d} changes to the composite source model'. format(changes)) return _get_csm(full_lt, groups, oq.calculation_mode.startswith( ('event_based', 'ebrisk')))
[docs]def fix_geometry_sections(smdict, h5): """ If there are MultiFaultSources, fix the sections according to the GeometryModels (if any). """ gmodels = [] smodels = [] gfiles = [] for fname, mod in smdict.items(): if isinstance(mod, nrml.GeometryModel): gmodels.append(mod) gfiles.append(fname) elif isinstance(mod, nrml.SourceModel): smodels.append(mod) else: raise RuntimeError('Unknown model %s' % mod) # merge and reorder the sections sec_ids = [] sections = {} for gmod in gmodels: sec_ids.extend(gmod.sections) sections.update(gmod.sections) nrml.check_unique( sec_ids, 'section ID in files ' + ' '.join(gfiles)) s2i = {suid: i for i, suid in enumerate(sections)} for idx, sec in enumerate(sections.values()): sec.suid = idx if h5 and sections: h5.save_vlen('multi_fault_sections', [kite_to_geom(sec) for sec in sections.values()]) # fix the MultiFaultSources section_idxs = [] for smod in smodels: for sg in smod.src_groups: for src in sg: if hasattr(src, 'set_sections'): if not sections: raise RuntimeError('Missing geometryModel files!') if h5: src.hdf5path = h5.filename src.rupture_idxs = [tuple(s2i[idx] for idx in idxs) for idxs in src.rupture_idxs] for idxs in src.rupture_idxs: section_idxs.extend(idxs) cnt = collections.Counter(section_idxs) if cnt: mean_counts = numpy.mean(list(cnt.values())) logging.info('Section multiplicity = %.1f', mean_counts)
def _groups_ids(smlt_dir, smdict, fnames): # extract the source groups and ids from a sequence of source files groups = [] for fname in fnames: fullname = os.path.abspath(os.path.join(smlt_dir, fname)) groups.extend(smdict[fullname].src_groups) return groups, set(src.source_id for grp in groups for src in grp) def _build_groups(full_lt, smdict): # build all the possible source groups from the full logic tree smlt_file = full_lt.source_model_lt.filename smlt_dir = os.path.dirname(smlt_file) groups = [] frac = 1. / len(full_lt.sm_rlzs) for rlz in full_lt.sm_rlzs: src_groups, source_ids = _groups_ids( smlt_dir, smdict, rlz.value[0].split()) bset_values = full_lt.source_model_lt.bset_values(rlz.lt_path) if bset_values and bset_values[0][0].uncertainty_type == 'extendModel': while len(bset_values): (bset, value), *bset_values = bset_values extra, extra_ids = _groups_ids(smlt_dir, smdict, value.split()) common = source_ids & extra_ids if common: raise InvalidFile( '%s contains source(s) %s already present in %s' % (value, common, rlz.value)) src_groups.extend(extra) for src_group in src_groups: trt_smr = full_lt.get_trt_smr(src_group.trt, rlz.ordinal) sg = apply_uncertainties(bset_values, src_group) for src in sg: src.trt_smr = trt_smr # the smweight is used in event based sampling: # see oq-risk-tests etna src.smweight = rlz.weight if full_lt.num_samples else frac if rlz.samples > 1: src.samples = rlz.samples groups.append(sg) # check applyToSources sm_branch = rlz.lt_path[0] src_id = full_lt.source_model_lt.info.applytosources[sm_branch] for srcid in src_id: if srcid not in source_ids: raise ValueError( "The source %s is not in the source model," " please fix applyToSources in %s or the " "source model(s) %s" % (srcid, smlt_file, rlz.value[0].split())) return groups
[docs]def reduce_sources(sources_with_same_id): """ :param sources_with_same_id: a list of sources with the same source_id :returns: a list of truly unique sources, ordered by trt_smr """ out = [] for src in sources_with_same_id: dic = {k: v for k, v in vars(src).items() if k not in 'source_id trt_smr smweight samples'} src.checksum = zlib.adler32(pickle.dumps(dic, protocol=4)) for srcs in general.groupby( sources_with_same_id, operator.attrgetter('checksum')).values(): # duplicate sources: same id, same checksum src = srcs[0] if len(srcs) > 1: # happens in classical/case_20 src.trt_smr = tuple(s.trt_smr for s in srcs) else: src.trt_smr = src.trt_smr, out.append(src) out.sort(key=operator.attrgetter('trt_smr')) return out
def _get_csm(full_lt, groups, event_based): # 1. extract a single source from multiple sources with the same ID # 2. regroup the sources in non-atomic groups by TRT # 3. reorder the sources by source_id atomic = [] acc = general.AccumDict(accum=[]) for grp in groups: if grp and grp.atomic: atomic.append(grp) elif grp: acc[grp.trt].extend(grp) key = operator.attrgetter('source_id', 'code') src_groups = [] for trt in acc: lst = [] for srcs in general.groupby(acc[trt], key).values(): # NB: not reducing the sources in event based if len(srcs) > 1 and not event_based: srcs = reduce_sources(srcs) lst.extend(srcs) for sources in general.groupby(lst, trt_smrs).values(): # set ._wkt attribute (for later storage in the source_wkt dataset) for src in sources: # check on MultiFaultSources and NonParametricSources mesh_size = getattr(src, 'mesh_size', 0) if mesh_size > 1E6: msg = ('src "{}" has {:_d} underlying meshes with a total ' 'of {:_d} points!').format( src.source_id, src.count_ruptures(), mesh_size) logging.warning(msg) src._wkt = src.wkt() src_groups.append(sourceconverter.SourceGroup(trt, sources)) for ag in atomic: for src in ag: src._wkt = src.wkt() src_groups.extend(atomic) _check_dupl_ids(src_groups) for sg in src_groups: sg.sources.sort(key=operator.attrgetter('source_id')) return CompositeSourceModel(full_lt, src_groups)
[docs]class CompositeSourceModel: """ :param full_lt: a :class:`FullLogicTree` instance :param src_groups: a list of SourceGroups :param event_based: a flag True for event based calculations, flag otherwise """ def __init__(self, full_lt, src_groups): self.gsim_lt = full_lt.gsim_lt self.source_model_lt = full_lt.source_model_lt self.sm_rlzs = full_lt.sm_rlzs self.full_lt = full_lt self.src_groups = src_groups self.code = {} # srcid -> code for grp_id, sg in enumerate(src_groups): assert len(sg) # sanity check for src in sg: src.grp_id = grp_id if src.code != b'P': source_id = basename(src) self.code[source_id] = src.code # used for debugging; assume PoissonTOM; use read_cmakers instead def _get_cmakers(self, oq): cmakers = [] trt_smrs = self.get_trt_smrs() rlzs_by_gsim_list = self.full_lt.get_rlzs_by_gsim_list(trt_smrs) trts = list(self.full_lt.gsim_lt.values) num_eff_rlzs = len(self.full_lt.sm_rlzs) start = 0 for grp_id, rlzs_by_gsim in enumerate(rlzs_by_gsim_list): trti = trt_smrs[grp_id][0] // num_eff_rlzs cmaker = ContextMaker(trts[trti], rlzs_by_gsim, oq) cmaker.tom = tom.PoissonTOM(oq.investigation_time) cmaker.trti = trti cmaker.gidx = numpy.arange(start, start + len(rlzs_by_gsim)) cmaker.grp_id = grp_id start += len(rlzs_by_gsim) cmakers.append(cmaker) return cmakers
[docs] def get_trt_smrs(self): """ :returns: an array of trt_smrs (to be stored as an hdf5.vuint32 array) """ keys = [sg.sources[0].trt_smrs for sg in self.src_groups] assert len(keys) < TWO16, len(keys) return [numpy.array(trt_smrs, numpy.uint32) for trt_smrs in keys]
[docs] def get_sources(self, atomic=None): """ There are 3 options: atomic == None => return all the sources (default) atomic == True => return all the sources in atomic groups atomic == True => return all the sources not in atomic groups """ srcs = [] for src_group in self.src_groups: if atomic is None: # get all sources srcs.extend(src_group) elif atomic == src_group.atomic: srcs.extend(src_group) return srcs
# used only in calc_by_rlz.py
[docs] def get_groups(self, smr): """ :param smr: effective source model realization ID :returns: SourceGroups associated to the given `smr` """ src_groups = [] for sg in self.src_groups: trt_smr = self.full_lt.get_trt_smr(sg.trt, smr) src_group = copy.copy(sg) src_group.sources = [src for src in sg if trt_smr in src.trt_smrs] if len(src_group): src_groups.append(src_group) return src_groups
[docs] def get_mags_by_trt(self): """ :returns: a dictionary trt -> magnitudes in the sources as strings """ mags = general.AccumDict(accum=set()) # trt -> mags for sg in self.src_groups: for src in sg: if hasattr(src, 'mags'): # MultiFaultSource srcmags = {magstr(mag) for mag in src.mags} elif hasattr(src, 'data'): # nonparametric srcmags = {magstr(item[0].mag) for item in src.data} else: srcmags = {magstr(item[0]) for item in src.get_annual_occurrence_rates()} mags[sg.trt].update(srcmags) return {trt: sorted(mags[trt]) for trt in mags}
[docs] def get_floating_spinning_factors(self): """ :returns: (floating rupture factor, spinning rupture factor) """ data = [] for sg in self.src_groups: for src in sg: if hasattr(src, 'hypocenter_distribution'): data.append( (len(src.hypocenter_distribution.data), len(src.nodal_plane_distribution.data))) if not data: return numpy.array([1, 1]) return numpy.array(data).mean(axis=0)
[docs] def update_source_info(self, source_data): """ Update (eff_ruptures, num_sites, calc_time) inside the source_info """ assert len(source_data) < TWO32, len(source_data) for src_id, grp_id, nsites, weight, ctimes in zip( source_data['src_id'], source_data['grp_id'], source_data['nsites'], source_data['weight'], source_data['ctimes']): baseid = basename(src_id) row = self.source_info[baseid] row[CALC_TIME] += ctimes row[WEIGHT] += weight row[NUM_SITES] += nsites
[docs] def count_ruptures(self): """ Call src.count_ruptures() on each source. Slow. """ n = 0 for src in self.get_sources(): n += src.count_ruptures() return n
[docs] def fix_src_offset(self): """ Set the src.offset field for each source """ for srcs in general.groupby(self.get_sources(), basename).values(): offset = 0 if len(srcs) > 1: # order by split number srcs.sort(key=fragmentno) for src in srcs: src.offset = offset offset += src.num_ruptures if src.num_ruptures >= TWO32: raise ValueError( '%s contains more than 2**32 ruptures' % src)
# print(src, src.offset, offset)
[docs] def get_max_weight(self, oq): # used in preclassical """ :param oq: an OqParam instance :returns: total weight and max weight of the sources """ srcs = self.get_sources() tot_weight = 0 nr = 0 for src in srcs: nr += src.num_ruptures tot_weight += src.weight if src.code == b'C' and src.num_ruptures > 20_000: msg = ('{} is suspiciously large, containing {:_d} ' 'ruptures with complex_fault_mesh_spacing={} km') spc = oq.complex_fault_mesh_spacing logging.info(msg.format(src, src.num_ruptures, spc)) assert tot_weight max_weight = tot_weight / (oq.concurrent_tasks or 1) logging.info('tot_weight={:_d}, max_weight={:_d}, num_sources={:_d}'. format(int(tot_weight), int(max_weight), len(srcs))) heavy = [src for src in srcs if src.weight > max_weight] for src in sorted(heavy, key=lambda s: s.weight, reverse=True): logging.info('%s', src) if not heavy: maxsrc = max(srcs, key=lambda s: s.weight) logging.info('Heaviest: %s', maxsrc) return max_weight
def __toh5__(self): G = len(self.src_groups) arr = numpy.zeros(G + 1, hdf5.vuint8) for grp_id, grp in enumerate(self.src_groups): arr[grp_id] = gzpik(grp) arr[G] = gzpik(self.source_info) size = sum(len(val) for val in arr) logging.info(f'Storing {general.humansize(size)} ' 'of CompositeSourceModel') return arr, {} # tested in case_36 def __fromh5__(self, arr, attrs): objs = [pickle.loads(gzip.decompress(a.tobytes())) for a in arr] self.src_groups = objs[:-1] self.source_info = objs[-1] def __repr__(self): """ Return a string representation of the composite model """ contents = [] for sg in self.src_groups: arr = numpy.array(_strip_colons(sg)) line = f'grp_id={sg.sources[0].grp_id} {arr}' contents.append(line) return '<%s\n%s>' % (self.__class__.__name__, '\n'.join(contents))
def _strip_colons(sources): ids = set() for src in sources: if ':' in src.source_id: ids.add(src.source_id.split(':')[0]) else: ids.add(src.source_id) return sorted(ids)