Source code for openquake.hazardlib.map_array

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2016-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake.  If not, see <http://www.gnu.org/licenses/>.

import copy
import logging
import warnings
import numpy
import pandas
import numba
from openquake.baselib.general import cached_property, humansize
from openquake.baselib.performance import compile
from openquake.hazardlib.tom import get_pnes

U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64
BYTES_PER_FLOAT = 8
TWO20 = 2 ** 20  # 1 MB
TWO24 = 2 ** 24
rates_dt = numpy.dtype([('sid', U32), ('lid', U16), ('gid', U16),
                        ('rate', F32)])


[docs]@compile("(float64[:, :], float64[:], uint32[:])") def combine_probs(array, other, rlzs): for li in range(len(array)): for ri in rlzs: if other[li] != 0.: array[li, ri] = ( 1. - (1. - array[li, ri]) * (1. - other[li]))
[docs]def get_mean_curve(dstore, imt, site_id=0): """ Extract the mean hazard curve from the datastore for the first site. """ if 'hcurves-stats' in dstore: # shape (N, S, M, L1) arr = dstore.sel('hcurves-stats', stat='mean', imt=imt) else: # there is only 1 realization arr = dstore.sel('hcurves-rlzs', rlz_id=0, imt=imt) return arr[site_id, 0, 0]
[docs]def get_poe_from_mean_curve(dstore, imt, iml, site_id=0): """ Extract the poe corresponding to the given iml by looking at the mean curve for the given imt. `iml` can also be an array. """ imls = dstore['oqparam'].imtls[imt] mean_curve = get_mean_curve(dstore, imt, site_id) return numpy.interp(imls, mean_curve)[iml]
# ######################### hazard maps ################################### # # cutoff value for the poe EPSILON = 1E-30
[docs]def compute_hazard_maps(curves, imls, poes): """ Given a set of hazard curve poes, interpolate hazard maps at the specified ``poes``. :param curves: Array of floats of shape N x L. Each row represents a curve, where the values in the row are the PoEs (Probabilities of Exceedance) corresponding to the ``imls``. Each curve corresponds to a geographical location. :param imls: Intensity Measure Levels associated with these hazard ``curves``. Type should be an array-like of floats. :param poes: Value(s) on which to interpolate a hazard map from the input ``curves``. :returns: An array of shape N x P, where N is the number of curves and P the number of poes. """ P = len(poes) N, L = curves.shape # number of levels if L != len(imls): raise ValueError('The curves have %d levels, %d were passed' % (L, len(imls))) log_poes = numpy.log(poes) hmap = numpy.zeros((N, P)) with warnings.catch_warnings(): warnings.simplefilter("ignore") # avoid RuntimeWarning: divide by zero for zero levels imls = numpy.log(numpy.array(imls[::-1])) for n, curve in enumerate(curves): # the hazard curve, having replaced the too small poes with EPSILON log_curve = numpy.log([max(poe, EPSILON) for poe in curve[::-1]]) for p, log_poe in enumerate(log_poes): if log_poe > log_curve[-1]: # special case when the interpolation poe is bigger than the # maximum, i.e the iml must be smaller than the minimum; # extrapolate the iml to zero as per # https://bugs.launchpad.net/oq-engine/+bug/1292093; # then the hmap goes automatically to zero pass else: # exp-log interpolation, to reduce numerical errors # see https://bugs.launchpad.net/oq-engine/+bug/1252770 hmap[n, p] = numpy.exp(numpy.interp(log_poe, log_curve, imls)) return hmap
[docs]def compute_hmaps(curvesNML, imtls, poes): """ :param curvesNML: an array of shape (N, M, L1) :param imlts: a DictArray with M keys :param poes: a sequence of P poes :returns: array of shape (N, M, P) with the hazard maps """ N = len(curvesNML) M = len(imtls) P = len(poes) assert M == len(imtls) iml3 = numpy.zeros((N, M, P)) for m, imls in enumerate(imtls.values()): curves = curvesNML[:, m] iml3[:, m] = compute_hazard_maps(curves, imls, poes) return iml3
[docs]def check_hmaps(hcurves, imtls, poes): """ :param hcurves: hazard curves of shape (N, M, L1) :param imtls: a dictionary imt -> imls :param poes: a list of poes :param poes: P poes """ N, M, _L1 = hcurves.shape assert M == len(imtls), (M, len(imtls)) all_poes = [] for poe in poes: all_poes.extend([poe, poe * .99]) for m, (imt, imls) in enumerate(imtls.items()): hmaps = compute_hazard_maps(hcurves[:, m], imls, all_poes) # (N, 2*P) for site_id in range(N): for p, poe in enumerate(poes): iml = hmaps[site_id, p*2] iml99 = hmaps[site_id, p*2+1] if iml + iml99 == 0: # zero curve logging.error(f'The {imt} hazard curve for {site_id=} cannot ' f'be inverted around {poe=}') continue rel_err = abs(iml - iml99) / abs(iml + iml99) if rel_err > .05: raise ValueError(f'The {imt} hazard curve for {site_id=} cannot ' f'be inverted reliably around {poe=}') elif rel_err > .01: logging.warning( f'The {imt} hazard curve for {site_id=} cannot be ' f'inverted reliably around {poe=}: {iml=}, {iml99=}')
# not used right now
[docs]def get_lvl(hcurve, imls, poe): """ :param hcurve: a hazard curve, i.e. array of L1 PoEs :param imls: L1 intensity measure levels :returns: index of the intensity measure level associated to the poe >>> imls = numpy.array([.1, .2, .3, .4]) >>> hcurve = numpy.array([1., .99, .90, .8]) >>> get_lvl(hcurve, imls, 1) 0 >>> get_lvl(hcurve, imls, .99) 1 >>> get_lvl(hcurve, imls, .91) 2 >>> get_lvl(hcurve, imls, .8) 3 """ [[iml]] = compute_hazard_maps(hcurve.reshape(1, -1), imls, [poe]) iml -= 1E-10 # small buffer return numpy.searchsorted(imls, iml)
# ############################# probability maps ############################## t = numba.types sig_i = t.void(t.float32[:, :, :], # pmap t.float32[:, :, :], # poes t.float64[:], # rates t.float64[:, :], # probs_occur t.uint32[:], # sids t.float64) # itime sig_m = t.void(t.float32[:, :, :], # pmap t.float32[:, :, :], # poes t.float64[:], # rates t.float64[:, :], # probs_occur t.float64[:], # weights t.uint32[:], # sids t.float64) # itime
[docs]@compile(sig_i) def update_pmap_i(arr, poes, rates, probs_occur, sidxs, itime): G = arr.shape[2] for poe, rate, probs, sidx in zip(poes, rates, probs_occur, sidxs): no_probs = len(probs) == 0 for g in range(G): if no_probs: arr[sidx, :, g] *= numpy.exp(-rate * poe[:, g] * itime) else: # nonparametric rupture arr[sidx, :, g] *= get_pnes(rate, probs, poe[:, g], itime) # shape L
[docs]@compile(sig_i) def update_pmap_r(arr, poes, rates, probs_occur, sidxs, itime): G = arr.shape[2] for poe, rate, probs, sidx in zip(poes, rates, probs_occur, sidxs): if len(probs) == 0: for g in range(G): arr[sidx, :, g] += rate * poe[:, g] * itime else: # nonparametric rupture for g in range(G): arr[sidx, :, g] += -numpy.log(get_pnes(rate, probs, poe[:, g], itime))
[docs]@compile(sig_m) def update_pmap_m(arr, poes, rates, probs_occur, weights, sidxs, itime): G = arr.shape[2] for poe, rate, probs, w, sidx in zip(poes, rates, probs_occur, weights, sidxs): for g in range(G): arr[sidx, :, g] += (1. - get_pnes(rate, probs, poe[:, g], itime)) * w
[docs]def fix_probs_occur(probs_occur): """ Try to convert object arrays into regular arrays """ if probs_occur.dtype.name == 'object': n = len(probs_occur) p = len(probs_occur[0]) po = numpy.zeros((n, p)) for p, probs in enumerate(probs_occur): po[p] = probs_occur[p] return po return probs_occur
[docs]class MapArray(object): """ Thin wrapper over a 3D-array of probabilities. """ def __init__(self, sids, shape_y, shape_z, rates=False): self.sids = sids self.shape = (len(sids), shape_y, shape_z) self.rates = rates @cached_property def sidx(self): """ :returns: an array of length N site_id -> index """ idxs = numpy.zeros(self.sids.max() + 1, numpy.uint32) for idx, sid in enumerate(self.sids): idxs[sid] = idx return idxs @property def size_mb(self): return self.array.nbytes / TWO20
[docs] def new(self, arr): new = copy.copy(self) new.array = arr return new
[docs] def split(self): """ :yields: G MapArrays of shape (N, L, 1) """ _N, L, G = self.shape for g in range(G): new = self.__class__(self.sids, L, 1).new(self.array[:, :, [g]]) new.gids = [g] yield new
[docs] def fill(self, value): """ :param value: a scalar probability Fill the MapArray underlying array with the given scalar and build the .sidx array """ assert 0 <= value <= 1, value self.array = numpy.empty(self.shape, F32) self.array.fill(value) return self
[docs] def reshape(self, N, M, P): """ :returns: a new Pmap associated to a reshaped array """ return self.new(self.array.reshape(N, M, P))
# used in calc/disagg_test.py
[docs] def expand(self, full_lt, trt_rlzs): """ Convert a MapArray with shape (N, L, Gt) into a MapArray with shape (N, L, R): works only for rates """ N, L, Gt = self.array.shape assert Gt == len(trt_rlzs), (Gt, len(trt_rlzs)) R = full_lt.get_num_paths() out = MapArray(range(N), L, R).fill(0., F32) for g, trs in enumerate(trt_rlzs): for sid in range(N): for rlz in trs % TWO24: out.array[sid, :, rlz] += self.array[sid, :, g] # NB: for probabilities use # combine_probs(out.array[sid], self.array[sid, :, g], rlzs) return out
# used in calc_hazard_curves
[docs] def convert(self, imtls, nsites, idx=0): """ Convert a probability map into a composite array of length `nsites` and dtype `imtls.dt`. :param imtls: DictArray instance :param nsites: the total number of sites :param idx: index on the z-axis (default 0) """ curves = numpy.zeros(nsites, imtls.dt) for imt in curves.dtype.names: curves[imt][self.sids] = self.array[:, imtls(imt), idx] return curves
[docs] def to_rates(self, itime=1.): """ Convert into a map containing rates unless the map already contains rates """ if self.rates: return self pnes = self.array # Physically, an extremely small intensity measure level can have an # extremely large probability of exceedence,however that probability # cannot be exactly 1 unless the level is exactly 0. Numerically, # the PoE can be 1 and this give issues when calculating the damage: # there is a log(0) in scientific.annual_frequency_of_exceedence. # Here we solve the issue by replacing the unphysical probabilities # 1 with .9999999999999999 (the float64 closest to 1). pnes[pnes == 0.] = 1.11E-16 return self.new(-numpy.log(pnes) / itime)
[docs] def to_array(self, gid): """ Assuming self contains an array of rates, returns a composite array with fields sid, lid, gid, rate """ if len(gid) == 0: return numpy.array([], rates_dt) outs = [] for i, g in enumerate(gid): rates_g = self.array[:, :, i] outs.append(from_rates_g(rates_g, g, self.sids)) if len(outs) == 1: return outs[0] return numpy.concatenate(outs, dtype=rates_dt)
[docs] def interp4D(self, imtls, poes): """ :param imtls: a dictionary imt->imls with M items :param poes: a list of P PoEs :returns: an array of shape (N, M, P, Z) """ poes3 = self.array N, _L, Z = poes3.shape M = len(imtls) P = len(poes) L1 = len(imtls[next(iter(imtls))]) hmap4 = numpy.zeros((N, M, P, Z)) for m, imt in enumerate(imtls): slc = slice(m*L1, m*L1 + L1) for z in range(Z): hmap4[:, m, :, z] = compute_hazard_maps( poes3[:, slc, z], imtls[imt], poes) return hmap4
# dangerous since it changes the shape by removing sites
[docs] def remove_zeros(self): ok = self.array.sum(axis=(1, 2)) > 0 new = self.__class__(self.sids[ok], self.shape[1], self.shape[2], self.rates) new.array = self.array[ok] return new
# used in classical_risk from CSV
[docs] def to_dframe(self): """ :returns: a DataFrame with fields sid, gid, lid, poe """ G = self.array.shape[2] arr = self.to_rates().to_array(numpy.arange(G)) return pandas.DataFrame({name: arr[name] for name in arr.dtype.names})
[docs] def update_indep(self, poes, ctxt, itime): """ Update probabilities for independent ruptures """ rates = ctxt.occurrence_rate sidxs = self.sidx[ctxt.sids] if self.rates: update_pmap_r(self.array, poes, rates, ctxt.probs_occur, sidxs, itime) else: update_pmap_i(self.array, poes, rates, ctxt.probs_occur, sidxs, itime)
[docs] def update_mutex(self, poes, ctxt, itime, mutex_weight): """ Update probabilities for mutex ruptures """ rates = ctxt.occurrence_rate probs_occur = fix_probs_occur(ctxt.probs_occur) sidxs = self.sidx[ctxt.sids] weights = numpy.array([mutex_weight[src_id, rup_id] for src_id, rup_id in zip(ctxt.src_id, ctxt.rup_id)]) update_pmap_m(self.array, poes, rates, probs_occur, weights, sidxs, itime)
def __invert__(self): return self.new(1. - self.array) def __pow__(self, n): return self.new(self.array ** n) def __iadd__(self, other): # used in calc.mean_rates sidx = self.sidx[other.sids] G = other.array.shape[2] # NLG for i, g in enumerate(other.gid): iadd(self.array[:, :, g], other.array[:, :, i % G], sidx) return self def __repr__(self): tup = self.shape + (humansize(self.array.nbytes),) return f'<{self.__class__.__name__}(%d, %d, %d)[%s]>' % tup
[docs]@compile("(float32[:, :], float32[:, :], uint32[:])") def iadd(arr, array, sidx): for i, sid in enumerate(sidx): arr[sid] += array[i]
[docs]def from_rates_g(rates_g, g, sids): """ :param rates_g: an array of shape (N, L) :param g: an integer representing a GSIM index :param sids: an array of site IDs """ outs = [] for lid, rates in enumerate(rates_g.T): idxs, = rates.nonzero() if len(idxs): out = numpy.zeros(len(idxs), rates_dt) out['sid'] = sids[idxs] out['lid'] = lid out['gid'] = g out['rate'] = rates[idxs] outs.append(out) if not outs: return numpy.array([], rates_dt) elif len(outs) == 1: return outs[0] return numpy.concatenate(outs, dtype=rates_dt)
[docs]class RateMap: """ A kind of MapArray specifically for rates """ sidx = MapArray.sidx size_mb = MapArray.size_mb __repr__ = MapArray.__repr__ def __init__(self, sids, L, gids): self.sids = sids self.shape = len(sids), L, len(gids) self.array = numpy.zeros(self.shape, F32) self.jid = {g: j for j, g in enumerate(gids)} def __iadd__(self, other): G = self.shape[2] sidx = self.sidx[other.sids] for i, g in enumerate(other.gid): iadd(self.array[:, :, self.jid[g]], other.array[:, :, i % G], sidx) return self
[docs] def to_array(self, g): """ Assuming self contains an array of rates, returns a composite array with fields sid, lid, gid, rate """ rates_g = self.array[:, :, self.jid[g]] return from_rates_g(rates_g, g, self.sids)