Source code for openquake.hazardlib.contexts

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2018-2022 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake.  If not, see <http://www.gnu.org/licenses/>.

import re
import abc
import copy
import time
import warnings
import itertools
import collections
from unittest.mock import patch
import numpy
import shapely
from scipy.interpolate import interp1d

from openquake.baselib.general import (
    AccumDict, DictArray, RecordBuilder, gen_slices, kmean)
from openquake.baselib.performance import Monitor, split_array, compile, numba
from openquake.baselib.python3compat import decode
from openquake.hazardlib import valid, imt as imt_module
from openquake.hazardlib.const import StdDev
from openquake.hazardlib.tom import (
    registry, get_pnes, FatedTOM, NegativeBinomialTOM)
from openquake.hazardlib.site import site_param_dt
from openquake.hazardlib.stats import _truncnorm_sf
from openquake.hazardlib.calc.filters import (
    SourceFilter, IntegrationDistance, magdepdist, get_distances, getdefault,
    MINMAG, MAXMAG)
from openquake.hazardlib.probability_map import ProbabilityMap
from openquake.hazardlib.geo.surface.planar import (
    project, project_back, get_distances_planar)

U32 = numpy.uint32
F64 = numpy.float64
TWO20 = 2**20  # used when collapsing
TWO16 = 2**16
TWO24 = 2**24
TWO32 = 2**32
STD_TYPES = (StdDev.TOTAL, StdDev.INTER_EVENT, StdDev.INTRA_EVENT)
MAX_MB = 512
KNOWN_DISTANCES = frozenset(
    'rrup rx ry0 rjb rhypo repi rcdpp azimuth azimuth_cp rvolc closest_point'
    .split())
# the following is used in the collapse method
IGNORE_PARAMS = {'mag', 'rrup', 'vs30', 'occurrence_rate', 'sids', 'mdvbin'}

# These coordinates were provided by M Gerstenberger (personal
# communication, 10 August 2018)
cshm_polygon = shapely.geometry.Polygon([(171.6, -43.3), (173.2, -43.3),
                                         (173.2, -43.9), (171.6, -43.9)])


