Source code for openquake.hazardlib.source_reader

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2019, GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake.  If not, see <http://www.gnu.org/licenses/>.

import time
import zlib
import os.path
import pickle
import operator
import logging
import numpy

from openquake.baselib import parallel, performance, general, hdf5
from openquake.hazardlib import (
    geo, nrml, source, sourceconverter, InvalidFile, calc)
from openquake.hazardlib.source_group import CompositeSourceModel, get_unique
from openquake.hazardlib.source.multi_fault import save_and_split
from openquake.hazardlib.lt import apply_uncertainties
from openquake.hazardlib.valid import basename

TWO24 = 2**24
U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
bybranch = operator.attrgetter('branch')
checksum = operator.attrgetter('checksum')
sampling_dt = numpy.dtype([
    ('trt_smr', U32),
    ('samples', U32)])

source_info_dt = numpy.dtype([
    ('source_id', hdf5.vstr),          # 0
    ('grp_id', U16),                   # 1
    ('code', (numpy.bytes_, 1)),       # 2
    ('calc_time', F32),                # 3
    ('num_ctxs', numpy.uint64),        # 4
    ('est_ctxs', numpy.uint64),        # 5
    ('num_ruptures', U32),             # 6
    ('weight', F32),                   # 7
    ('mul', U16),                      # 8
])


