Source code for openquake.risklib.riskmodels

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2013-2019 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import re
import ast
import copy
import functools
import collections
import logging
from urllib.parse import unquote_plus
import numpy

from openquake.baselib import hdf5
from openquake.baselib.node import Node
from openquake.baselib.general import AccumDict, cached_property
from openquake.hazardlib import valid, nrml, InvalidFile
from openquake.hazardlib.sourcewriter import obj_to_node
from openquake.risklib import scientific

U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64

COST_TYPE_REGEX = '|'.join(valid.cost_type.choices)
RISK_TYPE_REGEX = re.compile(
    r'(%s|occupants|fragility)_([\w_]+)' % COST_TYPE_REGEX)


[docs]def get_risk_files(inputs): """ :param inputs: a dictionary key -> path name :returns: a pair (file_type, {risk_type: path}) """ rfs = {} job_ini = inputs['job_ini'] for key in inputs: if key == 'fragility': # backward compatibily for .ini files with key fragility_file # instead of structural_fragility_file rfs['fragility/structural'] = inputs[ 'structural_fragility'] = inputs[key] del inputs['fragility'] elif key.endswith(('_fragility', '_vulnerability', '_consequence')): match = RISK_TYPE_REGEX.match(key) if match and 'retrofitted' not in key and 'consequence' not in key: rfs['%s/%s' % (match.group(2), match.group(1))] = inputs[key] elif match is None: raise ValueError('Invalid key in %s: %s_file' % (job_ini, key)) return rfs
# ########################### vulnerability ############################## #
[docs]def filter_vset(elem): return elem.tag.endswith('discreteVulnerabilitySet')
[docs]@obj_to_node.add('VulnerabilityFunction') def build_vf_node(vf): """ Convert a VulnerabilityFunction object into a Node suitable for XML conversion. """ nodes = [Node('imls', {'imt': vf.imt}, vf.imls), Node('meanLRs', {}, vf.mean_loss_ratios), Node('covLRs', {}, vf.covs)] return Node( 'vulnerabilityFunction', {'id': vf.id, 'dist': vf.distribution_name}, nodes=nodes)
[docs]def get_risk_models(oqparam, kind='vulnerability fragility consequence ' 'vulnerability_retrofitted'): """ :param oqparam: an OqParam instance :param kind: a space-separated string with the kinds of risk models to read :returns: a dictionary riskid -> loss_type, kind -> function """ kinds = kind.split() rmodels = AccumDict() for kind in kinds: for key in sorted(oqparam.inputs): mo = re.match('(occupants|%s)_%s$' % (COST_TYPE_REGEX, kind), key) if mo: loss_type = mo.group(1) # the cost_type in the key # can be occupants, structural, nonstructural, ... rmodel = nrml.to_python(oqparam.inputs[key]) if len(rmodel) == 0: raise InvalidFile('%s is empty!' % oqparam.inputs[key]) rmodels[loss_type, kind] = rmodel if rmodel.lossCategory is None: # NRML 0.4 continue cost_type = str(rmodel.lossCategory) rmodel_kind = rmodel.__class__.__name__ kind_ = kind.replace('_retrofitted', '') # strip retrofitted if not rmodel_kind.lower().startswith(kind_): raise ValueError( 'Error in the file "%s_file=%s": is ' 'of kind %s, expected %s' % ( key, oqparam.inputs[key], rmodel_kind, kind.capitalize() + 'Model')) if cost_type != loss_type: raise ValueError( 'Error in the file "%s_file=%s": lossCategory is of ' 'type "%s", expected "%s"' % (key, oqparam.inputs[key], rmodel.lossCategory, loss_type)) rdict = AccumDict(accum={}) rdict.limit_states = [] for (loss_type, kind), rm in sorted(rmodels.items()): if kind == 'fragility': # build a copy of the FragilityModel with different IM levels newfm = rm.build(oqparam.continuous_fragility_discretization, oqparam.steps_per_interval) for (imt, riskid), ffl in sorted(newfm.items()): if not rdict.limit_states: rdict.limit_states.extend(rm.limitStates) # we are rejecting the case of loss types with different # limit states; this may change in the future assert rdict.limit_states == rm.limitStates, ( rdict.limit_states, rm.limitStates) rdict[riskid][loss_type, kind] = ffl # TODO: see if it is possible to remove the attribute # below, used in classical_damage ffl.steps_per_interval = oqparam.steps_per_interval elif kind == 'consequence': for riskid, cf in sorted(rm.items()): rdict[riskid][loss_type, kind] = cf else: # vulnerability, vulnerability_retrofitted cl_risk = oqparam.calculation_mode in ( 'classical', 'classical_risk') # only for classical_risk reduce the loss_ratios # to make sure they are strictly increasing for (imt, riskid), rf in sorted(rm.items()): rdict[riskid][loss_type, kind] = ( rf.strictly_increasing() if cl_risk else rf) return rdict
[docs]def get_values(loss_type, assets, time_event=None): """ :returns: a numpy array with the values for the given assets, depending on the loss_type. """ if loss_type == 'occupants': return assets['occupants_%s' % time_event] else: return assets['value-' + loss_type]
loss_poe_dt = numpy.dtype([('loss', F64), ('poe', F64)])
[docs]def rescale(curves, values): """ Multiply the losses in each curve of kind (losses, poes) by the corresponding value. :param curves: an array of shape (A, 2, C) :param values: an array of shape (A,) """ A, _, C = curves.shape assert A == len(values), (A, len(values)) array = numpy.zeros((A, C), loss_poe_dt) array['loss'] = [c * v for c, v in zip(curves[:, 0], values)] array['poe'] = curves[:, 1] return array
[docs]class RiskModel(object): """ Base class. Can be used in the tests as a mock. :param taxonomy: a taxonomy string :param risk_functions: a dict (loss_type, kind) -> risk_function """ time_event = None # used in scenario_risk compositemodel = None # set by get_crmodel def __init__(self, calcmode, taxonomy, risk_functions, **kw): self.calcmode = calcmode self.taxonomy = taxonomy self.risk_functions = risk_functions vars(self).update(kw) steps = kw.get('lrem_steps_per_interval') if calcmode in 'classical_risk': self.loss_ratios = { lt: tuple(vf.mean_loss_ratios_with_steps(steps)) for (lt, kind), vf in risk_functions.items()} if calcmode == 'classical_bcr': self.loss_ratios_orig = { lt: tuple(vf.mean_loss_ratios_with_steps(steps)) for (lt, kind), vf in risk_functions.items() if kind == 'vulnerability'} self.loss_ratios_retro = { lt: tuple(vf.mean_loss_ratios_with_steps(steps)) for (lt, kind), vf in risk_functions.items() if kind == 'vulnerability_retrofitted'} @property def loss_types(self): """ The list of loss types in the underlying vulnerability functions, in lexicographic order """ return sorted(lt for (lt, kind) in self.risk_functions) def __call__(self, loss_type, assets, gmvs, eids, epsilons): meth = getattr(self, self.calcmode) res = meth(loss_type, assets, gmvs, eids, epsilons) return res def __toh5__(self): return self.risk_functions, {'taxonomy': self.taxonomy} def __fromh5__(self, dic, attrs): vars(self).update(attrs) self.risk_functions = dic def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.taxonomy) # ######################## calculation methods ######################### #
[docs] def classical_risk( self, loss_type, assets, hazard_curve, eids=None, eps=None): """ :param str loss_type: the loss type considered :param assets: assets is an iterator over A :class:`openquake.risklib.scientific.Asset` instances :param hazard_curve: an array of poes :param eids: ignored, here only for API compatibility with other calculators :param eps: ignored, here only for API compatibility with other calculators :returns: a composite array (loss, poe) of shape (A, C) """ n = len(assets) vf = self.risk_functions[loss_type, 'vulnerability'] lratios = self.loss_ratios[loss_type] imls = self.hazard_imtls[vf.imt] values = get_values(loss_type, assets) lrcurves = numpy.array( [scientific.classical(vf, imls, hazard_curve, lratios)] * n) return rescale(lrcurves, values)
[docs] def event_based_risk(self, loss_type, assets, gmvs, eids, epsilons): """ :param str loss_type: the loss type considered :param assets: a list of assets on the same site and with the same taxonomy :param gmvs_eids: a pair (gmvs, eids) with E values each :param epsilons: a matrix of epsilons of shape (A, E) (or an empty tuple) :returns: an array of loss ratios of shape (A, E) """ E = len(gmvs) A = len(assets) loss_ratios = numpy.zeros((A, E), F32) vf = self.risk_functions[loss_type, 'vulnerability'] means, covs, idxs = vf.interpolate(gmvs) if len(means) == 0: # all gmvs are below the minimum imls, 0 ratios pass elif self.ignore_covs or covs.sum() == 0 or len(epsilons) == 0: # the ratios are equal for all assets ratios = vf.sample(means, covs, idxs, None) # right shape for a in range(A): loss_ratios[a, idxs] = ratios else: # take into account the epsilons for a, asset in enumerate(assets): loss_ratios[a, idxs] = vf.sample( means, covs, idxs, epsilons[a]) return loss_ratios
ebrisk = event_based_risk
[docs] def classical_bcr(self, loss_type, assets, hazard, eids=None, eps=None): """ :param loss_type: the loss type :param assets: a list of N assets of the same taxonomy :param hazard: an hazard curve :param _eps: dummy parameter, unused :param _eids: dummy parameter, unused :returns: a list of triples (eal_orig, eal_retro, bcr_result) """ if loss_type != 'structural': raise NotImplementedError( 'retrofitted is not defined for ' + loss_type) n = len(assets) self.assets = assets vf = self.risk_functions[loss_type, 'vulnerability'] imls = self.hazard_imtls[vf.imt] vf_retro = self.risk_functions[loss_type, 'vulnerability_retrofitted'] curves_orig = functools.partial( scientific.classical, vf, imls, loss_ratios=self.loss_ratios_orig[loss_type]) curves_retro = functools.partial( scientific.classical, vf_retro, imls, loss_ratios=self.loss_ratios_retro[loss_type]) original_loss_curves = numpy.array([curves_orig(hazard)] * n) retrofitted_loss_curves = numpy.array([curves_retro(hazard)] * n) eal_original = numpy.array([scientific.average_loss(lc) for lc in original_loss_curves]) eal_retrofitted = numpy.array([scientific.average_loss(lc) for lc in retrofitted_loss_curves]) bcr_results = [ scientific.bcr( eal_original[i], eal_retrofitted[i], self.interest_rate, self.asset_life_expectancy, asset['value-' + loss_type], asset['retrofitted']) for i, asset in enumerate(assets)] return list(zip(eal_original, eal_retrofitted, bcr_results))
[docs] def scenario_risk(self, loss_type, assets, gmvs, eids, epsilons): """ :returns: an array of shape (A, E) """ values = get_values(loss_type, assets, self.time_event) ok = ~numpy.isnan(values) if not ok.any(): # there are no assets with a value return numpy.zeros(0) # there may be assets without a value missing_value = not ok.all() if missing_value: assets = assets[ok] epsilons = epsilons[ok] E = len(eids) # a matrix of A x E elements loss_matrix = numpy.empty((len(assets), E)) loss_matrix.fill(numpy.nan) vf = self.risk_functions[loss_type, 'vulnerability'] means, covs, idxs = vf.interpolate(gmvs) loss_ratio_matrix = numpy.zeros((len(assets), E)) if len(epsilons): for a, eps in enumerate(epsilons): loss_ratio_matrix[a, idxs] = vf.sample(means, covs, idxs, eps) else: ratios = vf.sample(means, covs, idxs, numpy.zeros(len(means), F32)) for a in range(len(assets)): loss_ratio_matrix[a, idxs] = ratios loss_matrix[:, :] = (loss_ratio_matrix.T * values).T return loss_matrix
scenario = scenario_risk
[docs] def scenario_damage(self, loss_type, assets, gmvs, eids=None, eps=None): """ :param loss_type: the loss type :param assets: a list of A assets of the same taxonomy :param gmvs_eids: pairs (gmvs, eids), each one with E elements :param _eps: dummy parameter, unused :returns: an array of shape (A, E, D + 1) elements where N is the number of points, E the number of events and D the number of damage states. """ ffs = self.risk_functions[loss_type, 'fragility'] damages = scientific.scenario_damage(ffs, gmvs).T E, D = damages.shape dmg_csq = numpy.zeros((E, D + 1)) dmg_csq[:, :D] = damages c_model = self.risk_functions.get((loss_type, 'consequence')) if c_model: # compute consequences means = [0] + [par[0] for par in c_model.params] # NB: we add a 0 in front for nodamage state
[docs] dmg_csq[:, D] = damages @ means # consequence ratio return numpy.array([dmg_csq] * len(assets))
def classical_damage( self, loss_type, assets, hazard_curve, eids=None, eps=None): """ :param loss_type: the loss type :param assets: a list of N assets of the same taxonomy :param hazard_curve: an hazard curve array :returns: an array of N assets and an array of N x D elements where N is the number of points and D the number of damage states. """ ffl = self.risk_functions[loss_type, 'fragility'] hazard_imls = self.hazard_imtls[ffl.imt] damage = scientific.classical_damage( ffl, hazard_imls, hazard_curve, investigation_time=self.investigation_time, risk_investigation_time=self.risk_investigation_time) return [a['number'] * damage for a in assets]
# NB: the approach used here relies on the convention of having the # names of the arguments of the RiskModel class to be equal to the # names of the parameter in the oqparam object. This is seen as a # feature, since it forces people to be consistent with the names, # in the spirit of the 'convention over configuration' philosophy
[docs]def get_riskmodel(taxonomy, oqparam, **extra): """ Return an instance of the correct risk model class, depending on the attribute `calculation_mode` of the object `oqparam`. :param taxonomy: a taxonomy string :param oqparam: an object containing the parameters needed by the RiskModel class :param extra: extra parameters to pass to the RiskModel class """ extra['hazard_imtls'] = oqparam.imtls extra['investigation_time'] = oqparam.investigation_time extra['risk_investigation_time'] = oqparam.risk_investigation_time extra['lrem_steps_per_interval'] = oqparam.lrem_steps_per_interval extra['ignore_covs'] = oqparam.ignore_covs extra['time_event'] = oqparam.time_event if oqparam.calculation_mode == 'classical_bcr': extra['interest_rate'] = oqparam.interest_rate extra['asset_life_expectancy'] = oqparam.asset_life_expectancy return RiskModel(oqparam.calculation_mode, taxonomy, **extra)
# ######################## CompositeRiskModel #########################
[docs]class ValidationError(Exception): pass
def _extract(rmdict, kind): lst = [] for riskid, rm in rmdict.items(): risk_functions = getattr(rm, 'risk_functions', rm) for (lt, k), rf in risk_functions.items(): if k == kind: lst.append((riskid, rf)) return lst
[docs]class CompositeRiskModel(collections.abc.Mapping): """ A container (riskid, kind) -> riskmodel :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param fragdict: a dictionary riskid -> loss_type -> fragility functions :param vulndict: a dictionary riskid -> loss_type -> vulnerability function :param consdict: a dictionary riskid -> loss_type -> consequence functions """
[docs] @classmethod def read(cls, dstore): """ :param dstore: a DataStore instance :returns: a :class:`CompositeRiskModel` instance """ oqparam = dstore['oqparam'] crm = dstore.getitem('risk_model') riskdict = AccumDict(accum={}) riskdict.limit_states = crm.attrs['limit_states'] for quoted_id, rm in crm.items(): riskid = unquote_plus(quoted_id) for lt_kind in rm: lt, kind = lt_kind.rsplit('-', 1) rf = dstore['risk_model/%s/%s' % (quoted_id, lt_kind)] if kind == 'consequence': riskdict[riskid][lt, kind] = rf elif kind == 'fragility': # rf is a FragilityFunctionList try: rf = rf.build( riskdict.limit_states, oqparam.continuous_fragility_discretization, oqparam.steps_per_interval) except ValueError as err: raise ValueError('%s: %s' % (riskid, err)) riskdict[riskid][lt, kind] = rf else: # rf is a vulnerability function rf.seed = oqparam.master_seed rf.init() if lt.endswith('_retrofitted'): # strip _retrofitted, since len('_retrofitted') = 12 riskdict[riskid][ lt[:-12], 'vulnerability_retrofitted'] = rf else: riskdict[riskid][lt, 'vulnerability'] = rf crm = CompositeRiskModel(oqparam, riskdict) crm.tmap = ast.literal_eval(dstore.get_attr('risk_model', 'tmap')) return crm
def __init__(self, oqparam, riskdict): self.damage_states = [] self._riskmodels = {} # riskid -> crmodel if oqparam.calculation_mode.endswith('_bcr'): # classical_bcr calculator for riskid, risk_functions in sorted(riskdict.items()): self._riskmodels[riskid] = get_riskmodel( riskid, oqparam, risk_functions=risk_functions) elif (_extract(riskdict, 'fragility') or 'damage' in oqparam.calculation_mode): # classical_damage/scenario_damage calculator if oqparam.calculation_mode in ('classical', 'scenario'): # case when the risk files are in the job_hazard.ini file oqparam.calculation_mode += '_damage' if 'exposure' not in oqparam.inputs: raise RuntimeError( 'There are risk files in %r but not ' 'an exposure' % oqparam.inputs['job_ini']) self.damage_states = ['no_damage'] + list(riskdict.limit_states) for riskid, ffs_by_lt in sorted(riskdict.items()): self._riskmodels[riskid] = get_riskmodel( riskid, oqparam, risk_functions=ffs_by_lt) else: # classical, event based and scenario calculators for riskid, vfs in sorted(riskdict.items()): for vf in vfs.values(): # set the seed; this is important for the case of # VulnerabilityFunctionWithPMF vf.seed = oqparam.random_seed self._riskmodels[riskid] = get_riskmodel( riskid, oqparam, risk_functions=vfs) self.init(oqparam)
[docs] def has(self, kind): return _extract(self._riskmodels, kind)
[docs] def init(self, oqparam): self.imtls = oqparam.imtls imti = {imt: i for i, imt in enumerate(oqparam.imtls)} self.lti = {} # loss_type -> idx self.covs = 0 # number of coefficients of variation # build a sorted list with all the loss_types contained in the model ltypes = set() for rm in self.values(): ltypes.update(rm.loss_types) self.loss_types = sorted(ltypes) self.taxonomies = set() self.distributions = set() for riskid, rm in self._riskmodels.items(): self.taxonomies.add(riskid) rm.compositemodel = self for lt, rf in rm.risk_functions.items(): if hasattr(rf, 'distribution_name'): self.distributions.add(rf.distribution_name) if hasattr(rf, 'init'): # vulnerability function rf.seed = oqparam.master_seed # setting the seed rf.init() # save the number of nonzero coefficients of variation if hasattr(rf, 'covs') and rf.covs.any(): self.covs += 1 missing = set(self.loss_types) - set( lt for lt, kind in rm.risk_functions) if missing: raise ValidationError( 'Missing vulnerability function for taxonomy %s and loss' ' type %s' % (riskid, ', '.join(missing))) rm.imti = {lt: imti[rm.risk_functions[lt, kind].imt] for lt, kind in rm.risk_functions if kind in 'vulnerability fragility'} self.curve_params = self.make_curve_params(oqparam) iml = collections.defaultdict(list) for riskid, rm in self._riskmodels.items(): for lt, rf in rm.risk_functions.items(): if hasattr(rf, 'imt'): iml[rf.imt].append(rf.imls[0]) self.min_iml = {imt: min(iml[imt]) for imt in iml}
@cached_property def taxonomy_dict(self): """ :returns: a dict taxonomy string -> taxonomy index """ # .taxonomy must be set by the engine tdict = {taxo: idx for idx, taxo in enumerate(self.taxonomy)} return tdict
[docs] def make_curve_params(self, oqparam): # the CurveParams are used only in classical_risk, classical_bcr # NB: populate the inner lists .loss_types too cps = [] for l, loss_type in enumerate(self.loss_types): if oqparam.calculation_mode in ('classical', 'classical_risk'): curve_resolutions = set() lines = [] allratios = [] for taxo in sorted(self): rm = self[taxo] rf = rm.risk_functions.get((loss_type, 'vulnerability')) if rf and loss_type in rm.loss_ratios: ratios = rm.loss_ratios[loss_type] allratios.append(ratios) curve_resolutions.add(len(ratios)) lines.append('%s %d' % (rf, len(ratios))) if len(curve_resolutions) > 1: # number of loss ratios is not the same for all taxonomies: # then use the longest array; see classical_risk case_5 allratios.sort(key=len) for rm in self.values(): if rm.loss_ratios[loss_type] != allratios[-1]: rm.loss_ratios[loss_type] = allratios[-1] logging.debug('Redefining loss ratios for %s', rm) cp = scientific.CurveParams( l, loss_type, max(curve_resolutions), allratios[-1], True ) if curve_resolutions else scientific.CurveParams( l, loss_type, 0, [], False) else: # used only to store the association l -> loss_type cp = scientific.CurveParams(l, loss_type, 0, [], False) cps.append(cp) self.lti[loss_type] = l return cps
[docs] def get_loss_ratios(self): """ :returns: a 1-dimensional composite array with loss ratios by loss type """ lst = [('user_provided', numpy.bool)] for cp in self.curve_params: lst.append((cp.loss_type, F32, len(cp.ratios))) loss_ratios = numpy.zeros(1, numpy.dtype(lst)) for cp in self.curve_params: loss_ratios['user_provided'] = cp.user_provided loss_ratios[cp.loss_type] = tuple(cp.ratios) return loss_ratios
def __getitem__(self, taxo): return self._riskmodels[taxo]
[docs] def get_rmodels_weights(self, taxidx): """ :returns: a list of weighted risk models for the given taxonomy index """ rmodels, weights = [], [] for key, weight in self.tmap[taxidx]: rmodels.append(self._riskmodels[key]) weights.append(weight) return rmodels, weights
def __iter__(self): return iter(sorted(self._riskmodels)) def __len__(self): return len(self._riskmodels)
[docs] def reduce(self, taxonomies): """ :param taxonomies: a set of taxonomies :returns: a new CompositeRiskModel reduced to the given taxonomies """ new = copy.copy(self) new._riskmodels = {} for riskid, rm in self._riskmodels.items(): if riskid in taxonomies: new._riskmodels[riskid] = rm rm.compositemodel = new return new
def __toh5__(self): loss_types = hdf5.array_of_vstr(self.loss_types) limit_states = hdf5.array_of_vstr(self.damage_states[1:] if self.damage_states else []) dic = dict(covs=self.covs, loss_types=loss_types, limit_states=limit_states, tmap=repr(getattr(self, 'tmap', []))) rf = next(iter(self.values())) if hasattr(rf, 'loss_ratios'): for lt in self.loss_types: dic['loss_ratios_' + lt] = rf.loss_ratios[lt] return self._riskmodels, dic def __repr__(self): lines = ['%s: %s' % item for item in sorted(self.items())] return '<%s(%d, %d)\n%s>' % ( self.__class__.__name__, len(lines), self.covs, '\n'.join(lines))