Source code for openquake.hmtk.seismicity.occurrence.penalized_mle

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4

#
# LICENSE
#
# Copyright (C) 2015-2023 GEM Foundation, G. Weatherill, M. Pagani
#
# The Hazard Modeller's Toolkit is free software: you can redistribute
# it and/or modify it under the terms of the GNU Affero General Public
# License as published by the Free Software Foundation, either version
# 3 of the License, or (at your option) any later version.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>
#
# DISCLAIMER
#
# The software Hazard Modeller's Toolkit (openquake.hmtk) provided herein
# is released as a prototype implementation on behalf of
# scientists and engineers working within the GEM Foundation (Global
# Earthquake Model).
#
# It is distributed for the purpose of open collaboration and in the
# hope that it will be useful to the scientific, engineering, disaster
# risk and software design communities.
#
# The software is NOT distributed as part of GEM’s OpenQuake suite
# (https://www.globalquakemodel.org/tools-products) and must be considered as a
# separate entity. The software provided herein is designed and implemented
# by scientific staff. It is not developed to the design standards, nor
# subject to same level of critical review by professional software
# developers, as GEM’s OpenQuake software suite.
#
# Feedback and contribution to the software is welcome, and can be
# directed to the hazard scientific staff of the GEM Model Facility
# (hazard@globalquakemodel.org).
#
# The Hazard Modeller's Toolkit (openquake.hmtk) is therefore distributed
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# The GEM Foundation, and the authors of the software, assume no
# liability for use of the software.

import numpy as np
from openquake.hmtk.seismicity.occurrence.base import (
    SeismicityOccurrence,
    OCCURRENCE_METHODS,
)


