Source code for openquake.commands.sample

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

import shutil
import numpy
import pandas
from openquake.hazardlib import (
    valid, nrml, sourceconverter, sourcewriter, logictree)
from openquake.baselib import general


[docs]def save_bak(fname, node, num_nodes, total): shutil.copy(fname, fname + '.bak') print('Copied the original file in %s.bak' % fname) with open(fname, 'wb') as f: nrml.write(node, f, xmlns=node['xmlns']) print('Extracted %d nodes out of %d' % (num_nodes, total))
[docs]def reduce_source_model(fname, reduction_factor, itime): """ Reduce the source model by sampling the sources; as a special case, multiPointSources are split in pointSources and then sampled. """ conv = sourceconverter.SourceConverter(area_source_discretization=20., investigation_time=itime) [smodel] = nrml.read_source_models([fname], conv) grp = smodel.src_groups[0] if any(src.code == b'M' for src in grp): # multiPoint for src in grp: if src.code == b'M': grp.sources = general.random_filter(src, reduction_factor) print('Extracted %d point sources out of %d' % (len(grp), len(src))) break elif any(src.code == b'F' for src in grp): # multiFault for src in grp: if src.code == b'F': rids = numpy.arange(src.count_ruptures()) ok = general.random_filter(rids, reduction_factor) src.mags = src.mags[ok] src.rakes = src.rakes[ok] src.probs_occur = src.probs_occur[ok] src._rupture_idxs = [src._rupture_idxs[o] for o in ok] print('Extracted %d ruptures out of %d' % (len(ok), len(rids))) break else: total = len(grp) grp.sources = general.random_filter(grp, reduction_factor) print('Extracted %d nodes out of %d' % (len(grp), total)) smodel.src_groups = [grp] shutil.copy(fname, fname + '.bak') print('Copied the original file in %s.bak' % fname) sourcewriter.write_source_model(fname, smodel)
[docs]def main(fname, reduction_factor: valid.probability, investigation_time: valid.positivefloat = 50.): """ Produce a submodel from `fname` by sampling the nodes randomly. Supports source models, site models and exposure models. As a special case, it is also able to reduce .csv files by sampling the lines. This is a debugging utility to reduce large computations to small ones. """ if fname.endswith('.csv'): df = pandas.read_csv(fname, dtype=str) idxs = general.random_filter(numpy.arange(len(df)), reduction_factor) shutil.copy(fname, fname + '.bak') print('Copied the original file in %s.bak' % fname) df.loc[idxs].to_csv(fname, index=False, lineterminator='\r\n', na_rep='nan') print('Extracted %d lines out of %d' % (len(idxs), len(df))) return elif fname.endswith('.npy'): array = numpy.load(fname) shutil.copy(fname, fname + '.bak') print('Copied the original file in %s.bak' % fname) arr = numpy.array(general.random_filter(array, reduction_factor)) numpy.save(fname, arr) print('Extracted %d rows out of %d' % (len(arr), len(array))) return node = nrml.read(fname) model = node[0] if model.tag.endswith('exposureModel'): total = len(model.assets) model.assets.nodes = general.random_filter( model.assets, reduction_factor) num_nodes = len(model.assets) elif model.tag.endswith('siteModel'): total = len(model) model.nodes = general.random_filter(model, reduction_factor) num_nodes = len(model) elif model.tag.endswith('sourceModel'): reduce_source_model(fname, reduction_factor, investigation_time) return elif model.tag.endswith('logicTree'): for smpath in logictree.collect_info(fname).smpaths: reduce_source_model(smpath, reduction_factor, investigation_time) return else: raise RuntimeError('Unknown model tag: %s' % model.tag) save_bak(fname, node, num_nodes, total)
main.fname = 'path to the model file' main.reduction_factor = 'reduction factor in the range 0..1' main.investigation_time = 'investigation_time used in read_source_models'