# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010-2014, GEM Foundation
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
"""
Core functionality for the scenario_damage risk calculator.
"""
import numpy
from openquake.commonlib.readinput import get_risk_model
from openquake.engine.calculators.risk import (
base, hazard_getters, writers, validation)
from openquake.engine.performance import EnginePerformanceMonitor
from openquake.engine.db import models
from openquake.engine.calculators import calculators
[docs]def scenario_damage(workflow, getter, outputdict, params, monitor):
"""
Celery task for the scenario damage risk calculator.
:param workflow:
A :class:`openquake.risklib.workflows.Workflow` instance
:param getter:
A HazardGetter instance
:param outputdict:
An instance of :class:`..writers.OutputDict` containing
output container instances (in this case only `LossMap`)
:param params:
An instance of :class:`..base.CalcParams` used to compute
derived outputs
:param monitor:
A monitor instance
:returns:
A matrix of fractions and a taxonomy string
"""
[ffs] = workflow.risk_functions
# and no output containers
assert len(outputdict) == 0, outputdict
with monitor('computing risk', autoflush=True):
out = workflow(
'damage', getter.assets, getter.get_data(), None)
aggfractions = sum(out.damages[i] * asset.number_of_units
for i, asset in enumerate(out.assets))
with monitor('saving damage per assets', autoflush=True):
writers.damage_distribution(
getter.assets, out.damages, params.damage_state_ids)
return {getter.assets[0].taxonomy: aggfractions}
@calculators.add('scenario_damage')
[docs]class ScenarioDamageRiskCalculator(base.RiskCalculator):
"""
Scenario Damage Risk Calculator. Computes four kinds of damage
distributions: per asset, per taxonomy, total and collapse map.
:attr dict fragility_functions:
A dictionary of dictionary mapping taxonomy ->
(limit state -> fragility function) where a fragility function is an
instance of
:class:`openquake.risklib.scientific.FragilityFunctionContinuous` or
:class:`openquake.risklib.scientific.FragilityFunctionDiscrete`.
"""
#: The core calculation celery task function
core = staticmethod(scenario_damage)
validators = [validation.HazardIMT, validation.EmptyExposure,
validation.OrphanTaxonomies,
validation.NoRiskModels]
# FIXME. scenario damage calculator does not use output builders
output_builders = []
getter_class = hazard_getters.GroundMotionGetter
def __init__(self, job):
super(ScenarioDamageRiskCalculator, self).__init__(job)
self.acc = {} # taxonomy -> fractions
@EnginePerformanceMonitor.monitor
def agg_result(self, acc, task_result):
"""
Update the dictionary acc, i.e. aggregate the damage distribution
by taxonomy; called every time a block of assets is computed for each
taxonomy. Fractions and taxonomy are extracted from task_result
:param task_result:
A pair (fractions, taxonomy)
"""
acc = acc.copy()
for taxonomy, fractions in task_result.iteritems():
if fractions is not None:
if taxonomy not in acc:
acc[taxonomy] = numpy.zeros(fractions.shape)
acc[taxonomy] += fractions
return acc
[docs] def post_process(self):
"""
Save the damage distributions by taxonomy and total on the db.
"""
damage_state_ids = self.oqparam.damage_state_ids
models.Output.objects.create_output(
self.job, "Damage Distribution per Asset",
"dmg_dist_per_asset")
models.Output.objects.create_output(
self.job, "Collapse Map per Asset",
"collapse_map")
if self.acc:
models.Output.objects.create_output(
self.job, "Damage Distribution per Taxonomy",
"dmg_dist_per_taxonomy")
tot = None
for taxonomy, fractions in self.acc.iteritems():
writers.damage_distribution_per_taxonomy(
fractions, damage_state_ids, taxonomy)
if tot is None: # only the first time
tot = numpy.zeros(fractions.shape)
tot += fractions
if tot is not None:
models.Output.objects.create_output(
self.job, "Damage Distribution Total",
"dmg_dist_total")
writers.total_damage_distribution(tot, damage_state_ids)
[docs] def get_risk_model(self):
"""
Load fragility model and store damage states
"""
risk_model = get_risk_model(self.oqparam)
for lsi, dstate in enumerate(risk_model.damage_states):
models.DmgState.objects.get_or_create(
risk_calculation=self.job, dmg_state=dstate, lsi=lsi)
damage_state_ids = [d.id for d in models.DmgState.objects.filter(
risk_calculation=self.job).order_by('lsi')]
self.oqparam.damage_state_ids = damage_state_ids
self.loss_types.add('damage') # single loss_type
return risk_model