# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2013-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division
import inspect
import functools
import numpy
from openquake.baselib.general import CallableDict, AccumDict
from openquake.risklib import utils, scientific, valid
U32 = numpy.uint32
F32 = numpy.float32
registry = CallableDict()
[docs]class CostCalculator(object):
"""
Return the value of an asset for the given loss type depending
on the cost types declared in the exposure, as follows:
case 1: cost type: aggregated:
cost = economic value
case 2: cost type: per asset:
cost * number (of assets) = economic value
case 3: cost type: per area and area type: aggregated:
cost * area = economic value
case 4: cost type: per area and area type: per asset:
cost * area * number = economic value
The same "formula" applies to retrofitting cost.
"""
def __init__(self, cost_types, area_types,
deduct_abs=True, limit_abs=True):
if set(cost_types) != set(area_types):
raise ValueError('cost_types has keys %s, area_types has keys %s'
% (sorted(cost_types), sorted(area_types)))
for ct in cost_types.values():
assert ct in ('aggregated', 'per_asset', 'per_area'), ct
for at in area_types.values():
assert at in ('aggregated', 'per_asset'), at
self.cost_types = cost_types
self.area_types = area_types
self.deduct_abs = deduct_abs
self.limit_abs = limit_abs
def __call__(self, loss_type, values, area, number):
cost = values[loss_type]
if cost is None:
return numpy.nan
cost_type = self.cost_types[loss_type]
if cost_type == "aggregated":
return cost
if cost_type == "per_asset":
return cost * number
if cost_type == "per_area":
area_type = self.area_types[loss_type]
if area_type == "aggregated":
return cost * area
elif area_type == "per_asset":
return cost * area * number
# this should never happen
raise RuntimeError('Unable to compute cost')
def __toh5__(self):
loss_types = sorted(self.cost_types)
dt = numpy.dtype([('cost_type', (bytes, 10)),
('area_type', (bytes, 10))])
array = numpy.zeros(len(loss_types), dt)
array['cost_type'] = [self.cost_types[lt] for lt in loss_types]
array['area_type'] = [self.area_types[lt] for lt in loss_types]
attrs = dict(deduct_abs=self.deduct_abs, limit_abs=self.limit_abs,
loss_types=loss_types)
return array, attrs
def __fromh5__(self, array, attrs):
vars(self).update(attrs)
self.cost_types = dict(zip(self.loss_types, array['cost_type']))
self.area_types = dict(zip(self.loss_types, array['area_type']))
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, vars(self))
costcalculator = CostCalculator(
cost_types=dict(structural='per_area'),
area_types=dict(structural='per_asset'))
[docs]class Asset(object):
"""
Describe an Asset as a collection of several values. A value can
represent a replacement cost (e.g. structural cost, business
interruption cost) or another quantity that can be considered for
a risk analysis (e.g. occupants).
Optionally, a Asset instance can hold also a collection of
deductible values and insured limits considered for insured losses
calculations.
"""
def __init__(self,
asset_id,
taxonomy,
number,
location,
values,
area=1,
deductibles=None,
insurance_limits=None,
retrofitteds=None,
calc=costcalculator,
ordinal=None):
"""
:param asset_id:
an unique identifier of the assets within the given exposure
:param taxonomy:
asset taxonomy
:param number:
number of apartments of number of people in the given asset
:param location:
geographic location of the asset
:param dict values:
asset values keyed by loss types
:param dict deductible:
deductible values (expressed as a percentage relative to
the value of the asset) keyed by loss types
:param dict insurance_limits:
insured limits values (expressed as a percentage relative to
the value of the asset) keyed by loss types
:param dict retrofitteds:
asset retrofitting values keyed by loss types
:param calc:
cost calculator instance
:param ordinal:
asset collection ordinal
"""
self.id = asset_id
self.taxonomy = taxonomy
self.number = number
self.location = location
self.values = values
self.area = area
self.retrofitteds = retrofitteds
self.deductibles = deductibles
self.insurance_limits = insurance_limits
self.calc = calc
self.ordinal = ordinal
self._cost = {} # cache for the costs
[docs] def value(self, loss_type, time_event=None):
"""
:returns: the total asset value for `loss_type`
"""
if loss_type == 'occupants':
return self.values['occupants_' + str(time_event)]
try:
val = self._cost[loss_type]
except KeyError:
val = self.calc(loss_type, self.values, self.area, self.number)
self._cost[loss_type] = val
return val
[docs] def deductible(self, loss_type):
"""
:returns: the deductible fraction of the asset cost for `loss_type`
"""
val = self.calc(loss_type, self.deductibles, self.area, self.number)
if self.calc.deduct_abs: # convert to relative value
return val / self.calc(loss_type, self.values,
self.area, self.number)
else:
return val
[docs] def insurance_limit(self, loss_type):
"""
:returns: the limit fraction of the asset cost for `loss_type`
"""
val = self.calc(loss_type, self.insurance_limits, self.area,
self.number)
if self.calc.limit_abs: # convert to relative value
return val / self.calc(loss_type, self.values,
self.area, self.number)
else:
return val
[docs] def retrofitted(self, loss_type, time_event=None):
"""
:returns: the asset retrofitted value for `loss_type`
"""
if loss_type == 'occupants':
return self.values['occupants_' + str(time_event)]
return self.calc(loss_type, self.retrofitteds,
self.area, self.number)
def __repr__(self):
return '<Asset %s>' % self.id
[docs]def get_values(loss_type, assets, time_event=None):
"""
:returns:
a numpy array with the values for the given assets, depending on the
loss_type.
"""
return numpy.array([a.value(loss_type, time_event) for a in assets])
[docs]class RiskModel(object):
"""
Base class. Can be used in the tests as a mock.
"""
time_event = None # used in scenario_risk
compositemodel = None # set by get_risk_model
kind = None # must be set in subclasses
def __init__(self, taxonomy, risk_functions):
self.taxonomy = taxonomy
self.risk_functions = risk_functions
@property
def loss_types(self):
"""
The list of loss types in the underlying vulnerability functions,
in lexicographic order
"""
return sorted(self.risk_functions)
[docs] def get_loss_types(self, imt):
"""
:param imt: Intensity Measure Type string
:returns: loss types with risk functions of the given imt
"""
return [lt for lt in self.loss_types
if self.risk_functions[lt].imt == imt]
[docs] def out_by_lr(self, imt, assets, hazard, epsgetter):
"""
:param imt: restrict the risk functions to this IMT
:param assets: an array of assets of homogeneous taxonomy
:param hazard: a dictionary rlz -> hazard
:param epsgetter: a callable returning epsilons for the given eids
:returns: a dictionary (l, r) -> output
"""
out_by_lr = AccumDict()
out_by_lr.assets = assets
loss_types = self.get_loss_types(imt)
for rlz in sorted(hazard):
haz = hazard[rlz]
if len(haz) == 0:
continue
r = rlz.ordinal
for loss_type in loss_types:
out = self(loss_type, assets, haz, epsgetter)
if out: # can be None in scenario_risk with no valid values
l = self.compositemodel.lti[loss_type]
out.hid = r
out.weight = rlz.weight
out_by_lr[l, r] = out
return out_by_lr
def __repr__(self):
return '<%s%s>' % (self.__class__.__name__, list(self.risk_functions))
[docs]def rescale(curves, values):
"""
Multiply the losses in each curve of kind (losses, poes) by the
corresponding value.
"""
n = len(curves)
assert n == len(values), (n, len(values))
losses = [curves[i, 0] * values[i] for i in range(n)]
poes = curves[:, 1]
return numpy.array([[losses[i], poes[i]] for i in range(n)])
@registry.add('classical_risk', 'classical', 'disaggregation')
[docs]class Classical(RiskModel):
"""
Classical PSHA-Based RiskModel.
1) Compute loss curves, loss maps for each realization.
2) Compute (if more than one realization is given) mean and
quantiles loss curves and maps.
Per-realization Outputs contain the following fields:
:attr assets:
an iterable over N assets the outputs refer to
:attr loss_curves:
a numpy array of N loss curves. If the curve resolution is C, the final
shape of the array will be (N, 2, C), where the `two` accounts for
the losses/poes dimensions
:attr average_losses:
a numpy array of N average loss values
:attr insured_curves:
a numpy array of N insured loss curves, shaped (N, 2, C)
:attr average_insured_losses:
a numpy array of N average insured loss values
:attr loss_maps:
a numpy array of P elements holding N loss maps where P is the
number of `conditional_loss_poes` considered. Shape: (P, N)
The statistical outputs are stored into
:class:`openquake.risklib.scientific.Output`,
which holds the following fields:
:attr assets:
an iterable of N assets the outputs refer to
:attr mean_curves:
A numpy array with N mean loss curves. Shape: (N, 2)
:attr mean_average_losses:
A numpy array with N mean average loss values
:attr mean_maps:
A numpy array with P mean loss maps. Shape: (P, N)
:attr mean_fractions:
A numpy array with F mean fractions, where F is the number of PoEs
used for disaggregation. Shape: (F, N)
:attr quantile_curves:
A numpy array with Q quantile curves (Q = number of quantiles).
Shape: (Q, N, 2, C)
:attr quantile_average_losses:
A numpy array shaped (Q, N) with average losses
:attr quantile_maps:
A numpy array with Q quantile maps shaped (Q, P, N)
:attr quantile_fractions:
A numpy array with Q quantile maps shaped (Q, F, N)
:attr mean_insured_curves:
A numpy array with N mean insured loss curves. Shape: (N, 2)
:attr mean_average_insured_losses:
A numpy array with N mean average insured loss values
:attr quantile_insured_curves:
A numpy array with Q quantile insured curves (Q = number of quantiles).
Shape: (Q, N, 2, C)
:attr quantile_average_insured_losses:
A numpy array shaped (Q, N) with average insured losses
"""
kind = 'vulnerability'
def __init__(self, taxonomy, vulnerability_functions,
hazard_imtls, lrem_steps_per_interval,
conditional_loss_poes, poes_disagg,
insured_losses=False):
"""
:param imt:
Intensity Measure Type for this riskmodel
:param taxonomy:
Taxonomy for this riskmodel
:param vulnerability_functions:
Dictionary of vulnerability functions by loss type
:param hazard_imtls:
The intensity measure types and levels of the hazard computation
:param lrem_steps_per_interval:
Configuration parameter
:param poes_disagg:
Configuration parameter
:param poes_disagg:
Probability of Exceedance levels used for disaggregate losses by
taxonomy.
:param bool insured_losses:
True if insured loss curves should be computed
See :func:`openquake.risklib.scientific.classical` for a description
of the other parameters.
"""
self.taxonomy = taxonomy
self.risk_functions = vulnerability_functions
self.hazard_imtls = hazard_imtls
self.lrem_steps_per_interval = lrem_steps_per_interval
self.conditional_loss_poes = conditional_loss_poes
self.poes_disagg = poes_disagg
self.insured_losses = insured_losses
self.loss_ratios = {
lt: vf.mean_loss_ratios_with_steps(lrem_steps_per_interval)
for lt, vf in vulnerability_functions.items()}
def __call__(self, loss_type, assets, hazard_curve, _eps=None):
"""
:param str loss_type:
the loss type considered
:param assets:
assets is an iterator over N
:class:`openquake.risklib.scientific.Asset` instances
:param hazard_curve:
an array of poes
:param _eps:
ignored, here only for API compatibility with other calculators
:returns:
a :class:`openquake.risklib.scientific.Classical.Output` instance.
"""
n = len(assets)
vf = self.risk_functions[loss_type]
imls = self.hazard_imtls[vf.imt]
curves = [scientific.classical(
vf, imls, hazard_curve, self.lrem_steps_per_interval)] * n
average_losses = utils.numpy_map(scientific.average_loss, curves)
maps = scientific.loss_map_matrix(self.conditional_loss_poes, curves)
values = get_values(loss_type, assets)
if self.insured_losses and loss_type != 'occupants':
deductibles = [a.deductible(loss_type) for a in assets]
limits = [a.insurance_limit(loss_type) for a in assets]
insured_curves = rescale(
utils.numpy_map(scientific.insured_loss_curve,
curves, deductibles, limits), values)
average_insured_losses = utils.numpy_map(
scientific.average_loss, insured_curves)
else:
insured_curves = None
average_insured_losses = None
return scientific.Output(
assets, loss_type,
loss_curves=rescale(numpy.array(curves), values),
average_losses=values * average_losses,
insured_curves=insured_curves,
average_insured_losses=average_insured_losses,
loss_maps=values * maps)
@registry.add('event_based_risk', 'event_based', 'event_based_rupture')
[docs]class ProbabilisticEventBased(RiskModel):
"""
Implements the Probabilistic Event Based riskmodel
Per-realization Output are saved into
:class:`openquake.risklib.scientific.ProbabilisticEventBased.Output`
which contains the several fields:
:attr assets:
an iterable over N assets the outputs refer to
:attr loss_matrix:
an array of losses shaped N x T (where T is the number of events)
:attr loss_curves:
a numpy array of N loss curves. If the curve resolution is C, the final
shape of the array will be (N, 2, C), where the `two` accounts for
the losses/poes dimensions
:attr average_losses:
a numpy array of N average loss values
:attr stddev_losses:
a numpy array holding N standard deviation of losses
:attr insured_curves:
a numpy array of N insured loss curves, shaped (N, 2, C)
:attr average_insured_losses:
a numpy array of N average insured loss values
:attr stddev_insured_losses:
a numpy array holding N standard deviation of losses
:attr loss_maps:
a numpy array of P elements holding N loss maps where P is the
number of `conditional_loss_poes` considered. Shape: (P, N)
:attr dict event_loss_table:
a dictionary mapping event ids to aggregate loss values
The statistical outputs are stored into
:class:`openquake.risklib.scientific.Output` objects.
"""
kind = 'vulnerability'
def __init__(
self, taxonomy, vulnerability_functions, loss_curve_resolution,
conditional_loss_poes, insured_losses=False):
"""
See :func:`openquake.risklib.scientific.event_based` for a description
of the input parameters.
"""
self.taxonomy = taxonomy
self.risk_functions = vulnerability_functions
self.loss_curve_resolution = loss_curve_resolution
self.conditional_loss_poes = conditional_loss_poes
self.insured_losses = insured_losses
def __call__(self, loss_type, assets, gmvs_eids, epsgetter):
"""
:param str loss_type:
the loss type considered
:param assets:
a list of assets on the same site and with the same taxonomy
:param gmvs_eids:
a pair of arrays of E elements
:param epsgetter:
a callable returning the correct epsilons for the given gmvs
:returns:
a :class:
`openquake.risklib.scientific.ProbabilisticEventBased.Output`
instance.
"""
gmvs, eids = gmvs_eids
E = len(gmvs)
I = self.insured_losses + 1
N = len(assets)
loss_ratios = numpy.zeros((N, E, I), F32)
vf = self.risk_functions[loss_type]
means, covs, idxs = vf.interpolate(gmvs)
for i, asset in enumerate(assets):
epsilons = epsgetter(asset.ordinal, eids)
if epsilons is not None:
ratios = vf.sample(means, covs, idxs, epsilons)
else:
ratios = means
loss_ratios[i, idxs, 0] = ratios
if self.insured_losses and loss_type != 'occupants':
loss_ratios[i, idxs, 1] = scientific.insured_losses(
ratios, asset.deductible(loss_type),
asset.insurance_limit(loss_type))
return scientific.Output(
assets, loss_type, loss_ratios=loss_ratios, eids=eids)
@registry.add('classical_bcr')
[docs]class ClassicalBCR(RiskModel):
kind = 'vulnerability'
def __init__(self, taxonomy,
vulnerability_functions_orig,
vulnerability_functions_retro,
hazard_imtls,
lrem_steps_per_interval,
interest_rate, asset_life_expectancy):
self.taxonomy = taxonomy
self.risk_functions = vulnerability_functions_orig
self.retro_functions = vulnerability_functions_retro
self.assets = None # set a __call__ time
self.interest_rate = interest_rate
self.asset_life_expectancy = asset_life_expectancy
self.hazard_imtls = hazard_imtls
self.lrem_steps_per_interval = lrem_steps_per_interval
def __call__(self, loss_type, assets, hazard, _eps=None, _eids=None):
"""
:param loss_type: the loss type
:param assets: a list of N assets of the same taxonomy
:param hazard: an hazard curve
:param _eps: dummy parameter, unused
:param _eids: dummy parameter, unused
:returns: a :class:`openquake.risklib.scientific.Output` instance
"""
n = len(assets)
self.assets = assets
vf = self.risk_functions[loss_type]
imls = self.hazard_imtls[vf.imt]
vf_retro = self.retro_functions[loss_type]
curves_orig = functools.partial(scientific.classical, vf, imls,
steps=self.lrem_steps_per_interval)
curves_retro = functools.partial(scientific.classical, vf_retro, imls,
steps=self.lrem_steps_per_interval)
original_loss_curves = utils.numpy_map(curves_orig, [hazard] * n)
retrofitted_loss_curves = utils.numpy_map(curves_retro, [hazard] * n)
eal_original = utils.numpy_map(
scientific.average_loss, original_loss_curves)
eal_retrofitted = utils.numpy_map(
scientific.average_loss, retrofitted_loss_curves)
bcr_results = [
scientific.bcr(
eal_original[i], eal_retrofitted[i],
self.interest_rate, self.asset_life_expectancy,
asset.value(loss_type), asset.retrofitted(loss_type))
for i, asset in enumerate(assets)]
return scientific.Output(
assets, loss_type,
data=list(zip(eal_original, eal_retrofitted, bcr_results)))
@registry.add('scenario_risk', 'scenario')
[docs]class Scenario(RiskModel):
"""
Implements the Scenario riskmodel
"""
kind = 'vulnerability'
def __init__(self, taxonomy, vulnerability_functions,
insured_losses, time_event=None):
self.taxonomy = taxonomy
self.risk_functions = vulnerability_functions
self.insured_losses = insured_losses
self.time_event = time_event
def __call__(self, loss_type, assets, ground_motion_values, epsgetter):
epsilons = epsgetter()
values = get_values(loss_type, assets, self.time_event)
ok = ~numpy.isnan(values)
if not ok.any():
# there are no assets with a value
return
# there may be assets without a value
missing_value = not ok.all()
if missing_value:
assets = assets[ok]
epsilons = epsilons[ok]
# a matrix of N x E elements
vf = self.risk_functions[loss_type]
means, covs, idxs = vf.interpolate(ground_motion_values)
loss_ratio_matrix = numpy.zeros((len(assets), len(epsilons[0])))
for i, eps in enumerate(epsilons):
loss_ratio_matrix[i, idxs] = vf.sample(means, covs, idxs, eps)
# another matrix of N x E elements
loss_matrix = (loss_ratio_matrix.T * values).T
# an array of E elements
aggregate_losses = loss_matrix.sum(axis=0)
if self.insured_losses and loss_type != "occupants":
deductibles = [a.deductible(loss_type) for a in assets]
limits = [a.insurance_limit(loss_type) for a in assets]
insured_loss_ratio_matrix = utils.numpy_map(
scientific.insured_losses,
loss_ratio_matrix, deductibles, limits)
insured_loss_matrix = (insured_loss_ratio_matrix.T * values).T
else:
insured_loss_matrix = numpy.empty_like(loss_ratio_matrix)
insured_loss_matrix.fill(numpy.nan)
# aggregating per asset, getting a vector of E elements
insured_losses = insured_loss_matrix.sum(axis=0)
return scientific.Output(
assets, loss_type, loss_matrix=loss_matrix,
loss_ratio_matrix=loss_ratio_matrix,
aggregate_losses=aggregate_losses,
insured_loss_matrix=insured_loss_matrix,
insured_losses=insured_losses)
@registry.add('scenario_damage')
[docs]class Damage(RiskModel):
"""
Implements the ScenarioDamage riskmodel
"""
kind = 'fragility'
def __init__(self, taxonomy, fragility_functions):
self.taxonomy = taxonomy
self.risk_functions = fragility_functions
def __call__(self, loss_type, assets, gmvs, _eps=None):
"""
:param loss_type: the loss type
:param assets: a list of N assets of the same taxonomy
:param gmvs: an array of E elements
:param _eps: dummy parameter, unused
:returns: an array of N assets and an array of N x E x D elements
where N is the number of points, E the number of events
and D the number of damage states.
"""
n = len(assets)
ffs = self.risk_functions[loss_type]
damages = numpy.array(
[scientific.scenario_damage(ffs, gmv) for gmv in gmvs])
return scientific.Output(assets, loss_type, damages=[damages] * n)
@registry.add('classical_damage')
[docs]class ClassicalDamage(Damage):
"""
Implements the ClassicalDamage riskmodel
"""
kind = 'fragility'
def __init__(self, taxonomy, fragility_functions,
hazard_imtls, investigation_time,
risk_investigation_time):
self.taxonomy = taxonomy
self.risk_functions = fragility_functions
self.hazard_imtls = hazard_imtls
self.investigation_time = investigation_time
self.risk_investigation_time = risk_investigation_time
def __call__(self, loss_type, assets, hazard_curve, _eps=None):
"""
:param loss_type: the loss type
:param assets: a list of N assets of the same taxonomy
:param hazard_curve: an hazard curve array
:returns: an array of N assets and an array of N x D elements
where N is the number of points and D the number of damage states.
"""
ffl = self.risk_functions[loss_type]
hazard_imls = self.hazard_imtls[ffl.imt]
damage = scientific.classical_damage(
ffl, hazard_imls, hazard_curve,
investigation_time=self.investigation_time,
risk_investigation_time=self.risk_investigation_time)
return scientific.Output(
assets, loss_type, damages=[a.number * damage for a in assets])
# NB: the approach used here relies on the convention of having the
# names of the arguments of the riskmodel class to be equal to the
# names of the parameter in the oqparam object. This is view as a
# feature, since it forces people to be consistent with the names,
# in the spirit of the 'convention over configuration' philosophy
[docs]def get_riskmodel(taxonomy, oqparam, **extra):
"""
Return an instance of the correct riskmodel class, depending on the
attribute `calculation_mode` of the object `oqparam`.
:param taxonomy:
a taxonomy string
:param oqparam:
an object containing the parameters needed by the riskmodel class
:param extra:
extra parameters to pass to the riskmodel class
"""
riskmodel_class = registry[oqparam.calculation_mode]
# arguments needed to instantiate the riskmodel class
argnames = inspect.getargspec(riskmodel_class.__init__).args[3:]
# arguments extracted from oqparam
known_args = set(name for name, value in
inspect.getmembers(oqparam.__class__)
if isinstance(value, valid.Param))
all_args = {}
for argname in argnames:
if argname in known_args:
all_args[argname] = getattr(oqparam, argname)
if 'hazard_imtls' in argnames: # special case
all_args['hazard_imtls'] = oqparam.imtls
all_args.update(extra)
missing = set(argnames) - set(all_args)
if missing:
raise TypeError('Missing parameter: %s' % ', '.join(missing))
return riskmodel_class(taxonomy, **all_args)