[docs]def is_modifiable(gsim): """ :returns: True if it is a ModifiableGMPE """ return hasattr(gsim, 'gmpe') and hasattr(gsim, 'params')
[docs]def concat(ctxs): """ Concatenate context arrays. :returns: list with 0, 1 or 2 elements """ out, poisson, nonpoisson, nonparam = [], [], [], [] for ctx in ctxs: if numpy.isnan(ctx.occurrence_rate).all(): nonparam.append(ctx) # If ctx has probs_occur and occur_rate is parametric non-poisson elif hasattr(ctx, 'probs_occur') and ctx.probs_occur.shape[1] >= 1: nonpoisson.append(ctx) else: poisson.append(ctx) if poisson: out.append(numpy.concatenate(poisson).view(numpy.recarray)) if nonpoisson: # Ctxs with the same shape of prob_occur are concatenated # and different shape sets are appended separately for shp in set(ctx.probs_occur.shape[1] for ctx in nonpoisson): p_array = [p for p in nonpoisson if p.probs_occur.shape[1] == shp] out.append(numpy.concatenate(p_array).view(numpy.recarray)) if nonparam: out.append(numpy.concatenate(nonparam).view(numpy.recarray)) return out
[docs]def get_maxsize(M, G): """ :returns: an integer N such that arrays N*M*G fit in the CPU cache """ maxs = TWO20 // (16*M*G) assert maxs > 1, maxs return maxs
# numbified below
[docs]def update_pmap_n(dic, poes, rates, probs_occur, sids, itime): for poe, rate, probs, sid in zip(poes, rates, probs_occur, sids): dic[sid] *= get_pnes(rate, probs, poe, itime)
# numbified below
[docs]def update_pmap_c(dic, poes, rates, probs_occur, allsids, sizes, itime): start = 0 for poe, rate, probs, size in zip(poes, rates, probs_occur, sizes): pne = get_pnes(rate, probs, poe, itime) for sid in allsids[start:start + size]: dic[sid] *= pne start += size
if numba: t = numba.types sig = t.void(t.DictType(t.uint32, t.float64[:, :]), # dic t.float64[:, :, :], # poes t.float64[:], # rates t.float64[:, :], # probs_occur t.uint32[:], # sids t.float64) # itime update_pmap_n = compile(sig)(update_pmap_n) sig = t.void(t.DictType(t.uint32, t.float64[:, :]), # dic t.float64[:, :, :], # poes t.float64[:], # rates t.float64[:, :], # probs_occur t.uint32[:], # allsids t.uint32[:], # sizes t.float64) # itime update_pmap_c = compile(sig)(update_pmap_c)
[docs]def size(imtls): """ :returns: size of the dictionary of arrays imtls """ imls = imtls[next(iter(imtls))] return len(imls) * len(imtls)
[docs]def trivial(ctx, name): """ :param ctx: a recarray :param name: name of a parameter :returns: True if the parameter is missing or single valued """ if name not in ctx.dtype.names: return True return len(numpy.unique(numpy.float32(ctx[name]))) == 1
[docs]def expand_mdvbin(mdvbin): """ :returns: a triple of integers (magbin, distbin, vs30bin) """ magbin, rest = numpy.divmod(mdvbin, TWO24) distbin, vs30bin = numpy.divmod(rest, TWO16) return magbin, distbin, vs30bin
[docs]def sqrscale(x_min, x_max, n): """ :param x_min: minumum value :param x_max: maximum value :param n: number of steps :returns: an array of n values from x_min to x_max in a quadratic scale """ if not (isinstance(n, int) and n > 0): raise ValueError('n must be a positive integer, got %s' % n) if x_min < 0: raise ValueError('x_min must be positive, got %s' % x_min) if x_max <= x_min: raise ValueError('x_max (%s) must be bigger than x_min (%s)' % (x_max, x_min)) delta = numpy.sqrt(x_max - x_min) / (n - 1) return x_min + (delta * numpy.arange(n))**2
[docs]class Collapser(object): """ Class managing the collapsing logic. """ mag_bins = numpy.linspace(MINMAG, MAXMAG, 256) dist_bins = sqrscale(1, 600, 255) vs30_bins = numpy.linspace(0, 32767, 65536) def __init__(self, collapse_level, dist_types, has_vs30=False): self.collapse_level = collapse_level self.dist_types = dist_types self.has_vs30 = has_vs30 self.cfactor = numpy.zeros(2) self.npartial = 0 self.nfull = 0
[docs] def calc_mdvbin(self, ctx): """ :param ctx: a RuptureContext or a context array :return: an array of dtype numpy.uint32 """ dist = numpy.mean([getattr(ctx, dt) for dt in self.dist_types], axis=0) magbin = numpy.searchsorted(self.mag_bins, ctx.mag) distbin = numpy.searchsorted(self.dist_bins, dist) if self.has_vs30: vs30bin = numpy.searchsorted(self.vs30_bins, ctx.vs30) return magbin * TWO24 + distbin * TWO16 + vs30bin else: # in test_collapse_area return magbin * TWO24 + distbin * TWO16
[docs] def expand(self, mdvbin): """ :returns: mag, dist and vs30 corresponding to mdvbin """ mbin, dbin, vbin = expand_mdvbin(mdvbin) return self.mag_bins[mbin], self.dist_bins[dbin], self.vs30bins[vbin]
[docs] def collapse(self, ctx, rup_indep, collapse_level=None): """ Collapse a context recarray if possible. :param ctx: a recarray with fields "mdvbin" and "sids" :param rup_indep: False if the ruptures are mutually exclusive :param collapse_level: if None, use .collapse_level :returns: the collapsed array and a list of arrays with site IDs """ ctx['mdvbin'] = self.calc_mdvbin(ctx) clevel = (collapse_level if collapse_level is not None else self.collapse_level) if not rup_indep or clevel < 0: # no collapse self.cfactor[0] += len(numpy.unique(ctx.mdvbin)) self.cfactor[1] += len(ctx) return ctx, ctx.sids.reshape(-1, 1) # names are mag, rake, vs30, rjb, mdvbin, sids, ... relevant = set(ctx.dtype.names) - IGNORE_PARAMS if all(trivial(ctx, param) for param in relevant): # collapse all far = ctx close = numpy.zeros(0, ctx.dtype) self.nfull += 1 else: # collapse far away ruptures dst = ctx.mag * 10 * self.collapse_level far = ctx[ctx.rrup >= dst] close = ctx[ctx.rrup < dst] self.npartial += 1 C = len(close) if len(far): uic = numpy.unique( # this is fast far['mdvbin'], return_inverse=True, return_counts=True) mean = kmean(far, 'mdvbin', uic) else: mean = numpy.zeros(0, ctx.dtype) self.cfactor[0] += len(close) + len(mean) self.cfactor[1] += len(ctx) out = numpy.zeros(len(close) + len(mean), ctx.dtype) out[:C] = close out[C:] = mean allsids = [[sid] for sid in close['sids']] if len(far): # this is slow allsids.extend(split_array(far['sids'], uic[1], uic[2])) # print(len(out), len(ctx)) return out.view(numpy.recarray), allsids
[docs]class FarAwayRupture(Exception): """Raised if the rupture is outside the maximum distance for all sites"""
[docs]def basename(src): """ :returns: the base name of a split source """ src_id = src if isinstance(src, str) else src.source_id splits = re.split('[.:]', src_id, 1) if len(splits) == 2 and ';' in splits[1]: return splits[0] + ';' + splits[1].split(';')[1] return splits[0]
[docs]def get_num_distances(gsims): """ :returns: the number of distances required for the given GSIMs """ dists = set() for gsim in gsims: dists.update(gsim.REQUIRES_DISTANCES) return len(dists)
[docs]def csdict(M, N, P, start, stop): """ :param M: number of IMTs :param N: number of sites :param P: number of IMLs :param start: index :param stop: index > start """ ddic = {} for _g in range(start, stop): ddic[_g] = AccumDict({'_c': numpy.zeros((M, N, 2, P)), '_s': numpy.zeros((N, P))}) return ddic
def _interp(param, name, trt): try: mdd = param[name] except KeyError: return magdepdist([(MINMAG, 1000), (MAXMAG, 1000)]) if isinstance(mdd, IntegrationDistance): return mdd(trt) elif isinstance(mdd, dict): return magdepdist(getdefault(mdd, trt)) return mdd
[docs]class ContextMaker(object): """ A class to manage the creation of contexts and to compute mean/stddevs and possibly PoEs. :param trt: tectonic region type string :param gsims: list of GSIMs or a dictionary gsim -> rlz indices :param oq: dictionary of parameters like the maximum_distance, the IMTLs, the investigation time, etc, or an OqParam instance :param extraparams: additional site parameters to consider, used only in the tests NB: the trt can be different from the tectonic region type for which the underlying GSIMs are defined. This is intentional. """ REQUIRES = ['DISTANCES', 'SITES_PARAMETERS', 'RUPTURE_PARAMETERS'] fewsites = False rup_indep = True tom = None def __init__(self, trt, gsims, oq, monitor=Monitor(), extraparams=()): if isinstance(oq, dict): param = oq self.mags = param.get('mags', ()) self.cross_correl = param.get('cross_correl') # cond_spectra_test else: # OqParam param = vars(oq) param['split_sources'] = oq.split_sources param['min_iml'] = oq.min_iml param['reqv'] = oq.get_reqv() param['af'] = getattr(oq, 'af', None) self.cross_correl = oq.cross_correl self.imtls = oq.imtls try: self.mags = oq.mags_by_trt[trt] except AttributeError: self.mags = () except KeyError: # missing TRT but there is only one [(_, self.mags)] = oq.mags_by_trt.items() if 'imtls' in param: for imt in param['imtls']: if not isinstance(imt, str): raise TypeError('Expected string, got %s' % type(imt)) self.imtls = param['imtls'] elif 'hazard_imtls' in param: self.imtls = DictArray(param['hazard_imtls']) elif not hasattr(self, 'imtls'): raise KeyError('Missing imtls in ContextMaker!') self.cache_distances = param.get('cache_distances', False) if self.cache_distances: # use a cache (surface ID, dist_type) for MultiFaultSources self.dcache = AccumDict() self.dcache.hit = 0 else: self.dcache = None # disabled self.af = param.get('af') self.max_sites_disagg = param.get('max_sites_disagg', 10) self.max_sites_per_tile = param.get('max_sites_per_tile', 50_000) self.time_per_task = param.get('time_per_task', 60) self.disagg_by_src = param.get('disagg_by_src') self.collapse_level = int(param.get('collapse_level', -1)) self.disagg_by_src = param.get('disagg_by_src', False) self.trt = trt self.gsims = gsims for gsim in gsims: if hasattr(gsim, 'set_tables'): if not self.mags and not is_modifiable(gsim): raise ValueError( 'You must supply a list of magnitudes as 2-digit ' 'strings, like mags=["6.00", "6.10", "6.20"]') gsim.set_tables(self.mags, self.imtls) self.maximum_distance = _interp(param, 'maximum_distance', trt) if 'pointsource_distance' not in param: self.pointsource_distance = 1000. else: self.pointsource_distance = getdefault( param['pointsource_distance'], trt) self.minimum_distance = param.get('minimum_distance', 0) self.investigation_time = param.get('investigation_time') if self.investigation_time: self.tom = registry['PoissonTOM'](self.investigation_time) self.ses_seed = param.get('ses_seed', 42) self.ses_per_logic_tree_path = param.get('ses_per_logic_tree_path', 1) self.truncation_level = param.get('truncation_level', 99.) self.num_epsilon_bins = param.get('num_epsilon_bins', 1) self.ps_grid_spacing = param.get('ps_grid_spacing') self.split_sources = param.get('split_sources') self.effect = param.get('effect') for req in self.REQUIRES: reqset = set() for gsim in gsims: reqset.update(getattr(gsim, 'REQUIRES_' + req)) if self.af and req == 'SITES_PARAMETERS': reqset.add('ampcode') if (is_modifiable(gsim) and req == 'SITES_PARAMETERS' and 'apply_swiss_amplification' in gsim.params): reqset.add('amplfactor') setattr(self, 'REQUIRES_' + req, reqset) try: self.min_iml = param['min_iml'] except KeyError: self.min_iml = [0. for imt in self.imtls] self.reqv = param.get('reqv') if self.reqv is not None: self.REQUIRES_DISTANCES.add('repi') # NB: REQUIRES_DISTANCES is empty when gsims = [FromFile] REQUIRES_DISTANCES = self.REQUIRES_DISTANCES | {'rrup'} reqs = (sorted(self.REQUIRES_RUPTURE_PARAMETERS) + sorted(self.REQUIRES_SITES_PARAMETERS | set(extraparams)) + sorted(REQUIRES_DISTANCES)) dic = {} for req in reqs: if req in site_param_dt: dt = site_param_dt[req] if isinstance(dt, tuple): # (string_, size) dic[req] = b'X' * dt[1] else: dic[req] = dt(0) else: dic[req] = 0. dic['src_id'] = U32(0) dic['rup_id'] = U32(0) dic['mdvbin'] = U32(0) # velocity-magnitude-distance bin dic['sids'] = U32(0) dic['rrup'] = numpy.float64(0) dic['weight'] = numpy.float64(0) dic['occurrence_rate'] = numpy.float64(0) self.defaultdict = dic self.collapser = Collapser( self.collapse_level, REQUIRES_DISTANCES, 'vs30' in dic) self.loglevels = DictArray(self.imtls) if self.imtls else {} self.shift_hypo = param.get('shift_hypo') with warnings.catch_warnings(): # avoid RuntimeWarning: divide by zero encountered in log warnings.simplefilter("ignore") for imt, imls in self.imtls.items(): if imt != 'MMI': self.loglevels[imt] = numpy.log(imls) self.init_monitoring(monitor)
[docs] def init_monitoring(self, monitor): # instantiating child monitors, may be called in the workers self.ctx_mon = monitor('make_contexts', measuremem=False) self.col_mon = monitor('collapsing contexts', measuremem=False) self.gmf_mon = monitor('computing mean_std', measuremem=False) self.poe_mon = monitor('get_poes', measuremem=False) self.pne_mon = monitor('composing pnes', measuremem=False) self.ir_mon = monitor('iter_ruptures', measuremem=False) self.task_no = getattr(monitor, 'task_no', 0) self.out_no = getattr(monitor, 'out_no', self.task_no)
[docs] def dcache_size(self): """ :returns: the size in bytes of the distance cache """ if not self.dcache: return 0 nbytes = 0 for arr in self.dcache.values(): if isinstance(arr, numpy.ndarray): nbytes += arr.nbytes return nbytes
[docs] def read_ctxs(self, dstore, slc=None, magi=None): """ :param dstore: a DataStore instance :param slice: a slice of contexts with the same grp_id :returns: a list of contexts """ self.src_mutex, self.rup_mutex = dstore['mutex_by_grp'][self.grp_id] sitecol = dstore['sitecol'].complete.array if slc is None: slc = dstore['rup/grp_id'][:] == self.grp_id params = {n: dstore['rup/' + n][slc] for n in dstore['rup']} dtlist = [] for par, val in params.items(): if len(val) == 0: return [] elif par == 'probs_occur': item = (par, object) elif par == 'occurrence_rate': item = (par, F64) else: item = (par, val[0].dtype) dtlist.append(item) for par in sitecol.dtype.names: if par != 'sids': dtlist.append((par, sitecol.dtype[par])) if magi is not None: dtlist.append(('magi', numpy.uint8)) ctx = numpy.zeros(len(params['grp_id']), dtlist).view(numpy.recarray) for par, val in params.items(): ctx[par] = val for par in sitecol.dtype.names: if par != 'sids': ctx[par] = sitecol[par][ctx['sids']] if magi is not None: ctx['magi'] = magi # NB: sorting the contexts break the disaggregation! (see case_1) # split parametric vs nonparametric contexts nans = numpy.isnan(ctx.occurrence_rate) if nans.sum() in (0, len(ctx)): # no nans or all nans ctxs = [ctx] else: # happens in the oq-risk-tests for NZ ctxs = [ctx[nans], ctx[~nans]] return ctxs
[docs] def new_ctx(self, size): """ :returns: a recarray of the given size full of zeros """ return RecordBuilder(**self.defaultdict).zeros(size)
[docs] def recarray(self, ctxs, magi=None): """ :params ctxs: a non-empty list of homogeneous contexts :returns: a recarray, possibly collapsed """ assert ctxs dd = self.defaultdict.copy() if magi is not None: # magnitude bin used in disaggregation dd['magi'] = numpy.uint8(0) if not hasattr(ctxs[0], 'probs_occur'): for ctx in ctxs: ctx.probs_occur = numpy.zeros(0) np = 0 else: shps = [ctx.probs_occur.shape for ctx in ctxs] np = max(i[1] if len(i) > 1 else i[0] for i in shps) dd['probs_occur'] = numpy.zeros(np) if self.fewsites: # must be at the end dd['clon'] = numpy.float64(0.) dd['clat'] = numpy.float64(0.) C = sum(len(ctx) for ctx in ctxs) ra = RecordBuilder(**dd).zeros(C) start = 0 for ctx in ctxs: ctx = ctx.roundup(self.minimum_distance) slc = slice(start, start + len(ctx)) for par in dd: if par == 'rup_id': val = getattr(ctx, par) elif par == 'magi': # in disaggregation val = magi elif par == 'mdvbin': val = 0 # overridden later elif par == 'weight': val = getattr(ctx, par, 0.) else: val = getattr(ctx, par, numpy.nan) getattr(ra, par)[slc] = val ra.sids[slc] = ctx.sids start = slc.stop return ra
[docs] def get_ctx_params(self): """ :returns: the interesting attributes of the context """ params = {'occurrence_rate', 'sids', 'src_id', 'probs_occur', 'clon', 'clat', 'rrup'} params.update(self.REQUIRES_RUPTURE_PARAMETERS) for dparam in self.REQUIRES_DISTANCES: params.add(dparam + '_') return params
[docs] def from_srcs(self, srcs, sitecol): # used in disagg.disaggregation """ :param srcs: a list of Source objects :param sitecol: a SiteCollection instance :returns: a list of context arrays """ ctxs = [] srcfilter = SourceFilter(sitecol, self.maximum_distance) for i, src in enumerate(srcs): src.id = i sites = srcfilter.get_close_sites(src) if sites is not None: ctxs.extend(self.get_ctxs(src, sites)) return concat(ctxs)
[docs] def make_rctx(self, rup): """ Add .REQUIRES_RUPTURE_PARAMETERS to the rupture """ ctx = RuptureContext() vars(ctx).update(vars(rup)) for param in self.REQUIRES_RUPTURE_PARAMETERS: if param == 'mag': value = numpy.round(rup.mag, 6) elif param == 'strike': value = rup.surface.get_strike() elif param == 'dip': value = rup.surface.get_dip() elif param == 'rake': value = rup.rake elif param == 'ztor': value = rup.surface.get_top_edge_depth() elif param == 'hypo_lon': value = rup.hypocenter.longitude elif param == 'hypo_lat': value = rup.hypocenter.latitude elif param == 'hypo_depth': value = rup.hypocenter.depth elif param == 'width': value = rup.surface.get_width() elif param == 'in_cshm': # used in McVerry and Bradley GMPEs if rup.surface: lons = rup.surface.mesh.lons.flatten() lats = rup.surface.mesh.lats.flatten() points_in_polygon = ( shapely.geometry.Point(lon, lat).within(cshm_polygon) for lon, lat in zip(lons, lats)) value = any(points_in_polygon) else: value = False elif param == 'zbot': # needed for width estimation in CampbellBozorgnia2014 if rup.surface: value = rup.surface.mesh.depths.max() else: value = rup.hypocenter.depth else: raise ValueError('%s requires unknown rupture parameter %r' % (type(self).__name__, param)) setattr(ctx, param, value) if not hasattr(ctx, 'occurrence_rate'): ctx.occurrence_rate = numpy.nan if hasattr(ctx, 'temporal_occurrence_model'): if isinstance(ctx.temporal_occurrence_model, NegativeBinomialTOM): ctx.probs_occur = ctx.temporal_occurrence_model.get_pmf( ctx.occurrence_rate) return ctx
[docs] def get_ctx(self, rup, sites, distances=None): """ :returns: a RuptureContext (or None if filtered away) """ with self.ctx_mon: ctx = self.make_rctx(rup) for name in sites.array.dtype.names: setattr(ctx, name, sites[name]) if distances is None: distances = rup.surface.get_min_distance(sites.mesh) ctx.rrup = distances ctx.sites = sites for param in self.REQUIRES_DISTANCES - {'rrup'}: dists = get_distances(rup, sites, param, self.dcache) setattr(ctx, param, dists) # Equivalent distances reqv_obj = (self.reqv.get(self.trt) if self.reqv else None) if reqv_obj and not rup.surface: # PointRuptures have no surface reqv = reqv_obj.get(ctx.repi, rup.mag) if 'rjb' in self.REQUIRES_DISTANCES: ctx.rjb = reqv if 'rrup' in self.REQUIRES_DISTANCES: ctx.rrup = numpy.sqrt(reqv**2 + rup.hypocenter.depth**2) return ctx
def _get_ctx_planar(self, mag, planar, sites, src_id, start_stop, tom): with self.ctx_mon: # computing distances rrup, xx, yy = project(planar, sites.xyz) # (3, U, N) if self.fewsites: # get the closest points on the surface closest = project_back(planar, xx, yy) # (3, U, N) dists = {'rrup': rrup} for par in self.REQUIRES_DISTANCES - {'rrup'}: dists[par] = get_distances_planar(planar, sites, par) for par in dists: dst = dists[par] if self.minimum_distance: dst[dst < self.minimum_distance] = self.minimum_distance # building contexts; ctx has shape (U, N), ctxt (N, U) ctx = self.build_ctx((len(planar), len(sites))) ctxt = ctx.T # smart trick taking advantage of numpy magic ctxt['src_id'] = src_id if self.fewsites: # the loop below is a bit slow for u, rup_id in enumerate(range(*start_stop)): ctx[u]['rup_id'] = rup_id # setting rupture parameters for par in self.ruptparams: if par == 'mag': ctxt[par] = mag elif par == 'occurrence_rate': ctxt[par] = planar.wlr[:, 2] # shape U-> (N, U) elif par == 'width': ctxt[par] = planar.wlr[:, 0] elif par == 'strike': ctxt[par] = planar.sdr[:, 0] elif par == 'dip': ctxt[par] = planar.sdr[:, 1] elif par == 'rake': ctxt[par] = planar.sdr[:, 2] elif par == 'ztor': # top edge depth ctxt[par] = planar.corners[:, 2, 0] elif par == 'zbot': # bottom edge depth ctxt[par] = planar.corners[:, 2, 3] elif par == 'hypo_lon': ctxt[par] = planar.hypo[:, 0] elif par == 'hypo_lat': ctxt[par] = planar.hypo[:, 1] elif par == 'hypo_depth': ctxt[par] = planar.hypo[:, 2] # setting distance parameters for par in dists: ctx[par] = dists[par] if self.fewsites: ctx['clon'] = closest[0] ctx['clat'] = closest[1] # setting site parameters for par in self.siteparams: ctx[par] = sites.array[par] # shape N-> (U, N) if hasattr(tom, 'get_pmf'): # NegativeBinomialTOM # read Probability Mass Function from model and reshape it # into predetermined shape of probs_occur pmf = tom.get_pmf(planar.wlr[:, 2], n_max=ctx['probs_occur'].shape[2]) ctx['probs_occur'] = pmf[:, numpy.newaxis, :] return ctx
[docs] def get_ctxs_planar(self, src, sitecol): """ :param src: a (Collapsed)PointSource :param sitecol: a filtered SiteCollection :returns: a list with 0 or 1 context array """ dd = self.defaultdict.copy() tom = src.temporal_occurrence_model if isinstance(tom, NegativeBinomialTOM): if hasattr(src, 'pointsources'): # CollapsedPointSource maxrate = max(max(ps.mfd.occurrence_rates) for ps in src.pointsources) else: # regular source maxrate = max(src.mfd.occurrence_rates) p_size = tom.get_pmf(maxrate).shape[1] dd['probs_occur'] = numpy.zeros(p_size) else: dd['probs_occur'] = numpy.zeros(0) if self.fewsites: dd['clon'] = numpy.float64(0.) dd['clat'] = numpy.float64(0.) self.build_ctx = RecordBuilder(**dd).zeros self.siteparams = [par for par in sitecol.array.dtype.names if par in dd] self.ruptparams = ( self.REQUIRES_RUPTURE_PARAMETERS | {'occurrence_rate'}) with self.ir_mon: # building planar geometries planardict = src.get_planar(self.shift_hypo) magdist = {mag: self.maximum_distance(mag) for mag, rate in src.get_annual_occurrence_rates()} maxmag = max(magdist) ctxs = [] max_radius = src.max_radius() cdist = sitecol.get_cdist(src.location) mask = cdist <= magdist[maxmag] + max_radius sitecol = sitecol.filter(mask) if sitecol is None: return [] for magi, mag, planarlist, sites in self._quartets( src, sitecol, cdist[mask], planardict): if not planarlist: continue elif len(planarlist) > 1: # when using ps_grid_spacing pla = numpy.concatenate(planarlist).view(numpy.recarray) else: pla = planarlist[0] offset = src.offset + magi * len(pla) start_stop = offset, offset + len(pla) ctx = self._get_ctx_planar( mag, pla, sites, src.id, start_stop, tom).flatten() ctxt = ctx[ctx.rrup < magdist[mag]] if len(ctxt): ctxs.append(ctxt) return concat(ctxs)
def _quartets(self, src, sitecol, cdist, planardict): # splitting by magnitude quartets = [] if src.count_nphc() == 1: # one rupture per magnitude for m, (mag, pla) in enumerate(planardict.items()): quartets.append((m, mag, pla, sitecol)) else: for m, rup in enumerate(src.iruptures()): mag = rup.mag arr = [rup.surface.array.reshape(-1, 3)] pla = planardict[mag] psdist = (self.pointsource_distance + src.ps_grid_spacing + src.radius[m]) close = sitecol.filter(cdist <= psdist) far = sitecol.filter(cdist > psdist) if self.fewsites: if close is None: # all is far, common for small mag quartets.append((m, mag, arr, sitecol)) else: # something is close quartets.append((m, mag, pla, sitecol)) else: # many sites if close is None: # all is far quartets.append((m, mag, arr, far)) elif far is None: # all is close quartets.append((m, mag, pla, close)) else: # some sites are far, some are close quartets.append((m, mag, arr, far)) quartets.append((m, mag, pla, close)) return quartets
[docs] def get_ctxs(self, src, sitecol, src_id=0, step=1): """ :param src: a source object (already split) or a list of ruptures :param sitecol: a (filtered) SiteCollection :param src_id: integer source ID used where src is actually a list :param step: > 1 only in preclassical :returns: fat RuptureContexts sorted by mag """ self.fewsites = len(sitecol.complete) <= self.max_sites_disagg ctxs = [] if getattr(src, 'location', None) and step == 1: return self.get_ctxs_planar(src, sitecol) elif hasattr(src, 'source_id'): # other source with self.ir_mon: allrups = numpy.array(list(src.iter_ruptures( shift_hypo=self.shift_hypo, step=step))) for i, rup in enumerate(allrups): rup.rup_id = src.offset + i self.num_rups = len(allrups) # sorted by mag by construction u32mags = U32([rup.mag * 100 for rup in allrups]) rups_sites = [(rups, sitecol) for rups in split_array(allrups, u32mags)] src_id = src.id else: # in event based we get a list with a single rupture rups_sites = [(src, sitecol)] src_id = 0 for rups, sites in rups_sites: # ruptures with the same magnitude if len(rups) == 0: # may happen in case of min_mag/max_mag continue magdist = self.maximum_distance(rups[0].mag) dists = [get_distances(rup, sites, 'rrup', self.dcache) for rup in rups] for u, rup in enumerate(rups): mask = dists[u] <= magdist if mask.any(): r_sites = sites.filter(mask) ctx = self.get_ctx(rup, r_sites, dists[u][mask]) ctx.src_id = src_id ctx.rup_id = rup.rup_id ctxs.append(ctx) if self.fewsites: c = rup.surface.get_closest_points(sites.complete) ctx.clon = c.lons[ctx.sids] ctx.clat = c.lats[ctx.sids] return [] if not ctxs else [self.recarray(ctxs)]
[docs] def max_intensity(self, sitecol1, mags, dists): """ :param sitecol1: a SiteCollection instance with a single site :param mags: a sequence of magnitudes :param dists: a sequence of distances :returns: an array of GMVs of shape (#mags, #dists) """ assert len(sitecol1) == 1, sitecol1 nmags, ndists = len(mags), len(dists) gmv = numpy.zeros((nmags, ndists)) for m, d in itertools.product(range(nmags), range(ndists)): mag, dist = mags[m], dists[d] ctx = RuptureContext() for par in self.REQUIRES_RUPTURE_PARAMETERS: setattr(ctx, par, 0) for dst in self.REQUIRES_DISTANCES: setattr(ctx, dst, numpy.array([dist])) for par in self.REQUIRES_SITES_PARAMETERS: setattr(ctx, par, getattr(sitecol1, par)) ctx.sids = sitecol1.sids ctx.mag = mag ctx.width = .01 # 10 meters to avoid warnings in abrahamson_2014 try: maxmean = self.get_mean_stds([ctx])[0].max() # shape NM except ValueError: # magnitude outside of supported range continue else: gmv[m, d] = numpy.exp(maxmean) return gmv
# not used by the engine, is is meant for notebooks
[docs] def get_poes(self, srcs, sitecol, collapse_level=-1): """ :param srcs: a list of sources with the same TRT :param sitecol: a SiteCollection instance with N sites :returns: an array of PoEs of shape (N, L, G) """ self.collapser.cfactor = numpy.zeros(2) ctxs = self.from_srcs(srcs, sitecol) with patch.object(self.collapser, 'collapse_level', collapse_level): return self.get_pmap(ctxs).array(len(sitecol))
[docs] def get_pmap(self, ctxs, probmap=None): """ :param ctxs: a list of context arrays :param probmap: if not None, update it :returns: a new ProbabilityMap if probmap is None """ if probmap is None: # create new pmap pmap = ProbabilityMap(size(self.imtls), len(self.gsims)) else: # update passed probmap pmap = probmap if self.tom is None: itime = -1. elif isinstance(self.tom, FatedTOM): itime = 0. else: itime = self.tom.time_span if numba: dic = numba.typed.Dict.empty( key_type=t.uint32, value_type=t.float64[:, :]) else: dic = {} # sid -> array of shape (L, G) for ctx in ctxs: # allocating pmap in advance for sid in numpy.unique(ctx.sids): dic[sid] = pmap.setdefault(sid, self.rup_indep).array for poes, ctxt, slcsids in self.gen_poes(ctx): probs_occur = getattr(ctxt, 'probs_occur', numpy.zeros((len(ctxt), 0))) rates = ctxt.occurrence_rate with self.pne_mon: if isinstance(slcsids, numpy.ndarray): # no collapse: avoiding an inner loop can give a 25% if self.rup_indep: update_pmap_n(dic, poes, rates, probs_occur, ctxt.sids, itime) else: # USAmodel, New Madrid cluster z = zip(poes, rates, probs_occur, ctxt.weight, ctxt.sids) for poe, rate, probs, wei, sid in z: pne = get_pnes(rate, probs, poe, itime) dic[sid] += (1. - pne) * wei else: # collapse is possible only for rup_indep allsids = [] sizes = [] for sids in slcsids: allsids.extend(sids) sizes.append(len(sids)) update_pmap_c(dic, poes, rates, probs_occur, U32(allsids), U32(sizes), itime) if probmap is None: # return the new pmap if self.rup_indep: for arr in dic.values(): arr[:] = 1. - arr return pmap
# called by gen_poes and by the GmfComputer
[docs] def get_mean_stds(self, ctxs): """ :param ctxs: a list of contexts with N=sum(len(ctx) for ctx in ctxs) :returns: an array of shape (4, G, M, N) with mean and stddevs """ if not hasattr(self, 'imts'): self.imts = tuple(imt_module.from_string(im) for im in self.imtls) N = sum(len(ctx) for ctx in ctxs) M = len(self.imtls) G = len(self.gsims) out = numpy.zeros((4, G, M, N)) if all(isinstance(ctx, numpy.recarray) for ctx in ctxs): # contexts already vectorized recarrays = ctxs else: # vectorize the contexts recarrays = [self.recarray(ctxs)] if any(hasattr(gsim, 'gmpe_table') for gsim in self.gsims): assert len(recarrays) == 1, len(recarrays) recarrays = split_array(recarrays[0], U32(recarrays[0].mag*100)) self.adj = {gsim: [] for gsim in self.gsims} # NSHM2014 adjustments for g, gsim in enumerate(self.gsims): compute = gsim.__class__.compute start = 0 for ctx in recarrays: slc = slice(start, start + len(ctx)) adj = compute(gsim, ctx, self.imts, *out[:, g, :, slc]) if adj is not None: self.adj[gsim].append(adj) start = slc.stop if self.adj[gsim]: self.adj[gsim] = numpy.concatenate(self.adj[gsim]) if self.truncation_level not in (0, 99.) and ( out[1, g] == 0.).any(): raise ValueError('Total StdDev is zero for %s' % gsim) return out
# http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.845.163&rep=rep1&type=pdf
[docs] def get_cs_contrib(self, ctx, imti, imls): """ :param ctx: a context array :param imti: IMT index in the range 0..M-1 :param imls: P intensity measure levels for the IMT specified by the index :returns: a dictionary g_ -> key -> array where g_ is an index, key is the string '_c' or '_s', and the arrays have shape (M, N, 2, P) or (N, P) respectively. Compute the contributions to the conditional spectra, in a form suitable for later composition. NB: at the present if works only for poissonian contexts """ assert self.tom sids, counts = numpy.unique(ctx.sids, return_counts=True) assert len(set(counts)) == 1, counts # must be all equal N = len(sids) U = len(ctx) // N G = len(self.gsims) M = len(self.imtls) P = len(imls) out = csdict(M, N, P, self.start, self.start + G) mean_stds = self.get_mean_stds([ctx]) # (4, G, M, N*C) imt_ref = self.imts[imti] rho = numpy.array([self.cross_correl.get_correlation(imt_ref, imt) for imt in self.imts]) m_range = range(len(self.imts)) # probs = 1 - exp(-occurrence_rates*time_span) probs = self.tom.get_probability_one_or_more_occurrences( ctx.occurrence_rate) # shape N * U for n in range(N): # NB: to understand the code below, consider the case with # N=3 sites and C=2 contexts; then the indices N*C are # 0: first site # 1: second site # 2: third site # 3: first site # 4: second site # 5: third site # i.e. idxs = [0, 3], [1, 4], [2, 5] for sites 0, 1, 2 slc = slice(n, N * U, N) # U indices for g in range(G): mu = mean_stds[0, g, :, slc] # shape (M, U) sig = mean_stds[1, g, :, slc] # shape (M, U) c = out[self.start + g]['_c'] s = out[self.start + g]['_s'] for p in range(P): eps = (imls[p] - mu[imti]) / sig[imti] # shape U poes = _truncnorm_sf(self.truncation_level, eps) # shape U ws = -numpy.log( (1. - probs[slc]) ** poes) / self.investigation_time s[n, p] = ws.sum() # weights not summing up to 1 for m in m_range: c[m, n, 0, p] = ws @ (mu[m] + rho[m] * eps * sig[m]) c[m, n, 1, p] = ws @ (sig[m]**2 * (1. - rho[m]**2)) return out
[docs] def gen_poes(self, ctx): """ :param ctx: a vectorized context (recarray) of size N :yields: poes, ctxt, slcsids with poes of shape (N, L, G) """ from openquake.hazardlib.site_amplification import get_poes_site (M, L1), G = self.loglevels.array.shape, len(self.gsims) maxsize = get_maxsize(M, G) # L1 is the reduction factor such that the NLG arrays have # the same size as the GMN array and fit in the CPU cache # collapse if possible with self.col_mon: ctx, allsids = self.collapser.collapse(ctx, self.rup_indep) # split large context arrays to avoid filling the CPU cache if ctx.nbytes > maxsize: slices = gen_slices(0, len(ctx), maxsize) else: slices = [slice(0, len(ctx))] for bigslc in slices: s = bigslc.start with self.gmf_mon: mean_stdt = self.get_mean_stds([ctx[bigslc]]) for slc in gen_slices(bigslc.start, bigslc.stop, maxsize // L1): slcsids = allsids[slc] ctxt = ctx[slc] self.slc = slice(slc.start - s, slc.stop - s) # in get_poes with self.poe_mon: poes = numpy.zeros((len(ctxt), M*L1, G)) for g, gsim in enumerate(self.gsims): ms = mean_stdt[:2, g, :, self.slc] # builds poes of shape (n, L, G) if self.af: # kernel amplification method poes[:, :, g] = get_poes_site(ms, self, ctxt) else: # regular case poes[:, :, g] = gsim.get_poes(ms, self, ctxt) yield poes, ctxt, slcsids
[docs] def estimate_sites(self, src, sites): """ :returns: how many sites are impacted overall """ nphc = src.count_nphc() dists = sites.get_cdist(src.location) planardict = src.get_planar(iruptures=True) esites = 0 for m, (mag, [planar]) in enumerate(planardict.items()): rrup = dists[dists < self.maximum_distance(mag) + src.radius[m]] nclose = (rrup < self.pointsource_distance + src.ps_grid_spacing + src.radius[m]).sum() nfar = len(rrup) - nclose esites += nclose * nphc + nfar return esites
# tested in test_collapse_small
[docs] def estimate_weight(self, src, srcfilter, multiplier=1): """ :param src: a source object :param srcfilter: a SourceFilter instance :returns: the weight of the source (num_ruptures * <num_sites/N>) """ sites = srcfilter.get_close_sites(src) if sites is None: # may happen for CollapsedPointSources return 0 src.nsites = len(sites) N = len(srcfilter.sitecol.complete) # total sites if (hasattr(src, 'location') and src.count_nphc() > 1 and self.pointsource_distance < 1000): esites = self.estimate_sites(src, sites) * multiplier else: ctxs = self.get_ctxs(src, sites, step=10) # reduced number if not ctxs: return src.num_ruptures if N == 1 else 0 esites = (len(ctxs[0]) * src.num_ruptures / self.num_rups * multiplier) weight = esites / N # the weight is the effective number of ruptures src.esites = int(esites) return weight
[docs] def set_weight(self, sources, srcfilter, multiplier=1, mon=Monitor()): """ Set the weight attribute on each prefiltered source """ if hasattr(srcfilter, 'array'): # a SiteCollection was passed srcfilter = SourceFilter(srcfilter, self.maximum_distance) N = len(srcfilter.sitecol) G = len(self.gsims) for src in sources: src.num_ruptures = src.count_ruptures() if src.nsites == 0: # was discarded by the prefiltering src.weight = .001 src.esites = 0 else: with mon: src.esites = 0 # overridden inside estimate_weight src.weight = .1 + self.estimate_weight( src, srcfilter, multiplier) * G if src.code == b'F' and N <= self.max_sites_disagg: src.weight *= 20 # test ucerf elif src.code == b'S': src.weight += .9 elif src.code == b'C': src.weight += 9.9
# see contexts_tests.py for examples of collapse # probs_occur = functools.reduce(combine_pmf, (r.probs_occur for r in rups))
[docs]def combine_pmf(o1, o2): """ Combine probabilities of occurrence; used to collapse nonparametric ruptures. :param o1: probability distribution of length n1 :param o2: probability distribution of length n2 :returns: probability distribution of length n1 + n2 - 1 >>> combine_pmf([.99, .01], [.98, .02]) array([9.702e-01, 2.960e-02, 2.000e-04]) """ n1 = len(o1) n2 = len(o2) o = numpy.zeros(n1 + n2 - 1) for i in range(n1): for j in range(n2): o[i + j] += o1[i] * o2[j] return o
[docs]class PmapMaker(object): """ A class to compute the PoEs from a given source """ def __init__(self, cmaker, srcfilter, group): vars(self).update(vars(cmaker)) self.cmaker = cmaker self.srcfilter = srcfilter self.N = len(self.srcfilter.sitecol.complete) try: self.sources = group.sources except AttributeError: # already a list of sources self.sources = group self.src_mutex = getattr(group, 'src_interdep', None) == 'mutex' self.cmaker.rup_indep = getattr(group, 'rup_interdep', None) != 'mutex' self.fewsites = self.N <= cmaker.max_sites_disagg
[docs] def count_bytes(self, ctxs): # # usuful for debugging memory issues rparams = len(self.cmaker.REQUIRES_RUPTURE_PARAMETERS) sparams = len(self.cmaker.REQUIRES_SITES_PARAMETERS) + 1 dparams = len(self.cmaker.REQUIRES_DISTANCES) nbytes = 0 for ctx in ctxs: nsites = len(ctx) nbytes += 8 * rparams nbytes += 8 * sparams * nsites nbytes += 8 * dparams * nsites return nbytes
def _get_ctxs(self, src): sites = self.srcfilter.get_close_sites(src) if sites is None: return [] ctxs = self.cmaker.get_ctxs(src, sites) if hasattr(src, 'mutex_weight'): for ctx in ctxs: if ctx.weight.any(): ctx['weight'] *= src.mutex_weight else: ctx['weight'] = src.mutex_weight if self.fewsites: # keep rupdata in memory (before collapse) for ctx in ctxs: self.rupdata.append(ctx) return ctxs def _make_src_indep(self): # sources with the same ID pmap = ProbabilityMap(size(self.imtls), len(self.gsims)) cm = self.cmaker allctxs = [] ctxs_mb = 0 totlen = 0 t0 = time.time() for src in self.sources: ctxs = self._get_ctxs(src) ctxs_mb += sum(ctx.nbytes for ctx in ctxs) / TWO20 # TWO20=1MB src.nsites = sum(len(ctx) for ctx in ctxs) totlen += src.nsites allctxs.extend(ctxs) if ctxs_mb > MAX_MB: cm.get_pmap(concat(allctxs), pmap) allctxs.clear() ctxs_mb = 0 if allctxs: cm.get_pmap(concat(allctxs), pmap) allctxs.clear() dt = time.time() - t0 nsrcs = len(self.sources) for src in self.sources: self.source_data['src_id'].append(src.source_id) self.source_data['grp_id'].append(src.grp_id) self.source_data['nsites'].append(src.nsites) self.source_data['esites'].append(src.esites) self.source_data['nrupts'].append(src.num_ruptures) self.source_data['weight'].append(src.weight) self.source_data['ctimes'].append( dt * src.nsites / totlen if totlen else dt / nsrcs) self.source_data['taskno'].append(cm.task_no) return ~pmap if cm.rup_indep else pmap def _make_src_mutex(self): # used in the Japan model, test case_27 pmap_by_src = {} cm = self.cmaker for src in self.sources: t0 = time.time() pm = ProbabilityMap(cm.imtls.size, len(cm.gsims)) ctxs = self._get_ctxs(src) nctxs = len(ctxs) nsites = sum(len(ctx) for ctx in ctxs) if nsites: cm.get_pmap(ctxs, pm) p = (~pm if cm.rup_indep else pm) * src.mutex_weight if ':' in src.source_id: srcid = basename(src) if srcid in pmap_by_src: pmap_by_src[srcid] += p else: pmap_by_src[srcid] = p else: pmap_by_src[src.source_id] = p dt = time.time() - t0 self.source_data['src_id'].append(src.source_id) self.source_data['grp_id'].append(src.grp_id) self.source_data['nsites'].append(nsites) self.source_data['esites'].append(src.esites) self.source_data['nrupts'].append(nctxs) self.source_data['weight'].append(src.weight) self.source_data['ctimes'].append(dt) self.source_data['taskno'].append(cm.task_no) return pmap_by_src
[docs] def make(self): dic = {} self.rupdata = [] self.source_data = AccumDict(accum=[]) grp_id = self.sources[0].grp_id if self.src_mutex: pmap = ProbabilityMap(size(self.imtls), len(self.gsims)) pmap_by_src = self._make_src_mutex() for source_id, pm in pmap_by_src.items(): pmap += pm else: pmap = self._make_src_indep() dic['pmap'] = pmap dic['cfactor'] = self.cmaker.collapser.cfactor dic['rup_data'] = concat(self.rupdata) dic['source_data'] = self.source_data dic['task_no'] = self.task_no dic['grp_id'] = grp_id if self.disagg_by_src and self.src_mutex: dic['pmap_by_src'] = pmap_by_src elif self.disagg_by_src: # all the sources in the group have the same source_id because # of the groupby(group, get_source_id) in classical.py srcid = basename(self.sources[0]) dic['pmap_by_src'] = {srcid: pmap} return dic
[docs]class BaseContext(metaclass=abc.ABCMeta): """ Base class for context object. """ def __eq__(self, other): """ Return True if ``other`` has same attributes with same values. """ if isinstance(other, self.__class__): if self._slots_ == other._slots_: oks = [] for s in self._slots_: a, b = getattr(self, s, None), getattr(other, s, None) if a is None and b is None: ok = True elif a is None and b is not None: ok = False elif a is not None and b is None: ok = False elif hasattr(a, 'shape') and hasattr(b, 'shape'): if a.shape == b.shape: ok = numpy.allclose(a, b) else: ok = False else: ok = a == b oks.append(ok) return numpy.all(oks) return False
# mock of a site collection used in the tests and in the SMTK
[docs]class SitesContext(BaseContext): """ Sites calculation context for ground shaking intensity models. Instances of this class are passed into :meth:`GroundShakingIntensityModel.get_mean_and_stddevs`. They are intended to represent relevant features of the sites collection. Every GSIM class is required to declare what :attr:`sites parameters <GroundShakingIntensityModel.REQUIRES_SITES_PARAMETERS>` does it need. Only those required parameters are made available in a result context object. """ # _slots_ is used in hazardlib check_gsim and in the SMTK def __init__(self, slots='vs30 vs30measured z1pt0 z2pt5'.split(), sitecol=None): self._slots_ = slots if sitecol is not None: self.sids = sitecol.sids for slot in slots: setattr(self, slot, getattr(sitecol, slot)) # used in the SMTK def __len__(self): return len(self.sids)
[docs]class DistancesContext(BaseContext): """ Distances context for ground shaking intensity models. Instances of this class are passed into :meth:`GroundShakingIntensityModel.get_mean_and_stddevs`. They are intended to represent relevant distances between sites from the collection and the rupture. Every GSIM class is required to declare what :attr:`distance measures <GroundShakingIntensityModel.REQUIRES_DISTANCES>` does it need. Only those required values are calculated and made available in a result context object. """ _slots_ = ('rrup', 'rx', 'rjb', 'rhypo', 'repi', 'ry0', 'rcdpp', 'azimuth', 'hanging_wall', 'rvolc') def __init__(self, param_dist_pairs=()): for param, dist in param_dist_pairs: setattr(self, param, dist)
[docs] def roundup(self, minimum_distance): """ If the minimum_distance is nonzero, returns a copy of the DistancesContext with updated distances, i.e. the ones below minimum_distance are rounded up to the minimum_distance. Otherwise, returns the original DistancesContext unchanged. """ if not minimum_distance: return self ctx = DistancesContext() for dist, array in vars(self).items(): small_distances = array < minimum_distance if small_distances.any(): array = numpy.array(array) # make a copy first array[small_distances] = minimum_distance array.flags.writeable = False setattr(ctx, dist, array) return ctx
[docs]def get_dists(ctx): """ Extract the distance parameters from a context. :returns: a dictionary dist_name -> distances """ return {par: dist for par, dist in vars(ctx).items() if par in KNOWN_DISTANCES}
# used to produce a RuptureContext suitable for legacy code, i.e. for calls # to .get_mean_and_stddevs, like for instance in the SMTK
[docs]def full_context(sites, rup, dctx=None): """ :returns: a full RuptureContext with all the relevant attributes """ self = RuptureContext() self.src_id = 0 for par, val in vars(rup).items(): setattr(self, par, val) if not hasattr(self, 'occurrence_rate'): self.occurrence_rate = numpy.nan if hasattr(sites, 'array'): # is a SiteCollection for par in sites.array.dtype.names: setattr(self, par, sites[par]) else: # sites is a SitesContext for par, val in vars(sites).items(): setattr(self, par, val) if dctx: for par, val in vars(dctx).items(): setattr(self, par, val) return self
[docs]def get_mean_stds(gsim, ctx, imts, **kw): """ :param gsim: a single GSIM or a a list of GSIMs :param ctx: a RuptureContext or a recarray of size N :param imts: a list of M IMTs :param kw: additional keyword arguments :returns: an array of shape (4, M, N) obtained by applying the given GSIM, ctx amd imts, or an array of shape (G, 4, M, N) """ imtls = {imt.string: [0] for imt in imts} single = hasattr(gsim, 'compute') kw['imtls'] = imtls cmaker = ContextMaker('*', [gsim] if single else gsim, kw) out = cmaker.get_mean_stds([ctx]) # (4, G, M, N) return out[:, 0] if single else out
# mock of a rupture used in the tests and in the SMTK
[docs]class RuptureContext(BaseContext): """ Rupture calculation context for ground shaking intensity models. Instances of this class are passed into :meth:`GroundShakingIntensityModel.get_mean_and_stddevs`. They are intended to represent relevant features of a single rupture. Every GSIM class is required to declare what :attr:`rupture parameters <GroundShakingIntensityModel.REQUIRES_RUPTURE_PARAMETERS>` does it need. Only those required parameters are made available in a result context object. """ src_id = 0 rup_id = 0 _slots_ = ( 'mag', 'strike', 'dip', 'rake', 'ztor', 'hypo_lon', 'hypo_lat', 'hypo_depth', 'width', 'hypo_loc', 'src_id', 'rup_id') def __init__(self, param_pairs=()): for param, value in param_pairs: setattr(self, param, value)
[docs] def size(self): """ If the context is a multi rupture context, i.e. it contains an array of magnitudes and it refers to a single site, returns the size of the array, otherwise returns 1. """ nsites = len(self.sids) if nsites == 1 and isinstance(self.mag, numpy.ndarray): return len(self.mag) return nsites
# used in acme_2019 def __len__(self): return len(self.sids)
[docs] def roundup(self, minimum_distance): """ If the minimum_distance is nonzero, returns a copy of the RuptureContext with updated distances, i.e. the ones below minimum_distance are rounded up to the minimum_distance. Otherwise, returns the original. """ if not minimum_distance: return self ctx = copy.copy(self) for dist, array in vars(self).items(): if dist in KNOWN_DISTANCES: small_distances = array < minimum_distance if small_distances.any(): array = numpy.array(array) # make a copy first array[small_distances] = minimum_distance array.flags.writeable = False setattr(ctx, dist, array) return ctx
[docs]class Effect(object): """ Compute the effect of a rupture of a given magnitude and distance. :param effect_by_mag: a dictionary magstring -> intensities :param dists: array of distances, one per each intensity :param cdist: collapse distance """ def __init__(self, effect_by_mag, dists, collapse_dist=None): self.effect_by_mag = effect_by_mag self.dists = dists self.nbins = len(dists)
[docs] def collapse_value(self, collapse_dist): """ :returns: intensity at collapse distance """ # get the maximum magnitude with a cutoff at 7 for mag in self.effect_by_mag: if mag > '7.00': break effect = self.effect_by_mag[mag] idx = numpy.searchsorted(self.dists, collapse_dist) return effect[idx-1 if idx == self.nbins else idx]
def __call__(self, mag, dist): di = numpy.searchsorted(self.dists, dist) if di == self.nbins: di = self.nbins eff = self.effect_by_mag['%.2f' % mag][di] return eff # this is used to compute the magnitude-dependent pointsource_distance
[docs] def dist_by_mag(self, intensity): """ :returns: a dict magstring -> distance """ dst = {} # magnitude -> distance for mag, intensities in self.effect_by_mag.items(): if intensity < intensities.min(): dst[mag] = self.dists[-1] # largest distance elif intensity > intensities.max(): dst[mag] = self.dists[0] # smallest distance else: dst[mag] = interp1d(intensities, self.dists)(intensity) return dst
[docs]def get_effect_by_mag(mags, sitecol1, gsims_by_trt, maximum_distance, imtls): """ :param mags: an ordered list of magnitude strings with format %.2f :param sitecol1: a SiteCollection with a single site :param gsims_by_trt: a dictionary trt -> gsims :param maximum_distance: an IntegrationDistance object :param imtls: a DictArray with intensity measure types and levels :returns: a dict magnitude-string -> array(#dists, #trts) """ trts = list(gsims_by_trt) ndists = 51 gmv = numpy.zeros((len(mags), ndists, len(trts))) param = dict(maximum_distance=maximum_distance, imtls=imtls) for t, trt in enumerate(trts): dist_bins = maximum_distance.get_dist_bins(trt, ndists) cmaker = ContextMaker(trt, gsims_by_trt[trt], param) gmv[:, :, t] = cmaker.max_intensity(sitecol1, F64(mags), dist_bins) return dict(zip(mags, gmv))
[docs]def read_cmakers(dstore, full_lt=None): """ :param dstore: a DataStore-like object :param full_lt: a FullLogicTree instance, if given :returns: a list of ContextMaker instance, one per source group """ from openquake.hazardlib.site_amplification import AmplFunction cmakers = [] oq = dstore['oqparam'] full_lt = full_lt or dstore['full_lt'] trt_smrs = dstore['trt_smrs'][:] toms = decode(dstore['toms'][:]) rlzs_by_gsim_list = full_lt.get_rlzs_by_gsim_list(trt_smrs) trts = list(full_lt.gsim_lt.values) num_eff_rlzs = len(full_lt.sm_rlzs) start = 0 for grp_id, rlzs_by_gsim in enumerate(rlzs_by_gsim_list): trti = trt_smrs[grp_id][0] // num_eff_rlzs trt = trts[trti] if ('amplification' in oq.inputs and oq.amplification_method == 'kernel'): df = AmplFunction.read_df(oq.inputs['amplification']) oq.af = AmplFunction.from_dframe(df) else: oq.af = None oq.mags_by_trt = {k: decode(v[:]) for k, v in dstore['source_mags'].items()} cmaker = ContextMaker(trt, rlzs_by_gsim, oq) cmaker.tom = valid.occurrence_model(toms[grp_id]) cmaker.trti = trti cmaker.start = start cmaker.grp_id = grp_id start += len(rlzs_by_gsim) cmakers.append(cmaker) return cmakers
# used in event_based
[docs]def read_cmaker(dstore, trt_smr): """ :param dstore: a DataStore-like object :returns: a ContextMaker instance """ oq = dstore['oqparam'] full_lt = dstore['full_lt'] trts = list(full_lt.gsim_lt.values) trt = trts[trt_smr // len(full_lt.sm_rlzs)] rlzs_by_gsim = full_lt._rlzs_by_gsim(trt_smr) return ContextMaker(trt, rlzs_by_gsim, oq)