# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2021, GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import re
import math
import scipy
import numpy as np
from openquake.baselib.general import RecordBuilder
from openquake.hazardlib.imt import from_string
SA_LIKE_PREFIXES = ['SA', 'EA', 'FA', 'DR']
class CoeffsTable(object):
r"""
Instances of :class:`CoeffsTable` encapsulate tables of coefficients
corresponding to different IMTs.
Tables are defined in a space-separated tabular form in a simple string
literal (heading and trailing whitespace does not matter). The first column
in the table must be named "IMT" (or "imt") and thus should represent IMTs:
>>> CoeffsTable(table='''imf z
... pga 1''')
Traceback (most recent call last):
...
ValueError: first column in a table must be IMT
Names of other columns are used as coefficients dicts keys. The values
in the first column should correspond to real intensity measure types,
see :mod:`openquake.hazardlib.imt`:
>>> CoeffsTable(table='''imt z
... pgx 2''')
Traceback (most recent call last):
...
KeyError: 'PGX'
Note that :class:`CoeffsTable` requires passing the arguments explicitly.
>>> CoeffsTable(table='', foo=1)
Traceback (most recent call last):
...
TypeError: CoeffsTable got unexpected kwargs: {'foo': 1}
If there are :class:`~openquake.hazardlib.imt.SA` IMTs in the table, they
are not referenced by name, because they require parametrization:
>>> CoeffsTable(table='''imt x
... sa 15''')
Traceback (most recent call last):
...
ValueError: specify period as float value to declare SA IMT
So proper table defining SA looks like this:
>>> ct = CoeffsTable(sa_damping=5, table='''
... imt a b c d
... pga 1 2.4 -5 0.01
... pgd 7.6 12 0 44.1
... 0.1 10 20 30 40
... 1.0 1 2 3 4
... 10 2 4 6 8
... ''')
Table objects could be indexed by IMT objects (this returns a dictionary
of coefficients):
>>> from openquake.hazardlib import imt
>>> ct[imt.PGA()]
(1., 2.4, -5., 0.01)
>>> ct[imt.PGD()]
(7.6, 12., 0., 44.1)
>>> ct[imt.SA(damping=5, period=0.1)]
(10., 20., 30., 40.)
>>> ct[imt.PGV()]
Traceback (most recent call last):
...
KeyError: PGV
>>> ct[imt.SA(1.0, 4)]
Traceback (most recent call last):
...
KeyError: SA(1.0, 4)
Table of coefficients for spectral acceleration could be indexed
by instances of :class:`openquake.hazardlib.imt.SA` with period
value that is not specified in the table. The coefficients then
get interpolated between the ones for closest higher and closest
lower period. That scaling of coefficients works in a logarithmic
scale of periods and only within the same damping:
>>> '%.5f' % ct[imt.SA(period=0.2, damping=5)]['a']
'7.29073'
>>> '%.5f' % ct[imt.SA(period=0.9, damping=5)]['c']
'4.23545'
>>> '%.5f' % ct[imt.SA(period=5, damping=5)]['c']
'5.09691'
>>> ct[imt.SA(period=0.9, damping=15)]
Traceback (most recent call last):
...
KeyError: SA(0.9, 15)
Extrapolation is not possible:
>>> ct[imt.SA(period=0.01, damping=5)]
Traceback (most recent call last):
...
KeyError: SA(0.01)
It is also possible to instantiate a table from a tuple of dictionaries,
corresponding to the SA coefficients and non-SA coefficients:
>>> coeffs = {imt.SA(0.1): {"a": 1.0, "b": 2.0},
... imt.SA(1.0): {"a": 3.0, "b": 4.0},
... imt.PGA(): {"a": 0.1, "b": 1.0},
... imt.PGV(): {"a": 0.5, "b": 10.0}}
>>> ct = CoeffsTable.fromdict(coeffs)
"""
@classmethod
def fromdict(cls, ddic, logratio=True, opt=0):
"""
:param ddic: a dictionary of dictionaries
:param logratio: flag (default True)
:param opt: int (default 0)
"""
firstdic = ddic[next(iter(ddic))]
self = object.__new__(cls)
self.rb = RecordBuilder(**firstdic)
self._coeffs = {imt: self.rb(**dic) for imt, dic in ddic.items()}
self.logratio = logratio
self.opt = opt
return self
def __init__(self, table, **kwargs):
self._coeffs = {} # cache
self.opt = kwargs.pop('opt', 0)
self.logratio = kwargs.pop('logratio', True)
sa_damping = kwargs.pop('sa_damping', None)
if kwargs:
raise TypeError('CoeffsTable got unexpected kwargs: %r' % kwargs)
self.rb = self._setup_table_from_str(table, sa_damping)
if self.opt == 1:
keys = list(self._coeffs.keys())
num_coeff = len(self._coeffs[keys[0]])
self.cmtx = np.zeros((len(self._coeffs.keys()), num_coeff))
periods = np.array([i.period for i in keys])
idxs = np.argsort(periods)
tmp = []
for i, idx in enumerate(idxs):
key = keys[i]
tmp.append(np.array(self._coeffs[key].tolist()))
tmp = np.array(tmp)
self.cmtx = tmp[idxs, :]
self.periods = periods[idxs]
def _setup_table_from_str(self, table, sa_damping):
"""
Builds the input tables from a string definition
"""
lines = table.strip().splitlines()
header = lines.pop(0).split()
if not header[0].upper() == "IMT":
raise ValueError('first column in a table must be IMT')
dt = RecordBuilder(**{name: 0. for name in header[1:]})
for line in lines:
row = line.split()
imt_name_or_period = row[0].upper()
if imt_name_or_period == 'SA': # protect against stupid mistakes
raise ValueError('specify period as float value '
'to declare SA IMT')
imt = from_string(imt_name_or_period, sa_damping)
self._coeffs[imt] = dt(*row[1:])
return dt
@property
def sa_coeffs(self):
return {imt: self._coeffs[imt] for imt in self._coeffs
if imt.string[:2] in SA_LIKE_PREFIXES}
@property
def non_sa_coeffs(self):
return {imt: self._coeffs[imt] for imt in self._coeffs
if imt.string[:2] not in SA_LIKE_PREFIXES}
def get_coeffs(self, coeff_list):
"""
:param coeff_list:
A list with the names of the coefficients
"""
coeffs = []
pof = []
for imt in self._coeffs:
if re.search('^(SA|EAS|FAS|DRVT)', imt.string):
tmp = np.array(self._coeffs[imt])
coeffs.append([tmp[i] for i in coeff_list])
if re.search('^(SA)', imt.string):
pof.append(imt.period)
elif re.search('^(EAS|FAS|DRVT)', imt.string):
pof.append(imt.frequency)
else:
raise ValueError('Unknown IMT: {:s}'.format(imt.string))
pof = np.array(pof)
coeffs = np.array(coeffs)
idx = np.argsort(pof)
pof = pof[idx]
coeffs = coeffs[idx, :]
return pof, coeffs
def __getitem__(self, imt):
"""
Return a dictionary of coefficients corresponding to ``imt``
from this table (if there is a line for requested IMT in it),
or the dictionary of interpolated coefficients, if ``imt`` is
of type :class:`~openquake.hazardlib.imt.SA` and interpolation
is possible.
:raises KeyError:
If ``imt`` is not available in the table and no interpolation
can be done.
"""
try: # see if already in cache
return self._coeffs[imt]
except KeyError: # populate the cache
pass
if self.opt == 0:
max_below = min_above = None
for unscaled_imt in list(self.sa_coeffs):
if unscaled_imt.damping != getattr(imt, 'damping', None):
pass
elif unscaled_imt.period > imt.period:
if (min_above is None or
unscaled_imt.period < min_above.period):
min_above = unscaled_imt
elif unscaled_imt.period < imt.period:
if (max_below is None or
unscaled_imt.period > max_below.period):
max_below = unscaled_imt
if max_below is None or min_above is None:
raise KeyError(imt)
if self.logratio: # regular case
# ratio tends to 1 when target period tends to a minimum
# known period above and to 0 if target period is close
# to maximum period below.
ratio = ((math.log(imt.period) - math.log(max_below.period)) /
(math.log(min_above.period) -
math.log(max_below.period)))
else: # in the ACME project
ratio = ((imt.period - max_below.period) /
(min_above.period - max_below.period))
below = self.sa_coeffs[max_below]
above = self.sa_coeffs[min_above]
lst = [(above[n] - below[n]) * ratio + below[n]
for n in self.rb.names]
self._coeffs[imt] = c = self.rb(*lst)
elif self.opt == 1:
if imt.period < self.periods[0] or imt.period > self.periods[-1]:
raise KeyError(imt)
fit = scipy.interpolate.interp1d(np.log10(self.periods), self.cmtx,
axis=0, kind='cubic')
vals = fit(np.log10(imt.period))
self._coeffs[imt] = c = self.rb(*vals)
return c
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, ' '.join(self.rb.names))