# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import time
import os.path
import logging
import operator
from functools import partial
import numpy
import pandas
from scipy import sparse
from openquake.baselib import (
hdf5, performance, parallel, general, python3compat)
from openquake.hazardlib import stats, InvalidFile
from openquake.hazardlib.source.rupture import RuptureProxy
from openquake.commonlib.calc import starmap_from_gmfs, compactify3
from openquake.risklib.scientific import (
total_losses, insurance_losses, MultiEventRNG, LOSSID)
from openquake.calculators import base, event_based
from openquake.calculators.post_risk import (
PostRiskCalculator, post_aggregate, fix_dtypes)
U8 = numpy.uint8
U16 = numpy.uint16
U32 = numpy.uint32
U64 = numpy.uint64
F32 = numpy.float32
F64 = numpy.float64
TWO16 = 2 ** 16
TWO32 = U64(2 ** 32)
get_n_occ = operator.itemgetter(1)
[docs]def fast_agg(keys, values, correl, li, acc):
"""
:param keys: an array of N uint64 numbers encoding (event_id, agg_id)
:param values: an array of (N, D) floats
:param correl: True if there is asset correlation
:param li: loss type index
:param acc: dictionary unique key -> array(L, D)
"""
ukeys, avalues = general.fast_agg2(keys, values)
if correl: # restore the variances
avalues[:, 0] = avalues[:, 0] ** 2
for ukey, avalue in zip(ukeys, avalues):
acc[ukey][li] += avalue
[docs]def average_losses(ln, alt, rlz_id, AR, collect_rlzs):
"""
:returns: a sparse coo matrix with the losses per asset and realization
"""
if collect_rlzs or len(numpy.unique(rlz_id)) == 1:
ldf = pandas.DataFrame(
dict(aid=alt.aid.to_numpy(), loss=alt.loss.to_numpy()))
tot = ldf.groupby('aid').loss.sum()
aids = tot.index.to_numpy()
rlzs = numpy.zeros_like(tot)
return sparse.coo_matrix((tot.to_numpy(), (aids, rlzs)), AR)
else:
ldf = pandas.DataFrame(
dict(aid=alt.aid.to_numpy(), loss=alt.loss.to_numpy(),
rlz=rlz_id[U32(alt.eid)])) # NB: without the U32 here
# the SURA calculation would fail with alt.eid being F64 (?)
tot = ldf.groupby(['aid', 'rlz']).loss.sum()
aids, rlzs = zip(*tot.index)
return sparse.coo_matrix((tot.to_numpy(), (aids, rlzs)), AR)
[docs]def debugprint(ln, asset_loss_table, adf):
"""
Print risk_by_event in a reasonable format. To be used with --nd
"""
if '+' in ln or ln == 'claim':
df = asset_loss_table.set_index('aid').rename(columns={'loss': ln})
df['asset_id'] = python3compat.decode(adf.id[df.index].to_numpy())
del df['variance']
print(df)
[docs]def aggreg(outputs, crmodel, ARK, aggids, rlz_id, ideduc, monitor):
"""
:returns: (avg_losses, agg_loss_table)
"""
mon_agg = monitor('aggregating losses', measuremem=False)
mon_avg = monitor('averaging losses', measuremem=False)
oq = crmodel.oqparam
xtypes = oq.ext_loss_types
if ideduc:
xtypes.append('claim')
loss_by_AR = {ln: [] for ln in xtypes}
correl = int(oq.asset_correlation)
(A, R, K), L = ARK, len(xtypes)
acc = general.AccumDict(accum=numpy.zeros((L, 2))) # u8idx->array
value_cols = ['variance', 'loss']
for out in outputs:
for li, ln in enumerate(xtypes):
if ln not in out or len(out[ln]) == 0:
continue
alt = out[ln]
if oq.avg_losses:
with mon_avg:
coo = average_losses(
ln, alt, rlz_id, (A, R), oq.collect_rlzs)
loss_by_AR[ln].append(coo)
with mon_agg:
if correl: # use sigma^2 = (sum sigma_i)^2
alt['variance'] = numpy.sqrt(alt.variance)
eids = alt.eid.to_numpy() * TWO32 # U64
values = numpy.array([alt[col] for col in value_cols]).T
# aggregate all assets
fast_agg(eids + U64(K), values, correl, li, acc)
if len(aggids):
# aggregate assets for each tag combination
aids = alt.aid.to_numpy()
for kids in aggids[:, aids]:
fast_agg(eids + U64(kids), values, correl, li, acc)
lis = range(len(xtypes))
with monitor('building event loss table', measuremem=True):
dic = general.AccumDict(accum=[])
for ukey, arr in acc.items():
eid, kid = divmod(ukey, TWO32)
for li in lis:
if arr[li].any():
dic['event_id'].append(eid)
dic['agg_id'].append(kid)
dic['loss_id'].append(LOSSID[xtypes[li]])
for c, col in enumerate(['variance', 'loss']):
dic[col].append(arr[li, c])
fix_dtypes(dic)
return loss_by_AR, pandas.DataFrame(dic)
[docs]def ebr_from_gmfs(sbe, oqparam, dstore, monitor):
"""
:param slice_by_event: composite array with fields 'start', 'stop'
:param oqparam: OqParam instance
:param dstore: DataStore instance from which to read the GMFs
:param monitor: a Monitor instance
:returns: a dictionary of arrays, the output of event_based_risk
"""
if dstore.parent:
dstore.parent.open('r')
gmfcols = oqparam.gmf_data_dt().names
with dstore:
# this is fast compared to reading the GMFs
risk_sids = monitor.read('sids')
s0, s1 = sbe[0]['start'], sbe[-1]['stop']
t0 = time.time()
haz_sids = dstore['gmf_data/sid'][s0:s1]
dt = time.time() - t0
idx, = numpy.where(numpy.isin(haz_sids, risk_sids))
if len(idx) == 0:
return {}
# print('waiting %.1f' % dt)
time.sleep(dt)
with dstore, monitor('reading GMFs', measuremem=True):
start, stop = idx.min(), idx.max() + 1
dic = {}
for col in gmfcols:
if col == 'sid':
dic[col] = haz_sids[idx]
else:
data = dstore['gmf_data/' + col][s0+start:s0+stop]
dic[col] = data[idx - start]
df = pandas.DataFrame(dic)
max_gmvs = oqparam.max_gmvs_per_task
if len(df) <= max_gmvs:
yield event_based_risk(df, oqparam, monitor)
else:
for s0, s1 in performance.split_slices(df.eid.to_numpy(), max_gmvs):
yield event_based_risk, df[s0:s1], oqparam
[docs]def event_based_risk(df, oqparam, monitor):
"""
:param df: a DataFrame of GMFs with fields sid, eid, gmv_X, ...
:param oqparam: parameters coming from the job.ini
:param monitor: a Monitor instance
:returns: a dictionary of arrays
"""
with monitor('reading crmodel', measuremem=True):
crmodel = monitor.read('crmodel')
ideduc = monitor.read('assets/ideductible')
aggids = monitor.read('aggids')
rlz_id = monitor.read('rlz_id')
weights = [1] if oqparam.collect_rlzs else monitor.read('weights')
ARK = (oqparam.A, len(weights), oqparam.K)
if oqparam.ignore_master_seed or oqparam.ignore_covs:
rng = None
else:
rng = MultiEventRNG(oqparam.master_seed, df.eid.unique(),
int(oqparam.asset_correlation))
outs = gen_outputs(df, crmodel, rng, monitor)
avg, alt = aggreg(outs, crmodel, ARK, aggids, rlz_id, ideduc.any(),
monitor)
return dict(avg=avg, alt=alt)
[docs]def gen_outputs(df, crmodel, rng, monitor):
"""
:param df: GMF dataframe (a slice of events)
:param crmodel: CompositeRiskModel instance
:param rng: random number generator
:param monitor: Monitor instance
:yields: one output per taxonomy and slice of events
"""
mon_risk = monitor('computing risk', measuremem=False)
fil_mon = monitor('filtering GMFs', measuremem=False)
ass_mon = monitor('reading assets', measuremem=False)
slices = performance.split_slices(df.eid.to_numpy(), 1_000_000)
for s0, s1 in monitor.read('start-stop'):
with ass_mon:
assets = monitor.read('assets', slice(s0, s1)).set_index('ordinal')
taxos = assets.taxonomy.unique()
for taxo in taxos:
adf = assets[assets.taxonomy == taxo]
for start, stop in slices:
gdf = df[start:stop]
with fil_mon:
# *crucial* for the performance of the next step
gmf_df = gdf[numpy.isin(gdf.sid.to_numpy(),
adf.site_id.to_numpy())]
if len(gmf_df) == 0: # common enough
continue
with mon_risk:
out = crmodel.get_output(
adf, gmf_df, crmodel.oqparam._sec_losses, rng)
yield out
[docs]def ebrisk(proxies, full_lt, oqparam, dstore, monitor):
"""
:param proxies: list of RuptureProxies with the same trt_smr
:param full_lt: a FullLogicTree instance
:param oqparam: input parameters
:param monitor: a Monitor instance
:returns: a dictionary of arrays
"""
oqparam.ground_motion_fields = True
dic = event_based.event_based(proxies, full_lt, oqparam, dstore, monitor)
if len(dic['gmfdata']) == 0: # no GMFs
return {}
return event_based_risk(dic['gmfdata'], oqparam, monitor)
[docs]@base.calculators.add('ebrisk', 'scenario_risk', 'event_based_risk')
class EventBasedRiskCalculator(event_based.EventBasedCalculator):
"""
Event based risk calculator generating event loss tables
"""
core_task = ebrisk
is_stochastic = True
precalc = 'event_based'
accept_precalc = ['scenario', 'event_based', 'event_based_risk', 'ebrisk']
[docs] def save_tmp(self, monitor, srcfilter=None):
"""
Save some useful data in the file calc_XXX_tmp.hdf5
"""
oq = self.oqparam
monitor.save('sids', self.sitecol.sids)
adf = self.assetcol.to_dframe().sort_values('taxonomy')
del adf['id']
monitor.save('assets', adf)
tss = performance.idx_start_stop(adf.taxonomy.to_numpy())
# storing start-stop indices in a smart way, so that the assets are
# read from the workers in chunks of at most 1 million elements
monitor.save('start-stop', compactify3(tss))
monitor.save('srcfilter', srcfilter)
monitor.save('crmodel', self.crmodel)
monitor.save('rlz_id', self.rlzs)
monitor.save('weights', self.datastore['weights'][:])
if oq.K:
aggids, _ = self.assetcol.build_aggids(
oq.aggregate_by, oq.max_aggregations)
else:
aggids = ()
monitor.save('aggids', aggids)
[docs] def pre_execute(self):
oq = self.oqparam
if oq.calculation_mode == 'ebrisk':
oq.ground_motion_fields = False
logging.warning('You should be using the event_based_risk '
'calculator, not ebrisk!')
parent = self.datastore.parent
if parent:
self.datastore['full_lt'] = parent['full_lt']
self.parent_events = ne = len(parent['events'])
logging.info('There are %d ruptures and %d events',
len(parent['ruptures']), ne)
else:
self.parent_events = None
if oq.investigation_time and oq.return_periods != [0]:
# setting return_periods = 0 disable loss curves
eff_time = oq.investigation_time * oq.ses_per_logic_tree_path
if eff_time < 2:
logging.warning(
'eff_time=%s is too small to compute loss curves',
eff_time)
super().pre_execute()
parentdir = (os.path.dirname(self.datastore.ppath)
if self.datastore.ppath else None)
oq.hdf5path = self.datastore.filename
oq.parentdir = parentdir
logging.info(
'There are {:_d} ruptures and {:_d} events'.format(
len(self.datastore['ruptures']),
len(self.datastore['events'])))
self.events_per_sid = numpy.zeros(self.N, U32)
try:
K = len(self.datastore['agg_keys'])
except KeyError:
K = 0
self.datastore.swmr_on()
sec_losses = [] # one insured loss for each loss type with a policy
if hasattr(self, 'policy_df') and 'reinsurance' not in oq.inputs:
sec_losses.append(
partial(insurance_losses, policy_df=self.policy_df))
ideduc = self.assetcol['ideductible'].any()
if oq.total_losses:
sec_losses.append(
partial(total_losses, kind=oq.total_losses, ideduc=ideduc))
elif ideduc:
# subtract the insurance deductible for a single loss_type
[lt] = oq.loss_types
sec_losses.append(partial(total_losses, kind=lt, ideduc=ideduc))
oq._sec_losses = sec_losses
oq.M = len(oq.all_imts())
oq.N = self.N
oq.K = K
oq.A = self.assetcol['ordinal'].max() + 1
ct = oq.concurrent_tasks or 1
oq.maxweight = int(oq.ebrisk_maxsize / ct)
self.A = A = len(self.assetcol)
self.L = L = len(oq.loss_types)
if (oq.calculation_mode == 'event_based_risk' and
A * self.R > 1_000_000 and oq.avg_losses
and not oq.collect_rlzs):
raise ValueError('For large exposures you must set '
'collect_rlzs=true or avg_losses=false')
if (oq.aggregate_by and self.E * A > oq.max_potential_gmfs and
all(val == 0 for val in oq.minimum_asset_loss.values())):
logging.warning('The calculation is really big; consider setting '
'minimum_asset_loss')
base.create_risk_by_event(self)
self.rlzs = self.datastore['events']['rlz_id']
self.num_events = numpy.bincount(self.rlzs, minlength=self.R)
self.xtypes = oq.ext_loss_types
if self.assetcol['ideductible'].any():
self.xtypes.append('claim')
if oq.avg_losses:
self.create_avg_losses()
alt_nbytes = 4 * self.E * L
if alt_nbytes / (oq.concurrent_tasks or 1) > TWO32:
raise RuntimeError('The risk_by_event is too big to be transfer'
'ed with %d tasks' % oq.concurrent_tasks)
[docs] def create_avg_losses(self):
oq = self.oqparam
ws = self.datastore['weights']
R = 1 if oq.collect_rlzs else len(ws)
if oq.collect_rlzs:
if oq.investigation_time: # event_based
self.avg_ratio = numpy.array([oq.time_ratio / len(ws)])
else: # scenario
self.avg_ratio = numpy.array([1. / self.num_events.sum()])
else:
if oq.investigation_time: # event_based
self.avg_ratio = numpy.array([oq.time_ratio] * len(ws))
else: # scenario
self.avg_ratio = 1. / self.num_events
self.avg_losses = {}
for lt in self.xtypes:
self.avg_losses[lt] = numpy.zeros((self.A, R), F32)
self.datastore.create_dset(
'avg_losses-rlzs/' + lt, F32, (self.A, R))
self.datastore.set_shape_descr(
'avg_losses-rlzs/' + lt, asset_id=self.assetcol['id'], rlz=R)
[docs] def execute(self):
"""
Compute risk from GMFs or ruptures depending on what is stored
"""
oq = self.oqparam
self.gmf_bytes = 0
if 'gmf_data' not in self.datastore: # start from ruptures
if (oq.ground_motion_fields and
'gsim_logic_tree' not in oq.inputs and
oq.gsim == '[FromFile]'):
raise InvalidFile('Missing gsim or gsim_logic_tree_file in %s'
% oq.inputs['job_ini'])
elif not hasattr(oq, 'maximum_distance'):
raise InvalidFile('Missing maximum_distance in %s'
% oq.inputs['job_ini'])
srcfilter = self.src_filter()
proxies = [RuptureProxy(rec)
for rec in self.datastore['ruptures'][:]]
full_lt = self.datastore['full_lt']
self.datastore.swmr_on() # must come before the Starmap
smap = parallel.Starmap.apply_split(
ebrisk, (proxies, full_lt, oq, self.datastore),
key=operator.itemgetter('trt_smr'),
weight=operator.itemgetter('n_occ'),
h5=self.datastore.hdf5,
duration=oq.time_per_task,
outs_per_task=5)
self.save_tmp(smap.monitor, srcfilter)
smap.reduce(self.agg_dicts)
if self.gmf_bytes == 0:
raise RuntimeError(
'No GMFs were generated, perhaps they were '
'all below the minimum_intensity threshold')
logging.info(
'Produced %s of GMFs', general.humansize(self.gmf_bytes))
else: # start from GMFs
logging.info('Preparing tasks')
smap = starmap_from_gmfs(ebr_from_gmfs, oq, self.datastore)
self.save_tmp(smap.monitor)
smap.reduce(self.agg_dicts)
if self.parent_events:
assert self.parent_events == len(self.datastore['events'])
return 1
[docs] def log_info(self, eids):
"""
Printing some information about the risk calculation
"""
logging.info('Processing {:_d} rows of gmf_data'.format(len(eids)))
E = len(numpy.unique(eids))
K = self.oqparam.K
logging.info('Risk parameters (rel_E={:_d}, K={:_d}, L={})'.
format(E, K, self.L))
[docs] def agg_dicts(self, dummy, dic):
"""
:param dummy: unused parameter
:param dic: dictionary with keys "avg", "alt"
"""
if not dic:
return
self.gmf_bytes += dic['alt'].memory_usage().sum()
self.oqparam.ground_motion_fields = False # hack
with self.monitor('saving risk_by_event'):
alt = dic.pop('alt')
if alt is not None:
for name in alt.columns:
dset = self.datastore['risk_by_event/' + name]
hdf5.extend(dset, alt[name].to_numpy())
with self.monitor('saving avg_losses'):
for ln, ls in dic.pop('avg').items():
for coo in ls:
self.avg_losses[ln][coo.row, coo.col] += coo.data
[docs] def post_execute(self, dummy):
"""
Compute and store average losses from the risk_by_event dataset,
and then loss curves and maps.
"""
oq = self.oqparam
# sanity check on the risk_by_event
alt = self.datastore.read_df('risk_by_event')
K = self.datastore['risk_by_event'].attrs.get('K', 0)
upper_limit = self.E * (K + 1) * len(self.xtypes)
size = len(alt)
assert size <= upper_limit, (size, upper_limit)
# sanity check on uniqueness by (agg_id, loss_id, event_id)
arr = alt[['agg_id', 'loss_id', 'event_id']].to_numpy()
uni = numpy.unique(arr, axis=0)
if len(uni) < len(arr):
raise RuntimeError('risk_by_event contains %d duplicates!' %
(len(arr) - len(uni)))
if oq.avg_losses:
for lt in self.xtypes:
al = self.avg_losses[lt]
for r in range(self.R):
al[:, r] *= self.avg_ratio[r]
name = 'avg_losses-rlzs/' + lt
self.datastore[name][:] = al
stats.set_rlzs_stats(self.datastore, name,
asset_id=self.assetcol['id'])
self.build_aggcurves()
if oq.reaggregate_by:
post_aggregate(self.datastore.calc_id,
','.join(oq.reaggregate_by))
[docs] def build_aggcurves(self):
prc = PostRiskCalculator(self.oqparam, self.datastore.calc_id)
prc.assetcol = self.assetcol
if hasattr(self, 'exported'):
prc.exported = self.exported
prc.run(exports='')