# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2019 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import copy
import logging
import collections
from urllib.parse import unquote_plus
import numpy
from openquake.baselib import hdf5
from openquake.baselib.general import (
AccumDict, group_array, cached_property)
from openquake.risklib import scientific, riskmodels
[docs]class ValidationError(Exception):
pass
U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
[docs]class TaxonomyMapping(dict):
"""
A dictionary taxonomy -> kind -> {ids, weights}
serializable to HDF5 as an array with fields
taxonomy, fragility_ids, fragility_weights, consequence_ids,
consequence_weights, vulnerability_ids, vulnerability_weights.
"""
dt = numpy.dtype([('taxonomy', hdf5.vstr),
('fragility_ids', hdf5.vstr),
('fragility_weights', hdf5.vfloat64),
('consequence_ids', hdf5.vstr),
('consequence_weights', hdf5.vfloat64),
('vulnerability_ids', hdf5.vstr),
('vulnerability_weights', hdf5.vfloat64)])
def __toh5__(self):
data = []
for taxonomy, dic in self.items():
row = (taxonomy,
' '.join(dic['fragility']['ids']),
numpy.array(dic['fragility']['weights']),
' '.join(dic['consequence']['ids']),
numpy.array(dic['consequence']['weights']),
' '.join(dic['vulnerability']['ids']),
numpy.array(dic['vulnerability']['weights']))
data.append(row)
return numpy.array(data, self.dt), {}
def __fromh5__(self, array, dic):
for rec in array:
f = dict(ids=rec['fragility_ids'].split(),
weights=rec['fragility_weights'])
c = dict(ids=rec['consequence_ids'].split(),
weights=rec['consequence_weights'])
v = dict(ids=rec['vulnerability_ids'].split(),
weights=rec['vulnerability_weights'])
self[rec['taxonomy']] = dict(
fragility=f, consequence=c, vulnerability=v)
[docs]def get_assets_by_taxo(assets, epspath=None):
"""
:param assets: an array of assets
:param epspath: hdf5 file where the epsilons are (or None)
:returns: assets_by_taxo with attributes eps and idxs
"""
assets_by_taxo = AccumDict(group_array(assets, 'taxonomy'))
assets_by_taxo.idxs = numpy.argsort(numpy.concatenate([
a['ordinal'] for a in assets_by_taxo.values()]))
assets_by_taxo.eps = {}
if epspath is None: # no epsilons
return assets_by_taxo
# otherwise read the epsilons and group them by taxonomy
with hdf5.File(epspath, 'r') as h5:
dset = h5['epsilon_matrix']
for taxo, assets in assets_by_taxo.items():
lst = [dset[aid] for aid in assets['ordinal']]
assets_by_taxo.eps[taxo] = numpy.array(lst)
return assets_by_taxo
[docs]class CompositeRiskModel(collections.Mapping):
"""
A container (riskid, kind) -> riskmodel
:param oqparam:
an :class:`openquake.commonlib.oqvalidation.OqParam` instance
:param tmap:
a taxonomy mapping
:param fragdict:
a dictionary riskid -> loss_type -> fragility functions
:param vulndict:
a dictionary riskid -> loss_type -> vulnerability function
:param consdict:
a dictionary riskid -> loss_type -> consequence functions
:param retrodict:
a dictionary riskid -> loss_type -> vulnerability function
"""
[docs] @classmethod
def read(cls, dstore):
"""
:param dstore: a DataStore instance
:returns: a :class:`CompositeRiskModel` instance
"""
oqparam = dstore['oqparam']
tmap = (dstore['taxonomy_mapping'] if 'taxonomy_mapping' in dstore
else {})
crm = dstore.getitem('risk_model')
# building dictionaries riskid -> loss_type -> risk_func
fragdict, vulndict, consdict, retrodict = (
AccumDict(), AccumDict(), AccumDict(), AccumDict())
fragdict.limit_states = crm.attrs['limit_states']
for quoted_id, rm in crm.items():
riskid = unquote_plus(quoted_id)
fragdict[riskid] = {}
vulndict[riskid] = {}
consdict[riskid] = {}
retrodict[riskid] = {}
for lt_kind in rm:
lt, kind = lt_kind.rsplit('-', 1)
rf = dstore['risk_model/%s/%s' % (quoted_id, lt_kind)]
if kind == 'consequence':
consdict[riskid][lt, kind] = rf
elif kind == 'fragility': # rf is a FragilityFunctionList
try:
rf = rf.build(
fragdict.limit_states,
oqparam.continuous_fragility_discretization,
oqparam.steps_per_interval)
except ValueError as err:
raise ValueError('%s: %s' % (riskid, err))
fragdict[riskid][lt, kind] = rf
else: # rf is a vulnerability function
rf.init()
if lt.endswith('_retrofitted'):
# strip _retrofitted, since len('_retrofitted') = 12
retrodict[riskid][lt[:-12], kind] = rf
else:
vulndict[riskid][lt, kind] = rf
return CompositeRiskModel(
oqparam, tmap, fragdict, vulndict, consdict, retrodict)
def __init__(self, oqparam, tmap, fragdict, vulndict, consdict, retrodict):
self.tmap = tmap
self.damage_states = []
self._riskmodels = {} # riskid -> riskmodel
self.consequences = sum(len(vals) for vals in consdict.values())
if sum(len(v) for v in fragdict.values()):
# classical_damage/scenario_damage calculator
if oqparam.calculation_mode in ('classical', 'scenario'):
# case when the risk files are in the job_hazard.ini file
oqparam.calculation_mode += '_damage'
if 'exposure' not in oqparam.inputs:
raise RuntimeError(
'There are risk files in %r but not '
'an exposure' % oqparam.inputs['job_ini'])
self.damage_states = ['no_damage'] + list(fragdict.limit_states)
for riskid, ffs_by_lt in fragdict.items():
self._riskmodels[riskid] = (
riskmodels.get_riskmodel(
riskid, oqparam, fragility_functions=ffs_by_lt,
vulnerability_functions=vulndict[riskid],
consequence_functions=consdict[riskid]))
self.kind = 'fragility'
elif oqparam.calculation_mode.endswith('_bcr'):
# classical_bcr calculator
for (riskid, vf_orig), (riskid_, vf_retro) in \
zip(sorted(vulndict.items()), sorted(retrodict.items())):
assert riskid == riskid_ # same IDs
self._riskmodels[riskid] = (
riskmodels.get_riskmodel(
riskid, oqparam,
vulnerability_functions_orig=vf_orig,
vulnerability_functions_retro=vf_retro))
self.kind = 'vulnerability'
else:
# classical, event based and scenario calculators
for riskid, vfs in vulndict.items():
for vf in vfs.values():
# set the seed; this is important for the case of
# VulnerabilityFunctionWithPMF
vf.seed = oqparam.random_seed
self._riskmodels[riskid] = (
riskmodels.get_riskmodel(
riskid, oqparam, fragility_functions=vulndict[riskid],
vulnerability_functions=vfs))
self.kind = 'vulnerability'
self.init(oqparam)
[docs] def init(self, oqparam):
imti = {imt: i for i, imt in enumerate(oqparam.imtls)}
self.lti = {} # loss_type -> idx
self.covs = 0 # number of coefficients of variation
# build a sorted list with all the loss_types contained in the model
ltypes = set()
for rm in self.values():
ltypes.update(rm.loss_types)
self.loss_types = sorted(ltypes)
taxonomies = set()
for riskid, riskmodel in self._riskmodels.items():
taxonomies.add(riskid)
riskmodel.compositemodel = self
for lt, vf in riskmodel.risk_functions.items():
# save the number of nonzero coefficients of variation
if hasattr(vf, 'covs') and vf.covs.any():
self.covs += 1
missing = set(self.loss_types) - set(
lt for lt, kind in riskmodel.risk_functions)
if missing:
raise ValidationError(
'Missing vulnerability function for taxonomy %s and loss'
' type %s' % (riskid, ', '.join(missing)))
riskmodel.imti = {lt: imti[riskmodel.risk_functions[lt, kind].imt]
for lt, kind in riskmodel.risk_functions}
self.taxonomies = sorted(taxonomies)
self.curve_params = self.make_curve_params(oqparam)
iml = collections.defaultdict(list)
for riskid, rm in self._riskmodels.items():
for lt, rf in rm.risk_functions.items():
iml[rf.imt].append(rf.imls[0])
self.min_iml = {imt: min(iml[imt]) for imt in iml}
@cached_property
def taxonomy_dict(self):
"""
:returns: a dict taxonomy string -> taxonomy index
"""
# .taxonomy must be set by the engine
tdict = {taxo: idx for idx, taxo in enumerate(self.taxonomy)}
return tdict
[docs] def make_curve_params(self, oqparam):
# the CurveParams are used only in classical_risk, classical_bcr
# NB: populate the inner lists .loss_types too
cps = []
for l, loss_type in enumerate(self.loss_types):
if oqparam.calculation_mode in ('classical', 'classical_risk'):
curve_resolutions = set()
lines = []
allratios = []
for taxo in sorted(self):
rm = self[taxo]
if loss_type in rm.loss_ratios:
ratios = rm.loss_ratios[loss_type]
allratios.append(ratios)
curve_resolutions.add(len(ratios))
lines.append('%s %d' % (
rm.vulnerability_functions[
loss_type, 'vulnerability'], len(ratios)))
if len(curve_resolutions) > 1:
# number of loss ratios is not the same for all taxonomies:
# then use the longest array; see classical_risk case_5
allratios.sort(key=len)
for rm in self.values():
if rm.loss_ratios[loss_type] != allratios[-1]:
rm.loss_ratios[loss_type] = allratios[-1]
logging.debug('Redefining loss ratios for %s', rm)
cp = scientific.CurveParams(
l, loss_type, max(curve_resolutions), allratios[-1], True)
else: # used only to store the association l -> loss_type
cp = scientific.CurveParams(l, loss_type, 0, [], False)
cps.append(cp)
self.lti[loss_type] = l
return cps
[docs] def get_loss_ratios(self):
"""
:returns: a 1-dimensional composite array with loss ratios by loss type
"""
lst = [('user_provided', numpy.bool)]
for cp in self.curve_params:
lst.append((cp.loss_type, F32, len(cp.ratios)))
loss_ratios = numpy.zeros(1, numpy.dtype(lst))
for cp in self.curve_params:
loss_ratios['user_provided'] = cp.user_provided
loss_ratios[cp.loss_type] = tuple(cp.ratios)
return loss_ratios
def __getitem__(self, key):
if isinstance(key, (int, U16)): # a taxonomy index
key = self.taxonomy[key]
return self._riskmodels[key]
def __iter__(self):
return iter(sorted(self._riskmodels))
def __len__(self):
return len(self._riskmodels)
# used in multi_risk
[docs] def get_dmg_csq(self, assets_by_site, gmf):
"""
:returns:
an array of shape (A, L, 1, D + 1) with the number of buildings
in each damage state for each asset and loss type
"""
A = sum(len(assets) for assets in assets_by_site)
L = len(self.loss_types)
D = len(self.damage_states)
out = numpy.zeros((A, L, 1, D + 1), F32)
for assets, gmv in zip(assets_by_site, gmf):
group = group_array(assets, 'taxonomy')
for taxonomy, assets in group.items():
for l, loss_type in enumerate(self.loss_types):
fracs = self[taxonomy](loss_type, assets, [gmv])
for asset, frac in zip(assets, fracs):
dmg = asset['number'] * frac[0, :D]
csq = asset['value-' + loss_type] * frac[0, D]
out[asset['ordinal'], l, 0, :D] = dmg
out[asset['ordinal'], l, 0, D] = csq
return out
[docs] def gen_outputs(self, riskinput, monitor, epspath=None, hazard=None):
"""
Group the assets per taxonomy and compute the outputs by using the
underlying riskmodels. Yield one output per realization.
:param riskinput: a RiskInput instance
:param monitor: a monitor object used to measure the performance
"""
self.monitor = monitor
hazard_getter = riskinput.hazard_getter
if hazard is None:
with monitor('getting hazard'):
hazard_getter.init()
hazard = hazard_getter.get_hazard()
sids = hazard_getter.sids
assert len(sids) == 1
with monitor('computing risk', measuremem=False):
# this approach is slow for event_based_risk since a lot of
# small arrays are passed (one per realization) instead of
# a long array with all realizations; ebrisk does the right
# thing since it calls get_output directly
assets_by_taxo = get_assets_by_taxo(riskinput.assets, epspath)
for rlzi, haz in sorted(hazard[sids[0]].items()):
out = self.get_output(assets_by_taxo, haz, rlzi)
yield out
[docs] def get_output(self, assets_by_taxo, haz, rlzi=None):
"""
:param assets_by_taxo: a dictionary taxonomy index -> assets on a site
:param haz: an array or a dictionary of hazard on that site
:param rlzi: if given, a realization index
"""
if isinstance(haz, numpy.ndarray):
# NB: in GMF-based calculations the order in which
# the gmfs are stored is random since it depends on
# which hazard task ends first; here we reorder
# the gmfs by event ID; this is convenient in
# general and mandatory for the case of
# VulnerabilityFunctionWithPMF, otherwise the
# sample method would receive the means in random
# order and produce random results even if the
# seed is set correctly; very tricky indeed! (MS)
haz.sort(order='eid')
eids = haz['eid']
data = haz['gmv'] # shape (E, M)
elif not haz: # no hazard for this site
eids = numpy.arange(1)
data = []
else: # classical
eids = []
data = haz # shape M
dic = dict(eids=eids)
if rlzi is not None:
dic['rlzi'] = rlzi
for l, lt in enumerate(self.loss_types):
ls = []
for taxonomy, assets_ in assets_by_taxo.items():
if len(assets_by_taxo.eps):
epsilons = assets_by_taxo.eps[taxonomy][:, eids]
else: # no CoVs
epsilons = ()
rm = self[taxonomy]
if len(data) == 0:
dat = [0]
elif len(eids): # gmfs
dat = data[:, rm.imti[lt]]
else: # hcurves
dat = data[rm.imti[lt]]
ls.append(rm(lt, assets_, dat, eids, epsilons))
arr = numpy.concatenate(ls)
dic[lt] = arr[assets_by_taxo.idxs] if len(arr) else arr
return hdf5.ArrayWrapper((), dic)
[docs] def reduce(self, taxonomies):
"""
:param taxonomies: a set of taxonomies
:returns: a new CompositeRiskModel reduced to the given taxonomies
"""
new = copy.copy(self)
new.taxonomies = sorted(taxonomies)
new._riskmodels = {}
for riskid, rm in self._riskmodels.items():
if riskid in taxonomies:
new._riskmodels[riskid] = rm
rm.compositemodel = new
return new
def __toh5__(self):
loss_types = hdf5.array_of_vstr(self.loss_types)
limit_states = hdf5.array_of_vstr(self.damage_states[1:]
if self.damage_states else [])
dic = dict(covs=self.covs, loss_types=loss_types,
limit_states=limit_states)
rf = next(iter(self.values()))
if hasattr(rf, 'loss_ratios'):
for lt in self.loss_types:
dic['loss_ratios_' + lt] = rf.loss_ratios[lt]
return self._riskmodels, dic
def __repr__(self):
lines = ['%s: %s' % item for item in sorted(self.items())]
return '<%s(%d, %d)\n%s>' % (
self.__class__.__name__, len(lines), self.covs, '\n'.join(lines))
# used in scenario_risk
[docs]def make_eps(asset_array, num_samples, seed, correlation):
"""
:param asset_array: an array of assets
:param int num_samples: the number of ruptures
:param int seed: a random seed
:param float correlation: the correlation coefficient
:returns: epsilons matrix of shape (num_assets, num_samples)
"""
assets_by_taxo = group_array(asset_array, 'taxonomy')
eps = numpy.zeros((len(asset_array), num_samples), numpy.float32)
for taxonomy, assets in assets_by_taxo.items():
shape = (len(assets), num_samples)
logging.info('Building %s epsilons for taxonomy %s', shape, taxonomy)
zeros = numpy.zeros(shape)
epsilons = scientific.make_epsilons(zeros, seed, correlation)
for asset, epsrow in zip(assets, epsilons):
eps[asset['ordinal']] = epsrow
return eps
[docs]def cache_epsilons(dstore, oq, assetcol, riskmodel, E):
"""
Do nothing if there are no coefficients of variation of ignore_covs is
set. Otherwise, generate an epsilon matrix of shape (A, E) and save it
in the cache file, by returning the path to it.
"""
if oq.ignore_covs or not riskmodel.covs:
return
A = len(assetcol)
hdf5path = dstore.hdf5cache()
logging.info('Storing the epsilon matrix in %s', hdf5path)
if oq.calculation_mode == 'scenario_risk':
eps = make_eps(assetcol.array, E, oq.master_seed, oq.asset_correlation)
else: # event based
if oq.asset_correlation:
numpy.random.seed(oq.master_seed)
eps = numpy.array([numpy.random.normal(size=E)] * A)
else:
seeds = oq.master_seed + numpy.arange(E)
eps = numpy.zeros((A, E), F32)
for i, seed in enumerate(seeds):
numpy.random.seed(seed)
eps[:, i] = numpy.random.normal(size=A)
with hdf5.File(hdf5path) as cache:
cache['epsilon_matrix'] = eps
return hdf5path
[docs]def str2rsi(key):
"""
Convert a string of the form 'rlz-XXXX/sid-YYYY/ZZZ'
into a triple (XXXX, YYYY, ZZZ)
"""
rlzi, sid, imt = key.split('/')
return int(rlzi[4:]), int(sid[4:]), imt
[docs]def rsi2str(rlzi, sid, imt):
"""
Convert a triple (XXXX, YYYY, ZZZ) into a string of the form
'rlz-XXXX/sid-YYYY/ZZZ'
"""
return 'rlz-%04d/sid-%04d/%s' % (rlzi, sid, imt)