Source code for openquake.calculators.getters

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2018-2019 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake.  If not, see <http://www.gnu.org/licenses/>.
import collections
import itertools
import operator
import logging
import unittest.mock as mock
import numpy
from openquake.baselib import hdf5, datastore, general
from openquake.hazardlib.gsim.base import ContextMaker, FarAwayRupture
from openquake.hazardlib import calc, geo, probability_map
from openquake.hazardlib.geo.mesh import Mesh, RectangularMesh
from openquake.hazardlib.source.rupture import EBRupture, BaseRupture
from openquake.risklib.riskinput import rsi2str
from openquake.commonlib.calc import _gmvs_to_haz_curve

U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
U64 = numpy.uint64
by_taxonomy = operator.attrgetter('taxonomy')
code2cls = BaseRupture.init()


[docs]def sig_eps_dt(imts): """ :returns: a composite data type for the sig_eps output """ lst = [('eid', U64), ('rlzi', U16)] for imt in imts: lst.append(('sig_' + imt, F32)) for imt in imts: lst.append(('eps_' + imt, F32)) return numpy.dtype(lst)
[docs]class PmapGetter(object): """ Read hazard curves from the datastore for all realizations or for a specific realization. :param dstore: a DataStore instance or file system path to it :param sids: the subset of sites to consider (if None, all sites) :param rlzs_assoc: a RlzsAssoc instance (if None, infers it) """ def __init__(self, dstore, weights, sids=None, poes=()): self.dstore = dstore self.sids = dstore['sitecol'].sids if sids is None else sids if len(weights[0].dic) == 1: # no weights by IMT self.weights = numpy.array([w['weight'] for w in weights]) else: self.weights = weights self.poes = poes self.num_rlzs = len(weights) self.eids = None self.nbytes = 0 self.sids = sids @property def imts(self): return list(self.imtls)
[docs] def init(self): """ Read the poes and set the .data attribute with the hazard curves """ if hasattr(self, '_pmap_by_grp'): # already initialized return self._pmap_by_grp if isinstance(self.dstore, str): self.dstore = hdf5.File(self.dstore, 'r') else: self.dstore.open('r') # if not if self.sids is None: self.sids = self.dstore['sitecol'].sids oq = self.dstore['oqparam'] self.imtls = oq.imtls self.poes = self.poes or oq.poes rlzs_by_grp = self.dstore['rlzs_by_grp'] self.rlzs_by_grp = {k: dset[()] for k, dset in rlzs_by_grp.items()} # populate _pmap_by_grp self._pmap_by_grp = {} if 'poes' in self.dstore: # build probability maps restricted to the given sids ok_sids = set(self.sids) for grp, dset in self.dstore['poes'].items(): ds = dset['array'] L, G = ds.shape[1:] pmap = probability_map.ProbabilityMap(L, G) for idx, sid in enumerate(dset['sids'][()]): if sid in ok_sids: pmap[sid] = probability_map.ProbabilityCurve(ds[idx]) self._pmap_by_grp[grp] = pmap self.nbytes += pmap.nbytes return self._pmap_by_grp
[docs] def get_hazard(self, gsim=None): """ :param gsim: ignored :returns: R probability curves for the given site """ return self.get_pcurves(self.sids[0])
[docs] def get(self, rlzi, grp=None): """ :param rlzi: a realization index :param grp: None (all groups) or a string of the form "grp-XX" :returns: the hazard curves for the given realization """ self.init() assert self.sids is not None pmap = probability_map.ProbabilityMap(len(self.imtls.array), 1) grps = [grp] if grp is not None else sorted(self._pmap_by_grp) for grp in grps: for gsim_idx, rlzis in enumerate(self.rlzs_by_grp[grp]): for r in rlzis: if r == rlzi: pmap |= self._pmap_by_grp[grp].extract(gsim_idx) break return pmap
[docs] def get_pcurves(self, sid): # used in classical """ :returns: a list of R probability curves with shape L """ pmap_by_grp = self.init() L = len(self.imtls.array) pcurves = [probability_map.ProbabilityCurve(numpy.zeros((L, 1))) for _ in range(self.num_rlzs)] for grp, pmap in pmap_by_grp.items(): try: pc = pmap[sid] except KeyError: # no hazard for sid continue for gsim_idx, rlzis in enumerate(self.rlzs_by_grp[grp]): c = probability_map.ProbabilityCurve(pc.array[:, [gsim_idx]]) for rlzi in rlzis: pcurves[rlzi] |= c return pcurves
[docs] def items(self, kind=''): """ Extract probability maps from the datastore, possibly generating on the fly the ones corresponding to the individual realizations. Yields pairs (tag, pmap). :param kind: the kind of PoEs to extract; if not given, returns the realization if there is only one or the statistics otherwise. """ num_rlzs = len(self.weights) if not kind or kind == 'all': # use default if 'hcurves' in self.dstore: for k in sorted(self.dstore['hcurves']): yield k, self.dstore['hcurves/' + k][()] elif num_rlzs == 1: yield 'mean', self.get(0) return if 'poes' in self.dstore and kind in ('rlzs', 'all'): for rlzi in range(num_rlzs): hcurves = self.get(rlzi) yield 'rlz-%03d' % rlzi, hcurves elif 'poes' in self.dstore and kind.startswith('rlz-'): yield kind, self.get(int(kind[4:])) if 'hcurves' in self.dstore and kind == 'stats': for k in sorted(self.dstore['hcurves']): if not k.startswith('rlz'): yield k, self.dstore['hcurves/' + k][()]
[docs] def get_mean(self, grp=None): """ Compute the mean curve as a ProbabilityMap :param grp: if not None must be a string of the form "grp-XX"; in that case returns the mean considering only the contribution for group XX """ self.init() if len(self.weights) == 1: # one realization # the standard deviation is zero pmap = self.get(0, grp) for sid, pcurve in pmap.items(): array = numpy.zeros(pcurve.array.shape) array[:, 0] = pcurve.array[:, 0] pcurve.array = array return pmap else: raise NotImplementedError('multiple realizations')
[docs]class GmfDataGetter(collections.abc.Mapping): """ A dictionary-like object {sid: dictionary by realization index} """ def __init__(self, dstore, sids, num_rlzs): self.dstore = dstore self.sids = sids self.num_rlzs = num_rlzs assert len(sids) == 1, sids
[docs] def init(self): if hasattr(self, 'data'): # already initialized return self.dstore.open('r') # if not already open try: self.imts = self.dstore['gmf_data/imts'][()].split() except KeyError: # engine < 3.3 self.imts = list(self.dstore['oqparam'].imtls) self.rlzs = self.dstore['events']['rlz'] self.data = self[self.sids[0]] if not self.data: # no GMVs, return 0, counted in no_damage self.data = {rlzi: 0 for rlzi in range(self.num_rlzs)} # now some attributes set for API compatibility with the GmfGetter # number of ground motion fields # dictionary rlzi -> array(imts, events, nbytes) self.E = len(self.rlzs)
[docs] def get_hazard(self, gsim=None): """ :param gsim: ignored :returns: an dict rlzi -> datadict """ return self.data
def __getitem__(self, sid): dset = self.dstore['gmf_data/data'] idxs = self.dstore['gmf_data/indices'][sid] if idxs.dtype.name == 'uint32': # scenario idxs = [idxs] elif not idxs.dtype.names: # engine >= 3.2 idxs = zip(*idxs) data = [dset[start:stop] for start, stop in idxs] if len(data) == 0: # site ID with no data return {} return group_by_rlz(numpy.concatenate(data), self.rlzs) def __iter__(self): return iter(self.sids) def __len__(self): return len(self.sids)
[docs]class GmfGetter(object): """ An hazard getter with methods .get_gmfdata and .get_hazard returning ground motion values. """ def __init__(self, rupgetter, srcfilter, oqparam): self.rlzs_by_gsim = rupgetter.rlzs_by_gsim self.rupgetter = rupgetter self.srcfilter = srcfilter self.sitecol = srcfilter.sitecol.complete self.oqparam = oqparam self.min_iml = oqparam.min_iml self.N = len(self.sitecol) self.num_rlzs = sum(len(rlzs) for rlzs in self.rlzs_by_gsim.values()) self.gmv_dt = oqparam.gmf_data_dt() self.sig_eps_dt = sig_eps_dt(oqparam.imtls) M32 = (F32, len(oqparam.imtls)) self.gmv_eid_dt = numpy.dtype([('gmv', M32), ('eid', U64)]) self.cmaker = ContextMaker( rupgetter.trt, rupgetter.rlzs_by_gsim, calc.filters.IntegrationDistance(oqparam.maximum_distance) if isinstance(oqparam.maximum_distance, dict) else oqparam.maximum_distance, {'filter_distance': oqparam.filter_distance}) self.correl_model = oqparam.correl_model @property def sids(self): return self.sitecol.sids @property def imtls(self): return self.oqparam.imtls @property def imts(self): return list(self.oqparam.imtls)
[docs] def init(self): """ Initialize the computers. Should be called on the workers """ if hasattr(self, 'computers'): # init already called return with hdf5.File(self.rupgetter.filename, 'r') as parent: self.weights = parent['weights'][()] self.computers = [] for ebr in self.rupgetter.get_ruptures(self.srcfilter): sitecol = self.sitecol.filtered(ebr.sids) try: computer = calc.gmf.GmfComputer( ebr, sitecol, self.oqparam.imtls, self.cmaker, self.oqparam.truncation_level, self.correl_model) except FarAwayRupture: # due to numeric errors, ruptures within the maximum_distance # when written, can be outside when read; I found a case with # a distance of 99.9996936 km over a maximum distance of 100 km continue self.computers.append(computer)
[docs] def gen_gmfs(self): """ Compute the GMFs for the given realization and yields arrays of the dtype (sid, eid, imti, gmv), one for rupture """ self.sig_eps = [] self.eid2rlz = {} for computer in self.computers: rup = computer.rupture sids = computer.sids eids_by_rlz = rup.get_eids_by_rlz(self.rlzs_by_gsim) data = [] for gs, rlzs in self.rlzs_by_gsim.items(): num_events = sum(len(eids_by_rlz[rlzi]) for rlzi in rlzs) if num_events == 0: continue # NB: the trick for performance is to keep the call to # compute.compute outside of the loop over the realizations # it is better to have few calls producing big arrays array, sig, eps = computer.compute(gs, num_events) array = array.transpose(1, 0, 2) # from M, N, E to N, M, E for i, miniml in enumerate(self.min_iml): # gmv < minimum arr = array[:, i, :] arr[arr < miniml] = 0 n = 0 for rlzi in rlzs: eids = eids_by_rlz[rlzi] e = len(eids) if not e: continue for ei, eid in enumerate(eids): gmf = array[:, :, n + ei] # shape (N, M) tot = gmf.sum(axis=0) # shape (M,) if not tot.sum(): continue tup = tuple([eid, rlzi] + list(sig[:, n + ei]) + list(eps[:, n + ei])) self.sig_eps.append(tup) self.eid2rlz[eid] = rlzi for sid, gmv in zip(sids, gmf): if gmv.sum(): data.append((sid, eid, gmv)) n += e yield numpy.array(data, self.gmv_dt)
[docs] def get_gmfdata(self): """ :returns: an array of the dtype (sid, eid, imti, gmv) """ alldata = list(self.gen_gmfs()) if not alldata: return numpy.zeros(0, self.gmv_dt) return numpy.concatenate(alldata)
[docs] def get_hazard_by_sid(self, data=None): """ :param data: if given, an iterator of records of dtype gmf_dt :returns: sid -> records """ if data is None: data = self.get_gmfdata() return general.group_array(data, 'sid')
[docs] def compute_gmfs_curves(self, monitor): """ :returns: a dict with keys gmfdata, indices, hcurves """ oq = self.oqparam with monitor('GmfGetter.init', measuremem=True): self.init() hcurves = {} # key -> poes if oq.hazard_curves_from_gmfs: hc_mon = monitor('building hazard curves', measuremem=False) with monitor('building hazard', measuremem=True): gmfdata = self.get_gmfdata() # returned later hazard = self.get_hazard_by_sid(data=gmfdata) for sid, hazardr in hazard.items(): dic = group_by_rlz(hazardr, self.eid2rlz) for rlzi, array in dic.items(): with hc_mon: gmvs = array['gmv'] for imti, imt in enumerate(oq.imtls): poes = _gmvs_to_haz_curve( gmvs[:, imti], oq.imtls[imt], oq.ses_per_logic_tree_path) hcurves[rsi2str(rlzi, sid, imt)] = poes elif oq.ground_motion_fields: # fast lane with monitor('building hazard', measuremem=True): gmfdata = self.get_gmfdata() else: return dict(gmfdata=(), hcurves=hcurves) if len(gmfdata) == 0: return dict(gmfdata=[]) indices = [] gmfdata.sort(order=('sid', 'eid')) start = stop = 0 for sid, rows in itertools.groupby(gmfdata['sid']): for row in rows: stop += 1 indices.append((sid, start, stop)) start = stop res = dict(gmfdata=gmfdata, hcurves=hcurves, sig_eps=numpy.array(self.sig_eps, self.sig_eps_dt), indices=numpy.array(indices, (U32, 3))) return res
[docs]def group_by_rlz(data, eid2rlz): """ :param data: a composite array of D elements with a field `eid` :param eid2rlz: an array of E >= D elements or a dictionary :returns: a dictionary rlzi -> data for each realization """ acc = general.AccumDict(accum=[]) for rec in data: acc[eid2rlz[rec['eid']]].append(rec) return {rlzi: numpy.array(recs) for rlzi, recs in acc.items()}
[docs]def gen_rupture_getters(dstore, slc=slice(None), concurrent_tasks=1, hdf5cache=None): """ :yields: RuptureGetters """ if dstore.parent: dstore = dstore.parent csm_info = dstore['csm_info'] trt_by_grp = csm_info.grp_by("trt") samples = csm_info.get_samples_by_grp() rlzs_by_gsim = csm_info.get_rlzs_by_gsim_grp() rup_array = dstore['ruptures'][slc] maxweight = numpy.ceil(len(rup_array) / (concurrent_tasks or 1)) nr, ne, first_event = 0, 0, 0 for grp_id, arr in general.group_array(rup_array, 'grp_id').items(): if not rlzs_by_gsim[grp_id]: # this may happen if a source model has no sources, like # in event_based_risk/case_3 continue for block in general.block_splitter(arr, maxweight): rgetter = RuptureGetter( hdf5cache or dstore.filename, numpy.array(block), grp_id, trt_by_grp[grp_id], samples[grp_id], rlzs_by_gsim[grp_id], first_event) rgetter.weight = getattr(block, 'weight', len(block)) first_event += rgetter.num_events yield rgetter nr += len(block) ne += rgetter.num_events logging.info('Read %d ruptures and %d events', nr, ne)
[docs]def get_maxloss_rupture(dstore, loss_type): """ :param dstore: a DataStore instance :param loss_type: a loss type string :returns: EBRupture instance corresponding to the maximum loss for the given loss type """ lti = dstore['oqparam'].lti[loss_type] ridx = dstore.get_attr('rup_loss_table', 'ridx')[lti] [rgetter] = gen_rupture_getters(dstore, slice(ridx, ridx + 1)) [ebr] = rgetter.get_ruptures() return ebr
# this is never called directly; gen_rupture_getters is used instead
[docs]class RuptureGetter(object): """ Iterable over ruptures. :param filename: path to an HDF5 file with a dataset names `ruptures` :param rup_indices: a list of rupture indices of the same group """ def __init__(self, filename, rup_indices, grp_id, trt, samples, rlzs_by_gsim, first_event=0): self.filename = filename self.rup_indices = rup_indices if not isinstance(rup_indices, list): # is a rup_array self.__dict__['rup_array'] = rup_indices self.__dict__['num_events'] = int(rup_indices['n_occ'].sum()) self.grp_id = grp_id self.trt = trt self.samples = samples self.rlzs_by_gsim = rlzs_by_gsim self.first_event = first_event self.rlz2idx = {} nr = 0 rlzi = [] for gsim, rlzs in rlzs_by_gsim.items(): assert not isinstance(gsim, str) for rlz in rlzs: self.rlz2idx[rlz] = nr rlzi.append(rlz) nr += 1 self.rlzs = numpy.array(rlzi) @general.cached_property def rup_array(self): with hdf5.File(self.filename, 'r') as h5: return h5['ruptures'][self.rup_indices] # must be a list @general.cached_property def num_events(self): n_occ = self.rup_array['n_occ'].sum() ne = n_occ if self.samples > 1 else n_occ * len(self.rlzs) return int(ne) @property def num_ruptures(self): return len(self.rup_indices) @property def num_rlzs(self): return len(self.rlz2idx) # used in ebrisk
[docs] def set_weights(self, src_filter, num_taxonomies_by_site): """ :returns: the weights of the ruptures in the getter """ weights = [] for rup in self.rup_array: sids = src_filter.close_sids(rup, self.trt, rup['mag']) weights.append(num_taxonomies_by_site[sids].sum()) self.weights = numpy.array(weights) self.weight = self.weights.sum()
[docs] def split(self, maxweight): """ :yields: RuptureGetters with weight <= maxweight """ # NB: can be called only after .set_weights() has been called idx = {ri: i for i, ri in enumerate(self.rup_indices)} fe = self.first_event for rup_indices in general.block_splitter( self.rup_indices, maxweight, lambda ri: self.weights[idx[ri]]): if rup_indices: # some indices may have weight 0 and are discarded rgetter = self.__class__( self.filename, list(rup_indices), self.grp_id, self.trt, self.samples, self.rlzs_by_gsim, fe) fe += rgetter.num_events rgetter.weight = sum([self.weights[idx[ri]] for ri in rup_indices]) yield rgetter
[docs] def get_eid_rlz(self, monitor=None): """ :returns: a composite array with the associations eid->rlz """ eid_rlz = [] for rup in self.rup_array: ebr = EBRupture(mock.Mock(serial=rup['serial']), rup['srcidx'], self.grp_id, rup['n_occ'], self.samples) for rlz, eids in ebr.get_eids_by_rlz(self.rlzs_by_gsim).items(): for eid in eids: eid_rlz.append((eid, rlz)) return numpy.array(eid_rlz, [('eid', U64), ('rlz', U16)])
[docs] def get_rupdict(self): """ :returns: a dictionary with the parameters of the rupture """ assert len(self.rup_array) == 1, 'Please specify a slice of length 1' dic = {'trt': self.trt, 'samples': self.samples} with datastore.read(self.filename) as dstore: rupgeoms = dstore['rupgeoms'] source_ids = dstore['source_info']['source_id'] rec = self.rup_array[0] geom = rupgeoms[rec['gidx1']:rec['gidx2']].reshape( rec['sy'], rec['sz']) dic['lons'] = geom['lon'] dic['lats'] = geom['lat'] dic['deps'] = geom['depth'] rupclass, surclass = code2cls[rec['code']] dic['rupture_class'] = rupclass.__name__ dic['surface_class'] = surclass.__name__ dic['hypo'] = rec['hypo'] dic['occurrence_rate'] = rec['occurrence_rate'] dic['grp_id'] = rec['grp_id'] dic['n_occ'] = rec['n_occ'] dic['serial'] = rec['serial'] dic['mag'] = rec['mag'] dic['srcid'] = source_ids[rec['srcidx']] return dic
[docs] def get_ruptures(self, srcfilter=calc.filters.nofilter): """ :returns: a list of EBRuptures filtered by bounding box """ ebrs = [] with datastore.read(self.filename) as dstore: rupgeoms = dstore['rupgeoms'] for rec in self.rup_array: if srcfilter.integration_distance: sids = srcfilter.close_sids(rec, self.trt, rec['mag']) if len(sids) == 0: # the rupture is far away continue else: sids = None mesh = numpy.zeros((3, rec['sy'], rec['sz']), F32) geom = rupgeoms[rec['gidx1']:rec['gidx2']].reshape( rec['sy'], rec['sz']) mesh[0] = geom['lon'] mesh[1] = geom['lat'] mesh[2] = geom['depth'] rupture_cls, surface_cls = code2cls[rec['code']] rupture = object.__new__(rupture_cls) rupture.serial = rec['serial'] rupture.surface = object.__new__(surface_cls) rupture.mag = rec['mag'] rupture.rake = rec['rake'] rupture.hypocenter = geo.Point(*rec['hypo']) rupture.occurrence_rate = rec['occurrence_rate'] rupture.tectonic_region_type = self.trt if surface_cls is geo.PlanarSurface: rupture.surface = geo.PlanarSurface.from_array( mesh[:, 0, :]) elif surface_cls is geo.MultiSurface: # mesh has shape (3, n, 4) rupture.surface.__init__([ geo.PlanarSurface.from_array(mesh[:, i, :]) for i in range(mesh.shape[1])]) elif surface_cls is geo.GriddedSurface: # fault surface, strike and dip will be computed rupture.surface.strike = rupture.surface.dip = None rupture.surface.mesh = Mesh(*mesh) else: # fault surface, strike and dip will be computed rupture.surface.strike = rupture.surface.dip = None rupture.surface.__init__(RectangularMesh(*mesh)) grp_id = rec['grp_id'] ebr = EBRupture(rupture, rec['srcidx'], grp_id, rec['n_occ'], self.samples) # not implemented: rupture_slip_direction ebr.sids = sids ebrs.append(ebr) return ebrs
[docs] def E2R(self, array, rlzi): """ :param array: an array of shape (E, ...) :param rlzi: an array of E realization indices :returns: an aggregated array of shape (R, ...) """ z = numpy.zeros((self.num_rlzs,) + array.shape[1:], array.dtype) for a, r in zip(array, rlzi): z[self.rlz2idx[r]] += a return z
def __len__(self): return len(self.rup_indices) def __repr__(self): wei = ' [w=%d]' % self.weight if hasattr(self, 'weight') else '' return '<%s grp_id=%d, %d rupture(s)%s>' % ( self.__class__.__name__, self.grp_id, len(self.rup_indices), wei)