# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2017 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
"""
Disaggregation calculator core functionality
"""
from __future__ import division
import math
import logging
import numpy
from openquake.baselib import hdf5
from openquake.baselib.general import split_in_blocks
from openquake.hazardlib.calc import disagg
from openquake.hazardlib.calc.filters import SourceFilter
from openquake.hazardlib.gsim.base import ContextMaker
from openquake.baselib import parallel
from openquake.hazardlib import sourceconverter
from openquake.commonlib import calc
from openquake.calculators import base, classical
DISAGG_RES_FMT = 'disagg/poe-%(poe)s-rlz-%(rlz)s-%(imt)s-%(lon)s-%(lat)s'
[docs]def compute_disagg(src_filter, sources, src_group_id, rlzs_assoc,
trt_names, curves_dict, bin_edges, oqparam, monitor):
# see https://bugs.launchpad.net/oq-engine/+bug/1279247 for an explanation
# of the algorithm used
"""
:param src_filter:
a :class:`openquake.hazardlib.calc.filter.SourceFilter` instance
:param sources:
list of hazardlib source objects
:param src_group_id:
numeric ID of a SourceGroup instance
:param rlzs_assoc:
a :class:`openquake.commonlib.source.RlzsAssoc` instance
:param dict trt_names:
a tuple of names for the given tectonic region type
:param curves_dict:
a dictionary with the hazard curves for sites, realizations and IMTs
:param bin_egdes:
a dictionary site_id -> edges
:param oqparam:
the parameters in the job.ini file
:param monitor:
monitor of the currently running job
:returns:
a dictionary of probability arrays, with composite key
(sid, rlz.id, poe, imt, iml, trt_names).
"""
sitecol = src_filter.sitecol
trt_num = dict((trt, i) for i, trt in enumerate(trt_names))
gsims = rlzs_assoc.gsims_by_grp_id[src_group_id]
rlzs_by_gsim = rlzs_assoc.rlzs_by_gsim[src_group_id]
result = {} # sid, rlz.id, poe, imt, iml, trt_names -> array
collecting_mon = monitor('collecting bins')
arranging_mon = monitor('arranging bins')
for site, sid in zip(sitecol, sitecol.sids):
# edges as wanted by disagg._arrange_data_in_bins
try:
edges = bin_edges[sid]
except KeyError:
# bin_edges for a given site are missing if the site is far away
continue
# generate source, rupture, sites once per site
with collecting_mon:
cmaker = ContextMaker(gsims, src_filter.integration_distance)
bdata = disagg._collect_bins_data(
trt_num, sources, site, curves_dict[sid],
src_group_id, rlzs_by_gsim, cmaker, oqparam.imtls,
oqparam.poes_disagg, oqparam.truncation_level,
oqparam.num_epsilon_bins, oqparam.iml_disagg,
monitor)
for (rlzi, poe, imt), iml_pne_pairs in bdata.pnes.items():
# extract the probabilities of non-exceedance for the
# given realization, disaggregation PoE, and IMT
iml = iml_pne_pairs[0][0]
probs = numpy.array([p for (i, p) in iml_pne_pairs], float)
# bins in a format handy for hazardlib
bins = [bdata.mags, bdata.dists,
bdata.lons, bdata.lats,
bdata.trts, None, probs]
# call disagg._arrange_data_in_bins
with arranging_mon:
key = (sid, rlzi, poe, imt, iml, trt_names)
matrix = disagg._arrange_data_in_bins(
bins, edges + (trt_names,))
result[key] = numpy.array(
[fn(matrix) for fn in disagg.pmf_map.values()])
return result
@base.calculators.add('disaggregation')
[docs]class DisaggregationCalculator(classical.ClassicalCalculator):
"""
Classical PSHA disaggregation calculator
"""
[docs] def post_execute(self, nbytes_by_kind):
"""Performs the disaggregation"""
self.full_disaggregation()
[docs] def agg_result(self, acc, result):
"""
Collect the results coming from compute_disagg into self.results,
a dictionary with key (sid, rlz.id, poe, imt, iml, trt_names)
and values which are probability arrays.
:param acc: dictionary accumulating the results
:param result: dictionary with the result coming from a task
"""
for key, val in result.items():
acc[key] = 1. - (1. - acc.get(key, 0)) * (1. - val)
return acc
[docs] def get_curves(self, sid):
"""
Get all the relevant hazard curves for the given site ordinal.
Returns a dictionary {(rlz_id, imt) -> curve}.
"""
dic = {}
imtls = self.oqparam.imtls
pgetter = calc.PmapGetter(self.datastore)
for rlz in self.rlzs_assoc.realizations:
try:
pmap = pgetter.get(numpy.array([sid]), rlz.ordinal)
except ValueError: # empty pmaps
logging.info(
'hazard curve contains all zero probabilities; '
'skipping site %d, rlz=%d', sid, rlz.ordinal)
continue
if sid not in pmap:
continue
poes = pmap[sid].convert(imtls)
for imt_str in imtls:
if all(x == 0.0 for x in poes[imt_str]):
logging.info(
'hazard curve contains all zero probabilities; '
'skipping site %d, rlz=%d, IMT=%s',
sid, rlz.ordinal, imt_str)
continue
dic[rlz.ordinal, imt_str] = poes[imt_str]
return dic
[docs] def full_disaggregation(self):
"""
Run the disaggregation phase after hazard curve finalization.
"""
oq = self.oqparam
tl = self.oqparam.truncation_level
bb_dict = self.datastore['bb_dict']
sitecol = self.sitecol
mag_bin_width = self.oqparam.mag_bin_width
eps_edges = numpy.linspace(-tl, tl, self.oqparam.num_epsilon_bins + 1)
logging.info('%d epsilon bins from %s to %s', len(eps_edges) - 1,
min(eps_edges), max(eps_edges))
self.bin_edges = {}
curves_dict = {sid: self.get_curves(sid) for sid in sitecol.sids}
# determine the number of effective source groups
sg_data = self.datastore['csm_info/sg_data']
num_grps = sum(1 for effrup in sg_data['effrup'] if effrup > 0)
nblocks = math.ceil(oq.concurrent_tasks / num_grps)
all_args = []
for smodel in self.csm.source_models:
sm_id = smodel.ordinal
trt_names = tuple(mod.trt for mod in smodel.src_groups)
max_mag = max(mod.max_mag for mod in smodel.src_groups)
min_mag = min(mod.min_mag for mod in smodel.src_groups)
mag_edges = mag_bin_width * numpy.arange(
int(numpy.floor(min_mag / mag_bin_width)),
int(numpy.ceil(max_mag / mag_bin_width) + 1))
logging.info('%d mag bins from %s to %s', len(mag_edges) - 1,
min_mag, max_mag)
for src_group in smodel.src_groups:
if src_group.id not in self.rlzs_assoc.rlzs_by_gsim:
continue # the group has been filtered away
for sid, site in zip(sitecol.sids, sitecol):
curves = curves_dict[sid]
if not curves:
continue # skip zero-valued hazard curves
bb = bb_dict[sm_id, sid]
if not bb:
logging.info(
'location %s was too far, skipping disaggregation',
site.location)
continue
dist_edges, lon_edges, lat_edges = bb.bins_edges(
oq.distance_bin_width, oq.coordinate_bin_width)
logging.info(
'[sid=%d] %d dist bins from %s to %s', sid,
len(dist_edges) - 1, min(dist_edges), max(dist_edges))
logging.info(
'[sid=%d] %d lon bins from %s to %s', sid,
len(lon_edges) - 1, bb.west, bb.east)
logging.info(
'[sid=%d] %d lat bins from %s to %s', sid,
len(lon_edges) - 1, bb.south, bb.north)
self.bin_edges[sm_id, sid] = (
mag_edges, dist_edges, lon_edges, lat_edges, eps_edges)
bin_edges = {}
for sid, site in zip(sitecol.sids, sitecol):
if (sm_id, sid) in self.bin_edges:
bin_edges[sid] = self.bin_edges[sm_id, sid]
src_filter = SourceFilter(sitecol, oq.maximum_distance)
split_sources = []
for src in src_group:
for split, _sites in src_filter(
sourceconverter.split_source(src), sitecol):
split_sources.append(split)
mon = self.monitor('disaggregation')
for srcs in split_in_blocks(split_sources, nblocks):
all_args.append(
(src_filter, srcs, src_group.id, self.rlzs_assoc,
trt_names, curves_dict, bin_edges, oq, mon))
results = parallel.Starmap(compute_disagg, all_args).reduce(
self.agg_result)
self.save_disagg_results(results)
[docs] def save_disagg_results(self, results):
"""
Save all the results of the disaggregation. NB: the number of results
to save is #sites * #rlzs * #disagg_poes * #IMTs.
:param results:
a dictionary of probability arrays
"""
# build a dictionary rlz.ordinal -> source_model.ordinal
sm_id = {}
for i, rlzs in self.rlzs_assoc.rlzs_by_smodel.items():
for rlz in rlzs:
sm_id[rlz.ordinal] = i
# since an extremely small subset of the full disaggregation matrix
# is saved this method can be run sequentially on the controller node
for key, probs in sorted(results.items()):
sid, rlz_id, poe, imt, iml, trt_names = key
edges = self.bin_edges[sm_id[rlz_id], sid]
self.save_disagg_result(
sid, edges, trt_names, probs, rlz_id,
self.oqparam.investigation_time, imt, iml, poe)
[docs] def save_disagg_result(self, site_id, bin_edges, trt_names, matrix,
rlz_id, investigation_time, imt_str, iml, poe):
"""
Save a computed disaggregation matrix to `hzrdr.disagg_result` (see
:class:`~openquake.engine.db.models.DisaggResult`).
:param site_id:
id of the current site
:param bin_edges:
The 5-uple mag, dist, lon, lat, eps
:param trt_names:
The list of Tectonic Region Types
:param matrix:
A probability array
:param rlz_id:
ordinal of the realization to which the results belong.
:param float investigation_time:
Investigation time (years) for the calculation.
:param imt_str:
Intensity measure type string (PGA, SA, etc.)
:param float iml:
Intensity measure level interpolated (using `poe`) from the hazard
curve at the `site`.
:param float poe:
Disaggregation probability of exceedance value for this result.
"""
lon = self.sitecol.lons[site_id]
lat = self.sitecol.lats[site_id]
mag, dist, lons, lats, eps = bin_edges
disp_name = DISAGG_RES_FMT % dict(
poe=poe, rlz=rlz_id, imt=imt_str, lon=lon, lat=lat)
self.datastore[disp_name] = {
'_'.join(key): mat for key, mat in zip(disagg.pmf_map, matrix)}
attrs = self.datastore.hdf5[disp_name].attrs
attrs['rlzi'] = rlz_id
attrs['imt'] = imt_str
attrs['iml'] = iml
attrs['poe'] = poe
attrs['trts'] = hdf5.array_of_vstr(trt_names)
attrs['mag_bin_edges'] = mag
attrs['dist_bin_edges'] = dist
attrs['lon_bin_edges'] = lons
attrs['lat_bin_edges'] = lats
attrs['eps_bin_edges'] = eps
attrs['location'] = (lon, lat)