Source code for openquake.calculators.classical

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2019 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import os
import copy
import time
import logging
import operator
from datetime import datetime
import itertools
import numpy

from openquake.baselib import parallel, hdf5
from openquake.baselib.general import AccumDict, block_splitter
from openquake.hazardlib import mfd
from openquake.hazardlib.contexts import (
    ContextMaker, Effect, get_effect_by_mag)
from openquake.hazardlib.calc.filters import split_sources, getdefault
from openquake.hazardlib.calc.hazard_curve import classical
from openquake.hazardlib.probability_map import ProbabilityMap
from openquake.hazardlib.site_amplification import Amplifier
from openquake.commonlib import calc, util, logs
from openquake.commonlib.source_reader import random_filtered_sources
from openquake.calculators import getters
from openquake.calculators import base

U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64
MINWEIGHT = 1000
weight = operator.attrgetter('weight')
grp_extreme_dt = numpy.dtype([('grp_id', U16), ('grp_name', hdf5.vstr),
                             ('extreme_poe', F32)])


[docs]def get_src_ids(sources): """ :returns: a string with the source IDs of the given sources, stripping the extension after the colon, if any """ src_ids = [] for src in sources: long_src_id = src.source_id try: src_id, ext = long_src_id.rsplit(':', 1) except ValueError: src_id = long_src_id src_ids.append(src_id) return ' '.join(set(src_ids))
[docs]def get_extreme_poe(array, imtls): """ :param array: array of shape (L, G) with L=num_levels, G=num_gsims :param imtls: DictArray imt -> levels :returns: the maximum PoE corresponding to the maximum level for IMTs and GSIMs """ return max(array[imtls(imt).stop - 1].max() for imt in imtls)
# NB: this is NOT called if split_by_magnitude is true
[docs]def classical_split_filter(srcs, srcfilter, gsims, params, monitor): """ Split the given sources, filter the subsources and the compute the PoEs. Yield back subtasks if the split sources contain more than maxweight ruptures. """ # first check if we are sampling the sources ss = int(os.environ.get('OQ_SAMPLE_SOURCES', 0)) if ss: splits, stime = split_sources(srcs) srcs = random_filtered_sources(splits, srcfilter, ss) yield classical(srcs, srcfilter, gsims, params, monitor) return # NB: splitting all the sources improves the distribution significantly, # compared to splitting only the big sources with monitor("splitting/filtering sources"): splits, _stime = split_sources(srcs) sources = list(srcfilter.filter(splits)) if not sources: yield {'pmap': {}} return maxw = min(sum(src.weight for src in sources)/5, params['max_weight']) if maxw < MINWEIGHT*5: # task too small to be resubmitted yield classical(sources, srcfilter, gsims, params, monitor) return blocks = list(block_splitter(sources, maxw, weight)) subtasks = len(blocks) - 1 for block in blocks[:-1]: yield classical, block, srcfilter, gsims, params if monitor.calc_id and subtasks: msg = 'produced %d subtask(s) with max weight=%d' % ( subtasks, max(b.weight for b in blocks)) try: logs.dbcmd('log', monitor.calc_id, datetime.utcnow(), 'DEBUG', 'classical_split_filter#%d' % monitor.task_no, msg) except Exception: # a foreign key error in case of `oq run` is expected print(msg) yield classical(blocks[-1], srcfilter, gsims, params, monitor)
[docs]def split_by_mag(sources): """ Split sources by magnitude """ out = [] for src in sources: if hasattr(src, 'get_annual_occurrence_rates'): for mag, rate in src.get_annual_occurrence_rates(): new = copy.copy(src) new.mfd = mfd.ArbitraryMFD([mag], [rate]) new.num_ruptures = new.count_ruptures() out.append(new) else: # nonparametric source # data is a list of pairs (rup, pmf) for mag, group in itertools.groupby( src.data, lambda pair: pair[0].mag): new = src.__class__(src.source_id, src.name, src.tectonic_region_type, list(group)) out.append(new) return out
[docs]def preclassical(srcs, srcfilter, gsims, params, monitor): """ Prefilter the sources """ calc_times = AccumDict(accum=numpy.zeros(3, F32)) # nrups, nsites, time pmap = AccumDict(accum=0) with monitor("splitting/filtering sources"): splits, _stime = split_sources(srcs) for src in splits: t0 = time.time() if srcfilter.get_close_sites(src) is None: continue dt = time.time() - t0 calc_times[src.id] += F32([src.num_ruptures, src.nsites, dt]) for grp_id in src.src_group_ids: pmap[grp_id] += 0 return dict(pmap=pmap, calc_times=calc_times, rup_data={'grp_id': []}, extra=dict(task_no=monitor.task_no, totrups=src.num_ruptures))
[docs]@base.calculators.add('classical') class ClassicalCalculator(base.HazardCalculator): """ Classical PSHA calculator """ core_task = classical_split_filter accept_precalc = ['classical']
[docs] def agg_dicts(self, acc, dic): """ Aggregate dictionaries of hazard curves by updating the accumulator. :param acc: accumulator dictionary :param dic: dict with keys pmap, calc_times, rup_data """ # NB: dic should be a dictionary, but when the calculation dies # for an OOM it can become None, thus giving a very confusing error if dic is None: raise MemoryError('You ran out of memory!') if not dic['pmap']: return acc with self.monitor('aggregate curves'): extra = dic['extra'] self.totrups += extra['totrups'] d = dic['calc_times'] # srcid -> eff_rups, eff_sites, dt self.calc_times += d srcids = [] eff_rups = 0 eff_sites = 0 for srcid, rec in d.items(): srcids.append(srcid) eff_rups += rec[0] if rec[0]: eff_sites += rec[1] / rec[0] self.by_task[extra['task_no']] = ( eff_rups, eff_sites, U32(srcids)) for grp_id, pmap in dic['pmap'].items(): if pmap: acc[grp_id] |= pmap acc.eff_ruptures[grp_id] += eff_rups rup_data = dic['rup_data'] nr = len(rup_data['grp_id']) if nr: default = (numpy.ones(nr, F32) * numpy.nan, [numpy.zeros(0, F32)] * nr) for k in self.rparams: # variable lenght arrays vlen = k.endswith('_') or k == 'probs_occur' try: v = rup_data[k] except KeyError: v = default[vlen] if vlen: self.datastore.hdf5.save_vlen('rup/' + k, v) else: dset = self.datastore['rup/' + k] hdf5.extend(dset, v) return acc
[docs] def acc0(self): """ Initial accumulator, a dict grp_id -> ProbabilityMap(L, G) """ zd = AccumDict() num_levels = len(self.oqparam.imtls.array) rparams = {'grp_id', 'occurrence_rate', 'weight', 'probs_occur', 'sid_', 'lon_', 'lat_', 'rrup_'} gsims_by_trt = self.csm_info.get_gsims_by_trt() for sm in self.csm_info.source_models: for grp in sm.src_groups: gsims = gsims_by_trt[grp.trt] cm = ContextMaker(grp.trt, gsims) rparams.update(cm.REQUIRES_RUPTURE_PARAMETERS) for dparam in cm.REQUIRES_DISTANCES: rparams.add(dparam + '_') zd[grp.id] = ProbabilityMap(num_levels, len(gsims)) zd.eff_ruptures = AccumDict(accum=0) # grp_id -> eff_ruptures self.rparams = sorted(rparams) for k in self.rparams: # variable length arrays vlen = k.endswith('_') or k == 'probs_occur' if k == 'grp_id': dt = U16 elif k == 'sid_': dt = hdf5.vuint16 elif vlen: dt = hdf5.vfloat32 else: dt = F32 self.datastore.create_dset('rup/' + k, dt) rparams = [p for p in self.rparams if not p.endswith('_')] dparams = [p[:-1] for p in self.rparams if p.endswith('_')] logging.info('Scalar parameters %s', rparams) logging.info('Vector parameters %s', dparams) self.by_task = {} # task_no => src_ids self.totrups = 0 # total number of ruptures before collapsing return zd
[docs] def execute(self): """ Run in parallel `core_task(sources, sitecol, monitor)`, by parallelizing on the sources according to their weight and tectonic region type. """ oq = self.oqparam if oq.hazard_calculation_id and not oq.compare_with_classical: with util.read(self.oqparam.hazard_calculation_id) as parent: self.csm_info = parent['csm_info'] self.calc_stats() # post-processing return {} mags = self.datastore['source_mags'][()] if len(mags) == 0: # everything was discarded raise RuntimeError('All sources were discarded!?') gsims_by_trt = self.csm_info.get_gsims_by_trt() dist_bins = {trt: oq.maximum_distance.get_dist_bins(trt) for trt in gsims_by_trt} if oq.pointsource_distance: logging.info('Computing effect of the ruptures') mon = self.monitor('rupture effect') effect = parallel.Starmap.apply( get_effect_by_mag, (mags, self.sitecol.one(), gsims_by_trt, oq.maximum_distance, oq.imtls, mon)).reduce() self.datastore['effect'] = effect self.datastore.set_attrs('effect', **dist_bins) self.effect = { trt: Effect({mag: effect[mag][:, t] for mag in effect}, dist_bins[trt], getdefault(oq.pointsource_distance, trt)) for t, trt in enumerate(gsims_by_trt)} for trt, eff in self.effect.items(): oq.maximum_distance.magdist[trt] = eff.dist_by_mag() oq.pointsource_distance[trt] = eff.dist_by_mag( eff.collapse_value) else: self.effect = {} smap = parallel.Starmap( self.core_task.__func__, h5=self.datastore.hdf5, num_cores=oq.num_cores) smap.task_queue = list(self.gen_task_queue()) # really fast acc0 = self.acc0() # create the rup/ datasets BEFORE swmr_on() self.datastore.swmr_on() smap.h5 = self.datastore.hdf5 self.calc_times = AccumDict(accum=numpy.zeros(3, F32)) try: acc = smap.get_results().reduce(self.agg_dicts, acc0) self.store_rlz_info(acc.eff_ruptures) finally: with self.monitor('store source_info'): self.store_source_info(self.calc_times) if self.by_task: logging.info('Storing by_task information') num_tasks = max(self.by_task) + 1, er = self.datastore.create_dset('by_task/eff_ruptures', U32, num_tasks) es = self.datastore.create_dset('by_task/eff_sites', U32, num_tasks) si = self.datastore.create_dset('by_task/srcids', hdf5.vuint32, num_tasks, fillvalue=None) for task_no, rec in self.by_task.items(): effrups, effsites, srcids = rec er[task_no] = effrups es[task_no] = effsites si[task_no] = srcids self.by_task.clear() numrups = sum(arr[0] for arr in self.calc_times.values()) numsites = sum(arr[1] for arr in self.calc_times.values()) logging.info('Effective number of ruptures: %d/%d', numrups, self.totrups) logging.info('Effective number of sites per rupture: %d', numsites / numrups) self.calc_times.clear() # save a bit of memory return acc
[docs] def gen_task_queue(self): """ Build a task queue to be attached to the Starmap instance """ oq = self.oqparam gsims_by_trt = self.csm_info.get_gsims_by_trt() trt_sources = self.csm.get_trt_sources(optimize_dupl=True) del self.csm # save memory def srcweight(src): trt = src.tectonic_region_type g = len(gsims_by_trt[trt]) m = (oq.maximum_distance(trt) / 300) ** 2 return src.weight * g * m totweight = sum(sum(srcweight(src) for src in sources) for trt, sources, atomic in trt_sources) param = dict( truncation_level=oq.truncation_level, imtls=oq.imtls, filter_distance=oq.filter_distance, reqv=oq.get_reqv(), maximum_distance=oq.maximum_distance, pointsource_distance=oq.pointsource_distance, shift_hypo=oq.shift_hypo, max_weight=oq.max_weight, max_sites_disagg=oq.max_sites_disagg) srcfilter = self.src_filter(self.datastore.tempname) if oq.calculation_mode == 'preclassical': f1 = f2 = preclassical elif oq.split_by_magnitude: f1 = f2 = classical else: f1, f2 = classical, classical_split_filter C = oq.concurrent_tasks or 1 for trt, sources, atomic in trt_sources: param['effect'] = self.effect.get(trt) gsims = gsims_by_trt[trt] if atomic: # do not split atomic groups nb = 1 yield f1, (sources, srcfilter, gsims, param) else: # regroup the sources in blocks if oq.split_by_magnitude: sources = split_by_mag(sources) blocks = list(block_splitter(sources, totweight/C, srcweight)) nb = len(blocks) for block in blocks: logging.debug('Sending %d sources with weight %d', len(block), block.weight) yield f2, (block, srcfilter, gsims, param) nr = sum(src.weight for src in sources) logging.info('TRT = %s', trt) logging.info('max_dist=%d km, gsims=%d, ruptures=%d, blocks=%d', oq.maximum_distance(trt), len(gsims), nr, nb)
[docs] def save_hazard(self, acc, pmap_by_kind): """ Works by side effect by saving hcurves and hmaps on the datastore :param acc: ignored :param pmap_by_kind: a dictionary of ProbabilityMaps kind can be ('hcurves', 'mean'), ('hmaps', 'mean'), ... """ with self.monitor('saving statistics'): for kind in pmap_by_kind: # i.e. kind == 'hcurves-stats' pmaps = pmap_by_kind[kind] if kind in ('hmaps-rlzs', 'hmaps-stats'): # pmaps is a list of R pmaps dset = self.datastore.getitem(kind) for r, pmap in enumerate(pmaps): for s in pmap: dset[s, r] = pmap[s].array # shape (M, P) elif kind in ('hcurves-rlzs', 'hcurves-stats'): dset = self.datastore.getitem(kind) for r, pmap in enumerate(pmaps): for s in pmap: dset[s, r] = pmap[s].array[:, 0] # shape L self.datastore.flush()
[docs] def post_execute(self, pmap_by_grp_id): """ Collect the hazard curves by realization and export them. :param pmap_by_grp_id: a dictionary grp_id -> hazard curves """ oq = self.oqparam trt_by_grp = self.csm_info.grp_by("trt") grp_name = {grp.id: grp.name for sm in self.csm_info.source_models for grp in sm.src_groups} data = [] with self.monitor('saving probability maps'): for grp_id, pmap in pmap_by_grp_id.items(): if pmap: # pmap can be missing if the group is filtered away base.fix_ones(pmap) # avoid saving PoEs == 1 trt = trt_by_grp[grp_id] key = 'poes/grp-%02d' % grp_id self.datastore[key] = pmap self.datastore.set_attrs(key, trt=trt) extreme = max( get_extreme_poe(pmap[sid].array, oq.imtls) for sid in pmap) data.append((grp_id, grp_name[grp_id], extreme)) if oq.hazard_calculation_id is None and 'poes' in self.datastore: self.datastore['disagg_by_grp'] = numpy.array( sorted(data), grp_extreme_dt) self.calc_stats()
[docs] def calc_stats(self): oq = self.oqparam hstats = oq.hazard_stats() # initialize datasets N = len(self.sitecol.complete) P = len(oq.poes) M = len(oq.imtls) if oq.soil_intensities is not None: L = M * len(oq.soil_intensities) else: L = len(oq.imtls.array) R = len(self.rlzs_assoc.realizations) S = len(hstats) if R > 1 and oq.individual_curves or not hstats: self.datastore.create_dset('hcurves-rlzs', F32, (N, R, L)) if oq.poes: self.datastore.create_dset('hmaps-rlzs', F32, (N, R, M, P)) if hstats: self.datastore.create_dset('hcurves-stats', F32, (N, S, L)) if oq.poes: self.datastore.create_dset('hmaps-stats', F32, (N, S, M, P)) ct = oq.concurrent_tasks logging.info('Building hazard statistics with %d concurrent_tasks', ct) weights = [rlz.weight for rlz in self.rlzs_assoc.realizations] if 'amplification' in oq.inputs: amplifier = Amplifier(oq.imtls, self.datastore['amplification'], oq.soil_intensities) amplifier.check(self.sitecol.vs30, oq.vs30_tolerance) else: amplifier = None allargs = [ # this list is very fast to generate (getters.PmapGetter(self.datastore, weights, t.sids, oq.poes), N, hstats, oq.individual_curves, oq.max_sites_disagg, amplifier) for t in self.sitecol.split_in_tiles(ct)] self.datastore.swmr_on() parallel.Starmap( build_hazard, allargs, h5=self.datastore.hdf5 ).reduce(self.save_hazard)
[docs]@base.calculators.add('preclassical') class PreCalculator(ClassicalCalculator): """ Calculator to filter the sources and compute the number of effective ruptures """ core_task = preclassical
[docs]def build_hazard(pgetter, N, hstats, individual_curves, max_sites_disagg, amplifier, monitor): """ :param pgetter: an :class:`openquake.commonlib.getters.PmapGetter` :param N: the total number of sites :param hstats: a list of pairs (statname, statfunc) :param individual_curves: if True, also build the individual curves :param max_sites_disagg: if there are less sites than this, store rup info :param amplifier: instance of Amplifier or None :param monitor: instance of Monitor :returns: a dictionary kind -> ProbabilityMap The "kind" is a string of the form 'rlz-XXX' or 'mean' of 'quantile-XXX' used to specify the kind of output. """ with monitor('read PoEs'): pgetter.init() if amplifier: ampcode = pgetter.dstore['sitecol'].ampcode imtls, poes, weights = pgetter.imtls, pgetter.poes, pgetter.weights M = len(imtls) L = len(imtls.array) if amplifier is None else len(amplifier.amplevels) * M R = len(weights) S = len(hstats) pmap_by_kind = {} if R > 1 and individual_curves or not hstats: pmap_by_kind['hcurves-rlzs'] = [ProbabilityMap(L) for r in range(R)] if hstats: pmap_by_kind['hcurves-stats'] = [ProbabilityMap(L) for r in range(S)] if poes: pmap_by_kind['hmaps-stats'] = [ProbabilityMap(L) for r in range(S)] combine_mon = monitor('combine pmaps', measuremem=False) compute_mon = monitor('compute stats', measuremem=False) for sid in pgetter.sids: with combine_mon: pcurves = pgetter.get_pcurves(sid) if amplifier: pcurves = amplifier.amplify(ampcode[sid], pcurves) if sum(pc.array.sum() for pc in pcurves) == 0: # no data continue with compute_mon: if hstats: arr = numpy.array([pc.array for pc in pcurves]) for s, (statname, stat) in enumerate(hstats.items()): pc = getters.build_stat_curve(arr, imtls, stat, weights) pmap_by_kind['hcurves-stats'][s][sid] = pc if poes: hmap = calc.make_hmap(pc, pgetter.imtls, poes, sid) pmap_by_kind['hmaps-stats'][s].update(hmap) if R > 1 and individual_curves or not hstats: for pmap, pc in zip(pmap_by_kind['hcurves-rlzs'], pcurves): pmap[sid] = pc if poes: pmap_by_kind['hmaps-rlzs'] = [ calc.make_hmap(pc, imtls, poes, sid) for pc in pcurves] return pmap_by_kind