# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2017 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import h5py
import numpy as np
import copy
import time
import logging
import functools
from openquake.baselib.performance import Monitor
from openquake.baselib.python3compat import raise_
from openquake.baselib.general import DictArray, AccumDict
from openquake.baselib import parallel
from openquake.hazardlib.geo import Point
from openquake.hazardlib.geo.geodetic import min_geodetic_distance
from openquake.hazardlib.source import PointSource
from openquake.hazardlib.mfd import EvenlyDiscretizedMFD
from openquake.hazardlib.probability_map import ProbabilityMap
from openquake.hazardlib.calc.hazard_curve import (
get_probability_no_exceedance, pmap_from_grp)
from openquake.hazardlib.calc.filters import SourceFilter
from openquake.hazardlib.gsim.base import ContextMaker, FarAwayRupture
from openquake.hazardlib import valid, nrml
from openquake.commonlib import source, readinput
from openquake.hazardlib.sourceconverter import SourceConverter
from openquake.calculators import base, classical
from openquake.calculators.ucerf_event_based import (
UCERFSESControl, get_ucerf_rupture, DEFAULT_TRT)
[docs]class UCERFControl(UCERFSESControl):
"""
General control file for a UCERF branch for the classical calculator.
Here we add a new method to generate a set of background sources per
branch
"""
[docs] def get_background_sources(self, background_sids):
"""
Turn the background model of a given branch into a set of point sources
:param str branch_id:
Valid ID of a UCERF branch
:param background_sids:
Site IDs affected by the background sources
"""
with h5py.File(self.source_file, "r") as hdf5:
grid_loc = "/".join(["Grid", self.idx_set["grid_key"]])
mags = hdf5[grid_loc + "/Magnitude"][:]
mmax = hdf5[grid_loc + "/MMax"][background_sids]
rates = hdf5[grid_loc + "/RateArray"][background_sids, :]
locations = hdf5["Grid/Locations"][background_sids, :]
sources = []
for i, bg_idx in enumerate(background_sids):
src_id = "_".join([self.idx_set["grid_key"], str(bg_idx)])
src_name = "|".join([self.idx_set["total_key"], str(bg_idx)])
# Get MFD
mag_idx = np.logical_and(mags >= self.min_mag, mags < mmax[i])
src_mags = mags[mag_idx]
src_rates = rates[i, :]
src_mfd = EvenlyDiscretizedMFD(
src_mags[0], src_mags[1] - src_mags[0],
src_rates[mag_idx].tolist())
ps = PointSource(
src_id, src_name, self.tectonic_region_type, src_mfd,
self.mesh_spacing, self.msr, self.aspect, self.tom,
self.usd, self.lsd,
Point(locations[i, 0], locations[i, 1]),
self.npd, self.hdd)
sources.append(ps)
return sources
[docs] def get_rupture_indices(self, branch_id):
"""
Returns a set of rupture indices
"""
with h5py.File(self.source_file, "r") as hdf5:
idxs = np.arange(len(hdf5[self.idx_set["rate_idx"]]))
logging.info('Found %d ruptures in %s', len(idxs), self.source_file)
logging.info(branch_id)
return idxs
[docs] def filter_sites_by_distance_from_rupture_set(
self, rupset_idx, sites, max_dist):
"""
Filter sites by distances from a set of ruptures
"""
with h5py.File(self.source_file, "r") as hdf5:
rup_index_key = "/".join([self.idx_set["geol_idx"],
"RuptureIndex"])
# Find the combination of rupture sections used in this model
rupture_set = set()
# Determine which of the rupture sections used in this set
# of indices
rup_index = hdf5[rup_index_key]
for i in rupset_idx:
rupture_set.update(rup_index[i])
centroids = np.empty([1, 3])
# For each of the identified rupture sections, retreive the
# centroids
for ridx in rupture_set:
trace_idx = "{:s}/{:s}".format(self.idx_set["sec_idx"],
str(ridx))
centroids = np.vstack([
centroids,
hdf5[trace_idx + "/Centroids"][:].astype("float64")])
distance = min_geodetic_distance(centroids[1:, 0],
centroids[1:, 1],
sites.lons, sites.lats)
idx = distance <= max_dist
if np.any(idx):
return rupset_idx, sites.filter(idx)
else:
return [], []
[docs]def ucerf_poe_map(hdf5, ucerf_source, rupset_idx, s_sites, imtls, cmaker,
trunclevel, ctx_mon, pne_mon):
"""
Compute a ProbabilityMap generated by the given set of indices.
:param hdf5:
UCERF file as instance of open h5py.File object
:param ucerf_source:
UCERFControl object
:param list rupset_idx:
List of rupture indices
"""
pmap = ProbabilityMap.build(len(imtls.array), len(cmaker.gsims),
s_sites.sids, initvalue=1.)
try:
for ridx in rupset_idx:
# Get the ucerf rupture
if not hdf5[ucerf_source.idx_set["rate_idx"]][ridx]:
# Ruptures seem to have a zero probability from time to time
# If this happens, skip it
continue
rup, ridx_string = get_ucerf_rupture(
hdf5, ridx,
ucerf_source.idx_set,
ucerf_source.tom, s_sites,
ucerf_source.integration_distance,
ucerf_source.mesh_spacing,
ucerf_source.tectonic_region_type)
if not rup:
# rupture outside of integration distance
continue
with ctx_mon: # compute distances
try:
sctx, rctx, dctx = cmaker.make_contexts(s_sites, rup)
except FarAwayRupture:
continue
with pne_mon: # compute probabilities and updates the pmap
pnes = get_probability_no_exceedance(
rup, sctx, rctx, dctx, imtls, cmaker.gsims, trunclevel)
for sid, pne in zip(sctx.sites.sids, pnes):
pmap[sid].array *= pne
except Exception as err:
etype, err, tb = sys.exc_info()
msg = 'An error occurred with rupture=%s. Error: %s'
msg %= (ridx, str(err))
raise_(etype, msg, tb)
return ~pmap
[docs]def convert_UCERFSource(self, node):
"""
Converts the Ucerf Source node into an SES Control object
"""
dirname = os.path.dirname(self.fname) # where the source_model_file is
source_file = os.path.join(dirname, node["filename"])
return UCERFControl(
source_file,
node["id"],
self.tom.time_span,
float(node["minMag"]),
npd=self.convert_npdist(node),
hdd=self.convert_hpdist(node),
aspect=~node.ruptAspectRatio,
upper_seismogenic_depth=~node.pointGeometry.upperSeismoDepth,
lower_seismogenic_depth=~node.pointGeometry.lowerSeismoDepth,
msr=valid.SCALEREL[~node.magScaleRel](),
mesh_spacing=self.rupture_mesh_spacing,
trt=node["tectonicRegion"])
SourceConverter.convert_UCERFSource = convert_UCERFSource
[docs]def hazard_curves_per_rupture_subset(
rupset_idx, ucerf_source, src_filter, imtls, cmaker,
truncation_level=None, monitor=Monitor()):
"""
Calculates the probabilities of exceedence from a set of rupture indices
"""
imtls = DictArray(imtls)
ctx_mon = monitor('making contexts', measuremem=False)
pne_mon = monitor('computing poes', measuremem=False)
pmap = ProbabilityMap(len(imtls.array), len(cmaker.gsims))
pmap.calc_times = []
pmap.eff_ruptures = 0
pmap.grp_id = ucerf_source.src_group_id
nsites = len(src_filter.sitecol)
with h5py.File(ucerf_source.source_file, "r") as hdf5:
t0 = time.time()
pmap |= ucerf_poe_map(hdf5, ucerf_source, rupset_idx,
src_filter.sitecol, imtls, cmaker,
truncation_level, ctx_mon, pne_mon)
pmap.calc_times.append(
(ucerf_source.source_id, nsites, time.time() - t0))
pmap.eff_ruptures += pne_mon.counts
return pmap
[docs]def ucerf_classical_hazard_by_rupture_set(
rupset_idx, branchname, ucerf_source, src_group_id, src_filter,
gsims, monitor):
"""
:param rupset_idx:
indices of the rupture sets
:param branchname:
name of the branch
:param ucerf_source:
an object taking the place of a source for UCERF
:param src_group_id:
source group index
:param src_filter:
a source filter returning the sites affected by the source
:param gsims:
a list of GSIMs
:param monitor:
a monitor instance
:returns:
an AccumDict rlz -> curves
"""
truncation_level = monitor.oqparam.truncation_level
imtls = monitor.oqparam.imtls
max_dist = monitor.oqparam.maximum_distance[DEFAULT_TRT]
# Apply the initial rupture to site filtering
rupset_idx, s_sites = \
ucerf_source.filter_sites_by_distance_from_rupture_set(
rupset_idx, src_filter.sitecol, max_dist)
if len(s_sites):
cmaker = ContextMaker(gsims, max_dist)
pmap = hazard_curves_per_rupture_subset(
rupset_idx, ucerf_source, src_filter, imtls, cmaker,
truncation_level, monitor=monitor)
else:
pmap = ProbabilityMap(len(imtls.array), len(gsims))
pmap.calc_times = []
pmap.eff_ruptures = {src_group_id: 0}
pmap.grp_id = ucerf_source.src_group_id
return pmap
[docs]def ucerf_classical_hazard_by_branch(branchname, ucerf_source, src_group_id,
src_filter, gsims, monitor):
"""
:param branchname:
a branch name
:param ucerf_source:
a source-like object for the UCERF model
:param src_group_id:
an ordinal number for the source
:param source filter:
a filter returning the sites affected by the source
:param gsims:
a list of GSIMs
:param monitor:
a monitor instance
:returns:
an AccumDict rlz -> curves
"""
truncation_level = monitor.oqparam.truncation_level
imtls = monitor.oqparam.imtls
trt = ucerf_source.tectonic_region_type
max_dist = monitor.oqparam.maximum_distance[trt]
dic = AccumDict()
dic.calc_times = []
# Two step process here - the first generates the hazard curves from
# the rupture sets
monitor.eff_ruptures = 0
# Apply the initial rupture to site filtering
rupset_idx = ucerf_source.get_rupture_indices(branchname)
rupset_idx, s_sites = \
ucerf_source.filter_sites_by_distance_from_rupture_set(
rupset_idx, src_filter.sitecol, max_dist)
if len(s_sites):
cmaker = ContextMaker(gsims, max_dist)
dic[src_group_id] = pm = hazard_curves_per_rupture_subset(
rupset_idx, ucerf_source, src_filter, imtls, cmaker,
truncation_level, monitor=monitor)
dic.calc_times.extend(pm.calc_times)
else:
dic[src_group_id] = ProbabilityMap(len(imtls.array), len(gsims))
dic.eff_ruptures = {src_group_id: monitor.eff_ruptures} # idem
logging.info('Branch %s', branchname)
# Get the background point sources
background_sids = ucerf_source.get_background_sids(
src_filter.sitecol, max_dist)
bckgnd_sources = ucerf_source.get_background_sources(background_sids)
if bckgnd_sources:
pmap = pmap_from_grp(
bckgnd_sources, src_filter, imtls, gsims, truncation_level,
(), monitor=monitor)
dic[src_group_id] |= pmap
dic.eff_ruptures[src_group_id] += monitor.eff_ruptures
dic.calc_times.extend(pmap.calc_times)
return dic
@base.calculators.add('ucerf_psha')
[docs]class UcerfPSHACalculator(classical.PSHACalculator):
"""
UCERF classical calculator.
"""
core_task = ucerf_classical_hazard_by_branch
is_stochastic = False
[docs] def pre_execute(self):
"""
parse the logic tree and source model input
"""
self.sitecol = readinput.get_site_collection(self.oqparam)
self.gsim_lt = readinput.get_gsim_lt(self.oqparam, [DEFAULT_TRT])
self.smlt = readinput.get_source_model_lt(self.oqparam)
parser = nrml.SourceModelParser(
SourceConverter(self.oqparam.investigation_time,
self.oqparam.rupture_mesh_spacing))
[self.src_group] = parser.parse_src_groups(
self.oqparam.inputs["source_model"])
source_models = []
for sm in self.smlt.gen_source_models(self.gsim_lt):
sg = copy.copy(self.src_group)
sm.src_groups = [sg]
[src] = sg
# Update the event set
src.src_group_id = sg.id = sm.ordinal
src.nsites = len(self.sitecol)
src.branch_id = sm.name
src.idx_set = src.build_idx_set()
source_models.append(sm)
self.csm = source.CompositeSourceModel(
self.gsim_lt, self.smlt, source_models, set_weight=False)
self.rlzs_assoc = self.csm.info.get_rlzs_assoc()
self.rup_data = {}
self.num_tiles = 1
self.infos = {}
[docs] def gen_args(self, branches, ucerf_source, monitor):
"""
:yields: (branch, ucerf_source, grp_id, self.sitecol, gsims, monitor)
"""
for grp_id, branch in enumerate(branches):
gsims = self.rlzs_assoc.gsims_by_grp_id[grp_id]
self.infos[grp_id, ucerf_source.source_id] = source.SourceInfo(
ucerf_source)
yield branch, ucerf_source, grp_id, self.src_filter, gsims, monitor
[docs] def execute(self):
"""
Run in parallel `core_task(sources, sitecol, monitor)`, by
parallelizing on the sources according to their weight and
tectonic region type.
"""
monitor = self.monitor.new(self.core_task.__name__)
monitor.oqparam = oq = self.oqparam
ucerf_source = self.src_group.sources[0]
self.src_filter = SourceFilter(self.sitecol, oq.maximum_distance)
acc = AccumDict({
grp_id: ProbabilityMap(len(oq.imtls.array), len(gsims))
for grp_id, gsims in self.rlzs_assoc.gsims_by_grp_id.items()})
acc.calc_times = []
acc.eff_ruptures = AccumDict() # grp_id -> eff_ruptures
acc.bb_dict = {} # just for API compatibility
if len(self.csm) > 1:
# when multiple branches, parallelise by branch
branches = [br.value for br in self.smlt.branches.values()]
rup_res = parallel.Starmap(
ucerf_classical_hazard_by_branch,
self.gen_args(branches, ucerf_source, monitor)).submit_all()
else:
# single branch
gsims = self.rlzs_assoc.gsims_by_grp_id[0]
[(branch_id, branch)] = self.smlt.branches.items()
branchname = branch.value
ucerf_source.src_group_id = 0
ucerf_source.weight = 1
ucerf_source.nsites = len(self.sitecol)
self.infos[0, ucerf_source.source_id] = source.SourceInfo(
ucerf_source)
logging.info('Getting the background point sources')
with self.monitor('getting background sources', autoflush=True):
background_sids = ucerf_source.get_background_sids(
self.sitecol, oq.maximum_distance[DEFAULT_TRT])
bckgnd_sources = ucerf_source.get_background_sources(
background_sids)
# parallelize on the background sources, small tasks
args = (bckgnd_sources, self.src_filter, oq.imtls,
gsims, self.oqparam.truncation_level, (), monitor)
bg_res = parallel.Starmap.apply(
pmap_from_grp, args,
concurrent_tasks=self.oqparam.concurrent_tasks).submit_all()
# parallelize by rupture subsets
tasks = self.oqparam.concurrent_tasks * 2 # they are big tasks
rup_sets = ucerf_source.get_rupture_indices(branchname)
rup_res = parallel.Starmap.apply(
ucerf_classical_hazard_by_rupture_set,
(rup_sets, branchname, ucerf_source, self.src_group.id,
self.src_filter, gsims, monitor),
concurrent_tasks=tasks).submit_all()
# compose probabilities from background sources
for pmap in bg_res:
acc[0] |= pmap
self.save_data_transfer(bg_res)
pmap_by_grp_id = functools.reduce(self.agg_dicts, rup_res, acc)
with self.monitor('store source_info', autoflush=True):
self.store_source_info(self.infos)
self.save_data_transfer(rup_res)
self.datastore['csm_info'] = self.csm.info
self.rlzs_assoc = self.csm.info.get_rlzs_assoc(
functools.partial(self.count_eff_ruptures, pmap_by_grp_id))
return pmap_by_grp_id
@base.calculators.add('ucerf_classical')
[docs]class UCERFClassicalCalculator(classical.ClassicalCalculator):
pre_calculator = 'ucerf_psha'