Source code for openquake.commonlib.calc

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2019 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import warnings
import numpy

from openquake.baselib import hdf5
from openquake.baselib.python3compat import decode
from openquake.hazardlib.source.rupture import BaseRupture
from openquake.hazardlib.gsim.base import ContextMaker
from openquake.hazardlib import calc, probability_map

TWO16 = 2 ** 16
MAX_INT = 2 ** 31 - 1  # this is used in the random number generator
# in this way even on 32 bit machines Python will not have to convert
# the generated seed into a long integer

U8 = numpy.uint8
U16 = numpy.uint16
I32 = numpy.int32
U32 = numpy.uint32
F32 = numpy.float32
U64 = numpy.uint64
F64 = numpy.float64

code2cls = BaseRupture.init()

# ############## utilities for the classical calculator ############### #


[docs]def convert_to_array(pmap, nsites, imtls, inner_idx=0): """ Convert the probability map into a composite array with header of the form PGA-0.1, PGA-0.2 ... :param pmap: probability map :param nsites: total number of sites :param imtls: a DictArray with IMT and levels :returns: a composite array of lenght nsites """ lst = [] # build the export dtype, of the form PGA-0.1, PGA-0.2 ... for imt, imls in imtls.items(): for iml in imls: lst.append(('%s-%s' % (imt, iml), F32)) curves = numpy.zeros(nsites, numpy.dtype(lst)) for sid, pcurve in pmap.items(): curve = curves[sid] idx = 0 for imt, imls in imtls.items(): for iml in imls: curve['%s-%s' % (imt, iml)] = pcurve.array[idx, inner_idx] idx += 1 return curves
# ######################### hazard maps ################################### # # cutoff value for the poe EPSILON = 1E-30
[docs]def compute_hazard_maps(curves, imls, poes): """ Given a set of hazard curve poes, interpolate a hazard map at the specified ``poe``. :param curves: 2D array of floats. Each row represents a curve, where the values in the row are the PoEs (Probabilities of Exceedance) corresponding to ``imls``. Each curve corresponds to a geographical location. :param imls: Intensity Measure Levels associated with these hazard ``curves``. Type should be an array-like of floats. :param poes: Value(s) on which to interpolate a hazard map from the input ``curves``. Can be an array-like or scalar value (for a single PoE). :returns: An array of shape N x P, where N is the number of curves and P the number of poes. """ poes = numpy.array(poes) if len(poes.shape) == 0: # `poes` was passed in as a scalar; # convert it to 1D array of 1 element poes = poes.reshape(1) if len(curves.shape) == 1: # `curves` was passed as 1 dimensional array, there is a single site curves = curves.reshape((1,) + curves.shape) # 1 x L L = curves.shape[1] # number of levels if L != len(imls): raise ValueError('The curves have %d levels, %d were passed' % (L, len(imls))) result = [] with warnings.catch_warnings(): warnings.simplefilter("ignore") # avoid RuntimeWarning: divide by zero encountered in log # happening in the classical_tiling tests imls = numpy.log(numpy.array(imls[::-1])) for curve in curves: # the hazard curve, having replaced the too small poes with EPSILON curve_cutoff = [max(poe, EPSILON) for poe in curve[::-1]] hmap_val = [] for poe in poes: # special case when the interpolation poe is bigger than the # maximum, i.e the iml must be smaller than the minumum if poe > curve_cutoff[-1]: # the greatest poes in the curve # extrapolate the iml to zero as per # https://bugs.launchpad.net/oq-engine/+bug/1292093 # a consequence is that if all poes are zero any poe > 0 # is big and the hmap goes automatically to zero hmap_val.append(0) else: # exp-log interpolation, to reduce numerical errors # see https://bugs.launchpad.net/oq-engine/+bug/1252770 val = numpy.exp( numpy.interp( numpy.log(poe), numpy.log(curve_cutoff), imls)) hmap_val.append(val) result.append(hmap_val) return numpy.array(result)
# ######################### GMF->curves #################################### # # NB (MS): the approach used here will not work for non-poissonian models def _gmvs_to_haz_curve(gmvs, imls, ses_per_logic_tree_path): """ Given a set of ground motion values (``gmvs``) and intensity measure levels (``imls``), compute hazard curve probabilities of exceedance. :param gmvs: A list of ground motion values, as floats. :param imls: A list of intensity measure levels, as floats. :param ses_per_logic_tree_path: Number of stochastic event sets: the larger, the best convergency :returns: Numpy array of PoEs (probabilities of exceedance). """ # convert to numpy array and redimension so that it can be broadcast with # the gmvs for computing PoE values; there is a gmv for each rupture # here is an example: imls = [0.03, 0.04, 0.05], gmvs=[0.04750576] # => num_exceeding = [1, 1, 0] coming from 0.04750576 > [0.03, 0.04, 0.05] imls = numpy.array(imls).reshape((len(imls), 1)) num_exceeding = numpy.sum(numpy.array(gmvs) >= imls, axis=1) poes = 1 - numpy.exp(- num_exceeding / ses_per_logic_tree_path) return poes # ################## utilities for classical calculators ################ #
[docs]def make_hmap(pmap, imtls, poes, sid=None): """ Compute the hazard maps associated to the passed probability map. :param pmap: hazard curves in the form of a ProbabilityMap :param imtls: DictArray with M intensity measure types :param poes: P PoEs where to compute the maps :param sid: not None when pmap is actually a ProbabilityCurve :returns: a ProbabilityMap with size (N, M, P) """ if sid is None: sids = pmap.sids else: # passed a probability curve pmap = {sid: pmap} sids = [sid] M, P = len(imtls), len(poes) hmap = probability_map.ProbabilityMap.build(M, P, sids, dtype=F32) if len(pmap) == 0: return hmap # empty hazard map for i, imt in enumerate(imtls): curves = numpy.array([pmap[sid].array[imtls(imt), 0] for sid in sids]) data = compute_hazard_maps(curves, imtls[imt], poes) # array (N, P) for sid, value in zip(sids, data): array = hmap[sid].array for j, val in enumerate(value): array[i, j] = val return hmap
[docs]def make_hmap_array(pmap, imtls, poes, nsites): """ :returns: a compound array of hazard maps of shape nsites """ hcurves = pmap[()] dtlist = [('%s-%s' % (imt, poe), F32) for imt in imtls for poe in poes] array = numpy.zeros(len(pmap), dtlist) for imt, imls in imtls.items(): curves = hcurves[:, imtls(imt)] for poe in poes: array['%s-%s' % (imt, poe)] = compute_hazard_maps( curves, imls, poe).flat return array # array of shape N
[docs]def make_uhs(hmap, info): """ Make Uniform Hazard Spectra curves for each location. :param hmap: array of shape (N, M, P) :param info: a dictionary with keys poes, imtls, uhs_dt :returns: a composite array containing uniform hazard spectra """ uhs = numpy.zeros(len(hmap), info['uhs_dt']) for p, poe in enumerate(info['poes']): for m, imt in enumerate(info['imtls']): if imt.startswith(('PGA', 'SA')): uhs[str(poe)][imt] = hmap[:, m, p] return uhs
[docs]class RuptureData(object): """ Container for information about the ruptures of a given tectonic region type. """ def __init__(self, trt, gsims): self.trt = trt self.cmaker = ContextMaker(trt, gsims) self.params = sorted(self.cmaker.REQUIRES_RUPTURE_PARAMETERS - set('mag strike dip rake hypo_depth'.split())) self.dt = numpy.dtype([ ('rup_id', U32), ('srcidx', U32), ('multiplicity', U16), ('occurrence_rate', F64), ('mag', F32), ('lon', F32), ('lat', F32), ('depth', F32), ('strike', F32), ('dip', F32), ('rake', F32), ('boundary', hdf5.vstr)] + [(param, F32) for param in self.params])
[docs] def to_array(self, ebruptures): """ Convert a list of ebruptures into an array of dtype RuptureRata.dt """ data = [] for ebr in ebruptures: rup = ebr.rupture self.cmaker.add_rup_params(rup) ruptparams = tuple(getattr(rup, param) for param in self.params) point = rup.surface.get_middle_point() mlons, mlats, mdeps = rup.surface.get_surface_boundaries_3d() bounds = ','.join('((%s))' % ','.join( '%.5f %.5f %.3f' % coords for coords in zip(lons, lats, deps)) for lons, lats, deps in zip(mlons, mlats, mdeps)) try: rate = ebr.rupture.occurrence_rate except AttributeError: # for nonparametric sources rate = numpy.nan data.append( (ebr.id, ebr.srcidx, ebr.n_occ, rate, rup.mag, point.x, point.y, point.z, rup.surface.get_strike(), rup.surface.get_dip(), rup.rake, 'MULTIPOLYGON(%s)' % decode(bounds)) + ruptparams) return numpy.array(data, self.dt)
[docs]class RuptureSerializer(object): """ Serialize event based ruptures on an HDF5 files. Populate the datasets `ruptures` and `sids`. """ def __init__(self, datastore): self.datastore = datastore self.nbytes = 0 self.nruptures = 0 datastore.create_dset('ruptures', calc.stochastic.rupture_dt, attrs={'nbytes': 0}) datastore.create_dset('rupgeoms', calc.stochastic.point3d)
[docs] def save(self, rup_array): """ Store the ruptures in array format. """ self.nruptures += len(rup_array) offset = len(self.datastore['rupgeoms']) rup_array.array['gidx1'] += offset rup_array.array['gidx2'] += offset hdf5.extend(self.datastore['ruptures'], rup_array) hdf5.extend(self.datastore['rupgeoms'], rup_array.geom) # TODO: PMFs for nonparametric ruptures are not stored self.datastore.flush()
[docs] def close(self): """ Save information about the rupture codes as attributes of the 'ruptures' dataset. """ if 'ruptures' not in self.datastore: # for UCERF return codes = numpy.unique(self.datastore['ruptures']['code']) attr = {'code_%d' % code: ' '.join( cls.__name__ for cls in code2cls[code]) for code in codes} self.datastore.set_attrs('ruptures', **attr)