# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division
import operator
import collections
from functools import partial
import numpy
from openquake.baselib.general import AccumDict
from openquake.hazardlib.geo.utils import get_spherical_bounding_box
from openquake.hazardlib.geo.utils import get_longitudinal_extent
from openquake.hazardlib.geo.geodetic import npoints_between
from openquake.hazardlib.calc.filters import source_site_distance_filter
from openquake.hazardlib.calc.hazard_curve import (
hazard_curves_per_trt, zero_curves, zero_maps,
array_of_curves, ProbabilityMap)
from openquake.risklib import scientific
from openquake.commonlib import parallel, datastore, source, calc
from openquake.calculators import base
HazardCurve = collections.namedtuple('HazardCurve', 'location poes')
# this is needed for the disaggregation
[docs]class BoundingBox(object):
"""
A class to store the bounding box in distances, longitudes and magnitudes,
given a source model and a site. This is used for disaggregation
calculations. The goal is to determine the minimum and maximum
distances of the ruptures generated from the model from the site;
moreover the maximum and minimum longitudes and magnitudes are stored, by
taking in account the international date line.
"""
def __init__(self, lt_model_id, site_id):
self.lt_model_id = lt_model_id
self.site_id = site_id
self.min_dist = self.max_dist = None
self.east = self.west = self.south = self.north = None
[docs] def update(self, dists, lons, lats):
"""
Compare the current bounding box with the value in the arrays
dists, lons, lats and enlarge it if needed.
:param dists:
a sequence of distances
:param lons:
a sequence of longitudes
:param lats:
a sequence of latitudes
"""
if self.min_dist is not None:
dists = [self.min_dist, self.max_dist] + dists
if self.west is not None:
lons = [self.west, self.east] + lons
if self.south is not None:
lats = [self.south, self.north] + lats
self.min_dist, self.max_dist = min(dists), max(dists)
self.west, self.east, self.north, self.south = \
get_spherical_bounding_box(lons, lats)
[docs] def update_bb(self, bb):
"""
Compare the current bounding box with the given bounding box
and enlarge it if needed.
:param bb:
an instance of :class:
`openquake.engine.calculators.hazard.classical.core.BoundingBox`
"""
if bb: # the given bounding box must be non-empty
self.update([bb.min_dist, bb.max_dist], [bb.west, bb.east],
[bb.south, bb.north])
[docs] def bins_edges(self, dist_bin_width, coord_bin_width):
"""
Define bin edges for disaggregation histograms, from the bin data
collected from the ruptures.
:param dists:
array of distances from the ruptures
:param lons:
array of longitudes from the ruptures
:param lats:
array of latitudes from the ruptures
:param dist_bin_width:
distance_bin_width from job.ini
:param coord_bin_width:
coordinate_bin_width from job.ini
"""
dist_edges = dist_bin_width * numpy.arange(
int(self.min_dist / dist_bin_width),
int(numpy.ceil(self.max_dist / dist_bin_width) + 1))
west = numpy.floor(self.west / coord_bin_width) * coord_bin_width
east = numpy.ceil(self.east / coord_bin_width) * coord_bin_width
lon_extent = get_longitudinal_extent(west, east)
lon_edges, _, _ = npoints_between(
west, 0, 0, east, 0, 0,
numpy.round(lon_extent / coord_bin_width) + 1)
lat_edges = coord_bin_width * numpy.arange(
int(numpy.floor(self.south / coord_bin_width)),
int(numpy.ceil(self.north / coord_bin_width) + 1))
return dist_edges, lon_edges, lat_edges
def __nonzero__(self):
"""
True if the bounding box is non empty.
"""
return (self.min_dist is not None and self.west is not None and
self.south is not None)
@parallel.litetask
[docs]def classical(sources, sitecol, siteidx, rlzs_assoc, monitor):
"""
:param sources:
a non-empty sequence of sources of homogeneous tectonic region type
:param sitecol:
a SiteCollection instance
:param siteidx:
index of the first site (0 if there is a single tile)
:param rlzs_assoc:
a RlzsAssoc instance
:param monitor:
a monitor instance
:returns:
an AccumDict rlz -> curves
"""
truncation_level = monitor.oqparam.truncation_level
imtls = monitor.oqparam.imtls
trt_model_id = sources[0].trt_model_id
# sanity check: the trt_model must be the same for all sources
for src in sources[1:]:
assert src.trt_model_id == trt_model_id
gsims = rlzs_assoc.gsims_by_trt_id[trt_model_id]
trt = sources[0].tectonic_region_type
max_dist = monitor.oqparam.maximum_distance[trt]
dic = AccumDict()
dic.siteslice = slice(siteidx, siteidx + len(sitecol))
if monitor.oqparam.poes_disagg:
sm_id = rlzs_assoc.sm_ids[trt_model_id]
dic.bbs = [BoundingBox(sm_id, sid) for sid in sitecol.sids]
else:
dic.bbs = []
# NB: the source_site_filter below is ESSENTIAL for performance inside
# hazard_curves_per_trt, since it reduces the full site collection
# to a filtered one *before* doing the rupture filtering
dic[trt_model_id] = hazard_curves_per_trt(
sources, sitecol, imtls, gsims, truncation_level,
source_site_filter=source_site_distance_filter(max_dist),
maximum_distance=max_dist, bbs=dic.bbs, monitor=monitor)
dic.calc_times = monitor.calc_times # added by hazard_curves_per_trt
dic.eff_ruptures = {trt_model_id: monitor.eff_ruptures} # idem
return dic
@base.calculators.add('classical')
[docs]class ClassicalCalculator(base.HazardCalculator):
"""
Classical PSHA calculator
"""
core_task = classical
source_info = datastore.persistent_attribute('source_info')
[docs] def agg_dicts(self, acc, val):
"""
Aggregate dictionaries of hazard curves by updating the accumulator.
:param acc: accumulator dictionary
:param val: a nested dictionary trt_id -> ProbabilityMap
"""
with self.monitor('aggregate curves', autoflush=True):
if hasattr(val, 'calc_times'):
acc.calc_times.extend(val.calc_times)
if hasattr(val, 'eff_ruptures'):
acc.eff_ruptures += val.eff_ruptures
for bb in getattr(val, 'bbs', []):
acc.bb_dict[bb.lt_model_id, bb.site_id].update_bb(bb)
acc |= val
self.datastore.flush()
return acc
[docs] def count_eff_ruptures(self, result_dict, trt_model):
"""
Returns the number of ruptures in the trt_model (after filtering)
or 0 if the trt_model has been filtered away.
:param result_dict: a dictionary with keys (trt_id, gsim)
:param trt_model: a TrtModel instance
"""
return (result_dict.eff_ruptures.get(trt_model.id, 0) / self.num_tiles)
[docs] def zerodict(self):
"""
Initial accumulator, an empty ProbabilityMap
"""
zd = ProbabilityMap()
zd.calc_times = []
zd.eff_ruptures = AccumDict() # trt_id -> eff_ruptures
zd.bb_dict = {
(smodel.ordinal, sid): BoundingBox(smodel.ordinal, sid)
for sid in self.sitecol.sids
for smodel in self.csm.source_models
} if self.oqparam.poes_disagg else {}
return zd
[docs] def execute(self):
"""
Run in parallel `core_task(sources, sitecol, monitor)`, by
parallelizing on the sources according to their weight and
tectonic region type.
"""
monitor = self.monitor.new(self.core_task.__name__)
monitor.oqparam = self.oqparam
curves_by_trt_id = self.taskman.reduce(self.agg_dicts, self.zerodict())
self.save_data_transfer(self.taskman)
with self.monitor('store source_info', autoflush=True):
self.store_source_info(curves_by_trt_id)
self.rlzs_assoc = self.csm.info.get_rlzs_assoc(
partial(self.count_eff_ruptures, curves_by_trt_id))
self.datastore['csm_info'] = self.csm.info
return curves_by_trt_id
[docs] def store_source_info(self, curves_by_trt_id):
# store the information about received data
received = self.taskman.received
if received:
tname = self.taskman.name
self.datastore.save('job_info', {
tname + '_max_received_per_task': max(received),
tname + '_tot_received': sum(received),
tname + '_num_tasks': len(received)})
# then save the calculation times per each source
calc_times = getattr(curves_by_trt_id, 'calc_times', [])
if calc_times:
sources = self.csm.get_sources()
info_dict = {(rec['trt_model_id'], rec['source_id']): rec
for rec in self.source_info}
for src_idx, dt in calc_times:
src = sources[src_idx]
info = info_dict[src.trt_model_id, src.source_id]
info['calc_time'] += dt
self.source_info = numpy.array(
sorted(info_dict.values(), key=operator.itemgetter(7),
reverse=True), source.source_info_dt)
self.datastore.hdf5.flush()
[docs] def post_execute(self, curves_by_trt_id):
"""
Collect the hazard curves by realization and export them.
:param curves_by_trt_id:
a dictionary trt_id -> hazard curves
"""
nsites = len(self.sitecol)
imtls = self.oqparam.imtls
curves_by_trt_gsim = {}
with self.monitor('saving probability maps', autoflush=True):
for trt_id, poes in curves_by_trt_id.items():
if poes:
key = 'poes/%04d' % trt_id
self.datastore[key] = curves_by_trt_id[trt_id]
self.datastore.set_attrs(
key, trt=self.csm.info.get_trt(trt_id))
gsims = self.rlzs_assoc.gsims_by_trt_id[trt_id]
for i, gsim in enumerate(gsims):
curves_by_trt_gsim[trt_id, gsim] = (
curves_by_trt_id[trt_id].extract(i))
self.datastore.set_nbytes('poes')
with self.monitor('combine curves_by_rlz', autoflush=True):
curves_by_rlz = self.rlzs_assoc.combine_curves(curves_by_trt_gsim)
self.save_curves({rlz: array_of_curves(curves, nsites, imtls)
for rlz, curves in curves_by_rlz.items()})
[docs] def save_curves(self, curves_by_rlz):
"""
Save the dictionary curves_by_rlz
"""
oq = self.oqparam
rlzs = self.rlzs_assoc.realizations
nsites = len(self.sitecol)
if oq.individual_curves:
with self.monitor('save curves_by_rlz', autoflush=True):
for rlz, curves in curves_by_rlz.items():
self.store_curves('rlz-%03d' % rlz.ordinal, curves, rlz)
if len(rlzs) == 1: # cannot compute statistics
[self.mean_curves] = curves_by_rlz.values()
return
with self.monitor('compute and save statistics', autoflush=True):
weights = (None if oq.number_of_logic_tree_samples
else [rlz.weight for rlz in rlzs])
# mean curves are always computed but stored only on request
zc = zero_curves(nsites, oq.imtls)
self.mean_curves = numpy.array(zc)
for imt in oq.imtls:
self.mean_curves[imt] = scientific.mean_curve(
[curves_by_rlz.get(rlz, zc)[imt] for rlz in rlzs], weights)
self.quantile = {}
for q in oq.quantile_hazard_curves:
self.quantile[q] = qc = numpy.array(zc)
for imt in oq.imtls:
curves = [curves_by_rlz[rlz][imt] for rlz in rlzs]
qc[imt] = scientific.quantile_curve(
curves, q, weights).reshape((nsites, -1))
if oq.mean_hazard_curves:
self.store_curves('mean', self.mean_curves)
for q in self.quantile:
self.store_curves('quantile-%s' % q, self.quantile[q])
[docs] def hazard_maps(self, curves):
"""
Compute the hazard maps associated to the curves
"""
maps = zero_maps(
len(self.sitecol), self.oqparam.imtls, self.oqparam.poes)
for imt in curves.dtype.fields:
# build a matrix of size (N, P)
data = calc.compute_hazard_maps(
curves[imt], self.oqparam.imtls[imt], self.oqparam.poes)
for poe, hmap in zip(self.oqparam.poes, data.T):
maps['%s-%s' % (imt, poe)] = hmap
return maps
[docs] def store_curves(self, kind, curves, rlz=None):
"""
Store all kind of curves, optionally computing maps and uhs curves.
:param kind: the kind of curves to store
:param curves: an array of N curves to store
:param rlz: hazard realization, if any
"""
oq = self.oqparam
self._store('hcurves/' + kind, curves, rlz, nbytes=curves.nbytes)
self.datastore['hcurves'].attrs['imtls'] = [
(imt, len(imls)) for imt, imls in self.oqparam.imtls.items()]
if oq.hazard_maps or oq.uniform_hazard_spectra:
# hmaps is a composite array of shape (N, P)
hmaps = self.hazard_maps(curves)
self._store('hmaps/' + kind, hmaps, rlz,
poes=oq.poes, nbytes=hmaps.nbytes)
def _store(self, name, curves, rlz, **kw):
self.datastore.hdf5[name] = curves
dset = self.datastore.hdf5[name]
if rlz is not None:
dset.attrs['uid'] = rlz.uid
for k, v in kw.items():
dset.attrs[k] = v
[docs]def nonzero(val):
"""
:returns: the sum of the composite array `val`
"""
return sum(val[k].sum() for k in val.dtype.names)