Source code for openquake.calculators.classical_damage
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2018 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import numpy
from openquake.baselib.general import AccumDict
from openquake.hazardlib import stats
from openquake.calculators import base, classical_risk
[docs]def classical_damage(riskinputs, riskmodel, param, monitor):
"""
Core function for a classical damage computation.
:param riskinputs:
:class:`openquake.risklib.riskinput.RiskInput` objects
:param riskmodel:
a :class:`openquake.risklib.riskinput.CompositeRiskModel` instance
:param param:
dictionary of extra parameters
:param monitor:
:class:`openquake.baselib.performance.Monitor` instance
:returns:
a nested dictionary rlz_idx -> asset -> <damage array>
"""
result = AccumDict(accum=AccumDict())
for ri in riskinputs:
for outputs in riskmodel.gen_outputs(ri, monitor):
for l, out in enumerate(outputs):
ordinals = [a.ordinal for a in outputs.assets]
result[l, outputs.rlzi] += dict(zip(ordinals, out))
return result
[docs]@base.calculators.add('classical_damage')
class ClassicalDamageCalculator(classical_risk.ClassicalRiskCalculator):
"""
Scenario damage calculator
"""
core_task = classical_damage
[docs] def post_execute(self, result):
"""
Export the result in CSV format.
:param result:
a dictionary (l, r) -> asset_ordinal -> fractions per damage state
"""
damages_dt = numpy.dtype([(ds, numpy.float32)
for ds in self.riskmodel.damage_states])
damages = numpy.zeros((self.A, self.R, self.L * self.I), damages_dt)
for l, r in result:
for aid, fractions in result[l, r].items():
damages[aid, r, l] = tuple(fractions)
stats.set_rlzs_stats(self.datastore, 'damages', damages)