[docs]def sampling(samples, trt_smr): """ :returns: a structured array (trt_smr, samples) of length 1 """ return numpy.array([(trt_smr, samples)], sampling_dt)
# NB: blocksize is chosen so that event_based/case_35 works
[docs]def splitMF(sources, disagg_by_src, blocksize=1000): """ Split the MultiPointSources to avoid oceanic sources with 160M ruptures hanging during rupture sampling """ splits = [] for src in sources: if src.code == b'M' and len(src) > blocksize: for i, slc in enumerate(general.gen_slices(0, len(src), blocksize)): segment = source.MultiPointSource( source_id=f'{src.source_id}:{i}', name=src.name, tectonic_region_type=src.tectonic_region_type, mfd=src.mfd[slc], magnitude_scaling_relationship=( src.magnitude_scaling_relationship), rupture_aspect_ratio=src.rupture_aspect_ratio, upper_seismogenic_depth=src.upper_seismogenic_depth, lower_seismogenic_depth=src.lower_seismogenic_depth, nodal_plane_distribution=src.nodal_plane_distribution, hypocenter_distribution=src.hypocenter_distribution, mesh=geo.Mesh(src.mesh.lons[slc], src.mesh.lats[slc]), temporal_occurrence_model=src.temporal_occurrence_model) segment.sampling = src.sampling splits.append(segment) elif src.code == b'F' and not disagg_by_src: # use the colon convention only in absence of kendra-splitting for segment in src: segment.sampling = src.sampling splits.append(segment) else: splits.append(src) sources[:] = splits
[docs]def check_unique(ids, msg='', strict=True): """ Raise a DuplicatedID exception if there are duplicated IDs """ if isinstance(ids, dict): # ids by key all_ids = sum(ids.values(), []) unique, counts = numpy.unique(all_ids, return_counts=True) for dupl in unique[counts > 1]: keys = [k for k in ids if dupl in ids[k]] if keys: errmsg = '%r appears in %s %s' % (dupl, keys, msg) if strict: raise nrml.DuplicatedID(errmsg) else: logging.info('*' * 60 + ' DuplicatedID:\n' + errmsg) return unique, counts = numpy.unique(ids, return_counts=True) for u, c in zip(unique, counts): if c > 1: errmsg = '%r appears %d times %s' % (u, c, msg) if strict: raise nrml.DuplicatedID(errmsg) else: logging.info('*' * 60 + ' DuplicatedID:\n' + errmsg)
[docs]def create_source_info(csm, h5): """ Creates source_info, trt_smrs, toms """ csm.source_info = csm.get_source_info() num_srcs = len(csm.source_info) # avoid hdf5 damned bug by creating source_info in advance h5.create_dataset('source_info', (num_srcs,), source_info_dt)
[docs]def trt_smrs(src): return tuple(src.trt_smrs)
def _sample(srcs, sample, applied): if not srcs: return [] out = [src for src in srcs if src.source_id in applied] rand = general.random_filter(srcs, sample) return (out + rand) or [srcs[0]]
[docs]def read_source_model(fname, branch, converter, applied, sample, monitor): """ :param fname: path to a source model XML file :param branch: source model logic tree branch ID :param converter: SourceConverter :param applied: list of source IDs within applyToSources :param sample: a string with the sampling factor (if any) :param monitor: a Monitor instance :returns: a SourceModel instance """ t0 = time.time() [sm] = nrml.read_source_models([fname], converter) sm.branch = branch for sg in sm.src_groups: if sample and not sg.atomic: srcs = [] for src in sg: if src.source_id in applied: srcs.append(src) else: srcs.extend(calc.filters.split_source(src)) sg.sources = _sample(srcs, float(sample), applied) sm.rtime = time.time() - t0 # save the read time return {fname: sm}
# NB:this is called after reduce_sources, so ";" is not added # if the same source appears multiple times, i.e. len(srcs) == 1
[docs]def add_semicolons(src_groups): """ Add semicolons to differentiate different sources with the same source_id and then sort the sources in each group by extended source_id """ sources = general.AccumDict(accum=[]) for sg in src_groups: for src in sg: sources[src.source_id].append(src) for src_id, srcs in sources.items(): srcs = get_unique(srcs) if len(srcs) > 1: # happens in logictree/case_01/rup.ini for i, src in enumerate(srcs): src.source_id = '%s;%d' % (src.source_id, i) for sg in src_groups: sg.sources.sort(key=operator.attrgetter('source_id'))
# tested in logictree/case_05 with 9 variations of # AreaSource 1 and SimpleFaultSource 2
[docs]def check_branchID(branchID): """ Forbids invalid characters .:; used in fragmentno """ if '.' in branchID: raise InvalidFile('branchID %r contains an invalid "."' % branchID) elif ':' in branchID: raise InvalidFile('branchID %r contains an invalid ":"' % branchID) elif ';' in branchID: raise InvalidFile('branchID %r contains an invalid ";"' % branchID)
[docs]def check_duplicates(smdict, strict): # check_duplicates in the same file for sm in smdict.values(): srcids = [] for sg in sm.src_groups: srcids.extend(src.source_id for src in sg) if sg.src_interdep == 'mutex': # mutex sources in the same group must have all the same # basename, i.e. the colon convention must be used basenames = set(map(basename, sg)) assert len(basenames) == 1, basenames check_unique(srcids, 'in ' + sm.fname, strict) # check duplicates in different files but in the same branch # the problem was discovered in the DOM model for branch, sms in general.groupby(smdict.values(), bybranch).items(): srcids = general.AccumDict(accum=[]) fnames = [] for sm in sms: if isinstance(sm, nrml.GeometryModel): # the section IDs are not checked since they not count # as real sources continue for sg in sm.src_groups: srcids[sm.fname].extend(src.source_id for src in sg) fnames.append(sm.fname) check_unique(srcids, 'in branch %s' % branch, strict=strict) found = add_bangs(smdict) if found: logging.info('Found different sources with same ID %s', general.shortlist(found))
[docs]def save_read_times(dstore, source_models): """ Store how many seconds it took to read each source model file in a table (fname, rtime) """ dt = [('fname', hdf5.vstr), ('rtime', float)] arr = numpy.array([(sm.fname, sm.rtime) for sm in source_models], dt) dstore.create_dset('source_model_read_times', arr)
[docs]def get_csm(oq, full_lt, dstore=None): """ Build source models from the logic tree and store them inside the `source_full_lt` dataset. """ converter = sourceconverter.SourceConverter( oq.investigation_time, oq.rupture_mesh_spacing, oq.complex_fault_mesh_spacing, oq.width_of_mfd_bin, oq.area_source_discretization, oq.minimum_magnitude, oq.source_id, discard_trts=[s.strip() for s in oq.discard_trts.split(',')], floating_x_step=oq.floating_x_step, floating_y_step=oq.floating_y_step, source_nodes=oq.source_nodes, infer_occur_rates=oq.infer_occur_rates) full_lt.ses_seed = oq.ses_seed logging.info('Reading the source model(s) in parallel') # NB: the source models file must be in the shared directory # NB: dstore is None in logictree_test.py allargs = [] sdata = full_lt.source_model_lt.source_data allpaths = set(full_lt.source_model_lt.info.smpaths) dic = general.group_array(sdata, 'fname') smpaths = [] ss = os.environ.get('OQ_SAMPLE_SOURCES') applied = set() for srcs in full_lt.source_model_lt.info.applytosources.values(): applied.update(srcs) for fname, rows in dic.items(): path = os.path.abspath( os.path.join(full_lt.source_model_lt.basepath, fname)) smpaths.append(path) allargs.append((path, rows[0]['branch'], converter, applied, ss)) for path in allpaths - set(smpaths): # geometry models allargs.append((path, '', converter, applied, ss)) smdict = parallel.Starmap(read_source_model, allargs, h5=dstore if dstore else None).reduce() parallel.Starmap.shutdown() # save memory smdict = {k: smdict[k] for k in sorted(smdict)} if dstore: save_read_times(dstore, smdict.values()) check_duplicates(smdict, strict=oq.disagg_by_src) # checking ps_grid_spacing pointlike_sources = 0 for sm in smdict.values(): for sg in sm.src_groups: for src in sg: if src.code in b'PAM': pointlike_sources += 1 break if (oq.strict and oq.mosaic_model and pointlike_sources and 'classical' in oq.calculation_mode and oq.ps_grid_spacing == 0 and not oq.sites): raise InvalidFile(f'{oq.inputs["job_ini"]}: ' 'missing ps_grid_spacing') return build_csm(oq, full_lt, smdict, dstore)
[docs]def build_csm(oq, full_lt, smdict, dstore): """ Applies uncertainties, builds the source groups and returns a CompositeSourceModel instance """ mon = performance.Monitor('_build_groups', measuremem=True) with mon: groups = _build_groups(full_lt, smdict) # fast logging.info(mon) logging.info('Building CompositeSourceModel') is_event_based = oq.calculation_mode.startswith(('event_based', 'ebrisk')) mon = performance.Monitor('_build_csm', measuremem=True) with mon: csm = _build_csm(oq, full_lt, groups, is_event_based) logging.info(mon) out = [] probs = [] for sg in csm.src_groups: if sg.src_interdep == 'mutex' and 'src_mutex' not in dstore: segments = [] for src in sg: segments.append(src.source_id.split(':')[1]) t = (src.source_id, src.grp_id, src.num_ruptures, src.mutex_weight, sg.rup_interdep == 'mutex') out.append(t) probs.append((src.grp_id, sg.grp_probability)) assert len(segments) == len(set(segments)), segments if out: dtlist = [('src_id', hdf5.vstr), ('grp_id', int), ('num_ruptures', int), ('mutex_weight', float), ('rup_mutex', bool)] dstore.create_dset('src_mutex', numpy.array(out, dtlist)) lst = [('grp_id', int), ('probability', float)] dstore.create_dset('grp_probability', numpy.array(probs, lst)) # add rupids_by_tag to multifault sources if there is a single site try: sitecol = dstore['sitecol'] except (KeyError, TypeError): # 'NoneType' object is not subscriptable sitecol = None else: # NB: in AELO we can have multiple vs30 on the same location lonlats = set(zip(sitecol.lons, sitecol.lats)) if len(lonlats) > 1: sitecol = None # must be called *after* add_semicolons t0 = time.time() secparams = fix_geometry_sections( smdict, csm.src_groups, dstore.tempname if dstore else '', sitecol if oq.disagg_by_src and oq.use_rates else None, split=not oq.calculation_mode.startswith('event_based')) if len(secparams): logging.info('Spent %.1f seconds in fix_geometry_sections', time.time()-t0) return csm
# called by reduce_sources
[docs]def add_checksums(srcs): """ Build and attach a checksum to each source """ for src in srcs: dic = {k: v for k, v in vars(src).items() if k not in 'source_id sampling branch'} src.checksum = zlib.adler32(pickle.dumps(dic, protocol=4))
# called before add_semicolons
[docs]def add_bangs(smdict): """ Discriminate different sources with same ID (false duplicates) and put an exclamation mark in their source ID """ acc = general.AccumDict(accum=[]) atomic = set() for smodel in smdict.values(): for sgroup in smodel.src_groups: for src in sgroup: src.branch = smodel.branch srcid = (src.source_id if sgroup.atomic else basename(src)) acc[srcid].append(src) if sgroup.atomic: atomic.add(src.source_id) found = [] for srcid, srcs in acc.items(): if len(srcs) > 1: # duplicated ID if any(src.source_id in atomic for src in srcs): raise RuntimeError('Sources in atomic groups cannot be ' 'duplicated: %s', srcid) if any(getattr(src, 'mutex_weight', 0) for src in srcs): raise RuntimeError('Mutually exclusive sources cannot be ' 'duplicated: %s', srcid) add_checksums(srcs) gb = general.AccumDict(accum=[]) for src in srcs: gb[checksum(src)].append(src) if len(gb) > 1: for same_checksum in gb.values(): for src in same_checksum: check_branchID(src.branch) src.source_id += '!%s' % src.branch found.append(srcid) return found
[docs]def replace(lst, splitdic, key): """ Replace a list of named elements with the split elements in splitdic """ new = [] for el in lst: tag = getattr(el, key) if tag in splitdic: new.extend(splitdic[tag]) else: new.append(el) lst[:] = new
[docs]def fix_geometry_sections(smdict, src_groups, hdf5path='', site1=None, split=True): """ If there are MultiFaultSources, fix the sections according to the GeometryModels (if any). """ gmodels = [] gfiles = [] for fname, mod in smdict.items(): if isinstance(mod, nrml.GeometryModel): gmodels.append(mod) gfiles.append(fname) # merge and reorder the sections sec_ids = [] sections = {} for gmod in gmodels: sec_ids.extend(gmod.sections) sections.update(gmod.sections) check_unique(sec_ids, 'section ID in files ' + ' '.join(gfiles)) if sections: # save in the temporary file sources and sections assert hdf5path, ('You forgot to pass the dstore to ' 'get_composite_source_model') mfsources = [] for sg in src_groups: for src in sg: if src.code == b'F': mfsources.append(src) if mfsources: split_dic, secparams = save_and_split( mfsources, sections, hdf5path, site1, split=split) for sg in src_groups: replace(sg.sources, split_dic, 'source_id') return secparams return ()
def _groups_ids(smlt_dir, smdict, fnames): # extract the source groups and ids from a sequence of source files groups = [] for fname in fnames: fullname = os.path.abspath(os.path.join(smlt_dir, fname)) groups.extend(smdict[fullname].src_groups) return groups, set(src.source_id for grp in groups for src in grp) def _build_groups(full_lt, smdict): # build all the possible source groups from the full logic tree smlt_file = full_lt.source_model_lt.filename smlt_dir = os.path.dirname(smlt_file) groups = [] for rlz in full_lt.sm_rlzs: if rlz.ordinal % 100 == 0: logging.info('Building source groups for rlz' f'#{rlz.ordinal}: {"_".join(rlz.lt_path)}') src_groups, source_ids = _groups_ids( smlt_dir, smdict, rlz.value[0].split()) bset_values = full_lt.source_model_lt.bset_values(rlz.lt_path) while (bset_values and bset_values[0][0].uncertainty_type == 'extendModel'): (_bset, value), *bset_values = bset_values extra, extra_ids = _groups_ids(smlt_dir, smdict, value.split()) common = source_ids & extra_ids if common: raise InvalidFile( '%s contains source(s) %s already present in %s' % (value, common, rlz.value)) src_groups.extend(extra) for src_group in src_groups: trti = 0 if full_lt.trti=={'*': 0} else full_lt.trti[src_group.trt] # an example of bsetvalues is in LogicTreeCase2ClassicalPSHA: # (<abGRAbsolute(3, applyToSources=['first'])>, (4.6, 1.1)) # (<abGRAbsolute(3, applyToSources=['second'])>, (3.3, 1.0)) # (<maxMagGRAbsolute(3, applyToSources=['first'])>, 7.0) # (<maxMagGRAbsolute(3, applyToSources=['second'])>, 7.5) sg = apply_uncertainties(bset_values, src_group) for src in sg: # tested in case_83_eb sampl = sampling(rlz.samples, trti * TWO24 + rlz.ordinal) if src.sampling is None: # the first time src.sampling = [sampl] else: # if the same source belongs to multiple realizations src.sampling.append(sampl) groups.append(sg) # check applyToSources sm_branch = rlz.lt_path[0] src_id = full_lt.source_model_lt.info.applytosources[sm_branch] for srcid in src_id: if srcid not in source_ids: raise ValueError( "The source %s is not in the source model," " please fix applyToSources in %s or the " "source model(s) %s" % (srcid, smlt_file, rlz.value[0].split())) return groups
[docs]def reduce_sources(sources_with_same_id, full_lt, event_based): """ :param sources_with_same_id: a list of sources with the same source_id :param full_lt: FullLogicTree instance :param event_based: flag True for event_based calculations :returns: a list of truly unique sources """ # first reduce identical sources having the same id(src) # tested in LogictreeTestCase.test_case_08, where <PoinstSource 2> # appears 3 times unique = get_unique(sources_with_same_id) out = [] add_checksums(unique) # in LogicTreeCase2ClassicalPSHA there 81 unique sources # grouped in 9 groups of 9 sources each with the same checksum for srcs in general.groupby(unique, checksum).values(): # NB: the simplest test featuring the same source in two # different source models is logictree/case_01 src = srcs[0] if len(srcs) > 1 and len(src.sampling) == 1: src.sampling = numpy.concatenate([s.sampling for s in srcs]) out.append(src) return out
[docs]def split_by_tom(sources): """ Groups together sources with the same TOM and collect multifault sources """ def key(src): tom = getattr(src, 'temporal_occurrence_model', None) return (tom.__class__.__name__, src.code == b'F') return general.groupby(sources, key).values()
def _build_csm(oq, full_lt, groups, event_based): acc = general.AccumDict(accum=[]) atomic = [] changes = 0 # concatenate sampling records, split MF sources, single out atomic groups for grp in groups: changes += grp.changes for src in grp: if isinstance(src.sampling, list): src.sampling = numpy.concatenate( src.sampling, dtype=sampling_dt) else: # already concatenated at the previous iteration; # happens for sources belonging to multiple groups, # such as <AreaSource 002> in classical case_10 pass splitMF(grp.sources, oq.disagg_by_src) if grp and grp.atomic: atomic.append(grp) elif grp: acc[grp.trt].extend(grp) if atomic: logging.info('Found %d atomic groups', len(atomic)) if changes: logging.info(f'Applied {changes:_d} changes to ' f'{len(groups):_d} source groups') # reduce identical sources by concatenating the sampling records, # then regroup by trt_smrs key = operator.attrgetter('source_id', 'code') src_groups = [] red_sources = 0 for trt in acc: lst = [] for srcs in general.groupby(acc[trt], key).values(): if len(srcs) > 1: # reduce_sources is ultra-fast srcs = reduce_sources(srcs, full_lt, event_based) red_sources += 1 lst.extend(srcs) for sources in general.groupby(lst, trt_smrs).values(): for grp in split_by_tom(sources): src_groups.append(sourceconverter.SourceGroup(trt, grp)) src_groups.extend(atomic) logging.info('reduce_sources was called %d times', red_sources) add_semicolons(src_groups) csm = CompositeSourceModel(oq, full_lt, src_groups) return csm