# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2018 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import collections
import itertools
import operator
import logging
import numpy
from openquake.baselib import hdf5
from openquake.baselib.general import (
AccumDict, groupby, group_array, get_array, block_splitter)
from openquake.hazardlib.gsim.base import ContextMaker, FarAwayRupture
from openquake.hazardlib import calc, geo, probability_map, stats
from openquake.hazardlib.geo.mesh import Mesh, RectangularMesh
from openquake.hazardlib.source.rupture import BaseRupture, EBRupture, classes
from openquake.risklib.riskinput import rsi2str
from openquake.commonlib.calc import _gmvs_to_haz_curve
U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
U64 = numpy.uint64
BaseRupture.init() # initialize rupture codes
[docs]class PmapGetter(object):
"""
Read hazard curves from the datastore for all realizations or for a
specific realization.
:param dstore: a DataStore instance or file system path to it
:param sids: the subset of sites to consider (if None, all sites)
:param rlzs_assoc: a RlzsAssoc instance (if None, infers it)
"""
def __init__(self, dstore, rlzs_assoc=None, sids=None):
self.dstore = dstore
self.sids = dstore['sitecol'].sids if sids is None else sids
self.rlzs_assoc = rlzs_assoc or dstore['csm_info'].get_rlzs_assoc()
self.eids = None
self.nbytes = 0
self.sids = sids
@property
def weights(self):
return [rlz.weight for rlz in self.rlzs_assoc.realizations]
[docs] def init(self):
"""
Read the poes and set the .data attribute with the hazard curves
"""
if hasattr(self, 'data'): # already initialized
return
if isinstance(self.dstore, str):
self.dstore = hdf5.File(self.dstore, 'r')
else:
self.dstore.open('r') # if not
if self.sids is None:
self.sids = self.dstore['sitecol'].sids
oq = self.dstore['oqparam']
self.imtls = oq.imtls
self.poes = oq.poes
self.data = collections.OrderedDict()
try:
hcurves = self.get_hcurves(self.imtls) # shape (R, N)
except IndexError: # no data
return
for sid, hcurve_by_rlz in zip(self.sids, hcurves.T):
self.data[sid] = datadict = {}
for rlzi, hcurve in enumerate(hcurve_by_rlz):
datadict[rlzi] = lst = [None for imt in self.imtls]
for imti, imt in enumerate(self.imtls):
lst[imti] = hcurve[imt] # imls
@property
def pmap_by_grp(self):
"""
:returns: dictionary "grp-XXX" -> ProbabilityMap instance
"""
if hasattr(self, '_pmap_by_grp'): # already called
return self._pmap_by_grp
# populate _pmap_by_grp
self._pmap_by_grp = {}
if 'poes' in self.dstore:
# build probability maps restricted to the given sids
ok_sids = set(self.sids)
for grp, dset in self.dstore['poes'].items():
ds = dset['array']
L, I = ds.shape[1:]
pmap = probability_map.ProbabilityMap(L, I)
for idx, sid in enumerate(dset['sids'].value):
if sid in ok_sids:
pmap[sid] = probability_map.ProbabilityCurve(ds[idx])
self._pmap_by_grp[grp] = pmap
self.nbytes += pmap.nbytes
return self._pmap_by_grp
[docs] def get_hazard(self, gsim=None):
"""
:param gsim: ignored
:returns: an OrderedDict rlzi -> datadict
"""
return self.data
[docs] def get(self, rlzi, grp=None):
"""
:param rlzi: a realization index
:param grp: None (all groups) or a string of the form "grp-XX"
:returns: the hazard curves for the given realization
"""
self.init()
assert self.sids is not None
pmap = probability_map.ProbabilityMap(len(self.imtls.array), 1)
grps = [grp] if grp is not None else sorted(self.pmap_by_grp)
array = self.rlzs_assoc.by_grp()
for grp in grps:
for gsim_idx, rlzis in array[grp]:
for r in rlzis:
if r == rlzi:
pmap |= self.pmap_by_grp[grp].extract(gsim_idx)
break
return pmap
[docs] def get_pmaps(self, sids): # used in classical
"""
:param sids: an array of S site IDs
:returns: a list of R probability maps
"""
return self.rlzs_assoc.combine_pmaps(self.pmap_by_grp)
[docs] def get_hcurves(self, imtls=None):
"""
:param imtls: intensity measure types and levels
:returns: an array of (R, N) hazard curves
"""
self.init()
if imtls is None:
imtls = self.imtls
pmaps = [pmap.convert2(imtls, self.sids)
for pmap in self.get_pmaps(self.sids)]
return numpy.array(pmaps)
[docs] def items(self, kind=''):
"""
Extract probability maps from the datastore, possibly generating
on the fly the ones corresponding to the individual realizations.
Yields pairs (tag, pmap).
:param kind:
the kind of PoEs to extract; if not given, returns the realization
if there is only one or the statistics otherwise.
"""
num_rlzs = len(self.weights)
if not kind: # use default
if 'hcurves' in self.dstore:
for k in sorted(self.dstore['hcurves']):
yield k, self.dstore['hcurves/' + k].value
elif num_rlzs == 1:
yield 'mean', self.get(0)
return
if 'poes' in self.dstore and kind in ('rlzs', 'all'):
for rlzi in range(num_rlzs):
hcurves = self.get(rlzi)
yield 'rlz-%03d' % rlzi, hcurves
elif 'poes' in self.dstore and kind.startswith('rlz-'):
yield kind, self.get(int(kind[4:]))
if 'hcurves' in self.dstore and kind in ('stats', 'all'):
for k in sorted(self.dstore['hcurves']):
yield k, self.dstore['hcurves/' + k].value
[docs] def get_mean(self, grp=None):
"""
Compute the mean curve as a ProbabilityMap
:param grp:
if not None must be a string of the form "grp-XX"; in that case
returns the mean considering only the contribution for group XX
"""
self.init()
if len(self.weights) == 1: # one realization
# the standard deviation is zero
pmap = self.get(0, grp)
for sid, pcurve in pmap.items():
array = numpy.zeros(pcurve.array.shape[:-1] + (2,))
array[:, 0] = pcurve.array[:, 0]
pcurve.array = array
return pmap
else: # multiple realizations, assume hcurves/mean is there
dic = ({g: self.dstore['poes/' + g] for g in self.dstore['poes']}
if grp is None else {grp: self.dstore['poes/' + grp]})
return self.rlzs_assoc.compute_pmap_stats(
dic, [stats.mean_curve, stats.std_curve])
[docs]class GmfDataGetter(collections.Mapping):
"""
A dictionary-like object {sid: dictionary by realization index}
"""
def __init__(self, dstore, sids, num_rlzs, imtls):
self.dstore = dstore
self.sids = sids
self.num_rlzs = num_rlzs
self.imtls = imtls
[docs] def init(self):
if hasattr(self, 'data'): # already initialized
return
self.dstore.open('r') # if not already open
self.eids = self.dstore['events']['eid']
self.eids.sort()
self.data = collections.OrderedDict()
for sid in self.sids:
self.data[sid] = data = self[sid]
if not data: # no GMVs, return 0, counted in no_damage
self.data[sid] = {rlzi: 0 for rlzi in range(self.num_rlzs)}
# dictionary eid -> index
if self.eids is not None:
self.eid2idx = dict(zip(self.eids, range(len(self.eids))))
# now some attributes set for API compatibility with the GmfGetter
# number of ground motion fields
# dictionary rlzi -> array(imts, events, nbytes)
self.gmdata = AccumDict(accum=numpy.zeros(len(self.imtls) + 1, F32))
[docs] def get_hazard(self, gsim=None):
"""
:param gsim: ignored
:returns: an OrderedDict rlzi -> datadict
"""
return self.data
def __getitem__(self, sid):
dset = self.dstore['gmf_data/data']
idxs = self.dstore['gmf_data/indices'][sid]
if idxs.dtype.name == 'uint32': # scenario
idxs = [idxs]
elif not idxs.dtype.names: # engine >= 3.2
idxs = zip(*idxs)
data = [dset[start:stop] for start, stop in idxs]
if len(data) == 0: # site ID with no data
return {}
return group_array(numpy.concatenate(data), 'rlzi')
def __iter__(self):
return iter(self.sids)
def __len__(self):
return len(self.sids)
[docs]class GmfGetter(object):
"""
An hazard getter with methods .gen_gmv and .get_hazard returning
ground motion values.
"""
def __init__(self, rlzs_by_gsim, ebruptures, sitecol, oqparam,
min_iml, samples=1):
assert sitecol is sitecol.complete, sitecol
self.rlzs_by_gsim = rlzs_by_gsim
self.ebruptures = ebruptures
self.sitecol = sitecol
self.oqparam = oqparam
self.min_iml = min_iml
self.N = len(self.sitecol.complete)
self.num_rlzs = sum(len(rlzs) for rlzs in self.rlzs_by_gsim.values())
self.I = len(oqparam.imtls)
self.gmv_dt = numpy.dtype(
[('sid', U32), ('eid', U64), ('gmv', (F32, (self.I,)))])
self.gmv_eid_dt = numpy.dtype(
[('gmv', (F32, (self.I,))), ('eid', U64)])
self.cmaker = ContextMaker(
rlzs_by_gsim,
calc.filters.IntegrationDistance(oqparam.maximum_distance)
if isinstance(oqparam.maximum_distance, dict)
else oqparam.maximum_distance,
{'filter_distance': oqparam.filter_distance})
self.correl_model = oqparam.correl_model
self.samples = samples
@property
def sids(self):
return self.sitecol.sids
@property
def imtls(self):
return self.oqparam.imtls
[docs] def init(self):
"""
Initialize the computers. Should be called on the workers
"""
if hasattr(self, 'eids'): # init already called
return
self.computers = []
eids = []
for ebr in self.ebruptures:
try:
computer = calc.gmf.GmfComputer(
ebr, self.sitecol, self.oqparam.imtls, self.cmaker,
self.oqparam.truncation_level, self.correl_model)
except FarAwayRupture:
# due to numeric errors ruptures within the maximum_distance
# when written can be outside when read; I found a case with
# a distance of 99.9996936 km over a maximum distance of 100 km
continue
self.computers.append(computer)
eids.append(ebr.events['eid'])
self.eids = numpy.concatenate(eids) if eids else []
# dictionary rlzi -> array(imtls, events, nbytes)
self.gmdata = AccumDict(accum=numpy.zeros(self.I + 1, F32))
# dictionary eid -> index
self.eid2idx = dict(zip(self.eids, range(len(self.eids))))
[docs] def gen_gmv(self):
"""
Compute the GMFs for the given realization and populate the .gmdata
array. Yields tuples of the form (sid, eid, imti, gmv).
"""
sample = 0 # in case of sampling the realizations have a corresponding
# sample number from 0 to the number of samples of the given src model
for gs in self.rlzs_by_gsim: # OrderedDict
rlzs = self.rlzs_by_gsim[gs]
for computer in self.computers:
rup = computer.rupture
sids = computer.sids
if self.samples > 1:
# events of the current slice of realizations
all_eids = [get_array(rup.events, sample=s)['eid']
for s in range(sample, sample + len(rlzs))]
else:
all_eids = [rup.events['eid']] * len(rlzs)
num_events = sum(len(eids) for eids in all_eids)
# NB: the trick for performance is to keep the call to
# compute.compute outside of the loop over the realizations
# it is better to have few calls producing big arrays
array = computer.compute(gs, num_events).transpose(1, 0, 2)
# shape (N, I, E)
for i, miniml in enumerate(self.min_iml): # gmv < minimum
arr = array[:, i, :]
arr[arr < miniml] = 0
n = 0
for r, rlzi in enumerate(rlzs):
e = len(all_eids[r])
gmdata = self.gmdata[rlzi]
gmdata[-1] += e # increase number of events
for ei, eid in enumerate(all_eids[r]):
gmf = array[:, :, n + ei] # shape (N, I)
tot = gmf.sum(axis=0) # shape (I,)
if not tot.sum():
continue
for i, val in enumerate(tot):
gmdata[i] += val
for sid, gmv in zip(sids, gmf):
if gmv.sum():
yield rlzi, sid, eid, gmv
n += e
sample += len(rlzs)
[docs] def get_hazard(self, data=None):
"""
:param data: if given, an iterator of records of dtype gmf_data_dt
:returns: an array (rlzi, sid, imti) -> array(gmv, eid)
"""
if data is None:
data = self.gen_gmv()
hazard = numpy.array([collections.defaultdict(list)
for _ in range(self.N)])
for rlzi, sid, eid, gmv in data:
hazard[sid][rlzi].append((gmv, eid))
for haz in hazard:
for rlzi in haz:
haz[rlzi] = numpy.array(haz[rlzi], self.gmv_eid_dt)
return hazard
[docs] def compute_gmfs_curves(self, monitor):
"""
:returns: a dict with keys gmdata, gmfdata, indices, hcurves
"""
oq = self.oqparam
dt = oq.gmf_data_dt()
with monitor('GmfGetter.init', measuremem=True):
self.init()
hcurves = {} # key -> poes
if oq.hazard_curves_from_gmfs:
hc_mon = monitor('building hazard curves', measuremem=False)
duration = oq.investigation_time * oq.ses_per_logic_tree_path
with monitor('building hazard', measuremem=True):
gmfdata = numpy.fromiter(self.gen_gmv(), dt)
hazard = self.get_hazard(data=gmfdata)
for sid, hazardr in zip(self.sids, hazard):
for rlzi, array in hazardr.items():
if len(array) == 0: # no data
continue
with hc_mon:
gmvs = array['gmv']
for imti, imt in enumerate(oq.imtls):
poes = _gmvs_to_haz_curve(
gmvs[:, imti], oq.imtls[imt],
oq.investigation_time, duration)
hcurves[rsi2str(rlzi, sid, imt)] = poes
elif oq.ground_motion_fields: # fast lane
with monitor('building hazard', measuremem=True):
gmfdata = numpy.fromiter(self.gen_gmv(), dt)
else:
return {}
indices = []
gmfdata.sort(order=('sid', 'rlzi', 'eid'))
start = stop = 0
for sid, rows in itertools.groupby(gmfdata['sid']):
for row in rows:
stop += 1
indices.append((sid, start, stop))
start = stop
res = dict(gmfdata=gmfdata, hcurves=hcurves, gmdata=self.gmdata,
indices=numpy.array(indices, (U32, 3)))
return res
[docs]def get_ruptures_by_grp(dstore, slice_=slice(None)):
"""
Extracts the ruptures corresponding to the given slice. If missing,
extract all ruptures.
:returns: a dictionary grp_id -> list of EBRuptures
"""
if slice_.stop is None:
n = len(dstore['ruptures']) - (slice_.start or 0)
logging.info('Reading %d ruptures from the datastore', n)
rgetter = RuptureGetter(dstore, slice_)
return groupby(rgetter, operator.attrgetter('grp_id'))
[docs]def get_maxloss_rupture(dstore, loss_type):
"""
:param dstore: a DataStore instance
:param loss_type: a loss type string
:returns:
EBRupture instance corresponding to the maximum loss for the
given loss type
"""
lti = dstore['oqparam'].lti[loss_type]
ridx = dstore.get_attr('rup_loss_table', 'ridx')[lti]
[ebr] = RuptureGetter(dstore, slice(ridx, ridx + 1))
return ebr
[docs]class RuptureGetter(object):
"""
Iterable over ruptures.
:param dstore:
a DataStore instance with a dataset names `ruptures`
:param mask:
which ruptures to read; it can be:
- None: read all ruptures
- a slice
- a boolean mask
- a list of integers
:param grp_id:
the group ID of the ruptures, if they are homogeneous, or None
"""
[docs] @classmethod
def from_(cls, dstore):
"""
:returns: a dictionary grp_id -> RuptureGetter instance
"""
array = dstore['ruptures'].value
grp_ids = numpy.unique(array['grp_id'])
return {grp_id: cls(dstore, array['grp_id'] == grp_id, grp_id)
for grp_id in grp_ids}
def __init__(self, dstore, mask=None, grp_id=None):
self.dstore = dstore
self.mask = slice(None) if mask is None else mask
self.grp_id = grp_id
[docs] def split(self, block_size):
"""
Split a RuptureGetter in multiple getters, each one containing a block
of ruptures.
:param block_size:
maximum length of the rupture blocks
:returns:
`RuptureGetters` containing `block_size` ruptures and with
an attribute `.n_events` counting the total number of events
"""
getters = []
indices, = self.mask.nonzero()
for block in block_splitter(indices, block_size):
idxs = list(block) # not numpy.int_(block)!
rgetter = self.__class__(self.dstore, idxs, self.grp_id)
rup = self.dstore['ruptures'][idxs]
# use int below, otherwise n_events would be a numpy.uint64
rgetter.n_events = int((rup['eidx2'] - rup['eidx1']).sum())
getters.append(rgetter)
return getters
def __iter__(self):
self.dstore.open('r') # if needed
attrs = self.dstore.get_attrs('ruptures')
code2cls = {} # code -> rupture_cls, surface_cls
for key, val in attrs.items():
if key.startswith('code_'):
code2cls[int(key[5:])] = [classes[v] for v in val.split()]
grp_trt = self.dstore['csm_info'].grp_by("trt")
events = self.dstore['events']
ruptures = self.dstore['ruptures'][self.mask]
rupgeoms = self.dstore['rupgeoms'][self.mask]
# NB: ruptures.sort(order='serial') causes sometimes a SystemError:
# <ufunc 'greater'> returned a result with an error set
# this is way I am sorting with Python and not with numpy below
data = sorted((serial, ridx) for ridx, serial in enumerate(
ruptures['serial']))
for serial, ridx in data:
rec = ruptures[ridx]
evs = events[rec['eidx1']:rec['eidx2']]
if self.grp_id is not None and self.grp_id != rec['grp_id']:
continue
mesh = numpy.zeros((3, rec['sy'], rec['sz']), F32)
for i, arr in enumerate(rupgeoms[ridx]): # i = 0, 1, 2
mesh[i] = arr.reshape(rec['sy'], rec['sz'])
rupture_cls, surface_cls = code2cls[rec['code']]
rupture = object.__new__(rupture_cls)
rupture.serial = serial
rupture.surface = object.__new__(surface_cls)
rupture.mag = rec['mag']
rupture.rake = rec['rake']
rupture.seed = rec['seed']
rupture.hypocenter = geo.Point(*rec['hypo'])
rupture.occurrence_rate = rec['occurrence_rate']
rupture.tectonic_region_type = grp_trt[rec['grp_id']]
pmfx = rec['pmfx']
if pmfx != -1:
rupture.pmf = self.dstore['pmfs'][pmfx]
if surface_cls is geo.PlanarSurface:
rupture.surface = geo.PlanarSurface.from_array(mesh[:, 0, :])
elif surface_cls is geo.MultiSurface:
# mesh has shape (3, n, 4)
rupture.surface.__init__([
geo.PlanarSurface.from_array(mesh[:, i, :])
for i in range(mesh.shape[1])])
elif surface_cls is geo.GriddedSurface:
# fault surface, strike and dip will be computed
rupture.surface.strike = rupture.surface.dip = None
rupture.surface.mesh = Mesh(*mesh)
else:
# fault surface, strike and dip will be computed
rupture.surface.strike = rupture.surface.dip = None
rupture.surface.__init__(RectangularMesh(*mesh))
ebr = EBRupture(rupture, (), evs)
ebr.eidx1 = rec['eidx1']
ebr.eidx2 = rec['eidx2']
# not implemented: rupture_slip_direction
yield ebr
def __len__(self):
if hasattr(self.mask, 'start'): # is a slice
if self.mask.start is None and self.mask.stop is None:
return len(self.dstore['ruptures'])
else:
return self.mask.stop - self.mask.start
elif isinstance(self.mask, list):
# NB: h5py wants lists, not arrays of indices
return len(self.mask)
else: # is a boolean mask
return self.mask.sum()