# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2021 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import logging
import numpy
import pandas
from openquake.baselib import hdf5, general, parallel
from openquake.hazardlib.stats import set_rlzs_stats
from openquake.commonlib import datastore
from openquake.calculators import base
from openquake.calculators.event_based_risk import EventBasedRiskCalculator
from openquake.calculators.post_risk import get_loss_builder
U8 = numpy.uint8
U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
[docs]def fix_dtype(dic, dtype, names):
for name in names:
dic[name] = dtype(dic[name])
[docs]def fix_dic(dic, columns):
fix_dtype(dic, U16, ['agg_id'])
fix_dtype(dic, U8, ['loss_id'])
if 'event_id' in dic:
fix_dtype(dic, U32, ['event_id'])
if 'return_period' in dic:
fix_dtype(dic, U32, ['return_period'])
fix_dtype(dic, F32, columns)
[docs]def agg_damages(dstore, slc, monitor):
"""
:returns: dict (agg_id, loss_id) -> [dmg1, dmg2, ...]
"""
with dstore:
df = dstore.read_df('risk_by_event', ['agg_id', 'loss_id'], slc=slc)
del df['event_id']
agg = df.groupby(df.index).sum()
dic = dict(zip(agg.index, agg.to_numpy()))
return dic
[docs]def zero_dmgcsq(assetcol, crmodel):
"""
:returns: an array of zeros of shape (A, L, Dc)
"""
dmg_csq = crmodel.get_dmg_csq()
A = len(assetcol)
L = len(crmodel.loss_types)
Dc = len(dmg_csq) + 1 # damages + consequences
return numpy.zeros((A, L, Dc), F32)
[docs]def event_based_damage(df, param, monitor):
"""
:param df: a DataFrame of GMFs with fields sid, eid, gmv_X, ...
:param param: a dictionary of parameters coming from the job.ini
:param monitor: a Monitor instance
:returns: (damages (eid, kid) -> LDc plus damages (A, Dc))
"""
mon_risk = monitor('computing risk', measuremem=False)
dstore = datastore.read(param['hdf5path'])
K = param['K']
with monitor('reading data'):
if hasattr(df, 'start'): # it is actually a slice
df = dstore.read_df('gmf_data', slc=df)
assets_df = dstore.read_df('assetcol/array', 'ordinal')
kids = (dstore['assetcol/kids'][:] if K
else numpy.zeros(len(assets_df), U16))
crmodel = monitor.read('crmodel')
dmg_csq = crmodel.get_dmg_csq()
ci = {dc: i + 1 for i, dc in enumerate(dmg_csq)}
dmgcsq = zero_dmgcsq(assets_df, crmodel) # shape (A, L, Dc)
A, L, Dc = dmgcsq.shape
D = len(crmodel.damage_states)
loss_names = crmodel.oqparam.loss_names
with mon_risk:
dddict = general.AccumDict(accum=numpy.zeros((L, Dc), F32)) # eid, kid
for sid, asset_df in assets_df.groupby('site_id'):
# working one site at the time
gmf_df = df[df.sid == sid]
if len(gmf_df) == 0:
continue
eids = gmf_df.eid.to_numpy()
for taxo, adf in asset_df.groupby('taxonomy'):
out = crmodel.get_output(taxo, adf, gmf_df)
aids = adf.index.to_numpy()
for lti, lt in enumerate(loss_names):
fractions = out[lt]
Asid, E, D = fractions.shape
ddd = numpy.zeros((Asid, E, Dc), F32)
ddd[:, :, :D] = fractions
for a, asset in enumerate(adf.to_records()):
# NB: uncomment the lines below to see the performance
# disaster of scenario_damage.bin_ddd; for instance
# the Messina test in oq-risk-tests becomes 10x
# slower even if it has only 25_736 assets:
# scenario_damage.bin_ddd(
# fractions[a], asset['number'],
# param['master_seed'] + a)
ddd[a] *= asset['number']
csq = crmodel.compute_csq(asset, fractions[a], lt)
for name, values in csq.items():
ddd[a, :, ci[name]] = values
dmgcsq[aids, lti] += ddd.sum(axis=1) # sum on the events
tot = ddd.sum(axis=0) # sum on the assets
for e, eid in enumerate(eids):
dddict[eid, K][lti] += tot[e]
if K:
for a, aid in enumerate(aids):
dddict[eid, kids[aid]][lti] += ddd[a, e]
return to_dframe(dddict, ci, L), dmgcsq
[docs]def to_dframe(adic, ci, L):
dic = general.AccumDict(accum=[])
for (eid, kid), dd in sorted(adic.items()):
for lti in range(L):
dic['event_id'].append(eid)
dic['agg_id'].append(kid)
dic['loss_id'].append(lti)
for sname, si in ci.items():
dic[sname].append(dd[lti, si])
fix_dic(dic, ci)
return pandas.DataFrame(dic)
[docs]def worst_dmgdist(df, agg_id, loss_id, dic):
cols = [col for col in df.columns if col.startswith('dmg_')]
event_id = df[cols[-1]].to_numpy().argmax()
dic['event_id'].append(event_id)
dic['loss_id'].append(loss_id)
dic['agg_id'].append(agg_id)
for col in cols:
dic[col].append(df[col].to_numpy()[event_id])
[docs]@base.calculators.add('event_based_damage')
class DamageCalculator(EventBasedRiskCalculator):
"""
Damage calculator
"""
core_task = event_based_damage
is_stochastic = True
precalc = 'event_based'
accept_precalc = ['scenario', 'event_based',
'event_based_risk', 'event_based_damage']
[docs] def save_avg_losses(self):
"""
Do nothing: there are no losses in the DamageCalculator
"""
[docs] def execute(self):
"""
Compute risk from GMFs or ruptures depending on what is stored
"""
self.builder = get_loss_builder(self.datastore) # check
eids = self.datastore['gmf_data/eid'][:]
logging.info('Processing {:_d} rows of gmf_data'.format(len(eids)))
self.dmgcsq = zero_dmgcsq(self.assetcol, self.crmodel)
self.datastore.swmr_on()
smap = parallel.Starmap(
event_based_damage, self.gen_args(eids), h5=self.datastore.hdf5)
smap.monitor.save('assets', self.assetcol.to_dframe())
smap.monitor.save('crmodel', self.crmodel)
return smap.reduce(self.combine)
[docs] def combine(self, acc, res):
"""
:param acc:
unused
:param res:
DataFrame with fields (event_id, agg_id, loss_id, dmg1 ...)
plus array with damages and consequences of shape (A, Dc)
Combine the results and grows risk_by_event with fields
(event_id, agg_id, loss_id) and (dmg_0, dmg_1, dmg_2, ...)
"""
df, dmgcsq = res
self.dmgcsq += dmgcsq
with self.monitor('saving risk_by_event', measuremem=True):
for name in df.columns:
dset = self.datastore['risk_by_event/' + name]
hdf5.extend(dset, df[name].to_numpy())
return 1
[docs] def sanity_check(self):
"""
Compare agglosses with aggregate avglosses and check that
damaged buildings < total buildings
"""
ac_df = self.datastore.read_df(
'aggcurves', sel=dict(agg_id=self.param['K']))
number = self.assetcol['number'].sum()
for (loss_id, period), df in ac_df.groupby(
['loss_id', 'return_period']):
tot = sum(df[col].sum() for col in df.columns
if col.startswith('dmg_'))
if tot > number:
logging.info('For loss type %s, return_period=%d the '
'damaged buildings are %d > %d, but it is okay',
self.oqparam.loss_names[loss_id],
period, tot, number)
[docs] def post_execute(self, dummy):
oq = self.oqparam
A, L, Dc = self.dmgcsq.shape
D = len(self.crmodel.damage_states)
# fix no_damage distribution for events with zero damage
number = self.assetcol['number']
for li in range(L):
self.dmgcsq[:, li, 0] = (
number * self.E - self.dmgcsq[:, li, 1:D].sum(axis=1))
self.dmgcsq /= self.E
self.datastore['damages-rlzs'] = self.dmgcsq.reshape((A, 1, L, Dc))
set_rlzs_stats(self.datastore,
'damages',
asset_id=self.assetcol['id'],
rlz=[0],
loss_type=oq.loss_names,
dmg_state=['no_damage'] + self.crmodel.get_dmg_csq())
size = self.datastore.getsize('risk_by_event')
logging.info('Building aggregated curves from %s of risk_by_event',
general.humansize(size))
alt_df = self.datastore.read_df('risk_by_event')
del alt_df['event_id']
dic = general.AccumDict(accum=[])
columns = [col for col in alt_df.columns
if col not in {'agg_id', 'loss_id', 'variance'}]
csqs = [col for col in columns if not col.startswith('dmg_')]
periods = list(self.builder.return_periods)
wdd = {'agg_id': [], 'loss_id': [], 'event_id': []}
for col in columns[:D-1]:
wdd[col] = []
for (agg_id, loss_id), df in alt_df.groupby(
[alt_df.agg_id, alt_df.loss_id]):
worst_dmgdist(df, agg_id, loss_id, wdd)
curves = [self.builder.build_curve(df[csq].to_numpy())
for csq in csqs]
for p, period in enumerate(periods):
dic['agg_id'].append(agg_id)
dic['loss_id'].append(loss_id)
dic['return_period'].append(period)
for col, curve in zip(csqs, curves):
dic[col].append(curve[p])
fix_dic(dic, csqs)
fix_dic(wdd, columns[:D-1])
ls = ' '.join(self.crmodel.damage_states[1:])
self.datastore.create_df('worst_dmgdist', wdd.items(), limit_states=ls)
if csqs:
self.datastore.create_df('aggcurves', dic.items(), limit_states=ls)
self.sanity_check()