# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import io
import os
import time
import zlib
import pickle
import psutil
import logging
import operator
import numpy
import pandas
from PIL import Image
from openquake.baselib import parallel, hdf5, config, python3compat
from openquake.baselib.general import (
AccumDict, DictArray, groupby, humansize, block_splitter)
from openquake.hazardlib import valid, InvalidFile
from openquake.hazardlib.contexts import read_cmakers
from openquake.hazardlib.calc.hazard_curve import classical as hazclassical
from openquake.hazardlib.calc import disagg
from openquake.hazardlib.map_array import RateMap, MapArray, rates_dt
from openquake.commonlib import calc
from openquake.calculators import base, getters, preclassical, views
get_weight = operator.attrgetter('weight')
U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64
I64 = numpy.int64
TWO24 = 2 ** 24
TWO32 = 2 ** 32
GZIP = 'gzip'
BUFFER = 1.5 # enlarge the pointsource_distance sphere to fix the weight;
# with BUFFER = 1 we would have lots of apparently light sources
# collected together in an extra-slow task, as it happens in SHARE
# with ps_grid_spacing=50
def _store(rates, num_chunks, h5, mon=None, gzip=GZIP):
if len(rates) == 0:
return
newh5 = h5 is None
if newh5:
scratch = parallel.scratch_dir(mon.calc_id)
h5 = hdf5.File(f'{scratch}/{mon.task_no}.hdf5', 'a')
chunks = rates['sid'] % num_chunks
idx_start_stop = []
for chunk in numpy.unique(chunks):
ch_rates = rates[chunks == chunk]
try:
h5.create_df(
'_rates', [(n, rates_dt[n]) for n in rates_dt.names], gzip)
hdf5.create(
h5, '_rates/slice_by_idx', getters.slice_dt, fillvalue=None)
except ValueError: # already created
offset = len(h5['_rates/sid'])
else:
offset = 0
idx_start_stop.append((chunk, offset, offset + len(ch_rates)))
hdf5.extend(h5['_rates/sid'], ch_rates['sid'])
hdf5.extend(h5['_rates/gid'], ch_rates['gid'])
hdf5.extend(h5['_rates/lid'], ch_rates['lid'])
hdf5.extend(h5['_rates/rate'], ch_rates['rate'])
iss = numpy.array(idx_start_stop, getters.slice_dt)
hdf5.extend(h5['_rates/slice_by_idx'], iss)
if newh5:
fname = h5.filename
h5.flush()
h5.close()
return fname
[docs]class Set(set):
__iadd__ = set.__ior__
[docs]def get_heavy_gids(source_groups, cmakers):
"""
:returns: the g-indices associated to the heavy groups
"""
if source_groups.attrs['tiling']:
return []
elif cmakers[0].oq.disagg_by_src:
grp_ids = source_groups['grp_id'] # all groups
else:
grp_ids = source_groups['grp_id'][source_groups['blocks'] > 1]
gids = []
for grp_id in grp_ids:
gids.extend(cmakers[grp_id].gid)
return gids
[docs]def store_ctxs(dstore, rupdata_list, grp_id):
"""
Store contexts in the datastore
"""
for rupdata in rupdata_list:
nr = len(rupdata)
known = set(rupdata.dtype.names)
for par in dstore['rup']:
if par == 'grp_id':
hdf5.extend(dstore['rup/grp_id'], numpy.full(nr, grp_id))
elif par == 'probs_occur':
dstore.hdf5.save_vlen('rup/probs_occur', rupdata[par])
elif par in known:
hdf5.extend(dstore['rup/' + par], rupdata[par])
else:
hdf5.extend(dstore['rup/' + par], numpy.full(nr, numpy.nan))
# ########################### task functions ############################ #
[docs]def save_rates(g, N, jid, num_chunks, mon):
"""
Store the rates for the given g on a file scratch/calc_id/task_no.hdf5
"""
with mon.shared['rates'] as rates:
rates_g = rates[:, :, jid[g]]
sids = numpy.arange(N)
for chunk in range(num_chunks):
ch = sids % num_chunks == chunk
rmap = MapArray(sids[ch], rates.shape[1], 1)
rmap.array = rates_g[ch, :, None]
rats = rmap.to_array([g])
_store(rats, num_chunks, None, mon)
[docs]def classical(sources, tilegetters, cmaker, dstore, monitor):
"""
Call the classical calculator in hazardlib
"""
# NB: removing the yield would cause terrible slow tasks
cmaker.init_monitoring(monitor)
with dstore:
if sources is None: # read the full group from the datastore
arr = dstore.getitem('_csm')[cmaker.grp_id]
sources = pickle.loads(zlib.decompress(arr.tobytes()))
sitecol = dstore['sitecol'].complete # super-fast
if cmaker.disagg_by_src and not cmaker.atomic:
# in case_27 (Japan) we do NOT enter here;
# disagg_by_src still works since the atomic group contains a single
# source 'case' (mutex combination of case:01, case:02)
for srcs in groupby(sources, valid.basename).values():
result = hazclassical(srcs, sitecol, cmaker)
result['rmap'].gid = cmaker.gid
yield result
return
for tileget in tilegetters:
result = hazclassical(sources, tileget(sitecol), cmaker)
if cmaker.disagg_by_src:
# do not remove zeros, otherwise AELO for JPN will break
# since there are 4 sites out of 18 with zeros
rmap = result.pop('rmap')
else:
rmap = result.pop('rmap').remove_zeros()
# print(f"{monitor.task_no=} {rmap=}")
if rmap.size_mb and cmaker.blocks == 1 and not cmaker.disagg_by_src:
if len(tilegetters) > 1:
del result['source_data']
if config.directory.custom_tmp:
rates = rmap.to_array(cmaker.gid)
_store(rates, cmaker.num_chunks, None, monitor)
else:
result['rmap'] = rmap.to_array(cmaker.gid)
elif rmap.size_mb:
result['rmap'] = rmap
result['rmap'].gid = cmaker.gid
yield result
[docs]def tiling(tilegetter, cmaker, dstore, monitor):
"""
Tiling calculator
"""
cmaker.init_monitoring(monitor)
with dstore:
arr = dstore.getitem('_csm')[cmaker.grp_id]
sources = pickle.loads(zlib.decompress(arr.tobytes()))
sitecol = dstore['sitecol'].complete # super-fast
result = hazclassical(sources, tilegetter(sitecol), cmaker)
rmap = result.pop('rmap').remove_zeros()
if config.directory.custom_tmp:
rates = rmap.to_array(cmaker.gid)
_store(rates, cmaker.num_chunks, None, monitor)
else:
result['rmap'] = rmap.to_array(cmaker.gid)
return result
# for instance for New Zealand G~1000 while R[full_enum]~1_000_000
# i.e. passing the gweights reduces the data transfer by 1000 times
[docs]def fast_mean(pgetter, monitor):
"""
:param pgetter: a :class:`openquake.commonlib.getters.MapGetter`
:param gweights: an array of G weights
:returns: a dictionary kind -> MapArray
"""
with monitor('reading rates', measuremem=True):
pgetter.init()
if not pgetter.sids: # can happen with tiling
return {}
with monitor('compute stats', measuremem=True):
hcurves = pgetter.get_fast_mean(pgetter.weights)
pmap_by_kind = {'hcurves-stats': [hcurves]}
if pgetter.poes:
with monitor('make_hmaps', measuremem=False):
pmap_by_kind['hmaps-stats'] = calc.make_hmaps(
pmap_by_kind['hcurves-stats'], pgetter.imtls, pgetter.poes)
return pmap_by_kind
[docs]def postclassical(pgetter, wget, hstats, individual_rlzs,
max_sites_disagg, amplifier, monitor):
"""
:param pgetter: a :class:`openquake.commonlib.getters.MapGetter`
:param wget: function (weights[:, :], imt) -> weights[:]
:param hstats: a list of pairs (statname, statfunc)
:param individual_rlzs: if True, also build the individual curves
:param max_sites_disagg: if there are less sites than this, store rup info
:param amplifier: instance of Amplifier or None
:param monitor: instance of Monitor
:returns: a dictionary kind -> MapArray
The "kind" is a string of the form 'rlz-XXX' or 'mean' of 'quantile-XXX'
used to specify the kind of output.
"""
with monitor('reading rates', measuremem=True):
pgetter.init()
if not pgetter.sids: # can happen with tiling
return {}
if amplifier:
# amplification is meant for few sites, i.e. no tiling
with hdf5.File(pgetter.filenames[0], 'r') as f:
ampcode = f['sitecol'].ampcode
imtls = DictArray({imt: amplifier.amplevels
for imt in pgetter.imtls})
else:
imtls = pgetter.imtls
poes, sids = pgetter.poes, U32(pgetter.sids)
M = len(imtls)
L = imtls.size
L1 = L // M
R = pgetter.R
S = len(hstats)
pmap_by_kind = {}
if R == 1 or individual_rlzs:
pmap_by_kind['hcurves-rlzs'] = [
MapArray(sids, M, L1).fill(0) for r in range(R)]
if hstats:
pmap_by_kind['hcurves-stats'] = [
MapArray(sids, M, L1).fill(0) for r in range(S)]
combine_mon = monitor('combine pmaps', measuremem=False)
compute_mon = monitor('compute stats', measuremem=False)
hmaps_mon = monitor('make_hmaps', measuremem=False)
sidx = MapArray(sids, 1, 1).fill(0).sidx
for sid in sids:
idx = sidx[sid]
with combine_mon:
pc = pgetter.get_hcurve(sid) # shape (L, R)
if amplifier:
pc = amplifier.amplify(ampcode[sid], pc)
# NB: the hcurve have soil levels != IMT levels
if pc.sum() == 0: # no data
continue
with compute_mon:
if R == 1 or individual_rlzs:
for r in range(R):
pmap_by_kind['hcurves-rlzs'][r].array[idx] = (
pc[:, r].reshape(M, L1))
if hstats:
for s, (statname, stat) in enumerate(hstats.items()):
sc = getters.build_stat_curve(
pc, imtls, stat, pgetter.weights, wget,
pgetter.use_rates)
arr = sc.reshape(M, L1)
pmap_by_kind['hcurves-stats'][s].array[idx] = arr
if poes and (R == 1 or individual_rlzs):
with hmaps_mon:
pmap_by_kind['hmaps-rlzs'] = calc.make_hmaps(
pmap_by_kind['hcurves-rlzs'], imtls, poes)
if poes and hstats:
with hmaps_mon:
pmap_by_kind['hmaps-stats'] = calc.make_hmaps(
pmap_by_kind['hcurves-stats'], imtls, poes)
return pmap_by_kind
[docs]def make_hmap_png(hmap, lons, lats):
"""
:param hmap:
a dictionary with keys calc_id, m, p, imt, poe, inv_time, array
:param lons: an array of longitudes
:param lats: an array of latitudes
:returns: an Image object containing the hazard map
"""
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_subplot(111)
ax.grid(True)
ax.set_title('hmap for IMT=%(imt)s, poe=%(poe)s\ncalculation %(calc_id)d,'
'inv_time=%(inv_time)dy' % hmap)
ax.set_ylabel('Longitude')
coll = ax.scatter(lons, lats, c=hmap['array'], cmap='jet')
plt.colorbar(coll)
bio = io.BytesIO()
plt.savefig(bio, format='png')
return dict(img=Image.open(bio), m=hmap['m'], p=hmap['p'])
[docs]class Hazard:
"""
Helper class for storing the rates
"""
def __init__(self, dstore, srcidx, gids):
self.datastore = dstore
oq = dstore['oqparam']
self.itime = oq.investigation_time
self.weig = dstore['gweights'][:]
self.imtls = oq.imtls
self.sids = dstore['sitecol/sids'][:]
self.srcidx = srcidx
self.gids = gids
self.N = len(dstore['sitecol/sids'])
self.M = len(oq.imtls)
self.L = oq.imtls.size
self.L1 = self.L // self.M
self.acc = AccumDict(accum={})
# used in in disagg_by_src
[docs] def get_rates(self, pmap, grp_id):
"""
:param pmap: a MapArray
:returns: an array of rates of shape (N, M, L1)
"""
gids = self.gids[grp_id]
rates = pmap.array @ self.weig[gids] / self.itime
return rates.reshape((self.N, self.M, self.L1))
[docs] def store_mean_rates_by_src(self, dic):
"""
Store data inside mean_rates_by_src with shape (N, M, L1, Ns)
"""
mean_rates_by_src = self.datastore['mean_rates_by_src/array'][()]
for key, rates in dic.items():
if isinstance(key, str):
# in case of mean_rates_by_src key is a source ID
idx = self.srcidx[valid.corename(key)]
mean_rates_by_src[..., idx] += rates
self.datastore['mean_rates_by_src/array'][:] = mean_rates_by_src
return mean_rates_by_src
[docs]@base.calculators.add('classical', 'ucerf_classical')
class ClassicalCalculator(base.HazardCalculator):
"""
Classical PSHA calculator
"""
core_task = classical
precalc = 'preclassical'
accept_precalc = ['preclassical', 'classical']
SLOW_TASK_ERROR = False
[docs] def agg_dicts(self, acc, dic):
"""
Aggregate dictionaries of hazard curves by updating the accumulator.
:param acc: accumulator dictionary
:param dic: dict with keys pmap, source_data, rup_data
"""
# NB: dic should be a dictionary, but when the calculation dies
# for an OOM it can become None, thus giving a very confusing error
if dic is None:
raise MemoryError('You ran out of memory!')
grp_id = dic.pop('grp_id')
sdata = dic.pop('source_data', None)
if sdata is not None:
self.source_data += sdata
self.rel_ruptures[grp_id] += sum(sdata['nrupts'])
cfactor = dic.pop('cfactor')
if cfactor[1] != cfactor[0]:
print('ctxs_per_mag = {:.0f}, cfactor_per_task = {:.1f}'.format(
cfactor[1] / cfactor[2], cfactor[1] / cfactor[0]))
self.cfactor += cfactor
# store rup_data if there are few sites
if self.few_sites and len(dic['rup_data']):
with self.monitor('saving rup_data'):
store_ctxs(self.datastore, dic['rup_data'], grp_id)
rmap = dic.pop('rmap', None)
source_id = dic.pop('basename', '') # non-empty for disagg_by_src
if source_id:
# accumulate the rates for the given source
acc[source_id] += self.haz.get_rates(rmap, grp_id)
if rmap is None:
# already stored in the workers, case_22
pass
elif isinstance(rmap, numpy.ndarray):
# store the rates directly, case_03 or tiling without custom_tmp
with self.monitor('storing rates', measuremem=True):
_store(rmap, self.num_chunks, self.datastore)
else:
# aggregating rates is ultra-fast compared to storing
self.rmap += rmap
return acc
[docs] def create_rup(self):
"""
Create the rup datasets *before* starting the calculation
"""
params = {'grp_id', 'occurrence_rate', 'clon', 'clat', 'rrup',
'probs_occur', 'sids', 'src_id', 'rup_id', 'weight'}
for cm in self.cmakers:
params.update(cm.REQUIRES_RUPTURE_PARAMETERS)
params.update(cm.REQUIRES_DISTANCES)
if self.few_sites:
descr = [] # (param, dt)
for param in sorted(params):
if param == 'sids':
dt = U16 # storing only for few sites
elif param == 'probs_occur':
dt = hdf5.vfloat64
elif param in ('src_id', 'rup_id'):
dt = U32
elif param == 'grp_id':
dt = U16
else:
dt = F32
descr.append((param, dt))
self.datastore.create_df('rup', descr, 'gzip')
# NB: the relevant ruptures are less than the effective ruptures,
# which are a preclassical concept
[docs] def init_poes(self):
oq = self.oqparam
self.cmakers = read_cmakers(self.datastore, self.csm)
parent = self.datastore.parent
if parent:
# tested in case_43
self.req_gb, self.max_weight, self.trt_rlzs, self.gids = (
preclassical.store_tiles(
self.datastore, self.csm, self.sitecol, self.cmakers))
self.cfactor = numpy.zeros(3)
self.rel_ruptures = AccumDict(accum=0) # grp_id -> rel_ruptures
if oq.disagg_by_src:
M = len(oq.imtls)
L1 = oq.imtls.size // M
sources = self.csm.get_basenames()
mean_rates_by_src = numpy.zeros((self.N, M, L1, len(sources)))
dic = dict(shape_descr=['site_id', 'imt', 'lvl', 'src_id'],
site_id=self.N, imt=list(oq.imtls),
lvl=L1, src_id=numpy.array(sources))
self.datastore['mean_rates_by_src'] = hdf5.ArrayWrapper(
mean_rates_by_src, dic)
# create empty dataframes
self.num_chunks = getters.get_num_chunks(self.datastore)
# create empty dataframes
self.datastore.create_df(
'_rates', [(n, rates_dt[n]) for n in rates_dt.names])
self.datastore.create_dset('_rates/slice_by_idx', getters.slice_dt)
[docs] def check_memory(self, N, L, maxw):
"""
Log the memory required to receive the largest MapArray,
assuming all sites are affected (upper limit)
"""
num_gs = [len(cm.gsims) for cm in self.cmakers]
size = max(num_gs) * N * L * 4
avail = min(psutil.virtual_memory().available, config.memory.limit)
if avail < size:
raise MemoryError(
'You have only %s of free RAM' % humansize(avail))
[docs] def execute(self):
"""
Run in parallel `core_task(sources, sitecol, monitor)`, by
parallelizing on the sources according to their weight and
tectonic region type.
"""
oq = self.oqparam
if oq.hazard_calculation_id:
logging.info('Reading from parent calculation')
parent = self.datastore.parent
self.full_lt = parent['full_lt'].init()
self.csm = parent['_csm']
self.csm.init(self.full_lt)
self.datastore['source_info'] = parent['source_info'][:]
oq.mags_by_trt = {
trt: python3compat.decode(dset[:])
for trt, dset in parent['source_mags'].items()}
if '_rates' in parent:
self.build_curves_maps() # repeat post-processing
return {}
self.init_poes()
if oq.fastmean:
logging.info('Will use the fast_mean algorithm')
if not hasattr(self, 'trt_rlzs'):
self.max_gb, self.trt_rlzs, self.gids = getters.get_pmaps_gb(
self.datastore, self.full_lt)
srcidx = {name: i for i, name in enumerate(self.csm.get_basenames())}
self.haz = Hazard(self.datastore, srcidx, self.gids)
rlzs = self.R == 1 or oq.individual_rlzs
if not rlzs and not oq.hazard_stats():
raise InvalidFile('%(job_ini)s: you disabled all statistics',
oq.inputs)
self.source_data = AccumDict(accum=[])
sgs, ds = self._pre_execute()
if self.tiling:
self._execute_tiling(sgs, ds)
else:
self._execute_regular(sgs, ds)
if self.cfactor[0] == 0:
if self.N == 1:
logging.error('The site is far from all seismic sources'
' included in the hazard model')
else:
raise RuntimeError('The sites are far from all seismic sources'
' included in the hazard model')
else:
logging.info('cfactor = {:_d}/{:_d} = {:.1f}'.format(
int(self.cfactor[1]), int(self.cfactor[0]),
self.cfactor[1] / self.cfactor[0]))
self.store_info()
self.build_curves_maps()
return True
def _pre_execute(self):
oq = self.oqparam
sgs = self.datastore['source_groups']
self.tiling = sgs.attrs['tiling']
if 'sitecol' in self.datastore.parent:
ds = self.datastore.parent
else:
ds = self.datastore
if config.directory.custom_tmp:
scratch = parallel.scratch_dir(self.datastore.calc_id)
logging.info('Storing the rates in %s', scratch)
self.datastore.hdf5.attrs['scratch_dir'] = scratch
if self.tiling:
assert not oq.disagg_by_src
assert self.N > self.oqparam.max_sites_disagg, self.N
else: # regular calculator
self.create_rup() # create the rup/ datasets BEFORE swmr_on()
return sgs, ds
def _execute_regular(self, sgs, ds):
allargs = []
n_out = []
for cmaker, tilegetters, blocks, splits in self.csm.split(
self.cmakers, self.sitecol, self.max_weight, self.num_chunks):
for block in blocks:
for tgetters in block_splitter(tilegetters, splits):
allargs.append((block, tgetters, cmaker, ds))
n_out.append(len(tilegetters))
logging.warning('This is a regular calculation with %d outputs, '
'%d tasks, min_tiles=%d, max_tiles=%d',
sum(n_out), len(allargs), min(n_out), max(n_out))
# log info about the heavy sources
srcs = self.csm.get_sources()
maxsrc = max(srcs, key=lambda s: s.weight / self.csm.splits[s.grp_id])
logging.info('Heaviest: %s', maxsrc)
L = self.oqparam.imtls.size
gids = get_heavy_gids(sgs, self.cmakers)
self.rmap = RateMap(self.sitecol.sids, L, gids)
self.datastore.swmr_on() # must come before the Starmap
smap = parallel.Starmap(classical, allargs, h5=self.datastore.hdf5)
if not self.oqparam.disagg_by_src:
smap.expected_outputs = sum(n_out)
acc = smap.reduce(self.agg_dicts, AccumDict(accum=0.))
self._post_regular(acc)
def _execute_tiling(self, sgs, ds):
allargs = []
n_out = []
for cmaker, tilegetters, blocks, splits in self.csm.split(
self.cmakers, self.sitecol, self.max_weight, self.num_chunks, True):
for block in blocks:
for tgetter in tilegetters:
allargs.append((tgetter, cmaker, ds))
n_out.append(len(tilegetters))
logging.warning('This is a tiling calculation with '
'%d tasks, min_tiles=%d, max_tiles=%d',
len(allargs), min(n_out), max(n_out))
t0 = time.time()
self.datastore.swmr_on() # must come before the Starmap
smap = parallel.Starmap(tiling, allargs, h5=self.datastore.hdf5)
smap.reduce(self.agg_dicts, AccumDict(accum=0.))
fraction = os.environ.get('OQ_SAMPLE_SOURCES')
if fraction:
est_time = time.time() - t0 / float(fraction)
logging.info('Estimated time for the classical part: %.1f hours '
'(upper limit)', est_time / 3600)
def _post_regular(self, acc):
# save the rates and performs some checks
if self.rmap.size_mb:
logging.info('Processing %s', self.rmap)
def genargs():
for g, j in self.rmap.jid.items():
yield g, self.N, self.rmap.jid, self.num_chunks
if (self.rmap.size_mb > 200 and config.directory.custom_tmp and
parallel.oq_distribute() != 'no'):
# tested in the oq-risk-tests
self.datastore.swmr_on() # must come before the Starmap
savemap = parallel.Starmap(save_rates, genargs(),
h5=self.datastore,
distribute='processpool')
savemap.share(rates=self.rmap.array)
savemap.reduce()
elif self.rmap.size_mb:
for g, N, jid, num_chunks in genargs():
rates = self.rmap.to_array(g)
_store(rates, self.num_chunks, self.datastore)
del self.rmap
if self.oqparam.disagg_by_src:
mrs = self.haz.store_mean_rates_by_src(acc)
if self.oqparam.use_rates and self.N == 1: # sanity check
self.check_mean_rates(mrs)
# NB: the largest mean_rates_by_src is SUPER-SENSITIVE to numerics!
# in particular disaggregation/case_15 is sensitive to num_cores
# with very different values between 2 and 16 cores(!)
[docs] def check_mean_rates(self, mean_rates_by_src):
"""
The sum of the mean_rates_by_src must correspond to the mean_rates
"""
try:
exp = disagg.to_rates(self.datastore['hcurves-stats'][0, 0])
except KeyError: # if there are no ruptures close to the site
return
got = mean_rates_by_src[0].sum(axis=2) # sum over the sources
for m in range(len(got)):
# skipping large rates which can be wrong due to numerics
# (it happens in logictree/case_05 and in Japan)
ok = got[m] < 2.
numpy.testing.assert_allclose(got[m, ok], exp[m, ok], atol=1E-5)
[docs] def store_info(self):
"""
Store full_lt, source_info and source_data
"""
self.store_rlz_info(self.rel_ruptures)
self.store_source_info(self.source_data)
df = pandas.DataFrame(self.source_data)
# NB: the impact factor is the number of effective ruptures;
# consider for instance a point source producing 200 ruptures
# for points within the pointsource_distance (n points) and
# producing 20 effective ruptures for the N-n points outside;
# then impact = (200 * n + 20 * (N-n)) / N; for n=1 and N=10
# it gives impact = 38, i.e. there are 38 effective ruptures
df['impact'] = df.nsites / self.N
self.datastore.create_df('source_data', df)
self.source_data.clear() # save a bit of memory
[docs] def collect_hazard(self, acc, pmap_by_kind):
"""
Populate hcurves and hmaps in the .hazard dictionary
:param acc: ignored
:param pmap_by_kind: a dictionary of MapArrays
"""
# this is practically instantaneous
if pmap_by_kind is None: # instead of a dict
raise MemoryError('You ran out of memory!')
for kind in pmap_by_kind: # hmaps-XXX, hcurves-XXX
pmaps = pmap_by_kind[kind]
if kind in self.hazard:
array = self.hazard[kind]
else:
dset = self.datastore.getitem(kind)
array = self.hazard[kind] = numpy.zeros(dset.shape, dset.dtype)
for r, pmap in enumerate(pmaps):
for idx, sid in enumerate(pmap.sids):
array[sid, r] = pmap.array[idx] # shape (M, P)
[docs] def post_execute(self, dummy):
"""
Check for slow tasks
"""
oq = self.oqparam
task_info = self.datastore.read_df('task_info', 'taskname')
try:
dur = views.discard_small(task_info.loc[b'classical'].duration)
except KeyError: # no data
pass
else:
slow_tasks = len(dur[dur > 3 * dur.mean()]) and dur.max() > 180
msg = 'There were %d slow task(s)' % slow_tasks
if slow_tasks and self.SLOW_TASK_ERROR and not oq.disagg_by_src:
raise RuntimeError('%s in #%d' % (msg, self.datastore.calc_id))
elif slow_tasks:
logging.info(msg)
def _create_hcurves_maps(self):
oq = self.oqparam
N = len(self.sitecol)
R = len(self.datastore['weights'])
if oq.individual_rlzs is None: # not specified in the job.ini
individual_rlzs = (N == 1) * (R > 1)
else:
individual_rlzs = oq.individual_rlzs
hstats = oq.hazard_stats()
# initialize datasets
P = len(oq.poes)
M = self.M = len(oq.imtls)
imts = list(oq.imtls)
if oq.soil_intensities is not None:
L = M * len(oq.soil_intensities)
else:
L = oq.imtls.size
L1 = self.L1 = L // M
S = len(hstats)
if R == 1 or individual_rlzs:
self.datastore.create_dset('hcurves-rlzs', F32, (N, R, M, L1))
self.datastore.set_shape_descr(
'hcurves-rlzs', site_id=N, rlz_id=R, imt=imts, lvl=L1)
if oq.poes:
self.datastore.create_dset('hmaps-rlzs', F32, (N, R, M, P))
self.datastore.set_shape_descr(
'hmaps-rlzs', site_id=N, rlz_id=R,
imt=list(oq.imtls), poe=oq.poes)
if hstats:
self.datastore.create_dset('hcurves-stats', F32, (N, S, M, L1))
self.datastore.set_shape_descr(
'hcurves-stats', site_id=N, stat=list(hstats),
imt=imts, lvl=numpy.arange(L1))
if oq.poes:
self.datastore.create_dset('hmaps-stats', F32, (N, S, M, P))
self.datastore.set_shape_descr(
'hmaps-stats', site_id=N, stat=list(hstats),
imt=list(oq.imtls), poe=oq.poes)
return N, S, M, P, L1, individual_rlzs
# called by execute before post_execute
[docs] def build_curves_maps(self):
"""
Compute and store hcurves-rlzs, hcurves-stats, hmaps-rlzs, hmaps-stats
"""
oq = self.oqparam
hstats = oq.hazard_stats()
N, S, M, P, L1, individual = self._create_hcurves_maps()
if '_rates' in set(self.datastore) or not self.datastore.parent:
dstore = self.datastore
else:
dstore = self.datastore.parent
wget = self.full_lt.wget
allargs = [(getter, wget, hstats, individual, oq.max_sites_disagg,
self.amplifier) for getter in getters.map_getters(
dstore, self.full_lt)]
if not config.directory.custom_tmp and not allargs: # case_60
logging.warning('No rates were generated')
return
self.hazard = {} # kind -> array
hcbytes = 8 * N * S * M * L1
hmbytes = 8 * N * S * M * P if oq.poes else 0
if hcbytes:
logging.info('Producing %s of hazard curves', humansize(hcbytes))
if hmbytes:
logging.info('Producing %s of hazard maps', humansize(hmbytes))
if 'delta_rates' in oq.inputs:
pass # avoid an HDF5 error
else: # in all the other cases
self.datastore.swmr_on()
if oq.fastmean:
parallel.Starmap(
fast_mean, [args[0:1] for args in allargs],
distribute='no' if self.few_sites else None,
h5=self.datastore.hdf5,
).reduce(self.collect_hazard)
else:
parallel.Starmap(
postclassical, allargs,
distribute='no' if self.few_sites else None,
h5=self.datastore.hdf5,
).reduce(self.collect_hazard)
for kind in sorted(self.hazard):
logging.info('Saving %s', kind) # very fast
self.datastore[kind][:] = self.hazard.pop(kind)
if 'hmaps-stats' in self.datastore and not oq.tile_spec:
self.plot_hmaps()
[docs] def plot_hmaps(self):
"""
Generate hazard map plots if there are more than 1000 sites
"""
hmaps = self.datastore.sel('hmaps-stats', stat='mean') # NSMP
maxhaz = hmaps.max(axis=(0, 1, 3))
mh = dict(zip(self.oqparam.imtls, maxhaz))
logging.info('The maximum hazard map values are %s', mh)
if Image is None or not self.from_engine: # missing PIL
return
if self.N < 1000: # few sites, don't plot
return
M, P = hmaps.shape[2:]
logging.info('Saving %dx%d mean hazard maps', M, P)
inv_time = self.oqparam.investigation_time
allargs = []
for m, imt in enumerate(self.oqparam.imtls):
for p, poe in enumerate(self.oqparam.poes):
dic = dict(m=m, p=p, imt=imt, poe=poe, inv_time=inv_time,
calc_id=self.datastore.calc_id,
array=hmaps[:, 0, m, p])
allargs.append((dic, self.sitecol.lons, self.sitecol.lats))
smap = parallel.Starmap(make_hmap_png, allargs, h5=self.datastore)
for dic in smap:
self.datastore['png/hmap_%(m)d_%(p)d' % dic] = dic['img']