Source code for openquake.calculators.classical_damage
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2017 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import numpy
from openquake.baselib.general import AccumDict
from openquake.calculators import base, classical_risk
[docs]def classical_damage(riskinput, riskmodel, param, monitor):
"""
Core function for a classical damage computation.
:param riskinput:
a :class:`openquake.risklib.riskinput.RiskInput` object
:param riskmodel:
a :class:`openquake.risklib.riskinput.CompositeRiskModel` instance
:param param:
dictionary of extra parameters
:param monitor:
:class:`openquake.baselib.performance.Monitor` instance
:returns:
a nested dictionary rlz_idx -> asset -> <damage array>
"""
with monitor:
result = {i: AccumDict() for i in range(len(riskinput.rlzs))}
for outputs in riskmodel.gen_outputs(riskinput, monitor):
for l, out in enumerate(outputs):
ordinals = [a.ordinal for a in outputs.assets]
result[outputs.r] += dict(zip(ordinals, out))
return result
@base.calculators.add('classical_damage')
[docs]class ClassicalDamageCalculator(classical_risk.ClassicalRiskCalculator):
"""
Scenario damage calculator
"""
core_task = classical_damage
[docs] def check_poes(self, curves_by_rlz):
"""
Raise an error if one PoE = 1, since it would produce a log(0) in
:class:`openquake.risklib.scientific.annual_frequency_of_exceedence`
"""
for rlz, curves in curves_by_rlz.items():
for imt in self.oqparam.imtls:
for sid, poes in enumerate(curves[imt]):
if (poes == 1).any():
raise ValueError('Found a PoE=1 for site_id=%d, %s'
% (sid, imt))
[docs] def post_execute(self, result):
"""
Export the result in CSV format.
:param result:
a dictionary asset -> fractions per damage state
"""
damages_dt = numpy.dtype([(ds, numpy.float32)
for ds in self.riskmodel.damage_states])
damages = numpy.zeros((self.N, self.R), damages_dt)
for r in result:
for aid, fractions in result[r].items():
damages[aid, r] = tuple(fractions)
self.datastore['damages-rlzs'] = damages