# Copyright (c) 2010-2014, GEM Foundation.
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
"""
Core functionality for the classical PSHA risk calculator.
"""
import collections
import itertools
import logging
import numpy
from openquake.hazardlib.geo import mesh
from openquake.risklib import scientific
from openquake.risklib.utils import numpy_map
from openquake.calculators import event_based_risk
from openquake.engine.calculators.risk import (
base, hazard_getters, writers)
from openquake.engine.db import models
from openquake.engine import engine, writer, logs
from openquake.engine.calculators import calculators
from openquake.engine.performance import EnginePerformanceMonitor
def _filter_loss_matrix_assets(loss_matrix, assets, specific_assets):
# reduce loss_matrix and assets to the specific_assets
mask = numpy.array([a.asset_ref in specific_assets for a in assets])
return loss_matrix[mask], numpy.array(assets)[mask]
[docs]def event_based(workflow, getter, outputdict, params, monitor):
"""
Celery task for the event based risk calculator.
:param job_id: the id of the current
:class:`openquake.engine.db.models.OqJob`
:param workflow:
A :class:`openquake.risklib.workflows.Workflow` instance
:param getter:
A :class:`HazardGetter` instance
:param outputdict:
An instance of :class:`..writers.OutputDict` containing
output container instances (e.g. a LossCurve)
:param params:
An instance of :class:`..base.CalcParams` used to compute
derived outputs
:param monitor:
A monitor instance
:returns:
A dictionary {loss_type: event_loss_table}
"""
# NB: event_loss_table is a dictionary (loss_type, out_id) -> loss,
# out_id can be None, and it that case it stores the statistics
event_loss_table = {}
# num_loss is a dictionary asset_ref -> array([not_zeros, total])
num_losses = collections.defaultdict(lambda: numpy.zeros(2, dtype=int))
specific_assets = set(params.specific_assets)
statistics = params.statistics # enabled by default
# keep in memory the loss_matrix only when specific_assets are set
workflow.return_loss_matrix = bool(specific_assets)
# the insert here will work only if specific_assets is set
inserter = writer.CacheInserter(
models.EventLossAsset, max_cache_size=10000)
for loss_type in workflow.loss_types:
with monitor('computing individual risk', autoflush=True):
outputs = workflow.compute_all_outputs(getter, loss_type)
if statistics:
outputs = list(outputs) # expand the generator
# this is needed, otherwise the call to workflow.statistics
# below will find an empty iterable; notice that by disabling
# the statistics we can save memory by keeping only one
# hazard realization at the time
for out in outputs:
event_loss_table[loss_type, out.hid] = out.event_loss_table
disagg_outputs = None # changed if params.sites_disagg is set
if specific_assets:
loss_matrix, assets = _filter_loss_matrix_assets(
out.loss_matrix, out.assets, specific_assets)
if len(assets):
# compute the loss per rupture per asset
event_loss = models.EventLoss.objects.get(
output__oq_job=monitor.job_id,
output__output_type='event_loss_asset',
loss_type=loss_type, hazard_output=out.hid)
# losses is E x n matrix, where E is the number of ruptures
# and n the number of assets in the specific_assets set
losses = (loss_matrix.transpose() *
numpy_map(lambda a: a.value(loss_type), assets))
# save an EventLossAsset record for each specific asset
for rup_id, losses_per_rup in zip(
getter.rupture_ids, losses):
for asset, loss_per_rup in zip(assets, losses_per_rup):
if loss_per_rup: # save only non-zero losses
ela = models.EventLossAsset(
event_loss=event_loss, rupture_id=rup_id,
asset=asset, loss=loss_per_rup)
inserter.add(ela)
# update the counters: not_zeros is incremented
# only if loss_per_rup is nonzero, total always
num_losses[asset.asset_ref] += numpy.array(
[bool(loss_per_rup), 1])
if params.sites_disagg:
with monitor('disaggregating results', autoflush=True):
ruptures = [models.SESRupture.objects.get(pk=rid)
for rid in getter.rupture_ids]
disagg_outputs = disaggregate(
out, [r.rupture for r in ruptures], params)
with monitor('saving individual risk', autoflush=True):
save_individual_outputs(
outputdict.with_args(hazard_output_id=out.hid,
loss_type=loss_type),
out, disagg_outputs, params)
if statistics and len(outputs) > 1:
stats = workflow.statistics(
outputs, params.quantile_loss_curves)
with monitor('saving risk statistics', autoflush=True):
save_statistical_output(
outputdict.with_args(
hazard_output_id=None, loss_type=loss_type),
stats, params)
event_loss_table[loss_type, None] = stats.event_loss_table
inserter.flush()
# log info about the rows entered in the event_loss_asset table
for asset_ref in sorted(num_losses):
not_zeros, total = num_losses[asset_ref]
logs.LOG.info('Saved %d/%d losses for asset %s',
not_zeros, total, asset_ref)
return event_loss_table
[docs]def save_individual_outputs(outputdict, outputs, disagg_outputs, params):
"""
Save loss curves, loss maps and loss fractions associated with a
calculation unit
:param outputdict:
a :class:`openquake.engine.calculators.risk.writers.OutputDict`
instance holding the reference to the output container objects
:param outputs:
a :class:`openquake.risklib.workflows.ProbabilisticEventBased.Output`
holding the output data for a calculation unit
:param disagg_outputs:
a :class:`.DisaggregationOutputs` holding the disaggreation
output data for a calculation unit
:param params:
a :class:`openquake.engine.calculators.risk.base.CalcParams`
holding the parameters for this calculation
"""
outputdict.write(
outputs.assets,
(outputs.loss_curves, outputs.average_losses, outputs.stddev_losses),
output_type="event_loss_curve")
outputdict.write_all(
"poe", params.conditional_loss_poes,
outputs.loss_maps,
outputs.assets,
output_type="loss_map")
if disagg_outputs is not None:
# FIXME. We should avoid synthetizing the generator
assets = list(disagg_outputs.assets_disagg)
outputdict.write(
assets,
disagg_outputs.magnitude_distance,
disagg_outputs.fractions,
output_type="loss_fraction",
variable="magnitude_distance")
outputdict.write(
assets,
disagg_outputs.coordinate, disagg_outputs.fractions,
output_type="loss_fraction",
variable="coordinate")
if outputs.insured_curves is not None:
outputdict.write(
outputs.assets,
(outputs.insured_curves, outputs.average_insured_losses,
outputs.stddev_insured_losses),
output_type="event_loss_curve", insured=True)
[docs]def save_statistical_output(outputdict, stats, params):
"""
Save statistical outputs (mean and quantile loss curves, mean and
quantile loss maps) for the calculation.
:param outputdict:
a :class:`openquake.engine.calculators.risk.writers.OutputDict`
instance holding the reference to the output container objects
:param stats:
:class:
`openquake.risklib.workflows.ProbabilisticEventBased.StatisticalOutput`
holding the statistical output data
:param params:
a :class:`openquake.engine.calculators.risk.base.CalcParams`
holding the parameters for this calculation
"""
outputdict.write(
stats.assets, (stats.mean_curves, stats.mean_average_losses),
output_type="loss_curve", statistics="mean")
outputdict.write_all(
"poe", params.conditional_loss_poes, stats.mean_maps,
stats.assets, output_type="loss_map", statistics="mean")
# quantile curves and maps
outputdict.write_all(
"quantile", params.quantile_loss_curves,
[(c, a) for c, a in itertools.izip(stats.quantile_curves,
stats.quantile_average_losses)],
stats.assets, output_type="loss_curve", statistics="quantile")
if params.quantile_loss_curves:
for quantile, maps in zip(
params.quantile_loss_curves, stats.quantile_maps):
outputdict.write_all(
"poe", params.conditional_loss_poes, maps,
stats.assets, output_type="loss_map",
statistics="quantile", quantile=quantile)
# mean and quantile insured curves
if stats.mean_insured_curves is not None:
outputdict.write(
stats.assets, (stats.mean_insured_curves,
stats.mean_average_insured_losses),
output_type="loss_curve", statistics="mean", insured=True)
outputdict.write_all(
"quantile", params.quantile_loss_curves,
[(c, a) for c, a in itertools.izip(
stats.quantile_insured_curves,
stats.quantile_average_insured_losses)],
stats.assets,
output_type="loss_curve", statistics="quantile", insured=True)
[docs]class DisaggregationOutputs(object):
def __init__(self, assets_disagg, magnitude_distance,
coordinate, fractions):
self.assets_disagg = assets_disagg
self.magnitude_distance = magnitude_distance
self.coordinate = coordinate
self.fractions = fractions
[docs]def disaggregate(outputs, ruptures, params):
"""
Compute disaggregation outputs given the individual `outputs` and `params`
:param outputs:
an instance of
:class:`openquake.risklib.workflows.ProbabilisticEventBased.Output`
:param params:
an instance of :class:`..base.CalcParams`
:param list rupture_ids:
a list of :class:`openquake.engine.db.models.SESRupture` IDs
:returns:
an instance of :class:`DisaggregationOutputs`
"""
def disaggregate_site(site, loss_ratios):
for fraction, rupture in zip(loss_ratios, ruptures):
s = rupture.surface
m = mesh.Mesh(numpy.array([site.x]), numpy.array([site.y]), None)
mag = numpy.floor(rupture.magnitude / params.mag_bin_width)
dist = numpy.floor(
s.get_joyner_boore_distance(m))[0] / params.distance_bin_width
closest_point = iter(s.get_closest_points(m)).next()
lon = closest_point.longitude / params.coordinate_bin_width
lat = closest_point.latitude / params.coordinate_bin_width
yield "%d,%d" % (mag, dist), "%d,%d" % (lon, lat), fraction
assets_disagg = []
disagg_matrix = []
for asset, losses in zip(outputs.assets, outputs.loss_matrix):
if (asset.site.x, asset.site.y) in params.sites_disagg:
disagg_matrix.extend(list(disaggregate_site(asset.site, losses)))
# FIXME. the functions in
# openquake.engine.calculators.risk.writers requires an
# asset per each row in the disaggregation matrix. To this
# aim, we repeat the assets that will be passed to such
# functions
assets_disagg = itertools.chain(
assets_disagg,
itertools.repeat(asset, len(ruptures)))
if assets_disagg:
magnitudes, coordinates, fractions = zip(*disagg_matrix)
else:
magnitudes, coordinates, fractions = [], [], []
return DisaggregationOutputs(
assets_disagg, magnitudes, coordinates, fractions)
@calculators.add('event_based_risk')
[docs]class EventBasedRiskCalculator(base.RiskCalculator):
"""
Probabilistic Event Based PSHA risk calculator. Computes loss
curves, loss maps, aggregate losses and insured losses for a given
set of assets.
"""
core = staticmethod(event_based)
# FIXME(lp). Validate sites_disagg to ensure non-empty outputs
validators = base.RiskCalculator.validators
output_builders = [writers.EventLossCurveMapBuilder,
writers.LossFractionBuilder]
getter_class = hazard_getters.GroundMotionGetter
def __init__(self, job):
logging.warn('This calculator is deprecated, use oq-engine --lite')
super(EventBasedRiskCalculator, self).__init__(job)
# accumulator for the event loss tables
self.acc = collections.defaultdict(collections.Counter)
self.sites_disagg = self.job.get_param('sites_disagg', [])
self.specific_assets = self.job.get_param('specific_assets', [])
[docs] def pre_execute(self):
"""
Base pre_execute + build Event Loss Asset outputs if needed
"""
super(EventBasedRiskCalculator, self).pre_execute()
for hazard_output in self.get_hazard_outputs():
for loss_type in self.loss_types:
models.EventLoss.objects.create(
output=models.Output.objects.create_output(
self.job,
"Event Loss Table type=%s, hazard=%s" % (
loss_type, hazard_output.id),
"event_loss"),
loss_type=loss_type,
hazard_output=hazard_output)
if self.specific_assets:
models.EventLoss.objects.create(
output=models.Output.objects.create_output(
self.job,
"Event Loss Asset type=%s, hazard=%s" % (
loss_type, hazard_output.id),
"event_loss_asset"),
loss_type=loss_type,
hazard_output=hazard_output)
@EnginePerformanceMonitor.monitor
def agg_result(self, acc, event_loss_table):
"""
Updates the event loss table
"""
newdict = acc.copy()
for (loss_type, out_id), counter in event_loss_table.iteritems():
try:
c = newdict[loss_type, out_id]
except KeyError:
pass
else:
newdict[loss_type, out_id] = c + counter
return newdict
[docs] def post_process(self):
"""
Compute aggregate loss curves and event loss tables
"""
oq = self.oqparam
with self.monitor('post processing', autoflush=True):
inserter = writer.CacheInserter(models.EventLossData,
max_cache_size=10000)
for (loss_type, out_id), event_loss_table in self.acc.items():
if out_id: # values for individual realizations
hazard_output = models.Output.objects.get(pk=out_id)
event_loss = models.EventLoss.objects.get(
output__oq_job=self.job,
output__output_type='event_loss',
loss_type=loss_type, hazard_output=hazard_output)
if isinstance(hazard_output.output_container,
models.SESCollection):
ses_coll = hazard_output.output_container
rupture_ids = ses_coll.get_ruptures().values_list(
'id', flat=True)
else: # extract the SES collection from the Gmf
rupture_ids = models.SESRupture.objects.filter(
rupture__ses_collection__trt_model__lt_model=
hazard_output.output_container.
lt_realization.lt_model).values_list(
'id', flat=True)
for rupture_id in rupture_ids:
if rupture_id in event_loss_table:
inserter.add(
models.EventLossData(
event_loss_id=event_loss.id,
rupture_id=rupture_id,
aggregate_loss=event_loss_table[
rupture_id]))
inserter.flush()
aggregate_losses = [
event_loss_table[rupture_id]
for rupture_id in rupture_ids
if rupture_id in event_loss_table]
if aggregate_losses:
aggregate_loss = scientific.event_based(
aggregate_losses, ses_ratio=oq.ses_ratio,
curve_resolution=oq.loss_curve_resolution)
models.AggregateLossCurveData.objects.create(
loss_curve=models.LossCurve.objects.create(
aggregate=True, insured=False,
hazard_output=hazard_output,
loss_type=loss_type,
output=models.Output.objects.create_output(
self.job,
"aggregate loss curves. "
"loss_type=%s hazard=%s" % (
loss_type, hazard_output),
"agg_loss_curve")),
losses=aggregate_loss[0],
poes=aggregate_loss[1],
average_loss=sum(aggregate_losses),
stddev_loss=None)
@calculators.add('ebr')
[docs]class EBRCalculator(event_based_risk.EventBasedRiskCalculator):
"""
Smaller Event Based risk calculator for the event loss table.
"""
def __init__(self, job):
self.job = job
oq = job.get_oqparam()
monitor = EnginePerformanceMonitor('ebr', job.id)
calc_id = engine.get_calc_id(job.id)
super(EBRCalculator, self).__init__(oq, monitor, calc_id)
job.ds_calc_dir = self.datastore.calc_dir
job.save()
[docs] def clean_up(self):
engine.expose_outputs(self.datastore, self.job)
super(EBRCalculator, self).clean_up()