Source code for openquake.risklib.riskmodels

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2013-2017 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

from __future__ import division
import re
import inspect
import functools
import numpy

from openquake.baselib.node import Node
from openquake.baselib.general import CallableDict, AccumDict
from openquake.baselib.hdf5 import ArrayWrapper
from openquake.hazardlib import valid, nrml
from openquake.hazardlib.sourcewriter import obj_to_node
from openquake.risklib import utils, scientific

U32 = numpy.uint32
F32 = numpy.float32
registry = CallableDict()


F64 = numpy.float64

COST_TYPE_REGEX = '|'.join(valid.cost_type.choices)

LOSS_TYPE_KEY = re.compile(
    '(%s|occupants|fragility)_([\w_]+)' % COST_TYPE_REGEX)


[docs]def get_risk_files(inputs): """ :param inputs: a dictionary key -> path name :returns: a pair (file_type, {cost_type: path}) """ vfs = {} names = set() for key in inputs: if key == 'fragility': # backward compatibily for .ini files with key fragility_file # instead of structural_fragility_file vfs['structural'] = inputs['structural_fragility'] = inputs[key] names.add('fragility') del inputs['fragility'] continue match = LOSS_TYPE_KEY.match(key) if match and 'retrofitted' not in key and 'consequence' not in key: vfs[match.group(1)] = inputs[key] names.add(match.group(2)) if not names: return None, {} elif len(names) > 1: raise ValueError('Found inconsistent keys %s in the .ini file' % ', '.join(names)) return names.pop(), vfs
# ########################### vulnerability ############################## #
[docs]def filter_vset(elem): return elem.tag.endswith('discreteVulnerabilitySet')
[docs]@obj_to_node.add('VulnerabilityFunction') def build_vf_node(vf): """ Convert a VulnerabilityFunction object into a Node suitable for XML conversion. """ nodes = [Node('imls', {'imt': vf.imt}, vf.imls), Node('meanLRs', {}, vf.mean_loss_ratios), Node('covLRs', {}, vf.covs)] return Node( 'vulnerabilityFunction', {'id': vf.id, 'dist': vf.distribution_name}, nodes=nodes)
[docs]def get_risk_models(oqparam, kind=None): """ :param oqparam: an OqParam instance :param kind: vulnerability|vulnerability_retrofitted|fragility|consequence; if None it is extracted from the oqparam.file_type attribute :returns: a dictionary taxonomy -> loss_type -> function """ kind = kind or oqparam.file_type rmodels = AccumDict() rmodels.limit_states = [] for key in sorted(oqparam.inputs): mo = re.match('(occupants|%s)_%s$' % (COST_TYPE_REGEX, kind), key) if mo: key_type = mo.group(1) # the cost_type in the key # can be occupants, structural, nonstructural, ... rmodel = nrml.to_python(oqparam.inputs[key]) rmodels[key_type] = rmodel if rmodel.lossCategory is None: # NRML 0.4 continue cost_type = str(rmodel.lossCategory) rmodel_kind = rmodel.__class__.__name__ kind_ = kind.replace('_retrofitted', '') # strip retrofitted if not rmodel_kind.lower().startswith(kind_): raise ValueError( 'Error in the file "%s_file=%s": is ' 'of kind %s, expected %s' % ( key, oqparam.inputs[key], rmodel_kind, kind.capitalize() + 'Model')) if cost_type != key_type: raise ValueError( 'Error in the file "%s_file=%s": lossCategory is of type ' '"%s", expected "%s"' % (key, oqparam.inputs[key], rmodel.lossCategory, key_type)) rdict = AccumDict(accum={}) rdict.limit_states = [] if kind == 'fragility': limit_states = [] for loss_type, fm in sorted(rmodels.items()): # build a copy of the FragilityModel with different IM levels newfm = fm.build(oqparam.continuous_fragility_discretization, oqparam.steps_per_interval) for (imt, taxo), ffl in newfm.items(): if not limit_states: limit_states.extend(fm.limitStates) # we are rejecting the case of loss types with different # limit states; this may change in the future assert limit_states == fm.limitStates, ( limit_states, fm.limitStates) rdict[taxo][loss_type] = ffl # TODO: see if it is possible to remove the attribute # below, used in classical_damage ffl.steps_per_interval = oqparam.steps_per_interval rdict.limit_states = [str(ls) for ls in limit_states] elif kind == 'consequence': rdict = rmodels else: # vulnerability cl_risk = oqparam.calculation_mode in ('classical', 'classical_risk') # only for classical_risk reduce the loss_ratios # to make sure they are strictly increasing for loss_type, rm in rmodels.items(): for (imt, taxo), rf in rm.items(): rdict[taxo][loss_type] = ( rf.strictly_increasing() if cl_risk else rf) return rdict
[docs]def get_values(loss_type, assets, time_event=None): """ :returns: a numpy array with the values for the given assets, depending on the loss_type. """ return numpy.array([a.value(loss_type, time_event) for a in assets])
[docs]class RiskModel(object): """ Base class. Can be used in the tests as a mock. """ time_event = None # used in scenario_risk compositemodel = None # set by get_risk_model kind = None # must be set in subclasses def __init__(self, taxonomy, risk_functions, insured_losses): self.taxonomy = taxonomy self.risk_functions = risk_functions self.insured_losses = insured_losses @property def loss_types(self): """ The list of loss types in the underlying vulnerability functions, in lexicographic order """ return sorted(self.risk_functions)
[docs] def get_loss_types(self, imt): """ :param imt: Intensity Measure Type string :returns: loss types with risk functions of the given imt """ return [lt for lt in self.loss_types if self.risk_functions[lt].imt == imt]
[docs] def get_output(self, assets, data_by_lt, epsgetter): """ :param assets: a list of assets with the same taxonomy :param data_by_lt: hazards for each loss type :param epsgetter: an epsilon getter function :returns: an ArrayWrapper of shape (L, ...) """ out = [self(lt, assets, data, epsgetter) for lt, data in zip(self.loss_types, data_by_lt)] return ArrayWrapper(numpy.array(out), dict(assets=assets))
def __toh5__(self): risk_functions = {lt: func for lt, func in self.risk_functions.items()} if hasattr(self, 'retro_functions'): for lt, func in self.retro_functions.items(): risk_functions[lt + '_retrofitted'] = func return risk_functions, {'taxonomy': self.taxonomy} def __fromh5__(self, dic, attrs): vars(self).update(attrs) self.risk_functions = dic def __repr__(self): return '<%s%s>' % (self.__class__.__name__, list(self.risk_functions))
[docs]def rescale(curves, values): """ Multiply the losses in each curve of kind (losses, poes) by the corresponding value. """ n = len(curves) assert n == len(values), (n, len(values)) losses = [curves[i, 0] * values[i] for i in range(n)] poes = curves[:, 1] return numpy.array([[losses[i], poes[i]] for i in range(n)])
[docs]@registry.add('classical_risk', 'classical', 'disaggregation') class Classical(RiskModel): """ Classical PSHA-Based RiskModel. Computes loss curves and insured curves. """ kind = 'vulnerability' def __init__(self, taxonomy, vulnerability_functions, hazard_imtls, lrem_steps_per_interval, conditional_loss_poes, poes_disagg, insured_losses=False): """ :param imt: Intensity Measure Type for this riskmodel :param taxonomy: Taxonomy for this riskmodel :param vulnerability_functions: Dictionary of vulnerability functions by loss type :param hazard_imtls: The intensity measure types and levels of the hazard computation :param lrem_steps_per_interval: Configuration parameter :param poes_disagg: Probability of Exceedance levels used for disaggregate losses by taxonomy. :param bool insured_losses: ignored since insured loss curves are not implemented See :func:`openquake.risklib.scientific.classical` for a description of the other parameters. """ self.taxonomy = taxonomy self.risk_functions = vulnerability_functions self.hazard_imtls = hazard_imtls self.lrem_steps_per_interval = lrem_steps_per_interval self.conditional_loss_poes = conditional_loss_poes self.poes_disagg = poes_disagg self.insured_losses = insured_losses self.loss_ratios = { lt: vf.mean_loss_ratios_with_steps(self.lrem_steps_per_interval) for lt, vf in self.risk_functions.items()} def __call__(self, loss_type, assets, hazard_curve, _eps=None): """ :param str loss_type: the loss type considered :param assets: assets is an iterator over N :class:`openquake.risklib.scientific.Asset` instances :param hazard_curve: an array of poes :param _eps: ignored, here only for API compatibility with other calculators :returns: an array of shape (C, N, 2) """ n = len(assets) vf = self.risk_functions[loss_type] imls = self.hazard_imtls[vf.imt] values = get_values(loss_type, assets) lrcurves = numpy.array( [scientific.classical( vf, imls, hazard_curve, self.lrem_steps_per_interval)] * n) # if in the future we wanted to implement insured_losses the # following lines could be useful # deductibles = [a.deductible(loss_type) for a in assets] # limits = [a.insurance_limit(loss_type) for a in assets] # insured_curves = rescale( # utils.numpy_map(scientific.insured_loss_curve, # lrcurves, deductibles, limits), values) return rescale(lrcurves, values).transpose(2, 0, 1)
# transpose array from shape (N, 2, C) -> (C, N, 2) # otherwise .get_output would fail
[docs]@registry.add('event_based_risk', 'event_based', 'event_based_rupture', 'ucerf_rupture', 'ucerf_hazard', 'ucerf_risk') class ProbabilisticEventBased(RiskModel): """ Implements the Probabilistic Event Based riskmodel. Computes loss ratios and event IDs. """ kind = 'vulnerability' def __init__( self, taxonomy, vulnerability_functions, conditional_loss_poes, insured_losses=False): """ See :func:`openquake.risklib.scientific.event_based` for a description of the input parameters. """ self.taxonomy = taxonomy self.risk_functions = vulnerability_functions self.conditional_loss_poes = conditional_loss_poes self.insured_losses = insured_losses def __call__(self, loss_type, assets, gmvs_eids, epsgetter): """ :param str loss_type: the loss type considered :param assets: a list of assets on the same site and with the same taxonomy :param gmvs_eids: a pair (gmvs, eids) with E values each :param epsgetter: a callable returning the correct epsilons for the given gmvs :returns: a :class: `openquake.risklib.scientific.ProbabilisticEventBased.Output` instance. """ gmvs, eids = gmvs_eids E = len(gmvs) I = self.insured_losses + 1 A = len(assets) loss_ratios = numpy.zeros((A, E, I), F32) vf = self.risk_functions[loss_type] means, covs, idxs = vf.interpolate(gmvs) for i, asset in enumerate(assets): epsilons = epsgetter(asset.ordinal, eids) ratios = vf.sample(means, covs, idxs, epsilons) loss_ratios[i, idxs, 0] = ratios if self.insured_losses and loss_type != 'occupants': loss_ratios[i, idxs, 1] = scientific.insured_losses( ratios, asset.deductible(loss_type), asset.insurance_limit(loss_type)) return loss_ratios
[docs]@registry.add('classical_bcr') class ClassicalBCR(RiskModel): kind = 'vulnerability' def __init__(self, taxonomy, vulnerability_functions_orig, vulnerability_functions_retro, hazard_imtls, lrem_steps_per_interval, interest_rate, asset_life_expectancy): self.taxonomy = taxonomy self.risk_functions = vulnerability_functions_orig self.insured_losses = False # not implemented self.retro_functions = vulnerability_functions_retro self.assets = [] # set a __call__ time self.interest_rate = interest_rate self.asset_life_expectancy = asset_life_expectancy self.hazard_imtls = hazard_imtls self.lrem_steps_per_interval = lrem_steps_per_interval def __call__(self, loss_type, assets, hazard, _eps=None, _eids=None): """ :param loss_type: the loss type :param assets: a list of N assets of the same taxonomy :param hazard: an hazard curve :param _eps: dummy parameter, unused :param _eids: dummy parameter, unused :returns: a list of triples (eal_orig, eal_retro, bcr_result) """ if loss_type != 'structural': raise NotImplemented('retrofitted is not defined for ' + loss_type) n = len(assets) self.assets = assets vf = self.risk_functions[loss_type] imls = self.hazard_imtls[vf.imt] vf_retro = self.retro_functions[loss_type] curves_orig = functools.partial(scientific.classical, vf, imls, steps=self.lrem_steps_per_interval) curves_retro = functools.partial(scientific.classical, vf_retro, imls, steps=self.lrem_steps_per_interval) original_loss_curves = utils.numpy_map(curves_orig, [hazard] * n) retrofitted_loss_curves = utils.numpy_map(curves_retro, [hazard] * n) eal_original = utils.numpy_map( scientific.average_loss, original_loss_curves) eal_retrofitted = utils.numpy_map( scientific.average_loss, retrofitted_loss_curves) bcr_results = [ scientific.bcr( eal_original[i], eal_retrofitted[i], self.interest_rate, self.asset_life_expectancy, asset.value(loss_type), asset.retrofitted()) for i, asset in enumerate(assets)] return list(zip(eal_original, eal_retrofitted, bcr_results))
[docs]@registry.add('scenario_risk', 'scenario') class Scenario(RiskModel): """ Implements the Scenario riskmodel. Computes the loss matrix. """ kind = 'vulnerability' def __init__(self, taxonomy, vulnerability_functions, insured_losses, time_event=None): self.taxonomy = taxonomy self.risk_functions = vulnerability_functions self.insured_losses = insured_losses self.time_event = time_event def __call__(self, loss_type, assets, gmvs_eids, epsgetter): gmvs, eids = gmvs_eids epsilons = [epsgetter(asset.ordinal, eids) for asset in assets] values = get_values(loss_type, assets, self.time_event) ok = ~numpy.isnan(values) if not ok.any(): # there are no assets with a value return # there may be assets without a value missing_value = not ok.all() if missing_value: assets = assets[ok] epsilons = epsilons[ok] E = len(epsilons[0]) I = self.insured_losses + 1 # a matrix of A x E x I elements loss_matrix = numpy.empty((len(assets), E, I)) loss_matrix.fill(numpy.nan) vf = self.risk_functions[loss_type] means, covs, idxs = vf.interpolate(gmvs) loss_ratio_matrix = numpy.zeros((len(assets), E)) for i, eps in enumerate(epsilons): loss_ratio_matrix[i, idxs] = vf.sample(means, covs, idxs, eps) loss_matrix[:, :, 0] = (loss_ratio_matrix.T * values).T if self.insured_losses and loss_type != "occupants": deductibles = [a.deductible(loss_type) for a in assets] limits = [a.insurance_limit(loss_type) for a in assets] insured_loss_ratio_matrix = utils.numpy_map( scientific.insured_losses, loss_ratio_matrix, deductibles, limits) loss_matrix[:, :, 1] = (insured_loss_ratio_matrix.T * values).T return loss_matrix
[docs]@registry.add('scenario_damage') class Damage(RiskModel): """ Implements the ScenarioDamage riskmodel. Computes the damages. """ kind = 'fragility' def __init__(self, taxonomy, fragility_functions): self.taxonomy = taxonomy self.risk_functions = fragility_functions self.insured_losses = False # not implemented def __call__(self, loss_type, assets, gmvs_eids, _eps=None): """ :param loss_type: the loss type :param assets: a list of N assets of the same taxonomy :param gmvs_eids: pairs (gmvs, eids), each one with E elements :param _eps: dummy parameter, unused :returns: N arrays of E x D elements where N is the number of points, E the number of events and D the number of damage states. """ ffs = self.risk_functions[loss_type] damages = scientific.scenario_damage(ffs, gmvs_eids[0]) # shape (D, E) damages[damages < 1E-7] = 0 # sanity check return [damages.T] * len(assets)
[docs]@registry.add('classical_damage') class ClassicalDamage(Damage): """ Implements the ClassicalDamage riskmodel. Computes the damages. """ kind = 'fragility' def __init__(self, taxonomy, fragility_functions, hazard_imtls, investigation_time, risk_investigation_time): self.taxonomy = taxonomy self.risk_functions = fragility_functions self.insured_losses = False # not implemented self.hazard_imtls = hazard_imtls self.investigation_time = investigation_time self.risk_investigation_time = risk_investigation_time assert risk_investigation_time, risk_investigation_time def __call__(self, loss_type, assets, hazard_curve, _eps=None): """ :param loss_type: the loss type :param assets: a list of N assets of the same taxonomy :param hazard_curve: an hazard curve array :returns: an array of N assets and an array of N x D elements where N is the number of points and D the number of damage states. """ ffl = self.risk_functions[loss_type] hazard_imls = self.hazard_imtls[ffl.imt] damage = scientific.classical_damage( ffl, hazard_imls, hazard_curve, investigation_time=self.investigation_time, risk_investigation_time=self.risk_investigation_time) return [a.number * damage for a in assets]
# NB: the approach used here relies on the convention of having the # names of the arguments of the riskmodel class to be equal to the # names of the parameter in the oqparam object. This is view as a # feature, since it forces people to be consistent with the names, # in the spirit of the 'convention over configuration' philosophy
[docs]def get_riskmodel(taxonomy, oqparam, **extra): """ Return an instance of the correct riskmodel class, depending on the attribute `calculation_mode` of the object `oqparam`. :param taxonomy: a taxonomy string :param oqparam: an object containing the parameters needed by the riskmodel class :param extra: extra parameters to pass to the riskmodel class """ riskmodel_class = registry[oqparam.calculation_mode] # arguments needed to instantiate the riskmodel class argnames = inspect.getargspec(riskmodel_class.__init__).args[3:] # arguments extracted from oqparam known_args = set(name for name, value in inspect.getmembers(oqparam.__class__) if isinstance(value, valid.Param)) all_args = {} for argname in argnames: if argname in known_args: all_args[argname] = getattr(oqparam, argname) if 'hazard_imtls' in argnames: # special case all_args['hazard_imtls'] = oqparam.imtls all_args.update(extra) missing = set(argnames) - set(all_args) if missing: raise TypeError('Missing parameter: %s' % ', '.join(missing)) return riskmodel_class(taxonomy, **all_args)