# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2013-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division
import inspect
import functools
import numpy
from openquake.baselib import hdf5
from openquake.baselib.general import CallableDict
from openquake.risklib import utils, scientific, valid
U32 = numpy.uint32
F32 = numpy.float32
registry = CallableDict()
[docs]class CostCalculator(object):
"""
Return the value of an asset for the given loss type depending
on the cost types declared in the exposure, as follows:
case 1: cost type: aggregated:
cost = economic value
case 2: cost type: per asset:
cost * number (of assets) = economic value
case 3: cost type: per area and area type: aggregated:
cost * area = economic value
case 4: cost type: per area and area type: per asset:
cost * area * number = economic value
The same "formula" applies to retrofitting cost.
"""
def __init__(self, cost_types, area_types, units,
deduct_abs=True, limit_abs=True):
if set(cost_types) != set(area_types):
raise ValueError('cost_types has keys %s, area_types has keys %s'
% (sorted(cost_types), sorted(area_types)))
for ct in cost_types.values():
assert ct in ('aggregated', 'per_asset', 'per_area'), ct
for at in area_types.values():
assert at in ('aggregated', 'per_asset'), at
self.cost_types = cost_types
self.area_types = area_types
self.units = units
self.deduct_abs = deduct_abs
self.limit_abs = limit_abs
def __call__(self, loss_type, values, area, number):
cost = values[loss_type]
if cost is None:
return numpy.nan
cost_type = self.cost_types[loss_type]
if cost_type == "aggregated":
return cost
if cost_type == "per_asset":
return cost * number
if cost_type == "per_area":
area_type = self.area_types[loss_type]
if area_type == "aggregated":
return cost * area
elif area_type == "per_asset":
return cost * area * number
# this should never happen
raise RuntimeError('Unable to compute cost')
def __toh5__(self):
loss_types = sorted(self.cost_types)
dt = numpy.dtype([('cost_type', hdf5.vstr),
('area_type', hdf5.vstr),
('unit', hdf5.vstr)])
array = numpy.zeros(len(loss_types), dt)
array['cost_type'] = [self.cost_types[lt] for lt in loss_types]
array['area_type'] = [self.area_types[lt] for lt in loss_types]
array['unit'] = [self.units[lt] for lt in loss_types]
attrs = dict(deduct_abs=self.deduct_abs, limit_abs=self.limit_abs,
loss_types=hdf5.array_of_vstr(loss_types))
return array, attrs
def __fromh5__(self, array, attrs):
vars(self).update(attrs)
self.cost_types = dict(zip(self.loss_types, array['cost_type']))
self.area_types = dict(zip(self.loss_types, array['area_type']))
self.units = dict(zip(self.loss_types, array['unit']))
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, vars(self))
costcalculator = CostCalculator(
cost_types=dict(structural='per_area'),
area_types=dict(structural='per_asset'),
units=dict(structural='EUR'))
[docs]class Asset(object):
"""
Describe an Asset as a collection of several values. A value can
represent a replacement cost (e.g. structural cost, business
interruption cost) or another quantity that can be considered for
a risk analysis (e.g. occupants).
Optionally, a Asset instance can hold also a collection of
deductible values and insured limits considered for insured losses
calculations.
"""
def __init__(self,
asset_id,
taxonomy,
number,
location,
values,
area=1,
deductibles=None,
insurance_limits=None,
retrofitteds=None,
calc=costcalculator,
ordinal=None):
"""
:param asset_id:
an unique identifier of the assets within the given exposure
:param taxonomy:
asset taxonomy
:param number:
number of apartments of number of people in the given asset
:param location:
geographic location of the asset
:param dict values:
asset values keyed by loss types
:param dict deductible:
deductible values (expressed as a percentage relative to
the value of the asset) keyed by loss types
:param dict insurance_limits:
insured limits values (expressed as a percentage relative to
the value of the asset) keyed by loss types
:param dict retrofitteds:
asset retrofitting values keyed by loss types
:param calc:
cost calculator instance
:param ordinal:
asset collection ordinal
"""
self.id = asset_id
self.taxonomy = taxonomy
self.number = number
self.location = location
self.values = values
self.area = area
self.retrofitteds = retrofitteds
self.deductibles = deductibles
self.insurance_limits = insurance_limits
self.calc = calc
self.ordinal = ordinal
self._cost = {} # cache for the costs
[docs] def value(self, loss_type, time_event=None):
"""
:returns: the total asset value for `loss_type`
"""
if loss_type == 'occupants':
return self.values['occupants_' + str(time_event)]
try:
val = self._cost[loss_type]
except KeyError:
val = self.calc(loss_type, self.values, self.area, self.number)
self._cost[loss_type] = val
return val
[docs] def deductible(self, loss_type):
"""
:returns: the deductible fraction of the asset cost for `loss_type`
"""
val = self.calc(loss_type, self.deductibles, self.area, self.number)
if self.calc.deduct_abs: # convert to relative value
return val / self.calc(loss_type, self.values,
self.area, self.number)
else:
return val
[docs] def insurance_limit(self, loss_type):
"""
:returns: the limit fraction of the asset cost for `loss_type`
"""
val = self.calc(loss_type, self.insurance_limits, self.area,
self.number)
if self.calc.limit_abs: # convert to relative value
return val / self.calc(loss_type, self.values,
self.area, self.number)
else:
return val
[docs] def retrofitted(self, loss_type, time_event=None):
"""
:returns: the asset retrofitted value for `loss_type`
"""
if loss_type == 'occupants':
return self.values['occupants_' + str(time_event)]
return self.calc(loss_type, self.retrofitteds,
self.area, self.number)
def __lt__(self, other):
return self.id < other.id
def __repr__(self):
return '<Asset %s>' % self.id
[docs]def get_values(loss_type, assets, time_event=None):
"""
:returns:
a numpy array with the values for the given assets, depending on the
loss_type.
"""
return numpy.array([a.value(loss_type, time_event) for a in assets])
[docs]class RiskModel(object):
"""
Base class. Can be used in the tests as a mock.
"""
time_event = None # used in scenario_risk
compositemodel = None # set by get_risk_model
kind = None # must be set in subclasses
def __init__(self, taxonomy, risk_functions):
self.taxonomy = taxonomy
self.risk_functions = risk_functions
@property
def loss_types(self):
"""
The list of loss types in the underlying vulnerability functions,
in lexicographic order
"""
return sorted(self.risk_functions)
[docs] def get_loss_types(self, imt):
"""
:param imt: Intensity Measure Type string
:returns: loss types with risk functions of the given imt
"""
return [lt for lt in self.loss_types
if self.risk_functions[lt].imt == imt]
def __repr__(self):
return '<%s%s>' % (self.__class__.__name__, list(self.risk_functions))
[docs]def rescale(curves, values):
"""
Multiply the losses in each curve of kind (losses, poes) by the
corresponding value.
"""
n = len(curves)
assert n == len(values), (n, len(values))
losses = [curves[i, 0] * values[i] for i in range(n)]
poes = curves[:, 1]
return numpy.array([[losses[i], poes[i]] for i in range(n)])
@registry.add('classical_risk', 'classical', 'disaggregation')
[docs]class Classical(RiskModel):
"""
Classical PSHA-Based RiskModel.
1) Compute loss curves, loss maps for each realization.
2) Compute (if more than one realization is given) mean and
quantiles loss curves and maps.
Per-realization Outputs contain the following fields:
:attr assets:
an iterable over N assets the outputs refer to
:attr loss_curves:
a numpy array of N loss curves. If the curve resolution is C, the final
shape of the array will be (N, 2, C), where the `two` accounts for
the losses/poes dimensions
:attr average_losses:
a numpy array of N average loss values
:attr insured_curves:
a numpy array of N insured loss curves, shaped (N, 2, C)
:attr average_insured_losses:
a numpy array of N average insured loss values
:attr loss_maps:
a numpy array of P elements holding N loss maps where P is the
number of `conditional_loss_poes` considered. Shape: (P, N)
The statistical outputs are stored into
:class:`openquake.risklib.scientific.Output`,
which holds the following fields:
:attr assets:
an iterable of N assets the outputs refer to
:attr mean_curves:
A numpy array with N mean loss curves. Shape: (N, 2)
:attr mean_average_losses:
A numpy array with N mean average loss values
:attr mean_maps:
A numpy array with P mean loss maps. Shape: (P, N)
:attr mean_fractions:
A numpy array with F mean fractions, where F is the number of PoEs
used for disaggregation. Shape: (F, N)
:attr quantile_curves:
A numpy array with Q quantile curves (Q = number of quantiles).
Shape: (Q, N, 2, C)
:attr quantile_average_losses:
A numpy array shaped (Q, N) with average losses
:attr quantile_maps:
A numpy array with Q quantile maps shaped (Q, P, N)
:attr quantile_fractions:
A numpy array with Q quantile maps shaped (Q, F, N)
:attr mean_insured_curves:
A numpy array with N mean insured loss curves. Shape: (N, 2)
:attr mean_average_insured_losses:
A numpy array with N mean average insured loss values
:attr quantile_insured_curves:
A numpy array with Q quantile insured curves (Q = number of quantiles).
Shape: (Q, N, 2, C)
:attr quantile_average_insured_losses:
A numpy array shaped (Q, N) with average insured losses
"""
kind = 'vulnerability'
def __init__(self, taxonomy, vulnerability_functions,
hazard_imtls, lrem_steps_per_interval,
conditional_loss_poes, poes_disagg,
insured_losses=False):
"""
:param imt:
Intensity Measure Type for this riskmodel
:param taxonomy:
Taxonomy for this riskmodel
:param vulnerability_functions:
Dictionary of vulnerability functions by loss type
:param hazard_imtls:
The intensity measure types and levels of the hazard computation
:param lrem_steps_per_interval:
Configuration parameter
:param poes_disagg:
Configuration parameter
:param poes_disagg:
Probability of Exceedance levels used for disaggregate losses by
taxonomy.
:param bool insured_losses:
True if insured loss curves should be computed
See :func:`openquake.risklib.scientific.classical` for a description
of the other parameters.
"""
self.taxonomy = taxonomy
self.risk_functions = vulnerability_functions
self.hazard_imtls = hazard_imtls
self.lrem_steps_per_interval = lrem_steps_per_interval
self.conditional_loss_poes = conditional_loss_poes
self.poes_disagg = poes_disagg
self.insured_losses = insured_losses
self.loss_ratios = {
lt: vf.mean_loss_ratios_with_steps(lrem_steps_per_interval)
for lt, vf in vulnerability_functions.items()}
def __call__(self, loss_type, assets, hazard_curve, _eps=None):
"""
:param str loss_type:
the loss type considered
:param assets:
assets is an iterator over N
:class:`openquake.risklib.scientific.Asset` instances
:param hazard_curve:
an array of poes
:param _eps:
ignored, here only for API compatibility with other calculators
:returns:
a :class:`openquake.risklib.scientific.Classical.Output` instance.
"""
n = len(assets)
vf = self.risk_functions[loss_type]
imls = self.hazard_imtls[vf.imt]
curves = [scientific.classical(
vf, imls, hazard_curve, self.lrem_steps_per_interval)] * n
average_losses = utils.numpy_map(scientific.average_loss, curves)
maps = scientific.loss_map_matrix(self.conditional_loss_poes, curves)
values = get_values(loss_type, assets)
if self.insured_losses and loss_type != 'occupants':
deductibles = [a.deductible(loss_type) for a in assets]
limits = [a.insurance_limit(loss_type) for a in assets]
insured_curves = rescale(
utils.numpy_map(scientific.insured_loss_curve,
curves, deductibles, limits), values)
average_insured_losses = utils.numpy_map(
scientific.average_loss, insured_curves)
else:
insured_curves = None
average_insured_losses = None
return scientific.Output(
assets, loss_type,
loss_curves=rescale(numpy.array(curves), values),
average_losses=values * average_losses,
insured_curves=insured_curves,
average_insured_losses=average_insured_losses,
loss_maps=values * maps)
@registry.add('event_based_risk', 'event_based', 'event_based_rupture')
[docs]class ProbabilisticEventBased(RiskModel):
"""
Implements the Probabilistic Event Based riskmodel
Per-realization Output are saved into
:class:`openquake.risklib.scientific.ProbabilisticEventBased.Output`
which contains the several fields:
:attr assets:
an iterable over N assets the outputs refer to
:attr loss_matrix:
an array of losses shaped N x T (where T is the number of events)
:attr loss_curves:
a numpy array of N loss curves. If the curve resolution is C, the final
shape of the array will be (N, 2, C), where the `two` accounts for
the losses/poes dimensions
:attr average_losses:
a numpy array of N average loss values
:attr stddev_losses:
a numpy array holding N standard deviation of losses
:attr insured_curves:
a numpy array of N insured loss curves, shaped (N, 2, C)
:attr average_insured_losses:
a numpy array of N average insured loss values
:attr stddev_insured_losses:
a numpy array holding N standard deviation of losses
:attr loss_maps:
a numpy array of P elements holding N loss maps where P is the
number of `conditional_loss_poes` considered. Shape: (P, N)
:attr dict event_loss_table:
a dictionary mapping event ids to aggregate loss values
The statistical outputs are stored into
:class:`openquake.risklib.scientific.Output` objects.
"""
kind = 'vulnerability'
def __init__(
self, taxonomy, vulnerability_functions, loss_curve_resolution,
conditional_loss_poes, insured_losses=False):
"""
See :func:`openquake.risklib.scientific.event_based` for a description
of the input parameters.
"""
self.taxonomy = taxonomy
self.risk_functions = vulnerability_functions
self.loss_curve_resolution = loss_curve_resolution
self.conditional_loss_poes = conditional_loss_poes
self.insured_losses = insured_losses
def __call__(self, loss_type, assets, gmvs_eids, epsgetter):
"""
:param str loss_type:
the loss type considered
:param assets:
a list of assets on the same site and with the same taxonomy
:param gmvs_eids:
a composite array of E elements with fields 'gmv' and 'eid'
:param epsgetter:
a callable returning the correct epsilons for the given gmvs
:returns:
a :class:
`openquake.risklib.scientific.ProbabilisticEventBased.Output`
instance.
"""
gmvs, eids = gmvs_eids['gmv'], gmvs_eids['eid']
E = len(gmvs)
I = self.insured_losses + 1
N = len(assets)
loss_ratios = numpy.zeros((N, E, I), F32)
vf = self.risk_functions[loss_type]
means, covs, idxs = vf.interpolate(gmvs)
for i, asset in enumerate(assets):
epsilons = epsgetter(asset.ordinal, eids)
if epsilons is not None:
ratios = vf.sample(means, covs, idxs, epsilons)
else:
ratios = means
loss_ratios[i, idxs, 0] = ratios
if self.insured_losses and loss_type != 'occupants':
loss_ratios[i, idxs, 1] = scientific.insured_losses(
ratios, asset.deductible(loss_type),
asset.insurance_limit(loss_type))
return scientific.Output(
assets, loss_type, loss_ratios=loss_ratios, eids=eids)
@registry.add('ebrisk', 'ucerf_risk', 'ucerf_risk_fast')
[docs]class EventBasedReduced(RiskModel):
"""
Implements the reduced event based riskmodel. This is used by the
EbrCalculator, a much simplified calculator that ignores asset correlation
and insured losses, cannot compute the event loss table, nor the average
losses, nor the loss curves. The only output returned is the total loss.
"""
kind = 'vulnerability'
def __init__(self, taxonomy, vulnerability_functions, time_event):
self.taxonomy = taxonomy
self.risk_functions = vulnerability_functions
self.time_event = time_event
def __call__(self, loss_type, assets, gmvs_eids, epsgetter=None):
"""
:param str loss_type:
the loss type considered
:param assets:
a list of assets on the same site and with the same taxonomy
:param gmvs_eids:
a composite array of E elements with fields 'gmv' and 'eid'
:param epsgetter:
ignored
:returns:
a :class:`openquake.risklib.scientific.Output`
instance with the total losses
"""
vf = self.risk_functions[loss_type]
gmvs, eids = gmvs_eids['gmv'], gmvs_eids['eid']
alosses = numpy.zeros(len(assets))
elosses = numpy.zeros(len(gmvs))
for i, asset in enumerate(assets):
ratios, _covs, idxs = vf.interpolate(gmvs)
losses = ratios * asset.value(loss_type, self.time_event)
alosses[i] = losses.sum()
elosses[idxs] += losses
return scientific.Output(
assets, loss_type, alosses=alosses, elosses=elosses, eids=eids)
@registry.add('classical_bcr')
[docs]class ClassicalBCR(RiskModel):
kind = 'vulnerability'
def __init__(self, taxonomy,
vulnerability_functions_orig,
vulnerability_functions_retro,
hazard_imtls,
lrem_steps_per_interval,
interest_rate, asset_life_expectancy):
self.taxonomy = taxonomy
self.risk_functions = vulnerability_functions_orig
self.retro_functions = vulnerability_functions_retro
self.assets = None # set a __call__ time
self.interest_rate = interest_rate
self.asset_life_expectancy = asset_life_expectancy
self.hazard_imtls = hazard_imtls
self.lrem_steps_per_interval = lrem_steps_per_interval
def __call__(self, loss_type, assets, hazard, _eps=None, _eids=None):
"""
:param loss_type: the loss type
:param assets: a list of N assets of the same taxonomy
:param hazard: an hazard curve
:param _eps: dummy parameter, unused
:param _eids: dummy parameter, unused
:returns: a :class:`openquake.risklib.scientific.Output` instance
"""
n = len(assets)
self.assets = assets
vf = self.risk_functions[loss_type]
imls = self.hazard_imtls[vf.imt]
vf_retro = self.retro_functions[loss_type]
curves_orig = functools.partial(scientific.classical, vf, imls,
steps=self.lrem_steps_per_interval)
curves_retro = functools.partial(scientific.classical, vf_retro, imls,
steps=self.lrem_steps_per_interval)
original_loss_curves = utils.numpy_map(curves_orig, [hazard] * n)
retrofitted_loss_curves = utils.numpy_map(curves_retro, [hazard] * n)
eal_original = utils.numpy_map(
scientific.average_loss, original_loss_curves)
eal_retrofitted = utils.numpy_map(
scientific.average_loss, retrofitted_loss_curves)
bcr_results = [
scientific.bcr(
eal_original[i], eal_retrofitted[i],
self.interest_rate, self.asset_life_expectancy,
asset.value(loss_type), asset.retrofitted(loss_type))
for i, asset in enumerate(assets)]
return scientific.Output(
assets, loss_type,
data=list(zip(eal_original, eal_retrofitted, bcr_results)))
@registry.add('scenario_risk', 'scenario')
[docs]class Scenario(RiskModel):
"""
Implements the Scenario riskmodel
"""
kind = 'vulnerability'
def __init__(self, taxonomy, vulnerability_functions,
insured_losses, time_event=None):
self.taxonomy = taxonomy
self.risk_functions = vulnerability_functions
self.insured_losses = insured_losses
self.time_event = time_event
def __call__(self, loss_type, assets, ground_motion_values, epsgetter):
epsilons = epsgetter(None, None)
values = get_values(loss_type, assets, self.time_event)
ok = ~numpy.isnan(values)
if not ok.any():
# there are no assets with a value
return
# there may be assets without a value
missing_value = not ok.all()
if missing_value:
assets = assets[ok]
epsilons = epsilons[ok]
# a matrix of N x E elements
vf = self.risk_functions[loss_type]
means, covs, idxs = vf.interpolate(ground_motion_values)
loss_ratio_matrix = numpy.zeros((len(assets), len(epsilons[0])))
for i, eps in enumerate(epsilons):
loss_ratio_matrix[i, idxs] = vf.sample(means, covs, idxs, eps)
# another matrix of N x E elements
loss_matrix = (loss_ratio_matrix.T * values).T
# an array of E elements
aggregate_losses = loss_matrix.sum(axis=0)
if self.insured_losses and loss_type != "occupants":
deductibles = [a.deductible(loss_type) for a in assets]
limits = [a.insurance_limit(loss_type) for a in assets]
insured_loss_ratio_matrix = utils.numpy_map(
scientific.insured_losses,
loss_ratio_matrix, deductibles, limits)
insured_loss_matrix = (insured_loss_ratio_matrix.T * values).T
else:
insured_loss_matrix = numpy.empty_like(loss_ratio_matrix)
insured_loss_matrix.fill(numpy.nan)
# aggregating per asset, getting a vector of E elements
insured_losses = insured_loss_matrix.sum(axis=0)
return scientific.Output(
assets, loss_type, loss_matrix=loss_matrix,
loss_ratio_matrix=loss_ratio_matrix,
aggregate_losses=aggregate_losses,
insured_loss_matrix=insured_loss_matrix,
insured_losses=insured_losses)
@registry.add('scenario_damage')
[docs]class Damage(RiskModel):
"""
Implements the ScenarioDamage riskmodel
"""
kind = 'fragility'
def __init__(self, taxonomy, fragility_functions):
self.taxonomy = taxonomy
self.risk_functions = fragility_functions
def __call__(self, loss_type, assets, gmvs, _eps=None):
"""
:param loss_type: the loss type
:param assets: a list of N assets of the same taxonomy
:param gmvs: an array of E elements
:param _eps: dummy parameter, unused
:returns: an array of N assets and an array of N x E x D elements
where N is the number of points, E the number of events
and D the number of damage states.
"""
n = len(assets)
ffs = self.risk_functions[loss_type]
damages = numpy.array(
[scientific.scenario_damage(ffs, gmv) for gmv in gmvs])
return scientific.Output(assets, loss_type, damages=[damages] * n)
@registry.add('classical_damage')
[docs]class ClassicalDamage(Damage):
"""
Implements the ClassicalDamage riskmodel
"""
kind = 'fragility'
def __init__(self, taxonomy, fragility_functions,
hazard_imtls, investigation_time,
risk_investigation_time):
self.taxonomy = taxonomy
self.risk_functions = fragility_functions
self.hazard_imtls = hazard_imtls
self.investigation_time = investigation_time
self.risk_investigation_time = risk_investigation_time
def __call__(self, loss_type, assets, hazard_curve, _eps=None):
"""
:param loss_type: the loss type
:param assets: a list of N assets of the same taxonomy
:param hazard_curve: an hazard curve array
:returns: an array of N assets and an array of N x D elements
where N is the number of points and D the number of damage states.
"""
ffl = self.risk_functions[loss_type]
hazard_imls = self.hazard_imtls[ffl.imt]
damage = scientific.classical_damage(
ffl, hazard_imls, hazard_curve,
investigation_time=self.investigation_time,
risk_investigation_time=self.risk_investigation_time)
return scientific.Output(
assets, loss_type, damages=[a.number * damage for a in assets])
# NB: the approach used here relies on the convention of having the
# names of the arguments of the riskmodel class to be equal to the
# names of the parameter in the oqparam object. This is view as a
# feature, since it forces people to be consistent with the names,
# in the spirit of the 'convention over configuration' philosophy
[docs]def get_riskmodel(taxonomy, oqparam, **extra):
"""
Return an instance of the correct riskmodel class, depending on the
attribute `calculation_mode` of the object `oqparam`.
:param taxonomy:
a taxonomy string
:param oqparam:
an object containing the parameters needed by the riskmodel class
:param extra:
extra parameters to pass to the riskmodel class
"""
riskmodel_class = registry[oqparam.calculation_mode]
# arguments needed to instantiate the riskmodel class
argnames = inspect.getargspec(riskmodel_class.__init__).args[3:]
# arguments extracted from oqparam
known_args = set(name for name, value in
inspect.getmembers(oqparam.__class__)
if isinstance(value, valid.Param))
all_args = {}
for argname in argnames:
if argname in known_args:
all_args[argname] = getattr(oqparam, argname)
if 'hazard_imtls' in argnames: # special case
all_args['hazard_imtls'] = oqparam.imtls
all_args.update(extra)
missing = set(argnames) - set(all_args)
if missing:
raise TypeError('Missing parameter: %s' % ', '.join(missing))
return riskmodel_class(taxonomy, **all_args)