[docs]@OCCURRENCE_METHODS.add( "calculate", completeness=True, reference_magnitude=0.0, mmax=None, b_prior=1.0, b_prior_weight=1.0, area=0.0, a_prior_weight=1.0, ) class PenalizedMLE(SeismicityOccurrence): """ Test Implementation of the Penalized Maximum Likelihood function """ IERR = { 1: "No events in catalogue - returning prior", 2: "Failure of convergence - returning prior", } def _null_outcome(self, config, ierr): """ Method to handle failures and return information on their causes :param dict config: Configuration of method :param int ierr: Error code """ print(self.IERR[ierr]) a_4 = 0.05 * config["area"] return ( config["b_prior"], 0.0, ( 10.0 ** ( np.log10(a_4) + (4.0 - config["reference_magnitude"]) * config["b_prior"] ), 0.0, ), )
[docs] def calculate(self, catalogue, config, completeness): """ Calculates the b-value and rates (and their corresponding standard deviations) using the Penalized MLE approach :param dict config: Configuration parameters :param catalogue: Earthquake catalogue as instance of :class: openquake.hmtk.seismicity.catalogue.Catalogue :param completeness: Completeness table :returns: b-value, standard deviation on b, rate (or a-value), standard deviation on a """ # Setup if config["b_prior"]: betap = config["b_prior"] * np.log(10.0) beta = np.copy(betap) wbu = config["b_prior_weight"] / np.log(10.0) wau = config["a_prior_weight"] if config["a_prior_weight"]: config["a_prior_weight"] else: pass else: # No prior assumed. Take initial b-value of 1.0 for beta betap = 0.0 beta = np.log(10.0) wbu = 1.0e-5 wau = 0.0 # Get the counts of earthquakes in their completeness windows ( delta, kval, tval, _lamda, _cum_rate, cum_count, _nmx, _nmt, ) = self._get_rate_counts(catalogue, config, completeness) n_val = np.sum(kval) if not n_val: return self._null_outcome(config, 1) assert n_val == cum_count[0] # Get the penalized MLE value (also returns correlation coefficient # rho - but not used!) bval, sigmab, rate, sigma_rate, _rho = self._run_penalized_mle( config, delta, kval, tval, cum_count, betap, beta, wbu, wau ) np.log10(rate) + bval * completeness[0, 1] if ( "reference_magnitude" in config.keys() and config["reference_magnitude"] ): dm = config["reference_magnitude"] - completeness[0, 1] rate = 10.0 ** (np.log10(rate) - bval * dm) sigma_rate = ( 10.0 ** (np.log10(rate + sigma_rate) - bval * dm) - rate ) else: dm = -completeness[0, 1] rate = np.log10(rate) - bval * dm sigma_rate = np.log10(rate + sigma_rate) - np.log10(rate) rate = np.log10(rate) return bval, sigmab, rate, sigma_rate
def _run_penalized_mle( self, config, delta, kval, tval, cum_count, betap, beta, wbu, wau ): """ Implements the core of the penalised maximum likelihood method for the b-value """ nrloop = 0 while nrloop <= 10: e_b = np.exp(beta * delta) deb = e_b[:-1] - e_b[1:] skmeb = np.sum( kval * ((delta[:-1] * e_b[:-1]) - (delta[1:] * e_b[1:])) / deb ) skm2eb = np.sum( kval * ( ( ( ((delta[:-1] ** 2.0) * e_b[:-1]) - ((delta[1:] ** 2.0) * e_b[1:]) ) / deb ) - (((delta[:-1] * e_b[:-1]) - (delta[1:] * e_b[1:])) / deb) ** 2.0 ) ) sateb = np.sum(config["area"] * tval * deb) satmeb = np.sum( config["area"] * tval * ((delta[:-1] * e_b[:-1]) - (delta[1:] * e_b[1:])) ) satm2eb = np.sum( config["area"] * tval * ( ((delta[:-1] ** 2.0) * e_b[:-1]) - ((delta[1:] ** 2.0) * e_b[1:]) ) ) dldb = ( skmeb - cum_count[0] * (satmeb / sateb) - (wbu * (beta - betap)) ) d2ldb2 = ( skm2eb - cum_count[0] * (satm2eb / sateb - (satmeb / sateb) ** 2.0) - wbu ) beta0 = np.copy(beta) am0 = cum_count[0] * (1.0 - e_b[-1]) / sateb if cum_count[1]: # More than one interval beta -= dldb / d2ldb2 if beta < 0.0: if nrloop > 10: # Total failure of convergence - return prior return self._null_outcome(config, 2) nrloop += 1 beta = np.log(10.0) / (1.0 + float(nrloop)) continue if np.abs(beta - beta0) < 1.0e-5: break else: break bval = beta / np.log(10.0) v11 = (cum_count[0] / ((am0 * config["area"]) ** 2.0)) + (wau / am0) v22 = -d2ldb2 * (np.log(10.0) ** 2.0) v12 = ( np.log(10.0) * ( (satmeb / (1.0 - e_b[-1])) + delta[-1] * e_b[-1] * sateb / ((1.0 - e_b[-1]) ** 2.0) ) / config["area"] ) vmat = np.array([[v11, v12], [v12, v22]]) error_mat = np.linalg.inv(vmat) sigmab = np.sqrt(error_mat[1, 1]) sigma_rate = np.sqrt(error_mat[0, 0]) rho = error_mat[0, 1] / np.sqrt(error_mat[0, 0] * error_mat[1, 1]) return bval, sigmab, am0 * config["area"], sigma_rate, rho def _get_rate_counts(self, catalogue, config, completeness): """ Using the earthquake catalogue and the completeness table determine the number of complete earthquakes in each time and magnitude bin :returns: delta: Mmin - Mi kval: Number of earthquakes in magnitude bin tval: Effective duration of completeness for magnitude bin lamda: Rate of earthquake in bin cum_count: Number of earthquakes >= Mi nmx: Number of magnitude bins nmt: number of time bins """ # If the observed mmax is greater than the specified mmax then replace # with observed Mmax mmax_inp = np.max( [config["mmax"], np.max(catalogue.data["magnitude"])] ) # Stack a maximum magnitude on the completeness magnitudes if mmax_inp > np.max(completeness[:, 1]): cmag = np.hstack([completeness[:, 1], mmax_inp + 1.0e-10]) high_event = True else: cmag = np.hstack( [completeness[:, 1], completeness[-1, 1] + 1.0e-10] ) high_event = False # Pre-pend the last year of the catalogue as the completeness year cyear = np.hstack([catalogue.end_year + 1, completeness[:, 0]]) count_table = np.zeros([len(cmag) - 1, len(cyear) - 1]) nmx, nmt = count_table.shape count_years = np.zeros_like(count_table) for i in range(len(cyear) - 1): time_idx = np.logical_and( catalogue.data["dtime"] < cyear[i], catalogue.data["dtime"] >= cyear[i + 1], ) nyrs = cyear[i] - cyear[i + 1] sel_mags = catalogue.data["magnitude"][time_idx] for j in range(i, len(cmag) - 1): mag_idx = np.logical_and( sel_mags >= cmag[j], sel_mags < cmag[j + 1] ) count_table[j, i] += float(np.sum(mag_idx)) count_years[j, i] += float(nyrs) delta = cmag[0] - cmag if not high_event: # Remove last row delta = delta[:-1] count_table = count_table[:-1, :] count_years = count_years[:-1, :] nmx -= 1 nmt -= 1 kval = np.sum(count_table, axis=1) tval = np.sum(count_years, axis=1) lamda = kval / tval cum_rates = np.zeros_like(count_table) cum_count = np.zeros_like(count_table) # Get cumulative values for i in range(nmt): cum_rates[:, i] = np.sum(cum_rates[:, i:], axis=1) cum_count[:, i] = np.sum(cum_count[:, i:], axis=1) cum_rate = np.array([np.sum(lamda[i:]) for i in range(nmx)]) cum_count = np.array([np.sum(kval[i:]) for i in range(nmx)]) return delta, kval, tval, lamda, cum_rate, cum_count, nmx, nmt