Source code for openquake.hazardlib.source_reader

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2019, GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake.  If not, see <http://www.gnu.org/licenses/>.

import time
import zlib
import os.path
import pickle
import operator
import logging
import numpy

from openquake.baselib import parallel, general, hdf5
from openquake.hazardlib import nrml, sourceconverter, InvalidFile, calc
from openquake.hazardlib.source_group import CompositeSourceModel
from openquake.hazardlib.source.multi_fault import save_and_split
from openquake.hazardlib.lt import apply_uncertainties
from openquake.hazardlib.valid import basename

bybranch = operator.attrgetter('branch')
checksum = operator.attrgetter('checksum')
source_info_dt = numpy.dtype([
    ('source_id', hdf5.vstr),          # 0
    ('grp_id', numpy.uint16),          # 1
    ('code', (numpy.bytes_, 1)),       # 2
    ('calc_time', numpy.float32),      # 3
    ('num_sites', numpy.uint64),       # 4
    ('num_ruptures', numpy.uint32),    # 5
    ('weight', numpy.float32),         # 6
    ('mutex_weight', numpy.float64),   # 7
    ('trti', numpy.uint8),             # 8
])


[docs]def check_unique(ids, msg='', strict=True): """ Raise a DuplicatedID exception if there are duplicated IDs """ if isinstance(ids, dict): # ids by key all_ids = sum(ids.values(), []) unique, counts = numpy.unique(all_ids, return_counts=True) for dupl in unique[counts > 1]: keys = [k for k in ids if dupl in ids[k]] if keys: errmsg = '%r appears in %s %s' % (dupl, keys, msg) if strict: raise nrml.DuplicatedID(errmsg) else: logging.info('*' * 60 + ' DuplicatedID:\n' + errmsg) return unique, counts = numpy.unique(ids, return_counts=True) for u, c in zip(unique, counts): if c > 1: errmsg = '%r appears %d times %s' % (u, c, msg) if strict: raise nrml.DuplicatedID(errmsg) else: logging.info('*' * 60 + ' DuplicatedID:\n' + errmsg)
[docs]def create_source_info(csm, h5): """ Creates source_info, trt_smrs, toms """ csm.source_info = csm.get_source_info() num_srcs = len(csm.source_info) # avoid hdf5 damned bug by creating source_info in advance h5.create_dataset('source_info', (num_srcs,), source_info_dt)
[docs]def trt_smrs(src): return tuple(src.trt_smrs)
def _sample(srcs, sample, applied): out = [src for src in srcs if src.source_id in applied] rand = general.random_filter(srcs, sample) return (out + rand) or [srcs[0]]
[docs]def read_source_model(fname, branch, converter, applied, sample, monitor): """ :param fname: path to a source model XML file :param branch: source model logic tree branch ID :param converter: SourceConverter :param applied: list of source IDs within applyToSources :param sample: a string with the sampling factor (if any) :param monitor: a Monitor instance :returns: a SourceModel instance """ t0 = time.time() [sm] = nrml.read_source_models([fname], converter) sm.branch = branch for sg in sm.src_groups: if sample and not sg.atomic: srcs = [] for src in sg: if src.source_id in applied: srcs.append(src) else: srcs.extend(calc.filters.split_source(src)) sg.sources = _sample(srcs, float(sample), applied) sm.rtime = time.time() - t0 # save the read time return {fname: sm}
# NB: in classical this is called after reduce_sources, so ";" is not # added if the same source appears multiple times, len(srcs) == 1 # in event based instead identical sources can appear multiple times # but will have different seeds and produce different rupture sets def _fix_dupl_ids(src_groups): sources = general.AccumDict(accum=[]) for sg in src_groups: for src in sg.sources: sources[src.source_id].append(src) for src_id, srcs in sources.items(): if len(srcs) > 1: # happens in logictree/case_01/rup.ini for i, src in enumerate(srcs): src.source_id = '%s;%d' % (src.source_id, i)
[docs]def check_branchID(branchID): """ Forbids invalid characters .:; used in fragmentno """ if '.' in branchID: raise InvalidFile('branchID %r contains an invalid "."' % branchID) elif ':' in branchID: raise InvalidFile('branchID %r contains an invalid ":"' % branchID) elif ';' in branchID: raise InvalidFile('branchID %r contains an invalid ";"' % branchID)
[docs]def check_duplicates(smdict, strict): # check_duplicates in the same file for sm in smdict.values(): srcids = [] for sg in sm.src_groups: srcids.extend(src.source_id for src in sg) if sg.src_interdep == 'mutex': # mutex sources in the same group must have all the same # basename, i.e. the colon convention must be used basenames = set(map(basename, sg)) assert len(basenames) == 1, basenames check_unique(srcids, 'in ' + sm.fname, strict) # check duplicates in different files but in the same branch # the problem was discovered in the DOM model for branch, sms in general.groupby(smdict.values(), bybranch).items(): srcids = general.AccumDict(accum=[]) fnames = [] for sm in sms: if isinstance(sm, nrml.GeometryModel): # the section IDs are not checked since they not count # as real sources continue for sg in sm.src_groups: srcids[sm.fname].extend(src.source_id for src in sg) fnames.append(sm.fname) check_unique(srcids, 'in branch %s' % branch, strict=strict) found = find_false_duplicates(smdict) if found: logging.warning('Found different sources with same ID %s', general.shortlist(found))
[docs]def save_read_times(dstore, source_models): """ Store how many seconds it took to read each source model file in a table (fname, rtime) """ dt = [('fname', hdf5.vstr), ('rtime', float)] arr = numpy.array([(sm.fname, sm.rtime) for sm in source_models], dt) dstore.create_dset('source_model_read_times', arr)
[docs]def get_csm(oq, full_lt, dstore=None): """ Build source models from the logic tree and to store them inside the `source_full_lt` dataset. """ converter = sourceconverter.SourceConverter( oq.investigation_time, oq.rupture_mesh_spacing, oq.complex_fault_mesh_spacing, oq.width_of_mfd_bin, oq.area_source_discretization, oq.minimum_magnitude, oq.source_id, discard_trts=[s.strip() for s in oq.discard_trts.split(',')], floating_x_step=oq.floating_x_step, floating_y_step=oq.floating_y_step, source_nodes=oq.source_nodes, infer_occur_rates=oq.infer_occur_rates) full_lt.ses_seed = oq.ses_seed logging.info('Reading the source model(s) in parallel') # NB: the source models file must be in the shared directory # NB: dstore is None in logictree_test.py allargs = [] sdata = full_lt.source_model_lt.source_data allpaths = set(full_lt.source_model_lt.info.smpaths) dic = general.group_array(sdata, 'fname') smpaths = [] ss = os.environ.get('OQ_SAMPLE_SOURCES') applied = set() for srcs in full_lt.source_model_lt.info.applytosources.values(): applied.update(srcs) for fname, rows in dic.items(): path = os.path.abspath( os.path.join(full_lt.source_model_lt.basepath, fname)) smpaths.append(path) allargs.append((path, rows[0]['branch'], converter, applied, ss)) for path in allpaths - set(smpaths): # geometry models allargs.append((path, '', converter, applied, ss)) smdict = parallel.Starmap(read_source_model, allargs, h5=dstore if dstore else None).reduce() parallel.Starmap.shutdown() # save memory smdict = {k: smdict[k] for k in sorted(smdict)} if dstore: save_read_times(dstore, smdict.values()) check_duplicates(smdict, strict=oq.disagg_by_src) logging.info('Applying uncertainties') groups = _build_groups(full_lt, smdict) # checking the changes changes = sum(sg.changes for sg in groups) if changes: logging.info('Applied {:_d} changes to the composite source model'. format(changes)) is_event_based = oq.calculation_mode.startswith(('event_based', 'ebrisk')) csm = _get_csm(full_lt, groups, is_event_based) out = [] probs = [] for sg in csm.src_groups: if sg.src_interdep == 'mutex' and 'src_mutex' not in dstore: segments = [] for src in sg: segments.append(src.source_id.split(':')[1]) t = (src.source_id, src.grp_id, src.count_ruptures(), src.mutex_weight, sg.rup_interdep == 'mutex') out.append(t) probs.append((src.grp_id, sg.grp_probability)) assert len(segments) == len(set(segments)), segments if out: dtlist = [('src_id', hdf5.vstr), ('grp_id', int), ('num_ruptures', int), ('mutex_weight', float), ('rup_mutex', bool)] dstore.create_dset('src_mutex', numpy.array(out, dtlist)) lst = [('grp_id', int), ('probability', float)] dstore.create_dset('grp_probability', numpy.array(probs, lst)) # split multifault sources if there is a single site try: sitecol = dstore['sitecol'] except (KeyError, TypeError): # 'NoneType' object is not subscriptable sitecol = None else: # NB: in AELO we can have multiple vs30 on the same location lonlats = set(zip(sitecol.lons, sitecol.lats)) if len(lonlats) > 1: sitecol = None # must be called *after* _fix_dupl_ids fix_geometry_sections( smdict, csm.src_groups, dstore.tempname if dstore else '', sitecol if oq.disagg_by_src and oq.use_rates else None) return csm
[docs]def add_checksums(srcs): """ Build and attach a checksum to each source """ for src in srcs: dic = {k: v for k, v in vars(src).items() if k not in 'source_id trt_smr smweight samples branch'} src.checksum = zlib.adler32(pickle.dumps(dic, protocol=4))
# called before _fix_dupl_ids
[docs]def find_false_duplicates(smdict): """ Discriminate different sources with same ID (false duplicates) and put a question mark in their source ID """ acc = general.AccumDict(accum=[]) atomic = set() for smodel in smdict.values(): for sgroup in smodel.src_groups: for src in sgroup: src.branch = smodel.branch srcid = (src.source_id if sgroup.atomic else basename(src)) acc[srcid].append(src) if sgroup.atomic: atomic.add(src.source_id) found = [] for srcid, srcs in acc.items(): if len(srcs) > 1: # duplicated ID if any(src.source_id in atomic for src in srcs): raise RuntimeError('Sources in atomic groups cannot be ' 'duplicated: %s', srcid) if any(getattr(src, 'mutex_weight', 0) for src in srcs): raise RuntimeError('Mutually exclusive sources cannot be ' 'duplicated: %s', srcid) add_checksums(srcs) gb = general.AccumDict(accum=[]) for src in srcs: gb[checksum(src)].append(src) if len(gb) > 1: for same_checksum in gb.values(): for src in same_checksum: check_branchID(src.branch) src.source_id += '!%s' % src.branch found.append(srcid) return found
[docs]def replace(lst, splitdic, key): """ Replace a list of named elements with the split elements in splitdic """ new = [] for el in lst: tag = getattr(el, key) if tag in splitdic: new.extend(splitdic[tag]) else: new.append(el) lst[:] = new
[docs]def fix_geometry_sections(smdict, src_groups, hdf5path='', site1=None): """ If there are MultiFaultSources, fix the sections according to the GeometryModels (if any). """ gmodels = [] gfiles = [] for fname, mod in smdict.items(): if isinstance(mod, nrml.GeometryModel): gmodels.append(mod) gfiles.append(fname) # merge and reorder the sections sec_ids = [] sections = {} for gmod in gmodels: sec_ids.extend(gmod.sections) sections.update(gmod.sections) check_unique(sec_ids, 'section ID in files ' + ' '.join(gfiles)) if sections: # save in the temporary file sources and sections assert hdf5path, ('You forgot to pass the dstore to ' 'get_composite_source_model') mfsources = [] for sg in src_groups: for src in sg: if src.code == b'F': mfsources.append(src) if mfsources: split_dic, secparams = save_and_split( mfsources, sections, hdf5path, site1) for sg in src_groups: replace(sg.sources, split_dic, 'source_id') return secparams return ()
def _groups_ids(smlt_dir, smdict, fnames): # extract the source groups and ids from a sequence of source files groups = [] for fname in fnames: fullname = os.path.abspath(os.path.join(smlt_dir, fname)) groups.extend(smdict[fullname].src_groups) return groups, set(src.source_id for grp in groups for src in grp) def _build_groups(full_lt, smdict): # build all the possible source groups from the full logic tree smlt_file = full_lt.source_model_lt.filename smlt_dir = os.path.dirname(smlt_file) groups = [] frac = 1. / len(full_lt.sm_rlzs) for rlz in full_lt.sm_rlzs: src_groups, source_ids = _groups_ids( smlt_dir, smdict, rlz.value[0].split()) bset_values = full_lt.source_model_lt.bset_values(rlz.lt_path) while (bset_values and bset_values[0][0].uncertainty_type == 'extendModel'): (_bset, value), *bset_values = bset_values extra, extra_ids = _groups_ids(smlt_dir, smdict, value.split()) common = source_ids & extra_ids if common: raise InvalidFile( '%s contains source(s) %s already present in %s' % (value, common, rlz.value)) src_groups.extend(extra) for src_group in src_groups: # an example of bsetvalues is in LogicTreeCase2ClassicalPSHA: # (<abGRAbsolute(3, applyToSources=['first'])>, (4.6, 1.1)) # (<abGRAbsolute(3, applyToSources=['second'])>, (3.3, 1.0)) # (<maxMagGRAbsolute(3, applyToSources=['first'])>, 7.0) # (<maxMagGRAbsolute(3, applyToSources=['second'])>, 7.5) sg = apply_uncertainties(bset_values, src_group) full_lt.set_trt_smr(sg, smr=rlz.ordinal) for src in sg: # the smweight is used in event based sampling: # see oq-risk-tests etna src.smweight = rlz.weight if full_lt.num_samples else frac if rlz.samples > 1: src.samples = rlz.samples groups.append(sg) # check applyToSources sm_branch = rlz.lt_path[0] src_id = full_lt.source_model_lt.info.applytosources[sm_branch] for srcid in src_id: if srcid not in source_ids: raise ValueError( "The source %s is not in the source model," " please fix applyToSources in %s or the " "source model(s) %s" % (srcid, smlt_file, rlz.value[0].split())) return groups
[docs]def reduce_sources(sources_with_same_id, full_lt): """ :param sources_with_same_id: a list of sources with the same source_id :returns: a list of truly unique sources, ordered by trt_smr """ out = [] add_checksums(sources_with_same_id) for srcs in general.groupby(sources_with_same_id, checksum).values(): # duplicate sources: same id, same checksum src = srcs[0] if len(srcs) > 1: # happens in logictree/case_07 src.trt_smr = tuple(s.trt_smr for s in srcs) else: src.trt_smr = src.trt_smr, out.append(src) out.sort(key=operator.attrgetter('trt_smr')) return out
[docs]def split_by_tom(sources): """ Groups together sources with the same TOM """ def key(src): tom = getattr(src, 'temporal_occurrence_model', None) return tom.__class__.__name__ return general.groupby(sources, key).values()
def _get_csm(full_lt, groups, event_based): # 1. extract a single source from multiple sources with the same ID # 2. regroup the sources in non-atomic groups by TRT # 3. reorder the sources by source_id atomic = [] acc = general.AccumDict(accum=[]) for grp in groups: if grp and grp.atomic: atomic.append(grp) elif grp: acc[grp.trt].extend(grp) key = operator.attrgetter('source_id', 'code') src_groups = [] for trt in acc: lst = [] for srcs in general.groupby(acc[trt], key).values(): # NB: not reducing the sources in event based if len(srcs) > 1 and not event_based: srcs = reduce_sources(srcs, full_lt) lst.extend(srcs) for sources in general.groupby(lst, trt_smrs).values(): for grp in split_by_tom(sources): src_groups.append(sourceconverter.SourceGroup(trt, grp)) src_groups.extend(atomic) _fix_dupl_ids(src_groups) # sort by source_id for sg in src_groups: sg.sources.sort(key=operator.attrgetter('source_id')) return CompositeSourceModel(full_lt, src_groups)