Source code for openquake.calculators.ucerf_classical

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2015-2017 GEM Foundation
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <>.
import os
import time
import socket
import logging
import functools
from datetime import datetime
import numpy
import h5py

from openquake.baselib.general import DictArray, AccumDict
from openquake.baselib import parallel
from openquake.hazardlib.probability_map import ProbabilityMap
from openquake.hazardlib.calc.hazard_curve import pmap_from_grp, poe_map
from openquake.hazardlib.calc.filters import SourceFilter
from openquake.hazardlib.gsim.base import ContextMaker
from openquake.hazardlib import valid
from openquake.commonlib import source, readinput, util
from openquake.hazardlib.sourceconverter import SourceConverter

from openquake.calculators import base, classical
from openquake.calculators.ucerf_event_based import (
    UCERFControl, get_composite_source_model)
# FIXME: the counting of effective ruptures has to be revised completely

[docs]def convert_UCERFSource(self, node): """ Converts the Ucerf Source node into an SES Control object """ dirname = os.path.dirname(self.fname) # where the source_model_file is source_file = os.path.join(dirname, node["filename"]) if "startDate" in node.attrib and "investigationTime" in node.attrib: # Is a time-dependent model - even if rates were originally # poissonian # Verify that the source time span is the same as the TOM time span inv_time = float(node["investigationTime"]) if inv_time != self.tom.time_span: raise ValueError("Source investigation time (%s) is not " "equal to configuration investigation time " "(%s)" % (inv_time, self.tom.time_span)) start_date = datetime.strptime(node["startDate"], "%d/%m/%Y") else: start_date = None return UCERFControl( source_file, node["id"], self.tom.time_span, start_date, float(node["minMag"]), npd=self.convert_npdist(node), hdd=self.convert_hpdist(node), aspect=~node.ruptAspectRatio, upper_seismogenic_depth=~node.pointGeometry.upperSeismoDepth, lower_seismogenic_depth=~node.pointGeometry.lowerSeismoDepth, msr=valid.SCALEREL[~node.magScaleRel](), mesh_spacing=self.rupture_mesh_spacing, trt=node["tectonicRegion"])
SourceConverter.convert_UCERFSource = convert_UCERFSource @util.reader
[docs]def ucerf_classical( rupset_idx, ucerf_source, src_filter, gsims, monitor): """ :param rupset_idx: indices of the rupture sets :param ucerf_source: an object taking the place of a source for UCERF :param src_filter: a source filter returning the sites affected by the source :param gsims: a list of GSIMs :param monitor: a monitor instance :returns: a ProbabilityMap """ t0 = time.time() truncation_level = monitor.oqparam.truncation_level imtls = monitor.oqparam.imtls ucerf_source.src_filter = src_filter # so that .iter_ruptures() work # prefilter the sites close to the rupture set with h5py.File(ucerf_source.control.source_file, "r") as hdf5: mag = hdf5[ucerf_source.idx_set["mag_idx"]][rupset_idx].max() ridx = set() # find the combination of rupture sections used in this model rup_index_key = "/".join( [ucerf_source.idx_set["geol_idx"], "RuptureIndex"]) # determine which of the rupture sections used in this set of indices rup_index = hdf5[rup_index_key] for i in rupset_idx: ridx.update(rup_index[i]) s_sites = ucerf_source.get_rupture_sites(hdf5, ridx, src_filter, mag) if s_sites is None: # return an empty probability map pm = ProbabilityMap(len(imtls.array), len(gsims)) pm.calc_times = [] # TODO: fix .calc_times pm.eff_ruptures = {ucerf_source.src_group_id: 0} pm.grp_id = ucerf_source.src_group_id return pm # compute the ProbabilityMap by using hazardlib.calc.hazard_curve.poe_map ucerf_source.rupset_idx = rupset_idx ucerf_source.num_ruptures = len(rupset_idx) cmaker = ContextMaker(gsims, src_filter.integration_distance) imtls = DictArray(imtls) ctx_mon = monitor('making contexts', measuremem=False) pne_mons = [monitor('%s.get_poes' % gsim, measuremem=False) for gsim in gsims] pmap = poe_map(ucerf_source, s_sites, imtls, cmaker, truncation_level, ctx_mon, pne_mons) nsites = len(s_sites) pmap.calc_times = [(ucerf_source.source_id, nsites, time.time() - t0)] pmap.grp_id = ucerf_source.src_group_id pmap.eff_ruptures = {pmap.grp_id: ucerf_source.num_ruptures} return pmap
[docs]class UcerfPSHACalculator(classical.PSHACalculator): """ UCERF classical calculator. """ core_task = ucerf_classical is_stochastic = False
[docs] def pre_execute(self): """ parse the logic tree and source model input """ logging.warn('%s is still experimental', self.__class__.__name__) self.sitecol = readinput.get_site_collection(self.oqparam) self.csm = get_composite_source_model(self.oqparam) self.rlzs_assoc = self.rup_data = {}
[docs] def execute(self): """ Run in parallel `core_task(sources, sitecol, monitor)`, by parallelizing on the sources according to their weight and tectonic region type. """ monitor = self.monitor(self.core_task.__name__) monitor.oqparam = oq = self.oqparam self.src_filter = SourceFilter(self.sitecol, oq.maximum_distance) acc = AccumDict({ grp_id: ProbabilityMap(len(oq.imtls.array), len(gsims)) for grp_id, gsims in self.rlzs_assoc.gsims_by_grp_id.items()}) acc.calc_times = [] acc.eff_ruptures = AccumDict() # grp_id -> eff_ruptures acc.bb_dict = {} # just for API compatibility for sm in self.csm.source_models: # one branch at the time grp_id = sm.ordinal gsims = self.rlzs_assoc.gsims_by_grp_id[grp_id] [[ucerf_source]] = sm.src_groups ucerf_source.nsites = len(self.sitecol) self.csm.infos[grp_id, ucerf_source.source_id] = source.SourceInfo( ucerf_source)'Getting the background point sources') bckgnd_sources = ucerf_source.get_background_sources( self.src_filter) # since there are two kinds of tasks (background and rupture_set) # we divide the concurrent_tasks parameter by 2; # notice the "or 1" below, to avoid issues when # self.oqparam.concurrent_tasks is 0 or 1 ct2 = (self.oqparam.concurrent_tasks // 2) or 1 # parallelize on the background sources, small tasks args = (bckgnd_sources, self.src_filter, oq.imtls, gsims, self.oqparam.truncation_level, (), monitor) bg_res = parallel.Starmap.apply( pmap_from_grp, args, name='background_sources_%d' % grp_id, concurrent_tasks=ct2).submit_all() # parallelize by rupture subsets rup_sets = numpy.arange(ucerf_source.num_ruptures) taskname = 'ucerf_classical_%d' % grp_id acc = parallel.Starmap.apply( ucerf_classical, (rup_sets, ucerf_source, self.src_filter, gsims, monitor), concurrent_tasks=ct2, name=taskname ).reduce(self.agg_dicts, acc) # compose probabilities from background sources for pmap in bg_res: acc[grp_id] |= pmap with self.monitor('store source_info', autoflush=True): self.store_source_info(self.csm.infos, acc) return acc # {grp_id: pmap}
[docs]class UCERFClassicalCalculator(classical.ClassicalCalculator): pre_calculator = 'ucerf_psha'