Source code for openquake.calculators.conditional_spectrum

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2021 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

"""
Conditional spectrum calculator, inspired by the disaggregation calculator
"""
import logging
import numpy

from openquake.baselib import general, performance
from openquake.commonlib.calc import compute_hazard_maps, get_mean_curve
from openquake.hazardlib.imt import from_string
from openquake.hazardlib.contexts import read_cmakers
from openquake.hazardlib.calc.cond_spectra import get_cs_out, outdict
from openquake.calculators import base

U16 = numpy.uint16
U32 = numpy.uint32


# helper function to be used when saving the spectra as an array
[docs]def to_spectra(outdic, n, p): """ :param outdic: dictionary rlz_id -> array MNOP :param n: site index in the range 0..N-1 :param p: IMLs index in the range 0..P-1 :returns: conditional spectra as an array of shape (R, M, 2) """ R = len(outdic) M = len(outdic[0]) out = numpy.zeros((R, M, 2)) for r in range(R): c = outdic[r] out[r, :, 0] = numpy.exp(c[:, n, 1, p]) # NB: seems wrong! out[r, :, 1] = numpy.sqrt(c[:, n, 2, p]) return out
[docs]@base.calculators.add('conditional_spectrum') class ConditionalSpectrumCalculator(base.HazardCalculator): """ Conditional spectrum calculator, to be used for few sites only """ precalc = 'classical' accept_precalc = ['classical', 'disaggregation']
[docs] def pre_execute(self): raise RuntimeError('Please use a more recent version of the engine ' 'to run conditional spectrum calculations')
[docs] def execute(self): """ Compute the conditional spectrum in a sequential way. NB: since conditional spectra are meant to be computed only for few sites, there is no point in parallelizing: the computation is dominated by the time spent reading the contexts, not by the CPU. """ assert self.N <= self.oqparam.max_sites_disagg, ( self.N, self.oqparam.max_sites_disagg) oq = self.oqparam self.full_lt = self.datastore['full_lt'] self.trts = list(self.full_lt.gsim_lt.values) self.imts = list(oq.imtls) imti = self.imts.index(oq.imt_ref) self.M = M = len(self.imts) dstore = (self.datastore.parent if self.datastore.parent else self.datastore) totrups = len(dstore['rup/mag']) logging.info('Reading {:_d} ruptures'.format(totrups)) rdt = [('grp_id', U32), ('idx', U32)] rdata = numpy.zeros(totrups, rdt) rdata['idx'] = numpy.arange(totrups) rdata['grp_id'] = dstore['rup/grp_id'][:] self.periods = [from_string(imt).period for imt in self.imts] # extract imls from the "mean" hazard map curve = get_mean_curve(self.datastore, oq.imt_ref) # there is 1 site [self.imls] = compute_hazard_maps(curve, oq.imtls[oq.imt_ref], oq.poes) self.P = P = len(self.imls) self.datastore.create_dset( 'cs-rlzs', float, (self.R, M, self.N, 2, self.P)) self.datastore.set_shape_descr( 'cs-rlzs', rlz_id=self.R, period=self.periods, sid=self.N, cs=2, poe_id=P) self.datastore.create_dset('cs-stats', float, (1, M, self.N, 2, P)) self.datastore.set_shape_descr( 'cs-stats', stat='mean', period=self.periods, sid=self.N, cs=['spec', 'std'], poe_id=P) self.cmakers = read_cmakers(self.datastore) # self.datastore.swmr_on() # IMPORTANT!! we rely on the fact that the classical part # of the calculation stores the ruptures in chunks of constant # grp_id, therefore it is possible to build (start, stop) slices out = general.AccumDict() # grp_id => dict # Computing CS for gid, start, stop in performance.idx_start_stop(rdata['grp_id']): cmaker = self.cmakers[gid] cmaker.poes = oq.poes ctxs = cmaker.read_ctxs(dstore, slice(start, stop)) for ctx in ctxs: out += get_cs_out(cmaker, ctx, imti, self.imls) # Apply weights and get two dictionaries with integer keys # (corresponding to the rlz ID) and array values # of shape (M, N, 3, P) where: # M is the number of IMTs # N is the number of sites # 3 is the number of statistical momenta # P is the the number of IMLs outdic, outmean = self._apply_weights(out) # Computing standard deviation for gid, start, stop in performance.idx_start_stop(rdata['grp_id']): cmaker = self.cmakers[gid] cmaker.poes = oq.poes ctxs = cmaker.read_ctxs(dstore, slice(start, stop)) for ctx in ctxs: res = get_cs_out(cmaker, ctx, imti, self.imls, outmean[0]) for g in res: out[g][:, :, 2] += res[g][:, :, 2] # STDDEV return out
[docs] def convert_and_save(self, dsetname, outdic): """ Save the conditional spectra """ for n in range(self.N): for p in range(self.P): # shape (R, M, N, 2, P) self.datastore[dsetname][:, :, n, :, p] = to_spectra( outdic, n, p) # shape (R, M, 2) attrs = dict(imls=self.imls, periods=self.periods) if self.oqparam.poes: attrs['poes'] = self.oqparam.poes self.datastore.set_attrs(dsetname, **attrs)
[docs] def post_execute(self, acc): # apply weights outdic, outmean = self._apply_weights(acc) # convert dictionaries into spectra and save them self.convert_and_save('cs-rlzs', outdic) self.convert_and_save('cs-stats', outmean)
def _apply_weights(self, acc): # build conditional spectra for each realization rlzs_by_g = self.datastore['rlzs_by_g'][()] outdic = outdict(self.M, self.N, self.P, 0, self.R) for _g, rlzs in enumerate(rlzs_by_g): for r in rlzs: outdic[r] += acc[_g] self.convert_and_save('cs-rlzs', outdic) # build final conditional mean and std weights = self.datastore['weights'][:] outmean = outdict(self.M, self.N, self.P, 0, 1) # dict with key 0 for r, weight in enumerate(weights): outmean[0] += outdic[r] * weight return outdic, outmean