Source code for openquake.commonlib.calc

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2018 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import warnings
import operator
import numpy

from openquake.baselib import hdf5, general
from openquake.baselib.python3compat import decode
from openquake.hazardlib.source.rupture import BaseRupture
from openquake.hazardlib.geo.mesh import surface_to_array
from openquake.hazardlib.gsim.base import ContextMaker
from openquake.hazardlib.imt import from_string
from openquake.hazardlib import calc, probability_map

TWO16 = 2 ** 16
MAX_INT = 2 ** 31 - 1  # this is used in the random number generator
# in this way even on 32 bit machines Python will not have to convert
# the generated seed into a long integer

U8 = numpy.uint8
U16 = numpy.uint16
I32 = numpy.int32
U32 = numpy.uint32
F32 = numpy.float32
U64 = numpy.uint64
F64 = numpy.float64

# ############## utilities for the classical calculator ############### #


[docs]def convert_to_array(pmap, nsites, imtls, inner_idx=0): """ Convert the probability map into a composite array with header of the form PGA-0.1, PGA-0.2 ... :param pmap: probability map :param nsites: total number of sites :param imtls: a DictArray with IMT and levels :returns: a composite array of lenght nsites """ lst = [] # build the export dtype, of the form PGA-0.1, PGA-0.2 ... for imt, imls in imtls.items(): for iml in imls: lst.append(('%s-%s' % (imt, iml), F32)) curves = numpy.zeros(nsites, numpy.dtype(lst)) for sid, pcurve in pmap.items(): curve = curves[sid] idx = 0 for imt, imls in imtls.items(): for iml in imls: curve['%s-%s' % (imt, iml)] = pcurve.array[idx, inner_idx] idx += 1 return curves
# ######################### hazard maps ################################### # # cutoff value for the poe EPSILON = 1E-30
[docs]def compute_hazard_maps(curves, imls, poes): """ Given a set of hazard curve poes, interpolate a hazard map at the specified ``poe``. :param curves: 2D array of floats. Each row represents a curve, where the values in the row are the PoEs (Probabilities of Exceedance) corresponding to ``imls``. Each curve corresponds to a geographical location. :param imls: Intensity Measure Levels associated with these hazard ``curves``. Type should be an array-like of floats. :param poes: Value(s) on which to interpolate a hazard map from the input ``curves``. Can be an array-like or scalar value (for a single PoE). :returns: An array of shape N x P, where N is the number of curves and P the number of poes. """ poes = numpy.array(poes) if len(poes.shape) == 0: # `poes` was passed in as a scalar; # convert it to 1D array of 1 element poes = poes.reshape(1) if len(curves.shape) == 1: # `curves` was passed as 1 dimensional array, there is a single site curves = curves.reshape((1,) + curves.shape) # 1 x L L = curves.shape[1] # number of levels if L != len(imls): raise ValueError('The curves have %d levels, %d were passed' % (L, len(imls))) result = [] with warnings.catch_warnings(): warnings.simplefilter("ignore") # avoid RuntimeWarning: divide by zero encountered in log # happening in the classical_tiling tests imls = numpy.log(numpy.array(imls[::-1])) for curve in curves: # the hazard curve, having replaced the too small poes with EPSILON curve_cutoff = [max(poe, EPSILON) for poe in curve[::-1]] hmap_val = [] for poe in poes: # special case when the interpolation poe is bigger than the # maximum, i.e the iml must be smaller than the minumum if poe > curve_cutoff[-1]: # the greatest poes in the curve # extrapolate the iml to zero as per # https://bugs.launchpad.net/oq-engine/+bug/1292093 # a consequence is that if all poes are zero any poe > 0 # is big and the hmap goes automatically to zero hmap_val.append(0) else: # exp-log interpolation, to reduce numerical errors # see https://bugs.launchpad.net/oq-engine/+bug/1252770 val = numpy.exp( numpy.interp( numpy.log(poe), numpy.log(curve_cutoff), imls)) hmap_val.append(val) result.append(hmap_val) return numpy.array(result)
# ######################### GMF->curves #################################### # # NB (MS): the approach used here will not work for non-poissonian models def _gmvs_to_haz_curve(gmvs, imls, invest_time, duration): """ Given a set of ground motion values (``gmvs``) and intensity measure levels (``imls``), compute hazard curve probabilities of exceedance. :param gmvs: A list of ground motion values, as floats. :param imls: A list of intensity measure levels, as floats. :param float invest_time: Investigation time, in years. It is with this time span that we compute probabilities of exceedance. Another way to put it is the following. When computing a hazard curve, we want to answer the question: What is the probability of ground motion meeting or exceeding the specified levels (``imls``) in a given time span (``invest_time``). :param float duration: Time window during which GMFs occur. Another was to say it is, the period of time over which we simulate ground motion occurrences. NOTE: Duration is computed as the calculation investigation time multiplied by the number of stochastic event sets. :returns: Numpy array of PoEs (probabilities of exceedance). """ # convert to numpy array and redimension so that it can be broadcast with # the gmvs for computing PoE values; there is a gmv for each rupture # here is an example: imls = [0.03, 0.04, 0.05], gmvs=[0.04750576] # => num_exceeding = [1, 1, 0] coming from 0.04750576 > [0.03, 0.04, 0.05] imls = numpy.array(imls).reshape((len(imls), 1)) num_exceeding = numpy.sum(numpy.array(gmvs) >= imls, axis=1) poes = 1 - numpy.exp(- (invest_time / duration) * num_exceeding) return poes # ################## utilities for classical calculators ################ #
[docs]def get_imts_periods(imtls): """ Returns a list of IMT strings and a list of periods. There is an element for each IMT of type Spectral Acceleration, including PGA which is considered an alias for SA(0.0). The lists are sorted by period. :param imtls: a set of intensity measure type strings :returns: a list of IMT strings and a list of periods """ imts = [] for im in imtls: imt = from_string(im) if hasattr(imt, 'period'): imts.append(imt) imts.sort(key=operator.attrgetter('period')) return imts, [imt.period for imt in imts]
[docs]def make_hmap(pmap, imtls, poes): """ Compute the hazard maps associated to the passed probability map. :param pmap: hazard curves in the form of a ProbabilityMap :param imtls: DictArray with M intensity measure types :param poes: P PoEs where to compute the maps :returns: a ProbabilityMap with size (N, M * P, 1) """ M, P = len(imtls), len(poes) hmap = probability_map.ProbabilityMap.build(M * P, 1, pmap, dtype=F32) if len(pmap) == 0: return hmap # empty hazard map for i, imt in enumerate(imtls): curves = numpy.array([pmap[sid].array[imtls(imt), 0] for sid in pmap.sids]) data = compute_hazard_maps(curves, imtls[imt], poes) # array (N, P) for sid, value in zip(pmap.sids, data): array = hmap[sid].array for j, val in enumerate(value): array[i * P + j] = val return hmap
[docs]def make_hmap_array(pmap, imtls, poes, nsites): """ :returns: a compound array of hazard maps of shape nsites """ if isinstance(pmap, probability_map.ProbabilityMap): # this is here for compatibility with the # past, it could be removed in the future hmap = make_hmap(pmap, imtls, poes) pdic = general.DictArray({imt: poes for imt in imtls}) return convert_to_array(hmap, nsites, pdic) try: hcurves = pmap.value except AttributeError: hcurves = pmap dtlist = [('%s-%s' % (imt, poe), F32) for imt in imtls for poe in poes] array = numpy.zeros(len(pmap), dtlist) for imt, imls in imtls.items(): curves = hcurves[:, imtls(imt)] for poe in poes: array['%s-%s' % (imt, poe)] = compute_hazard_maps( curves, imls, poe).flat return array # array of shape N
[docs]def make_uhs(hcurves, imtls, poes, nsites): """ Make Uniform Hazard Spectra curves for each location. It is assumed that the `lons` and `lats` for each of the `maps` are uniform. :param pmap: a composite array of hazard curves :param imtls: a dictionary of intensity measure types and levels :param poes: a sequence of PoEs for the underlying hazard maps :returns: an composite array containing nsites uniform hazard maps """ imts, _ = get_imts_periods(imtls) array = make_hmap_array(hcurves, imtls, poes, len(hcurves)) imts_dt = numpy.dtype([(str(imt), F32) for imt in imts]) uhs_dt = numpy.dtype([(str(poe), imts_dt) for poe in poes]) uhs = numpy.zeros(nsites, uhs_dt) for field in array.dtype.names: imt, poe = field.split('-') if any(imt == str(i) for i in imts): uhs[poe][imt] = array[field] return uhs
[docs]def fix_minimum_intensity(min_iml, imts): """ :param min_iml: a dictionary, possibly with a 'default' key :param imts: an ordered list of IMTs :returns: a numpy array of intensities, one per IMT Make sure the dictionary minimum_intensity (provided by the user in the job.ini file) is filled for all intensity measure types and has no key named 'default'. Here is how it works: >>> min_iml = {'PGA': 0.1, 'default': 0.05} >>> fix_minimum_intensity(min_iml, ['PGA', 'PGV']) array([0.1 , 0.05], dtype=float32) >>> sorted(min_iml.items()) [('PGA', 0.1), ('PGV', 0.05)] """ if min_iml: for imt in imts: try: min_iml[imt] = calc.filters.getdefault(min_iml, imt) except KeyError: raise ValueError( 'The parameter `minimum_intensity` in the job.ini ' 'file is missing the IMT %r' % imt) if 'default' in min_iml: del min_iml['default'] return F32([min_iml.get(imt, 0) for imt in imts])
[docs]class RuptureData(object): """ Container for information about the ruptures of a given tectonic region type. """ def __init__(self, trt, gsims): self.trt = trt self.cmaker = ContextMaker(gsims) self.params = sorted(self.cmaker.REQUIRES_RUPTURE_PARAMETERS - set('mag strike dip rake hypo_depth'.split())) self.dt = numpy.dtype([ ('rup_id', U32), ('multiplicity', U16), ('eidx', U32), ('occurrence_rate', F64), ('mag', F32), ('lon', F32), ('lat', F32), ('depth', F32), ('strike', F32), ('dip', F32), ('rake', F32), ('boundary', hdf5.vstr)] + [(param, F32) for param in self.params])
[docs] def to_array(self, ebruptures): """ Convert a list of ebruptures into an array of dtype RuptureRata.dt """ data = [] for ebr in ebruptures: rup = ebr.rupture self.cmaker.add_rup_params(rup) ruptparams = tuple(getattr(rup, param) for param in self.params) point = rup.surface.get_middle_point() multi_lons, multi_lats = rup.surface.get_surface_boundaries() bounds = ','.join('((%s))' % ','.join( '%.5f %.5f' % (lon, lat) for lon, lat in zip(lons, lats)) for lons, lats in zip(multi_lons, multi_lats)) try: rate = ebr.rupture.occurrence_rate except AttributeError: # for nonparametric sources rate = numpy.nan data.append( (ebr.serial, ebr.multiplicity, ebr.eidx1, rate, rup.mag, point.x, point.y, point.z, rup.surface.get_strike(), rup.surface.get_dip(), rup.rake, 'MULTIPOLYGON(%s)' % decode(bounds)) + ruptparams) return numpy.array(data, self.dt)
[docs]class RuptureSerializer(object): """ Serialize event based ruptures on an HDF5 files. Populate the datasets `ruptures` and `sids`. """ rupture_dt = numpy.dtype([ ('serial', U32), ('grp_id', U16), ('code', U8), ('eidx1', U32), ('eidx2', U32), ('pmfx', I32), ('seed', U32), ('mag', F32), ('rake', F32), ('occurrence_rate', F32), ('hypo', (F32, 3)), ('sy', U16), ('sz', U16)]) pmfs_dt = numpy.dtype([('serial', U32), ('pmf', hdf5.vfloat32)])
[docs] @classmethod def get_array_nbytes(cls, ebruptures): """ Convert a list of EBRuptures into a numpy composite array """ lst = [] geom = [] nbytes = 0 for ebrupture in ebruptures: rup = ebrupture.rupture mesh = surface_to_array(rup.surface) sy, sz = mesh.shape[1:] # sanity checks assert sy < TWO16, 'Too many multisurfaces: %d' % sy assert sz < TWO16, 'The rupture mesh spacing is too small' hypo = rup.hypocenter.x, rup.hypocenter.y, rup.hypocenter.z rate = getattr(rup, 'occurrence_rate', numpy.nan) tup = (ebrupture.serial, ebrupture.grp_id, rup.code, ebrupture.eidx1, ebrupture.eidx2, getattr(ebrupture, 'pmfx', -1), rup.seed, rup.mag, rup.rake, rate, hypo, sy, sz) lst.append(tup) geom.append(mesh.reshape(3, -1)) nbytes += cls.rupture_dt.itemsize + mesh.nbytes return numpy.array(lst, cls.rupture_dt), geom, nbytes
def __init__(self, datastore): self.datastore = datastore self.nbytes = 0 self.nruptures = 0 if datastore['oqparam'].save_ruptures: datastore.create_dset('ruptures', self.rupture_dt, fillvalue=None, attrs={'nbytes': 0}) datastore.create_dset('rupgeoms', hdf5.vfloat32, shape=(None, 3), fillvalue=None)
[docs] def save(self, ebruptures, eidx=0): """ Populate a dictionary of site IDs tuples and save the ruptures. :param ebruptures: a list of EBRupture objects to save :param eidx: the last event index saved """ pmfbytes = 0 self.nruptures += len(ebruptures) for ebr in ebruptures: mul = ebr.multiplicity ebr.eidx1 = eidx ebr.eidx2 = eidx + mul eidx += mul rup = ebr.rupture if hasattr(rup, 'pmf'): pmfs = numpy.array([(ebr.serial, rup.pmf)], self.pmfs_dt) dset = self.datastore.extend('pmfs', pmfs) ebr.pmfx = len(dset) - 1 pmfbytes += self.pmfs_dt.itemsize + rup.pmf.nbytes # store the ruptures in a compact format array, geom, nbytes = self.get_array_nbytes(ebruptures) previous = self.datastore.get_attr('ruptures', 'nbytes', 0) dset = self.datastore.extend( 'ruptures', array, nbytes=previous + nbytes) self.datastore.hdf5.save_vlen('rupgeoms', geom) # save nbytes occupied by the PMFs if pmfbytes: if 'nbytes' in dset.attrs: dset.attrs['nbytes'] += pmfbytes else: dset.attrs['nbytes'] = pmfbytes self.datastore.flush()
[docs] def close(self): """ Save information about the rupture codes as attributes of the 'ruptures' dataset. """ if 'ruptures' not in self.datastore: # for UCERF return codes = numpy.unique(self.datastore['ruptures']['code']) attr = {'code_%d' % code: ' '.join( cls.__name__ for cls in BaseRupture.types[code]) for code in codes} self.datastore.set_attrs('ruptures', **attr)