Source code for openquake.calculators.getters

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2018-2020 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake.  If not, see <http://www.gnu.org/licenses/>.
import collections
import itertools
import operator
import logging
import unittest.mock as mock
import numpy
from openquake.baselib import hdf5, datastore, general
from openquake.hazardlib.gsim.base import ContextMaker, FarAwayRupture
from openquake.hazardlib import calc, probability_map, stats
from openquake.hazardlib.source.rupture import (
    EBRupture, BaseRupture, events_dt, RuptureProxy)
from openquake.risklib.riskinput import rsi2str
from openquake.commonlib.calc import _gmvs_to_haz_curve

U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
by_taxonomy = operator.attrgetter('taxonomy')
code2cls = BaseRupture.init()


[docs]def build_stat_curve(poes, imtls, stat, weights): """ Build statistics by taking into account IMT-dependent weights """ assert len(poes) == len(weights), (len(poes), len(weights)) L = len(imtls.array) array = numpy.zeros((L, 1)) if isinstance(weights, list): # IMT-dependent weights # this is slower since the arrays are shorter for imt in imtls: slc = imtls(imt) ws = [w[imt] for w in weights] if sum(ws) == 0: # expect no data for this IMT continue array[slc] = stat(poes[:, slc], ws) else: array = stat(poes, weights) return probability_map.ProbabilityCurve(array)
[docs]def sig_eps_dt(imts): """ :returns: a composite data type for the sig_eps output """ lst = [('eid', U32), ('rlz_id', U16)] for imt in imts: lst.append(('sig_inter_' + imt, F32)) for imt in imts: lst.append(('eps_inter_' + imt, F32)) return numpy.dtype(lst)
[docs]class PmapGetter(object): """ Read hazard curves from the datastore for all realizations or for a specific realization. :param dstore: a DataStore instance or file system path to it :param sids: the subset of sites to consider (if None, all sites) """ def __init__(self, dstore, weights, sids=None, poes=()): self.dstore = dstore self.sids = dstore['sitecol'].sids if sids is None else sids if len(weights[0].dic) == 1: # no weights by IMT self.weights = numpy.array([w['weight'] for w in weights]) else: self.weights = weights self.poes = poes self.num_rlzs = len(weights) self.eids = None self.nbytes = 0 self.sids = sids @property def imts(self): return list(self.imtls)
[docs] def init(self): """ Read the poes and set the .data attribute with the hazard curves """ if hasattr(self, '_pmap_by_grp'): # already initialized return self._pmap_by_grp if isinstance(self.dstore, str): self.dstore = hdf5.File(self.dstore, 'r') else: self.dstore.open('r') # if not if self.sids is None: self.sids = self.dstore['sitecol'].sids oq = self.dstore['oqparam'] self.imtls = oq.imtls self.poes = self.poes or oq.poes self.rlzs_by_grp = self.dstore['full_lt'].get_rlzs_by_grp() # populate _pmap_by_grp self._pmap_by_grp = {} if 'poes' in self.dstore: # build probability maps restricted to the given sids ok_sids = set(self.sids) for grp, dset in self.dstore['poes'].items(): ds = dset['array'] L, G = ds.shape[1:] pmap = probability_map.ProbabilityMap(L, G) for idx, sid in enumerate(dset['sids'][()]): if sid in ok_sids: pmap[sid] = probability_map.ProbabilityCurve(ds[idx]) self._pmap_by_grp[grp] = pmap self.nbytes += pmap.nbytes return self._pmap_by_grp
# used in risk calculation where there is a single site per getter
[docs] def get_hazard(self, gsim=None): """ :param gsim: ignored :returns: R probability curves for the given site """ return self.get_pcurves(self.sids[0])
[docs] def get(self, rlzi, grp=None): """ :param rlzi: a realization index :param grp: None (all groups) or a string of the form "grp-XX" :returns: the hazard curves for the given realization """ self.init() assert self.sids is not None pmap = probability_map.ProbabilityMap(len(self.imtls.array), 1) grps = [grp] if grp is not None else sorted(self._pmap_by_grp) for grp in grps: for gsim_idx, rlzis in enumerate(self.rlzs_by_grp[grp]): for r in rlzis: if r == rlzi: pmap |= self._pmap_by_grp[grp].extract(gsim_idx) break return pmap
[docs] def get_pcurves(self, sid): # used in classical """ :returns: a list of R probability curves with shape L """ pmap_by_grp = self.init() L = len(self.imtls.array) pcurves = [probability_map.ProbabilityCurve(numpy.zeros((L, 1))) for _ in range(self.num_rlzs)] for grp, pmap in pmap_by_grp.items(): try: pc = pmap[sid] except KeyError: # no hazard for sid continue for gsim_idx, rlzis in enumerate(self.rlzs_by_grp[grp]): c = probability_map.ProbabilityCurve(pc.array[:, [gsim_idx]]) for rlzi in rlzis: pcurves[rlzi] |= c return pcurves
[docs] def get_pcurve(self, s, r, g): # used in disaggregation """ :param s: site ID :param r: realization ID :param g: group ID :returns: a probability curves with shape L (or None, if missing) """ grp = 'grp-%02d' % g pmap = self.init()[grp] try: pc = pmap[s] except KeyError: return L = len(self.imtls.array) pcurve = probability_map.ProbabilityCurve(numpy.zeros((L, 1))) for gsim_idx, rlzis in enumerate(self.rlzs_by_grp[grp]): for rlzi in rlzis: if rlzi == r: pcurve |= probability_map.ProbabilityCurve( pc.array[:, [gsim_idx]]) return pcurve
[docs] def items(self, kind=''): """ Extract probability maps from the datastore, possibly generating on the fly the ones corresponding to the individual realizations. Yields pairs (tag, pmap). :param kind: the kind of PoEs to extract; if not given, returns the realization if there is only one or the statistics otherwise. """ num_rlzs = len(self.weights) if not kind or kind == 'all': # use default if 'hcurves' in self.dstore: for k in sorted(self.dstore['hcurves']): yield k, self.dstore['hcurves/' + k][()] elif num_rlzs == 1: yield 'mean', self.get(0) return if 'poes' in self.dstore and kind in ('rlzs', 'all'): for rlzi in range(num_rlzs): hcurves = self.get(rlzi) yield 'rlz-%03d' % rlzi, hcurves elif 'poes' in self.dstore and kind.startswith('rlz-'): yield kind, self.get(int(kind[4:])) if 'hcurves' in self.dstore and kind == 'stats': for k in sorted(self.dstore['hcurves']): if not k.startswith('rlz'): yield k, self.dstore['hcurves/' + k][()]
[docs] def get_mean(self, grp=None): """ Compute the mean curve as a ProbabilityMap :param grp: if not None must be a string of the form "grp-XX"; in that case returns the mean considering only the contribution for group XX """ self.init() if len(self.weights) == 1: # one realization # the standard deviation is zero pmap = self.get(0, grp) for sid, pcurve in pmap.items(): array = numpy.zeros(pcurve.array.shape) array[:, 0] = pcurve.array[:, 0] pcurve.array = array return pmap elif grp: raise NotImplementedError('multiple realizations') L = len(self.imtls.array) pmap = probability_map.ProbabilityMap.build(L, 1, self.sids) for sid in self.sids: pmap[sid] = build_stat_curve( numpy.array([pc.array for pc in self.get_pcurves(sid)]), self.imtls, stats.mean_curve, self.weights) return pmap
[docs]class GmfDataGetter(collections.abc.Mapping): """ A dictionary-like object {sid: dictionary by realization index} """ def __init__(self, dstore, sids, num_rlzs): self.dstore = dstore self.sids = sids self.num_rlzs = num_rlzs assert len(sids) == 1, sids
[docs] def init(self): if hasattr(self, 'data'): # already initialized return self.dstore.open('r') # if not already open try: self.imts = self.dstore['gmf_data/imts'][()].split() except KeyError: # engine < 3.3 self.imts = list(self.dstore['oqparam'].imtls) self.rlzs = self.dstore['events']['rlz_id'] self.data = self[self.sids[0]] if not self.data: # no GMVs, return 0, counted in no_damage self.data = {rlzi: 0 for rlzi in range(self.num_rlzs)} # now some attributes set for API compatibility with the GmfGetter # number of ground motion fields # dictionary rlzi -> array(imts, events, nbytes) self.E = len(self.rlzs)
[docs] def get_hazard(self, gsim=None): """ :param gsim: ignored :returns: an dict rlzi -> datadict """ return self.data
def __getitem__(self, sid): dset = self.dstore['gmf_data/data'] idxs = self.dstore['gmf_data/indices'][sid] if idxs.dtype.name == 'uint32': # scenario idxs = [idxs] elif not idxs.dtype.names: # engine >= 3.2 idxs = zip(*idxs) data = [dset[start:stop] for start, stop in idxs] if len(data) == 0: # site ID with no data return {} return group_by_rlz(numpy.concatenate(data), self.rlzs) def __iter__(self): return iter(self.sids) def __len__(self): return len(self.sids)
time_dt = numpy.dtype( [('rup_id', U32), ('nsites', U16), ('time', F32), ('task_no', U16)])
[docs]class GmfGetter(object): """ An hazard getter with methods .get_gmfdata and .get_hazard returning ground motion values. """ def __init__(self, rupgetter, srcfilter, oqparam, amplifier=None): self.rlzs_by_gsim = rupgetter.rlzs_by_gsim self.rupgetter = rupgetter self.srcfilter = srcfilter self.sitecol = srcfilter.sitecol.complete self.oqparam = oqparam self.amplifier = amplifier self.min_iml = oqparam.min_iml self.N = len(self.sitecol) self.num_rlzs = sum(len(rlzs) for rlzs in self.rlzs_by_gsim.values()) self.sig_eps_dt = sig_eps_dt(oqparam.imtls) M32 = (F32, (len(oqparam.imtls),)) self.gmv_eid_dt = numpy.dtype([('gmv', M32), ('eid', U32)]) md = (calc.filters.IntegrationDistance(oqparam.maximum_distance) if isinstance(oqparam.maximum_distance, dict) else oqparam.maximum_distance) param = {'filter_distance': oqparam.filter_distance, 'imtls': oqparam.imtls, 'maximum_distance': md} self.cmaker = ContextMaker( rupgetter.trt, rupgetter.rlzs_by_gsim, param) self.correl_model = oqparam.correl_model
[docs] def gen_computers(self, mon): """ Yield a GmfComputer instance for each non-discarded rupture """ trt, samples = self.rupgetter.trt, self.rupgetter.samples with mon: proxies = self.rupgetter.get_proxies() for proxy in proxies: with mon: ebr = proxy.to_ebr(trt, samples) sids = self.srcfilter.close_sids(proxy, trt) sitecol = self.sitecol.filtered(sids) try: computer = calc.gmf.GmfComputer( ebr, sitecol, self.oqparam.imtls, self.cmaker, self.oqparam.truncation_level, self.correl_model, self.amplifier) except FarAwayRupture: continue # due to numeric errors ruptures within the maximum_distance # when written, can be outside when read; I found a case with # a distance of 99.9996936 km over a maximum distance of 100 km yield computer
@property def sids(self): return self.sitecol.sids @property def imtls(self): return self.oqparam.imtls @property def imts(self): return list(self.oqparam.imtls)
[docs] def get_gmfdata(self, mon): """ :returns: an array of the dtype (sid, eid, gmv) """ alldata = [] self.sig_eps = [] self.times = [] # rup_id, nsites, dt for computer in self.gen_computers(mon): data, dt = computer.compute_all( self.min_iml, self.rlzs_by_gsim, self.sig_eps) self.times.append((computer.rupture.id, len(computer.sids), dt)) alldata.append(data) if not alldata: return [] return numpy.concatenate(alldata)
[docs] def get_hazard_by_sid(self, data=None): """ :param data: if given, an iterator of records of dtype gmf_dt :returns: sid -> records """ if data is None: data = self.get_gmfdata() if len(data) == 0: return {} return general.group_array(data, 'sid')
[docs] def compute_gmfs_curves(self, rlzs, monitor): """ :param rlzs: an array of shapeE :returns: a dict with keys gmfdata, indices, hcurves """ oq = self.oqparam mon = monitor('getting ruptures', measuremem=True) hcurves = {} # key -> poes if oq.hazard_curves_from_gmfs: hc_mon = monitor('building hazard curves', measuremem=False) gmfdata = self.get_gmfdata(mon) # returned later hazard = self.get_hazard_by_sid(data=gmfdata) for sid, hazardr in hazard.items(): dic = group_by_rlz(hazardr, rlzs) for rlzi, array in dic.items(): with hc_mon: gmvs = array['gmv'] for imti, imt in enumerate(oq.imtls): poes = _gmvs_to_haz_curve( gmvs[:, imti], oq.imtls[imt], oq.ses_per_logic_tree_path) hcurves[rsi2str(rlzi, sid, imt)] = poes if not oq.ground_motion_fields: return dict(gmfdata=(), hcurves=hcurves) gmfdata = self.get_gmfdata(mon) if len(gmfdata) == 0: return dict(gmfdata=[]) indices = [] gmfdata.sort(order=('sid', 'eid')) start = stop = 0 for sid, rows in itertools.groupby(gmfdata['sid']): for row in rows: stop += 1 indices.append((sid, start, stop)) start = stop times = numpy.array([tup + (monitor.task_no,) for tup in self.times], time_dt) times.sort(order='rup_id') res = dict(gmfdata=gmfdata, hcurves=hcurves, times=times, sig_eps=numpy.array(self.sig_eps, self.sig_eps_dt), indices=numpy.array(indices, (U32, 3))) return res
[docs]def group_by_rlz(data, rlzs): """ :param data: a composite array of D elements with a field `eid` :param rlzs: an array of E >= D elements :returns: a dictionary rlzi -> data for each realization """ acc = general.AccumDict(accum=[]) for rec in data: acc[rlzs[rec['eid']]].append(rec) return {rlzi: numpy.array(recs) for rlzi, recs in acc.items()}
[docs]def gen_rgetters(dstore, slc=slice(None)): """ :yields: unfiltered RuptureGetters """ full_lt = dstore['full_lt'] trt_by_grp = full_lt.trt_by_grp samples = full_lt.get_samples_by_grp() rlzs_by_gsim = full_lt.get_rlzs_by_gsim_grp() rup_array = dstore['ruptures'][slc] nr = len(dstore['ruptures']) for grp_id, arr in general.group_array(rup_array, 'grp_id').items(): if not rlzs_by_gsim.get(grp_id, []): # the model has no sources continue for block in general.split_in_blocks(arr, len(arr) / nr): rgetter = RuptureGetter( [RuptureProxy(rec) for rec in block], dstore.filename, grp_id, trt_by_grp[grp_id], samples[grp_id], rlzs_by_gsim[grp_id]) yield rgetter
def _gen(arr, srcfilter, trt, samples): for rec in arr: sids = srcfilter.close_sids(rec, trt) if len(sids): yield RuptureProxy(rec, len(sids), samples)
[docs]def gen_rupture_getters(dstore, srcfilter, ct): """ :param dstore: a :class:`openquake.baselib.datastore.DataStore` :param srcfilter: a :class:`openquake.hazardlib.calc.filters.SourceFilter` :param ct: number of concurrent tasks :yields: filtered RuptureGetters """ full_lt = dstore['full_lt'] trt_by_grp = full_lt.trt_by_grp samples = full_lt.get_samples_by_grp() rlzs_by_gsim = full_lt.get_rlzs_by_gsim_grp() rup_array = dstore['ruptures'][()] items = list(general.group_array(rup_array, 'grp_id').items()) items.sort(key=lambda item: len(item[1])) # other weights were much worse maxweight = None while items: grp_id, rups = items.pop() # from the largest group if not rlzs_by_gsim[grp_id]: # this may happen if a source model has no sources, like # in event_based_risk/case_3 continue trt = trt_by_grp[grp_id] proxies = list(_gen(rups, srcfilter, trt, samples[grp_id])) if not maxweight: maxweight = sum(p.weight for p in proxies) / (ct // 2 or 1) blocks = list(general.block_splitter( proxies, maxweight, operator.attrgetter('weight'))) logging.info('Group %d: %d ruptures -> %d task(s)', grp_id, len(rups), len(blocks)) for block in blocks: rgetter = RuptureGetter( block, dstore.filename, grp_id, trt, samples[grp_id], rlzs_by_gsim[grp_id]) yield rgetter
[docs]def get_ebruptures(dstore): """ Extract EBRuptures from the datastore """ ebrs = [] for rgetter in gen_rgetters(dstore): for proxy in rgetter.get_proxies(): ebrs.append(proxy.to_ebr(rgetter.trt, rgetter.samples)) return ebrs
[docs]def get_rupdict(dstore): """ :returns: a dictionary rup_id->rup_dict """ dic = {} for i, ebr in enumerate(get_ebruptures(dstore)): dic['rup_%s' % i] = d = ebr.rupture.todict() for attr in ['srcidx', 'grp_id', 'n_occ', 'samples']: d[attr] = int(getattr(ebr, attr)) return dic
# this is never called directly; gen_rupture_getters is used instead
[docs]class RuptureGetter(object): """ :param proxies: a list of RuptureProxies :param filename: path to the HDF5 file containing a 'rupgeoms' dataset :param grp_id: source group index :param trt: tectonic region type string :param samples: number of samples of the group :param rlzs_by_gsim: dictionary gsim -> rlzs for the group """ def __init__(self, proxies, filename, grp_id, trt, samples, rlzs_by_gsim): self.proxies = proxies self.weight = sum(proxy.weight for proxy in proxies) self.filename = filename self.grp_id = grp_id self.trt = trt self.samples = samples self.rlzs_by_gsim = rlzs_by_gsim n_occ = sum(int(proxy['n_occ']) for proxy in proxies) self.num_events = n_occ if samples > 1 else n_occ * sum( len(rlzs) for rlzs in rlzs_by_gsim.values()) @property def num_ruptures(self): return len(self.proxies)
[docs] def get_eid_rlz(self): """ :returns: a composite array with the associations eid->rlz """ eid_rlz = [] for rup in self.proxies: ebr = EBRupture(mock.Mock(rup_id=rup['serial']), rup['srcidx'], self.grp_id, rup['n_occ'], self.samples) for rlz_id, eids in ebr.get_eids_by_rlz(self.rlzs_by_gsim).items(): for eid in eids: eid_rlz.append((eid + rup['e0'], rup['id'], rlz_id)) return numpy.array(eid_rlz, events_dt)
[docs] def get_rupdict(self): """ :returns: a dictionary with the parameters of the rupture """ assert len(self.proxies) == 1, 'Please specify a slice of length 1' dic = {'trt': self.trt, 'samples': self.samples} with datastore.read(self.filename) as dstore: rupgeoms = dstore['rupgeoms'] source_ids = dstore['source_info']['source_id'] rec = self.proxies[0].rec geom = rupgeoms[rec['gidx1']:rec['gidx2']].reshape( rec['sx'], rec['sy']) dic['lons'] = geom['lon'] dic['lats'] = geom['lat'] dic['deps'] = geom['depth'] rupclass, surclass = code2cls[rec['code']] dic['rupture_class'] = rupclass.__name__ dic['surface_class'] = surclass.__name__ dic['hypo'] = rec['hypo'] dic['occurrence_rate'] = rec['occurrence_rate'] dic['grp_id'] = rec['grp_id'] dic['n_occ'] = rec['n_occ'] dic['serial'] = rec['serial'] dic['mag'] = rec['mag'] dic['srcid'] = source_ids[rec['srcidx']] return dic
[docs] def get_proxies(self, min_mag=0): """ :returns: a list of RuptureProxies """ proxies = [] with datastore.read(self.filename) as dstore: rupgeoms = dstore['rupgeoms'] for proxy in self.proxies: if proxy['mag'] < min_mag: continue proxy.geom = rupgeoms[proxy['gidx1']:proxy['gidx2']].reshape( proxy['sx'], proxy['sy']) proxies.append(proxy) return proxies
def __len__(self): return len(self.proxies) def __repr__(self): wei = ' [w=%d]' % self.weight if hasattr(self, 'weight') else '' return '<%s grp_id=%d, %d rupture(s)%s>' % ( self.__class__.__name__, self.grp_id, len(self), wei)