# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import io
import os
import time
import psutil
import random
import logging
import operator
import functools
import numpy
import pandas
try:
from PIL import Image
except ImportError:
Image = None
from openquake.baselib import (
performance, parallel, hdf5, config, python3compat, workerpool as w)
from openquake.baselib.general import (
AccumDict, DictArray, block_splitter, groupby, humansize,
get_nbytes_msg, agg_probs, pprod)
from openquake.hazardlib.contexts import (
ContextMaker, read_cmakers, basename, get_maxsize)
from openquake.hazardlib.calc.hazard_curve import classical as hazclassical
from openquake.hazardlib.probability_map import (
ProbabilityMap, poes_dt, combine_probs)
from openquake.commonlib import calc
from openquake.calculators import base, getters, extract
U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64
I64 = numpy.int64
TWO32 = 2 ** 32
BUFFER = 1.5 # enlarge the pointsource_distance sphere to fix the weight;
# with BUFFER = 1 we would have lots of apparently light sources
# collected together in an extra-slow task, as it happens in SHARE
# with ps_grid_spacing=50
get_weight = operator.attrgetter('weight')
disagg_grp_dt = numpy.dtype([
('grp_trt', hdf5.vstr), ('avg_poe', F32), ('nsites', U32)])
slice_dt = numpy.dtype([('sid', U32), ('start', int), ('stop', int)])
[docs]def get_pmaps_gb(dstore):
"""
:returns: memory required on the master node to keep the pmaps
"""
N = len(dstore['sitecol'])
L = dstore['oqparam'].imtls.size
cmakers = read_cmakers(dstore)
num_gs = [len(cm.gsims) for cm in cmakers]
return sum(num_gs) * N * L * 8 / 1024**3
[docs]def build_slice_by_sid(sids, offset=0):
"""
Convert an array of site IDs (with repetitions) into an array slice_dt
"""
arr = performance.idx_start_stop(sids)
sbs = numpy.zeros(len(arr), slice_dt)
sbs['sid'] = arr[:, 0]
sbs['start'] = arr[:, 1] + offset
sbs['stop'] = arr[:, 2] + offset
return sbs
def _concat(acc, slc2):
if len(acc) == 0:
return [slc2]
slc1 = acc[-1] # last slice
if slc2[0] == slc1[1]:
new = numpy.array([slc1[0], slc2[1]])
return acc[:-1] + [new]
return acc + [slc2]
[docs]def compactify(arrayN2):
"""
:param arrayN2: an array with columns (start, stop)
:returns: a shorter array with the same structure
Here is how it works in an example where the first three slices
are compactified into one while the last slice stays as it is:
>>> arr = numpy.array([[84384702, 84385520],
... [84385520, 84385770],
... [84385770, 84386062],
... [84387636, 84388028]])
>>> compactify(arr)
array([[84384702, 84386062],
[84387636, 84388028]])
"""
if len(arrayN2) == 1:
# nothing to compactify
return arrayN2
out = numpy.array(functools.reduce(_concat, arrayN2, []))
return out
[docs]class Set(set):
__iadd__ = set.__ior__
[docs]def store_ctxs(dstore, rupdata_list, grp_id):
"""
Store contexts in the datastore
"""
for rupdata in rupdata_list:
nr = len(rupdata)
known = set(rupdata.dtype.names)
for par in dstore['rup']:
if par == 'grp_id':
hdf5.extend(dstore['rup/grp_id'], numpy.full(nr, grp_id))
elif par == 'probs_occur':
dstore.hdf5.save_vlen('rup/probs_occur', rupdata[par])
elif par in known:
hdf5.extend(dstore['rup/' + par], rupdata[par])
else:
hdf5.extend(dstore['rup/' + par], numpy.full(nr, numpy.nan))
[docs]def semicolon_aggregate(probs, source_ids):
"""
:param probs: array of shape (..., Ns)
:param source_ids: list of source IDs (some with semicolons) of length Ns
:returns: array of shape (..., Ns) and list of length N with N < Ns
This is used to aggregate array of probabilities in the case of sources
which are variations of a base source. Here is an example with Ns=7
sources reducible to N=4 base sources:
>>> source_ids = ['A;0', 'A;1', 'A;2', 'B', 'C', 'D;0', 'D;1']
>>> probs = numpy.array([[.01, .02, .03, .04, .05, .06, .07],
... [.00, .01, .02, .03, .04, .05, .06]])
`semicolon_aggregate` effectively reduces the array of probabilities
from 7 to 4 components, however for storage convenience it does not
change the shape, so the missing components are zeros:
>>> semicolon_aggregate(probs, source_ids) # (2, 7) => (2, 4)
(array([[0.058906, 0.04 , 0.05 , 0.1258 , 0. , 0. ,
0. ],
[0.0298 , 0.03 , 0.04 , 0.107 , 0. , 0. ,
0. ]]), array(['A', 'B', 'C', 'D'], dtype='<U1'))
It is assumed that the semicolon sources are independent, i.e. not mutex.
"""
srcids = [srcid.split(';')[0] for srcid in source_ids]
unique, indices = numpy.unique(srcids, return_inverse=True)
new = numpy.zeros_like(probs)
for i, s1, s2 in performance.idx_start_stop(indices):
new[..., i] = pprod(probs[..., s1:s2], axis=-1)
return new, unique
[docs]def check_disagg_by_src(dstore):
"""
Make sure that by composing disagg_by_src one gets the hazard curves
"""
info = dstore['source_info'][:]
mutex = info['mutex_weight'] > 0
mean = dstore.sel('hcurves-stats', stat='mean')[:, 0] # N, M, L
dbs = dstore.sel('disagg_by_src') # N, R, M, L1, Ns
if mutex.sum():
dbs_indep = dbs[:, :, :, :, ~mutex]
dbs_mutex = dbs[:, :, :, :, mutex]
poes_indep = pprod(dbs_indep, axis=4) # N, R, M, L1
poes_mutex = dbs_mutex.sum(axis=4) # N, R, M, L1
poes = poes_indep + poes_mutex - poes_indep * poes_mutex
else:
poes = pprod(dbs, axis=4) # N, R, M, L1
rlz_weights = dstore['weights'][:]
mean2 = numpy.einsum('sr...,r->s...', poes, rlz_weights) # N, M, L1
if not numpy.allclose(mean, mean2, atol=1E-6):
logging.error('check_disagg_src fails: %s =! %s', mean[0], mean2[0])
# check the extract call is not broken
for imt in dstore['oqparam'].imtls:
aw = extract.extract(dstore, f'disagg_by_src?imt={imt}&poe=1E-4')
assert aw.array.dtype.names == ('src_id', 'poe')
# ########################### task functions ############################ #
[docs]def classical(srcs, sitecol, cmaker, monitor):
"""
Call the classical calculator in hazardlib
"""
cmaker.init_monitoring(monitor)
rup_indep = getattr(srcs, 'rup_interdep', None) != 'mutex'
for sites in sitecol.split_in_tiles(cmaker.ntiles):
pmap = ProbabilityMap(
sites.sids, cmaker.imtls.size, len(cmaker.gsims)).fill(rup_indep)
result = hazclassical(srcs, sites, cmaker, pmap)
result['pnemap'] = ~pmap.remove_zeros()
result['pnemap'].gidx = cmaker.gidx
yield result
[docs]def postclassical(pgetter, N, hstats, individual_rlzs,
max_sites_disagg, amplifier, monitor):
"""
:param pgetter: an :class:`openquake.commonlib.getters.PmapGetter`
:param N: the total number of sites
:param hstats: a list of pairs (statname, statfunc)
:param individual_rlzs: if True, also build the individual curves
:param max_sites_disagg: if there are less sites than this, store rup info
:param amplifier: instance of Amplifier or None
:param monitor: instance of Monitor
:returns: a dictionary kind -> ProbabilityMap
The "kind" is a string of the form 'rlz-XXX' or 'mean' of 'quantile-XXX'
used to specify the kind of output.
"""
if 20 < monitor.task_no < pgetter.ntasks - 20:
# give time to the other tasks
time.sleep(pgetter.ntasks * random.random())
with monitor('read PoEs', measuremem=True):
pgetter.init()
if amplifier:
with hdf5.File(pgetter.filename, 'r') as f:
ampcode = f['sitecol'].ampcode
imtls = DictArray({imt: amplifier.amplevels
for imt in pgetter.imtls})
else:
imtls = pgetter.imtls
poes, weights, sids = pgetter.poes, pgetter.weights, U32(pgetter.sids)
M = len(imtls)
L = imtls.size
L1 = L // M
R = len(weights)
S = len(hstats)
pmap_by_kind = {}
if R > 1 and individual_rlzs or not hstats:
pmap_by_kind['hcurves-rlzs'] = [
ProbabilityMap(sids, M, L1).fill(0) for r in range(R)]
if hstats:
pmap_by_kind['hcurves-stats'] = [
ProbabilityMap(sids, M, L1).fill(0) for r in range(S)]
combine_mon = monitor('combine pmaps', measuremem=False)
compute_mon = monitor('compute stats', measuremem=False)
sidx = ProbabilityMap(sids, 1, 1).fill(0).sidx
for sid in sids:
idx = sidx[sid]
with combine_mon:
pc = pgetter.get_pcurve(sid) # shape (L, R)
if amplifier:
pc = amplifier.amplify(ampcode[sid], pc)
# NB: the pcurve have soil levels != IMT levels
if pc.array.sum() == 0: # no data
continue
with compute_mon:
if R > 1 and individual_rlzs or not hstats:
for r in range(R):
pmap_by_kind['hcurves-rlzs'][r].array[idx] = (
pc.array[:, r].reshape(M, L1))
if hstats:
for s, (statname, stat) in enumerate(hstats.items()):
sc = getters.build_stat_curve(pc, imtls, stat, weights)
arr = sc.array.reshape(M, L1)
pmap_by_kind['hcurves-stats'][s].array[idx] = arr
if poes and (R > 1 and individual_rlzs or not hstats):
pmap_by_kind['hmaps-rlzs'] = calc.make_hmaps(
pmap_by_kind['hcurves-rlzs'], imtls, poes)
if poes and hstats:
pmap_by_kind['hmaps-stats'] = calc.make_hmaps(
pmap_by_kind['hcurves-stats'], imtls, poes)
return pmap_by_kind
[docs]def make_hmap_png(hmap, lons, lats):
"""
:param hmap:
a dictionary with keys calc_id, m, p, imt, poe, inv_time, array
:param lons: an array of longitudes
:param lats: an array of latitudes
:returns: an Image object containing the hazard map
"""
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_subplot(111)
ax.grid(True)
ax.set_title('hmap for IMT=%(imt)s, poe=%(poe)s\ncalculation %(calc_id)d,'
'inv_time=%(inv_time)dy' % hmap)
ax.set_ylabel('Longitude')
coll = ax.scatter(lons, lats, c=hmap['array'], cmap='jet')
plt.colorbar(coll)
bio = io.BytesIO()
plt.savefig(bio, format='png')
return dict(img=Image.open(bio), m=hmap['m'], p=hmap['p'])
[docs]class Hazard:
"""
Helper class for storing the PoEs
"""
def __init__(self, dstore, full_lt, srcidx):
self.datastore = dstore
oq = dstore['oqparam']
self.full_lt = full_lt
self.weights = numpy.array(
[r.weight['weight'] for r in full_lt.get_realizations()])
self.cmakers = read_cmakers(dstore, full_lt)
self.collect_rlzs = oq.collect_rlzs
self.totgsims = sum(len(cm.gsims) for cm in self.cmakers)
self.imtls = oq.imtls
self.level_weights = oq.imtls.array.flatten() / oq.imtls.array.sum()
self.sids = dstore['sitecol/sids'][:]
self.srcidx = srcidx
self.N = len(dstore['sitecol/sids'])
self.L = oq.imtls.size
self.R = full_lt.get_num_paths()
self.acc = AccumDict(accum={})
self.offset = 0
[docs] def get_poes(self, pmap, cmaker): # used in in disagg_by_src
"""
:param pmap: a ProbabilityMap
:param cmaker: a ContextMaker
:returns: an array of PoEs of shape (N, R, M, L1)
"""
R = 1 if self.collect_rlzs else self.R
M = len(self.imtls)
L1 = self.L // M
out = numpy.zeros((self.N, R, M, L1))
dic = dict(zip(cmaker.gidx, cmaker.gsims.values()))
for lvl in range(self.L):
m, l = divmod(lvl, L1)
res = numpy.zeros((len(pmap.sids), self.R))
for i, g in enumerate(pmap.gidx):
combine_probs(res, pmap.array[:, lvl, i], U32(dic[g]))
if self.collect_rlzs:
out[:, 0, m, l] = res @ self.weights
else:
out[:, :, m, l] = res
return out
[docs] def store_poes(self, g, pnes, pnes_sids):
"""
Store 1-pnes inside the _poes dataset
"""
# Physically, an extremely small intensity measure level can have an
# extremely large probability of exceedence, however that probability
# cannot be exactly 1 unless the level is exactly 0. Numerically, the
# PoE can be 1 and this give issues when calculating the damage (there
# is a log(0) in
# :class:`openquake.risklib.scientific.annual_frequency_of_exceedence`).
# Here we solve the issue by replacing the unphysical probabilities 1
# with .9999999999999999 (the float64 closest to 1).
poes = 1. - pnes
poes[poes == 1.] = .9999999999999999
# poes[poes < 1E-5] = 0. # minimum_poe
idxs, lids = poes.nonzero()
gids = numpy.repeat(g, len(idxs))
if len(idxs):
sids = pnes_sids[idxs]
hdf5.extend(self.datastore['_poes/sid'], sids)
hdf5.extend(self.datastore['_poes/gid'], gids)
hdf5.extend(self.datastore['_poes/lid'], lids)
hdf5.extend(self.datastore['_poes/poe'], poes[idxs, lids])
sbs = build_slice_by_sid(sids, self.offset)
hdf5.extend(self.datastore['_poes/slice_by_sid'], sbs)
self.offset += len(sids)
self.acc[g]['avg_poe'] = poes.mean(axis=0) @ self.level_weights
self.acc[g]['nsites'] = len(pnes_sids)
[docs] def store_disagg(self, pmaps=None):
"""
Store data inside disagg_by_grp (optionally disagg_by_src)
"""
n = len(self.full_lt.sm_rlzs)
lst = []
for grp_id, indices in enumerate(self.datastore['trt_smrs']):
dic = self.acc[grp_id]
if dic:
trti, smrs = numpy.divmod(indices, n)
trt = self.full_lt.trts[trti[0]]
lst.append((trt, dic['avg_poe'], dic['nsites']))
self.datastore['disagg_by_grp'] = numpy.array(lst, disagg_grp_dt)
if pmaps: # called inside a loop
disagg_by_src = self.datastore['disagg_by_src'][()]
for key, pmap in pmaps.items():
if isinstance(key, str):
# in case of disagg_by_src key is a source ID
disagg_by_src[..., self.srcidx[key]] = (
self.get_poes(pmap, self.cmakers[pmap.grp_id]))
self.datastore['disagg_by_src'][:] = disagg_by_src
[docs]@base.calculators.add('classical', 'ucerf_classical')
class ClassicalCalculator(base.HazardCalculator):
"""
Classical PSHA calculator
"""
core_task = classical
precalc = 'preclassical'
accept_precalc = ['preclassical', 'classical', 'aftershock']
SLOW_TASK_ERROR = False
[docs] def agg_dicts(self, acc, dic):
"""
Aggregate dictionaries of hazard curves by updating the accumulator.
:param acc: accumulator dictionary
:param dic: dict with keys pmap, source_data, rup_data
"""
# NB: dic should be a dictionary, but when the calculation dies
# for an OOM it can become None, thus giving a very confusing error
if dic is None:
raise MemoryError('You ran out of memory!')
sdata = dic['source_data']
self.source_data += sdata
grp_id = dic.pop('grp_id')
self.rel_ruptures[grp_id] += sum(sdata['nrupts'])
cfactor = dic.pop('cfactor')
if cfactor[1] != cfactor[0]:
print('ctxs_per_mag = {:.0f}, cfactor_per_task = {:.1f}'.format(
cfactor[1] / cfactor[2], cfactor[1] / cfactor[0]))
self.cfactor += cfactor
# store rup_data if there are few sites
if self.few_sites and len(dic['rup_data']):
with self.monitor('saving rup_data'):
store_ctxs(self.datastore, dic['rup_data'], grp_id)
pnemap = dic['pnemap'] # probabilities of no exceedence
for i, g in enumerate(pnemap.gidx):
acc[g].update(pnemap, i)
pmap_by_src = dic.pop('pmap_by_src', {}) # non-empty for disagg_by_src
# len(pmap_by_src) > 1 only for mutex sources, see contexts.py
for source_id, pm in pmap_by_src.items():
# store the poes for the given source
acc[source_id] = pm
pm.grp_id = grp_id
pm.gidx = pnemap.gidx
return acc
[docs] def create_dsets(self):
"""
Store some empty datasets in the datastore
"""
params = {'grp_id', 'occurrence_rate', 'clon', 'clat', 'rrup',
'probs_occur', 'sids', 'src_id', 'rup_id', 'weight'}
gsims_by_trt = self.full_lt.get_gsims_by_trt()
for trt, gsims in gsims_by_trt.items():
cm = ContextMaker(trt, gsims, self.oqparam)
params.update(cm.REQUIRES_RUPTURE_PARAMETERS)
params.update(cm.REQUIRES_DISTANCES)
if self.few_sites:
# self.oqparam.time_per_task = 1_000_000 # disable task splitting
descr = [] # (param, dt)
for param in sorted(params):
if param == 'sids':
dt = U16 # storing only for few sites
elif param == 'probs_occur':
dt = hdf5.vfloat64
elif param in ('src_id', 'rup_id'):
dt = U32
elif param == 'grp_id':
dt = U16
else:
dt = F32
descr.append((param, dt))
self.datastore.create_df('rup', descr, 'gzip')
# NB: the relevant ruptures are less than the effective ruptures,
# which are a preclassical concept
if self.oqparam.disagg_by_src:
self.create_disagg_by_src()
[docs] def create_disagg_by_src(self):
"""
:returns: the unique source IDs contained in the composite model
"""
oq = self.oqparam
self.M = len(oq.imtls)
self.L1 = oq.imtls.size // self.M
sources = list(self.csm.source_info)
R = 1 if oq.collect_rlzs else self.R
size, msg = get_nbytes_msg(
dict(N=self.N, R=R, M=self.M, L1=self.L1, Ns=len(sources)))
if size > TWO32:
raise RuntimeError('The matrix disagg_by_src is too large, '
'use collect_rlzs=true\n%s' % msg)
size = self.N * R * self.M * self.L1 * len(sources) * 8
logging.info('Creating disagg_by_src of size %s', humansize(size))
self.datastore.create_dset(
'disagg_by_src', F32,
(self.N, R, self.M, self.L1, len(sources)))
self.datastore.set_shape_descr(
'disagg_by_src', site_id=self.N, rlz_id=R,
imt=list(self.oqparam.imtls), lvl=self.L1, src_id=sources)
return sources
[docs] def init_poes(self):
self.cfactor = numpy.zeros(3)
self.rel_ruptures = AccumDict(accum=0) # grp_id -> rel_ruptures
if self.oqparam.hazard_calculation_id:
full_lt = self.datastore.parent['full_lt']
trt_smrs = self.datastore.parent['trt_smrs'][:]
else:
full_lt = self.csm.full_lt
trt_smrs = self.csm.get_trt_smrs()
self.grp_ids = numpy.arange(len(trt_smrs))
rlzs_by_gsim_list = full_lt.get_rlzs_by_gsim_list(trt_smrs)
rlzs_by_g = []
for rlzs_by_gsim in rlzs_by_gsim_list:
for rlzs in rlzs_by_gsim.values():
rlzs_by_g.append(rlzs)
self.datastore.create_df('_poes', poes_dt.items())
self.datastore.create_dset('_poes/slice_by_sid', slice_dt)
# NB: compressing the dataset causes a big slowdown in writing :-(
[docs] def check_memory(self, N, L, maxw):
"""
Log the memory required to receive the largest ProbabilityMap,
assuming all sites are affected (upper limit)
"""
num_gs = [len(cm.gsims) for cm in self.cmakers]
max_gs = max(num_gs)
maxsize = get_maxsize(len(self.oqparam.imtls), max_gs)
logging.info('Considering {:_d} contexts at once'.format(maxsize))
size = max_gs * N * L * 8
logging.info('ProbabilityMap(G=%d,N=%d,L=%d) %s per core',
max_gs, N, L, humansize(size))
avail = min(psutil.virtual_memory().available, config.memory.limit)
if avail < size:
raise MemoryError(
'You have only %s of free RAM' % humansize(avail))
[docs] def execute(self):
"""
Run in parallel `core_task(sources, sitecol, monitor)`, by
parallelizing on the sources according to their weight and
tectonic region type.
"""
oq = self.oqparam
if oq.hazard_calculation_id:
parent = self.datastore.parent
if '_poes' in parent:
self.build_curves_maps() # repeat post-processing
return {}
else: # after preclassical, like in case_36
logging.info('Reading from parent calculation')
self.csm = parent['_csm']
oq.mags_by_trt = {
trt: python3compat.decode(dset[:])
for trt, dset in parent['source_mags'].items()}
self.full_lt = parent['full_lt']
self.datastore['source_info'] = parent['source_info'][:]
maxw = self.csm.get_max_weight(oq)
else:
maxw = self.max_weight
self.init_poes()
srcidx = {
rec[0]: i for i, rec in enumerate(self.csm.source_info.values())}
self.haz = Hazard(self.datastore, self.full_lt, srcidx)
self.source_data = AccumDict(accum=[])
if not performance.numba:
logging.warning('numba is not installed: using the slow algorithm')
self.cmakers = self.haz.cmakers
t0 = time.time()
req = get_pmaps_gb(self.datastore)
ntiles = 1 + int(req / (oq.pmap_max_gb * 100)) # 50 GB
if ntiles > 1:
self.execute_seq(maxw, ntiles)
else: # regular case
self.check_memory(len(self.sitecol), oq.imtls.size, maxw)
self.execute_par(maxw)
self.store_info()
if self.cfactor[0] == 0:
raise RuntimeError('Filtered away all ruptures??')
logging.info('cfactor = {:_d}/{:_d} = {:.1f}'.format(
int(self.cfactor[1]), int(self.cfactor[0]),
self.cfactor[1] / self.cfactor[0]))
if '_poes' in self.datastore:
self.build_curves_maps()
if not oq.hazard_calculation_id:
self.classical_time = time.time() - t0
return True
[docs] def execute_par(self, maxw):
"""
Regular case
"""
self.create_dsets() # create the rup/ datasets BEFORE swmr_on()
acc = self.run_one(self.sitecol, maxw)
self.haz.store_disagg(acc)
[docs] def execute_seq(self, maxw, ntiles):
"""
Use sequential tiling
"""
assert self.N > self.oqparam.max_sites_disagg, self.N
logging.info('Running %d tiles', ntiles)
for n, tile in enumerate(self.sitecol.split(ntiles)):
if n == 0:
self.check_memory(len(tile), self.oqparam.imtls.size, maxw)
self.run_one(tile, maxw)
logging.info('Finished tile %d of %d', n+1, ntiles)
if parallel.oq_distribute() == 'zmq':
w.WorkerMaster(config.zworkers).restart()
else:
parallel.Starmap.shutdown()
[docs] def run_one(self, sitecol, maxw):
"""
Run a subset of sites and update the accumulator
"""
acc = {} # g -> pmap
oq = self.oqparam
L = oq.imtls.size
allargs = []
for cm in self.cmakers:
G = len(cm.gsims)
sg = self.csm.src_groups[cm.grp_id]
# maximum size of the pmap array in GB
size_gb = G * L * len(sitecol) * 8 / 1024**3
ntiles = int(numpy.ceil(10 * size_gb / oq.pmap_max_gb))
# NB: disagg_by_src is disabled in case of tiling
assert not (ntiles > 1 and oq.disagg_by_src)
cm.ntiles = ntiles
if ntiles > 1:
logging.debug('Producing %d inner tiles', ntiles)
if sg.atomic or sg.weight <= maxw:
allargs.append((sg, sitecol, cm))
else:
if oq.disagg_by_src: # possible only with a single tile
blks = groupby(sg, basename).values()
else:
blks = block_splitter(sg, maxw, get_weight, sort=True)
for block in blks:
logging.debug('Sending %d source(s) with weight %d',
len(block), sg.weight)
allargs.append((block, sitecol, cm))
# allocate memory
for g in cm.gidx:
acc[g] = ProbabilityMap(sitecol.sids, L, 1).fill(1)
totsize = sum(pmap.array.nbytes for pmap in acc.values())
logging.info('Global pmap size %s', humansize(totsize))
self.datastore.swmr_on() # must come before the Starmap
smap = parallel.Starmap(classical, h5=self.datastore.hdf5)
# using submit avoids the .task_queue and thus core starvation
for args in allargs:
smap.submit(args)
acc = smap.reduce(self.agg_dicts, acc)
for g in list(acc):
# NOTE: on Windows g is a int32; on Linux it is a int64
if isinstance(g, (numpy.int64, numpy.int32)):
with self.monitor('storing PoEs', measuremem=True):
pne = acc.pop(g)
self.haz.store_poes(g, pne.array[:, :, 0], pne.sids)
return acc
[docs] def store_info(self):
"""
Store full_lt, source_info and source_data
"""
self.store_rlz_info(self.rel_ruptures)
self.store_source_info(self.source_data)
df = pandas.DataFrame(self.source_data)
# NB: the impact factor is the number of effective ruptures;
# consider for instance a point source producing 200 ruptures
# for points within the pointsource_distance (n points) and
# producing 20 effective ruptures for the N-n points outside;
# then impact = (200 * n + 20 * (N-n)) / N; for n=1 and N=10
# it gives impact = 38, i.e. there are 38 effective ruptures
df['impact'] = df.nsites / self.N
self.datastore.create_df('source_data', df)
self.source_data.clear() # save a bit of memory
[docs] def collect_hazard(self, acc, pmap_by_kind):
"""
Populate hcurves and hmaps in the .hazard dictionary
:param acc: ignored
:param pmap_by_kind: a dictionary of ProbabilityMaps
"""
# this is practically instantaneous
if pmap_by_kind is None: # instead of a dict
raise MemoryError('You ran out of memory!')
for kind in pmap_by_kind: # hmaps-XXX, hcurves-XXX
pmaps = pmap_by_kind[kind]
if kind in self.hazard:
array = self.hazard[kind]
else:
dset = self.datastore.getitem(kind)
array = self.hazard[kind] = numpy.zeros(dset.shape, dset.dtype)
for r, pmap in enumerate(pmaps):
for idx, sid in enumerate(pmap.sids):
array[sid, r] = pmap.array[idx] # shape (M, P)
[docs] def post_execute(self, dummy):
"""
Check for slow tasks and fix disagg_by_src if needed
"""
task_info = self.datastore.read_df('task_info', 'taskname')
try:
dur = task_info.loc[b'classical'].duration
except KeyError: # no data
pass
else:
slow_tasks = len(dur[dur > 3 * dur.mean()]) and dur.max() > 180
msg = 'There were %d slow task(s)' % slow_tasks
if (slow_tasks and self.SLOW_TASK_ERROR and
not self.oqparam.disagg_by_src):
raise RuntimeError('%s in #%d' % (msg, self.datastore.calc_id))
elif slow_tasks:
logging.info(msg)
if 'disagg_by_src' in list(self.datastore):
srcids = python3compat.decode(
self.datastore['source_info']['source_id'])
if any(';' in srcid for srcid in srcids):
# enable reduction of the array disagg_by_src
arr = self.disagg_by_src = self.datastore['disagg_by_src'][:]
arr, srcids = semicolon_aggregate(arr, srcids)
self.datastore['disagg_by_src'][:] = arr
R = 1 if self.oqparam.collect_rlzs else self.R
self.datastore.set_shape_descr(
'disagg_by_src', site_id=self.N, rlz_id=R,
imt=list(self.oqparam.imtls), lvl=self.L1, src_id=srcids)
if 'disagg_by_src' in self.datastore and not self.oqparam.collect_rlzs:
logging.info('Comparing disagg_by_src vs mean curves')
check_disagg_by_src(self.datastore)
def _create_hcurves_maps(self):
oq = self.oqparam
N = len(self.sitecol)
R = len(self.realizations)
if oq.individual_rlzs is None: # not specified in the job.ini
individual_rlzs = (N == 1) * (R > 1)
else:
individual_rlzs = oq.individual_rlzs
hstats = oq.hazard_stats()
# initialize datasets
P = len(oq.poes)
M = self.M = len(oq.imtls)
imts = list(oq.imtls)
if oq.soil_intensities is not None:
L = M * len(oq.soil_intensities)
else:
L = oq.imtls.size
L1 = self.L1 = L // M
S = len(hstats)
if R > 1 and individual_rlzs or not hstats:
self.datastore.create_dset('hcurves-rlzs', F32, (N, R, M, L1))
self.datastore.set_shape_descr(
'hcurves-rlzs', site_id=N, rlz_id=R, imt=imts, lvl=L1)
if oq.poes:
self.datastore.create_dset('hmaps-rlzs', F32, (N, R, M, P))
self.datastore.set_shape_descr(
'hmaps-rlzs', site_id=N, rlz_id=R,
imt=list(oq.imtls), poe=oq.poes)
if hstats:
self.datastore.create_dset('hcurves-stats', F32, (N, S, M, L1))
self.datastore.set_shape_descr(
'hcurves-stats', site_id=N, stat=list(hstats),
imt=imts, lvl=numpy.arange(L1))
if oq.poes:
self.datastore.create_dset('hmaps-stats', F32, (N, S, M, P))
self.datastore.set_shape_descr(
'hmaps-stats', site_id=N, stat=list(hstats),
imt=list(oq.imtls), poe=oq.poes)
return N, S, M, P, L1, individual_rlzs
# called by execute before post_execute
[docs] def build_curves_maps(self):
"""
Compute and store hcurves-rlzs, hcurves-stats, hmaps-rlzs, hmaps-stats
"""
oq = self.oqparam
hstats = oq.hazard_stats()
if not oq.hazard_curves: # do nothing
return
N, S, M, P, L1, individual = self._create_hcurves_maps()
poes_gb = self.datastore.getsize('_poes') / 1024**3
if poes_gb < .1:
ct = 1
elif poes_gb < 2:
ct = int(poes_gb * 10)
else:
ct = int(poes_gb) + 18 # number of tasks > number of GB
if ct > 1:
logging.info('Producing %d postclassical tasks', ct)
self.weights = ws = [rlz.weight for rlz in self.realizations]
if '_poes' in set(self.datastore):
dstore = self.datastore
else:
dstore = self.datastore.parent
sites_per_task = int(numpy.ceil(self.N / ct))
sbs = dstore['_poes/slice_by_sid'][:]
sbs['sid'] //= sites_per_task
# NB: there is a genious idea here, to split in tasks by using
# the formula ``taskno = sites_ids // sites_per_task`` and then
# extracting a dictionary of slices for each taskno. This works
# since by construction the site_ids are sequential and there are
# at most G slices per task. For instance if there are 6 sites
# disposed in 2 groups and we want to produce 2 tasks we can use
# 012345012345 // 3 = 000111000111 and the slices are
# {0: [(0, 3), (6, 9)], 1: [(3, 6), (9, 12)]}
slicedic = AccumDict(accum=[])
for idx, start, stop in sbs:
slicedic[idx].append((start, stop))
if not slicedic:
# no hazard, nothing to do, happens in case_60
return
# using compactify improves the performance of `read PoEs`;
# I have measured a 3.5x in the AUS model with 1 rlz
allslices = [compactify(slices) for slices in slicedic.values()]
nslices = sum(len(slices) for slices in allslices)
logging.info('There are %d slices of poes [%.1f per task]',
nslices, nslices / len(slicedic))
allargs = [
(getters.PmapGetter(dstore, ws, slices, oq.imtls, oq.poes, ct),
N, hstats, individual, oq.max_sites_disagg, self.amplifier)
for slices in allslices]
self.hazard = {} # kind -> array
hcbytes = 8 * N * S * M * L1
hmbytes = 8 * N * S * M * P if oq.poes else 0
logging.info('Producing %s of hazard curves and %s of hazard maps',
humansize(hcbytes), humansize(hmbytes))
if not performance.numba:
logging.warning('numba is not installed: using the slow algorithm')
if 'delta_rates' in self.datastore.parent:
pass # do nothing for the aftershock calculator, avoids an error
else: # in all the other cases
self.datastore.swmr_on()
parallel.Starmap(
postclassical, allargs,
distribute='no' if self.few_sites else None,
h5=self.datastore.hdf5,
).reduce(self.collect_hazard)
for kind in sorted(self.hazard):
logging.info('Saving %s', kind) # very fast
self.datastore[kind][:] = self.hazard.pop(kind)
fraction = os.environ.get('OQ_SAMPLE_SOURCES')
if fraction and hasattr(self, 'classical_time'):
total_time = time.time() - self.t0
delta = total_time - self.classical_time
est_time = self.classical_time / float(fraction) + delta
logging.info('Estimated time: %.1f hours', est_time / 3600)
# generate plots
if 'hmaps-stats' in self.datastore:
hmaps = self.datastore.sel('hmaps-stats', stat='mean') # NSMP
maxhaz = hmaps.max(axis=(0, 1, 3))
mh = dict(zip(self.oqparam.imtls, maxhaz))
logging.info('The maximum hazard map values are %s', mh)
if Image is None or not self.from_engine: # missing PIL
return
M, P = hmaps.shape[2:]
logging.info('Saving %dx%d mean hazard maps', M, P)
inv_time = oq.investigation_time
allargs = []
for m, imt in enumerate(self.oqparam.imtls):
for p, poe in enumerate(self.oqparam.poes):
dic = dict(m=m, p=p, imt=imt, poe=poe, inv_time=inv_time,
calc_id=self.datastore.calc_id,
array=hmaps[:, 0, m, p])
allargs.append((dic, self.sitecol.lons, self.sitecol.lats))
smap = parallel.Starmap(make_hmap_png, allargs)
for dic in smap:
self.datastore['png/hmap_%(m)d_%(p)d' % dic] = dic['img']