# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2021 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import copy
import logging
import numpy
from openquake.hazardlib.stats import set_rlzs_stats
from openquake.risklib import scientific, riskinput
from openquake.calculators import base, post_risk, views
U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64  # higher precision to avoid task order dependency
[docs]def run_sec_sims(out, loss_types, sec_sims, seed):
    """
    :param out: a dictionary of losses
    :param loss_types: the loss types
    :param sec_sims: pair (probability field, number of simulations)
    :param seed: random seed to use
    Run secondary simulations and update the losses
    """
    [(prob_field, num_sims)] = sec_sims
    numpy.random.seed(seed)
    probs = out.haz[prob_field].to_numpy()   # LiqProb
    affected = numpy.random.random((num_sims, 1)) < probs  # (N, E)
    # doing the mean on the secondary simulations for each event
    for lt in loss_types:
        out[lt][:] = numpy.mean(affected * out[lt], axis=0)  # shape E 
[docs]def event_based_risk(riskinputs, param, monitor):
    """
    Core function for a scenario_risk/event_based_risk computation.
    :param riskinputs:
        a list of :class:`openquake.risklib.riskinput.RiskInput` objects
    :param param:
        dictionary of extra parameters
    :param monitor:
        :class:`openquake.baselib.performance.Monitor` instance
    :returns:
        a dictionary {
        'alt': AggLoggTable instance
        'losses_by_asset': list of tuples
        (lt_idx, rlz_idx, asset_ordinal, totloss)}
    """
    crmodel = monitor.read('crmodel')
    alt = copy.copy(param['alt'])
    result = dict(losses_by_asset=[], alt=alt)
    sec_sims = param['secondary_simulations'].items()
    for ri in riskinputs:
        for out in ri.gen_outputs(crmodel, monitor, param['tempname']):
            if sec_sims:
                run_sec_sims(out, crmodel.loss_types, sec_sims,
                             param['master_seed'])
            alt.aggregate(
                out, param['minimum_asset_loss'], param['aggregate_by'])
            for lti, loss_type in enumerate(crmodel.loss_types):
                losses = out[loss_type]
                for a, asset in enumerate(ri.assets):
                    aid = asset['ordinal']
                    lba = losses[a].sum()
                    if lba:
                        result['losses_by_asset'].append(
                            (lti, out.rlzi, aid, lba))
    return result 
[docs]@base.calculators.add('scenario_risk', 'event_based_risk')
class EventBasedRiskCalculator(base.RiskCalculator):
    """
    Run a scenario/event_based risk calculation
    """
    core_task = event_based_risk
    is_stochastic = True
    precalc = 'event_based'
    accept_precalc = ['scenario', 'event_based', 'event_based_risk', 'ebrisk']
[docs]    def pre_execute(self):
        """
        Compute the GMFs, build the epsilons, the riskinputs, and a dictionary
        with the unit of measure, used in the export phase.
        """
        oq = self.oqparam
        parent = self.datastore.parent
        if parent:
            self.datastore['full_lt'] = parent['full_lt']
            ne = len(parent['events'])
            logging.info('There are %d ruptures and %d events',
                         len(parent['ruptures']), ne)
        if oq.investigation_time and oq.return_periods != [0]:
            # setting return_periods = 0 disable loss curves
            eff_time = oq.investigation_time * oq.ses_per_logic_tree_path
            if eff_time < 2:
                logging.warning(
                    'eff_time=%s is too small to compute loss curves',
                    eff_time)
        super().pre_execute()
        if not oq.ground_motion_fields:
            return  # this happens in the reportwriter
        self.assetcol = self.datastore['assetcol']
        self.riskinputs = self.build_riskinputs('gmf')
        self.param['tempname'] = riskinput.cache_epsilons(
            self.datastore, oq, self.assetcol, self.crmodel, self.E)
        self.param['aggregate_by'] = oq.aggregate_by
        self.param['secondary_simulations'] = oq.secondary_simulations
        self.param['master_seed'] = oq.master_seed
        self.rlzs = self.datastore['events']['rlz_id']
        self.num_events = numpy.bincount(self.rlzs)  # events by rlz
        aggkey = self.assetcol.tagcol.get_aggkey(oq.aggregate_by)
        self.param['alt'] = self.acc = scientific.AggLossTable.new(
            aggkey, oq.loss_names, sec_losses=[])
        L = len(oq.loss_names)
        self.avglosses = numpy.zeros((len(self.assetcol), self.R, L), F32)
        if oq.investigation_time:  # event_based
            self.avg_ratio = numpy.array([oq.ses_ratio] * self.R)
        else:  # scenario
            self.avg_ratio = 1. / self.num_events 
[docs]    def combine(self, acc, res):
        if res is None:
            raise MemoryError('You ran out of memory!')
        with self.monitor('aggregating losses', measuremem=False):
            self.acc += res['alt']
            for (l, r, aid, lba) in res['losses_by_asset']:
                self.avglosses[aid, r, l] = lba * self.avg_ratio[r]
        return acc 
[docs]    def post_execute(self, result):
        """
        Compute stats for the aggregated distributions and save
        the results on the datastore.
        """
        oq = self.oqparam
        L = len(oq.loss_names)
        # avg losses must be 32 bit otherwise export losses_by_asset will
        # break the QGIS test for ScenarioRisk
        self.datastore['avg_losses-rlzs'] = self.avglosses
        set_rlzs_stats(self.datastore, 'avg_losses',
                       asset_id=self.assetcol['id'],
                       loss_type=self.oqparam.loss_names)
        with self.monitor('saving agg_loss_table'):
            logging.info('Saving the agg_loss_table')
            K = len(result.aggkey) - 1
            alt = result.to_dframe()
            self.datastore.create_dframe('agg_loss_table', alt, K=K, L=L)
        # save agg_losses
        units = self.datastore['cost_calculator'].get_units(oq.loss_names)
        if oq.investigation_time is None:  # scenario, compute agg_losses
            alt['rlz_id'] = self.rlzs[alt.event_id.to_numpy()]
            agglosses = numpy.zeros((K+1, self.R, L), F32)
            for (agg_id, rlz_id), df in alt.groupby(['agg_id', 'rlz_id']):
                agglosses[agg_id, rlz_id] = numpy.array(
                    [df[ln].sum() for ln in oq.loss_names]
                ) * self.avg_ratio[rlz_id]
            self.datastore['agg_losses-rlzs'] = agglosses
            set_rlzs_stats(self.datastore, 'agg_losses',
                           agg_id=K, loss_types=oq.loss_names, units=units)
            logging.info('Total portfolio loss\n' +
                         views.view('portfolio_loss', self.datastore))
        else:  # event_based_risk, run post_risk
            prc = post_risk.PostRiskCalculator(oq, self.datastore.calc_id)
            if hasattr(self, 'exported'):
                prc.exported = self.exported
            prc.run(exports='')
            try:
                agglosses = self.datastore['agg_losses-rlzs'][K]
            except KeyError:  # not enough events in post_risk
                logging.error('No agg_losses-rlzs')
                return
        # sanity check on the agg_losses and sum_losses
        sumlosses = self.avglosses.sum(axis=0)
        if not numpy.allclose(agglosses, sumlosses, rtol=1E-6):
            url = ('https://docs.openquake.org/oq-engine/advanced/'
                   'addition-is-non-associative.html')
            logging.warning(
                'Due to rounding errors inherent in floating-point arithmetic,'
                ' agg_losses != sum(avg_losses): %s != %s\nsee %s',
                agglosses.mean(), sumlosses.mean(), url)