Source code for openquake.calculators.ucerf_classical

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2017 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import os
import time
import logging
from datetime import datetime
import numpy

from openquake.baselib.general import DictArray, AccumDict
from openquake.baselib import parallel
from openquake.hazardlib.probability_map import ProbabilityMap
from openquake.hazardlib.calc.hazard_curve import classical
from openquake.hazardlib.calc.filters import SourceFilter
from openquake.hazardlib.gsim.base import ContextMaker
from openquake.hazardlib import valid
from openquake.commonlib import source, readinput, util
from openquake.hazardlib.sourceconverter import SourceConverter

from openquake.calculators import base
from openquake.calculators.classical import ClassicalCalculator, PSHACalculator
from openquake.calculators.ucerf_event_based import (
    UCERFSource, get_composite_source_model)
# FIXME: the counting of effective ruptures has to be revised


[docs]def convert_UCERFSource(self, node): """ Converts the Ucerf Source node into an SES Control object """ dirname = os.path.dirname(self.fname) # where the source_model_file is source_file = os.path.join(dirname, node["filename"]) if "startDate" in node.attrib and "investigationTime" in node.attrib: # Is a time-dependent model - even if rates were originally # poissonian # Verify that the source time span is the same as the TOM time span inv_time = float(node["investigationTime"]) if inv_time != self.tom.time_span: raise ValueError("Source investigation time (%s) is not " "equal to configuration investigation time " "(%s)" % (inv_time, self.tom.time_span)) start_date = datetime.strptime(node["startDate"], "%d/%m/%Y") else: start_date = None return UCERFSource( source_file, node["id"], self.tom.time_span, start_date, float(node["minMag"]), npd=self.convert_npdist(node), hdd=self.convert_hpdist(node), aspect=~node.ruptAspectRatio, upper_seismogenic_depth=~node.pointGeometry.upperSeismoDepth, lower_seismogenic_depth=~node.pointGeometry.lowerSeismoDepth, msr=valid.SCALEREL[~node.magScaleRel](), mesh_spacing=self.rupture_mesh_spacing, trt=node["tectonicRegion"])
SourceConverter.convert_UCERFSource = convert_UCERFSource
[docs]@util.reader def ucerf_classical(rupset_idx, ucerf_source, src_filter, gsims, monitor): """ :param rupset_idx: indices of the rupture sets :param ucerf_source: an object taking the place of a source for UCERF :param src_filter: a source filter returning the sites affected by the source :param gsims: a list of GSIMs :param monitor: a monitor instance :returns: a ProbabilityMap """ t0 = time.time() truncation_level = monitor.oqparam.truncation_level imtls = monitor.oqparam.imtls ucerf_source.src_filter = src_filter # so that .iter_ruptures() work grp_id = ucerf_source.src_group_id mag = ucerf_source.mags[rupset_idx].max() ridx = set() for idx in rupset_idx: ridx.update(ucerf_source.get_ridx(idx)) ucerf_source.rupset_idx = rupset_idx ucerf_source.num_ruptures = nruptures = len(rupset_idx) # prefilter the sites close to the rupture set s_sites = ucerf_source.get_rupture_sites(ridx, src_filter, mag) if s_sites is None: # return an empty probability map pm = ProbabilityMap(len(imtls.array), len(gsims)) acc = AccumDict({grp_id: pm}) acc.calc_times = { ucerf_source.source_id: numpy.array([nruptures, 0, time.time() - t0, 1])} acc.eff_ruptures = {grp_id: 0} return acc # compute the ProbabilityMap cmaker = ContextMaker(gsims, src_filter.integration_distance) imtls = DictArray(imtls) ctx_mon = monitor('make_contexts', measuremem=False) poe_mon = monitor('get_poes', measuremem=False) pmap = cmaker.poe_map(ucerf_source, s_sites, imtls, truncation_level, ctx_mon, poe_mon) nsites = len(s_sites) acc = AccumDict({grp_id: pmap}) acc.calc_times = { ucerf_source.source_id: numpy.array([nruptures * nsites, nsites, time.time() - t0, 1])} acc.eff_ruptures = {grp_id: ucerf_source.num_ruptures} return acc
[docs]@base.calculators.add('ucerf_psha') class UcerfPSHACalculator(PSHACalculator): """ UCERF classical calculator. """ core_task = ucerf_classical is_stochastic = False
[docs] def pre_execute(self): """ parse the logic tree and source model input """ logging.warn('%s is still experimental', self.__class__.__name__) self.sitecol = readinput.get_site_collection(self.oqparam) self.csm = get_composite_source_model(self.oqparam) self.gsims_by_grp = {grp.id: self.csm.info.get_gsims(grp.id) for sm in self.csm.source_models for grp in sm.src_groups} self.rup_data = {}
[docs] def execute(self): """ Run in parallel `core_task(sources, sitecol, monitor)`, by parallelizing on the sources according to their weight and tectonic region type. """ monitor = self.monitor(self.core_task.__name__) monitor.oqparam = oq = self.oqparam self.src_filter = SourceFilter(self.sitecol, oq.maximum_distance) self.nsites = [] acc = AccumDict({ grp_id: ProbabilityMap(len(oq.imtls.array), len(gsims)) for grp_id, gsims in self.gsims_by_grp.items()}) acc.calc_times = {} acc.eff_ruptures = AccumDict() # grp_id -> eff_ruptures acc.bb_dict = {} # just for API compatibility param = dict(imtls=oq.imtls, truncation_level=oq.truncation_level) for sm in self.csm.source_models: # one branch at the time grp_id = sm.ordinal gsims = self.gsims_by_grp[grp_id] [[ucerf_source]] = sm.src_groups ucerf_source.nsites = len(self.sitecol) self.csm.infos[ucerf_source.source_id] = source.SourceInfo( ucerf_source) logging.info('Getting the background point sources') bckgnd_sources = ucerf_source.get_background_sources( self.src_filter) # since there are two kinds of tasks (background and rupture_set) # we divide the concurrent_tasks parameter by 2; # notice the "or 1" below, to avoid issues when # self.oqparam.concurrent_tasks is 0 or 1 ct2 = (self.oqparam.concurrent_tasks // 2) or 1 # parallelize on the background sources, small tasks args = (bckgnd_sources, self.src_filter, gsims, param, monitor) bg_res = parallel.Starmap.apply( classical, args, name='background_sources_%d' % grp_id, concurrent_tasks=ct2) # parallelize by rupture subsets rup_sets = numpy.arange(ucerf_source.num_ruptures) taskname = 'ucerf_classical_%d' % grp_id acc = parallel.Starmap.apply( ucerf_classical, (rup_sets, ucerf_source, self.src_filter, gsims, monitor), concurrent_tasks=ct2, name=taskname ).reduce(self.agg_dicts, acc) # compose probabilities from background sources for pmap in bg_res: acc[grp_id] |= pmap[grp_id] with self.monitor('store source_info', autoflush=True): self.store_source_info(self.csm.infos, acc) return acc # {grp_id: pmap}
[docs]@base.calculators.add('ucerf_classical') class UCERFClassicalCalculator(ClassicalCalculator): pre_calculator = 'ucerf_psha'