Source code for openquake.commonlib.readinput

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2018 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import os
import csv
import copy
import zlib
import shutil
import zipfile
import logging
import tempfile
import configparser
import collections
import numpy

from openquake.baselib import performance
from openquake.baselib.general import (
    AccumDict, DictArray, deprecated, random_filter)
from openquake.baselib.python3compat import decode, zip
from openquake.baselib.node import Node
from openquake.hazardlib.const import StdDev
from openquake.hazardlib.source.base import BaseSeismicSource
from openquake.hazardlib.calc.gmf import CorrelationButNoInterIntraStdDevs
from openquake.hazardlib import (
    geo, site, imt, valid, sourceconverter, nrml, InvalidFile)
from openquake.hazardlib.probability_map import ProbabilityMap
from openquake.risklib import asset, riskinput
from openquake.risklib.riskmodels import get_risk_models
from openquake.commonlib.oqvalidation import OqParam
from openquake.commonlib import logictree, source, writers

# the following is quite arbitrary, it gives output weights that I like (MS)
NORMALIZATION_FACTOR = 1E-2
TWO16 = 2 ** 16  # 65,536
F32 = numpy.float32
U16 = numpy.uint16
U32 = numpy.uint32
U64 = numpy.uint64

Site = collections.namedtuple('Site', 'sid lon lat')
stored_event_dt = numpy.dtype([
    ('eid', U64), ('rup_id', U32), ('grp_id', U16), ('year', U32),
    ('ses', U32), ('sample', U32)])


[docs]class DuplicatedPoint(Exception): """ Raised when reading a CSV file with duplicated (lon, lat) pairs """
[docs]class LargeExposureGrid(Exception): msg = ''' The point of automatic gridding of the exposure is to reduce the hazard mesh, however you are increasing it from %d points to %d points. Or your region_grid_spacing (%d km) is too small or the bounding box of your exposure is too large. Currently you have longitudes in the range [%d, %d] and latitudes in the range [%d, %d], please plot your assets and check.''' def __init__(self, exposure_mesh, hazard_mesh, spacing): l1 = len(exposure_mesh) l2 = len(hazard_mesh) lon1, lon2 = exposure_mesh.lons.min(), exposure_mesh.lons.max() lat1, lat2 = exposure_mesh.lats.min(), exposure_mesh.lats.max() self.args = [self.msg % (l2, l1, spacing, lon1, lon2, lat1, lat2)]
[docs]def collect_files(dirpath, cond=lambda fullname: True): """ Recursively collect the files contained inside dirpath. :param dirpath: path to a readable directory :param cond: condition on the path to collect the file """ files = [] for fname in os.listdir(dirpath): fullname = os.path.join(dirpath, fname) if os.path.isdir(fullname): # navigate inside files.extend(collect_files(fullname)) else: # collect files if cond(fullname): files.append(fullname) return files
[docs]def extract_from_zip(path, candidates): """ Given a zip archive and a function to detect the presence of a given filename, unzip the archive into a temporary directory and return the full path of the file. Raise an IOError if the file cannot be found within the archive. :param path: pathname of the archive :param candidates: list of names to search for """ temp_dir = tempfile.mkdtemp() with zipfile.ZipFile(path) as archive: archive.extractall(temp_dir) return [f for f in collect_files(temp_dir) if os.path.basename(f) in candidates]
def _update(params, items, base_path): for key, value in items: if key.endswith(('_file', '_csv', '_hdf5')): if os.path.isabs(value): raise ValueError('%s=%s is an absolute path' % (key, value)) input_type, _ext = key.rsplit('_', 1) params['inputs'][input_type] = ( os.path.normpath(os.path.join(base_path, value)) if value else '') else: params[key] = value
[docs]def get_params(job_inis, **kw): """ Parse one or more INI-style config files. :param job_inis: List of configuration files (or list containing a single zip archive) :param kw: Optionally override some parameters :returns: A dictionary of parameters """ input_zip = None if len(job_inis) == 1 and job_inis[0].endswith('.zip'): input_zip = job_inis[0] job_inis = extract_from_zip( job_inis[0], ['job_hazard.ini', 'job_haz.ini', 'job.ini', 'job_risk.ini']) not_found = [ini for ini in job_inis if not os.path.exists(ini)] if not_found: # something was not found raise IOError('File not found: %s' % not_found[0]) cp = configparser.ConfigParser() cp.read(job_inis) # directory containing the config files we're parsing job_ini = os.path.abspath(job_inis[0]) base_path = decode(os.path.dirname(job_ini)) params = dict(base_path=base_path, inputs={'job_ini': job_ini}) if input_zip: params['inputs']['input_zip'] = os.path.abspath(input_zip) for sect in cp.sections(): _update(params, cp.items(sect), base_path) _update(params, kw.items(), base_path) # override on demand # populate the 'source' list inputs = params['inputs'] smlt = inputs.get('source_model_logic_tree') if smlt: inputs['source'] = logictree.collect_info(smlt).smpaths elif 'source_model' in inputs: inputs['source'] = [inputs['source_model']] return params
[docs]def get_oqparam(job_ini, pkg=None, calculators=None, hc_id=None, validate=1): """ Parse a dictionary of parameters from an INI-style config file. :param job_ini: Path to configuration file/archive or dictionary of parameters :param pkg: Python package where to find the configuration file (optional) :param calculators: Sequence of calculator names (optional) used to restrict the valid choices for `calculation_mode` :param hc_id: Not None only when called from a post calculation :param validate: Flag. By default it is true and the parameters are validated :returns: An :class:`openquake.commonlib.oqvalidation.OqParam` instance containing the validate and casted parameters/values parsed from the job.ini file as well as a subdictionary 'inputs' containing absolute paths to all of the files referenced in the job.ini, keyed by the parameter name. """ # UGLY: this is here to avoid circular imports from openquake.calculators import base OqParam.calculation_mode.validator.choices = tuple( calculators or base.calculators) if not isinstance(job_ini, dict): basedir = os.path.dirname(pkg.__file__) if pkg else '' job_ini = get_params([os.path.join(basedir, job_ini)]) if hc_id: job_ini.update(hazard_calculation_id=str(hc_id)) oqparam = OqParam(**job_ini) if validate: oqparam.validate() BaseSeismicSource.min_mag = oqparam.minimum_magnitude return oqparam
pmap = None # set as side effect when the user reads hazard_curves from a file # the hazard curves format does not split the site locations from the data (an # unhappy legacy design choice that I fixed in the GMFs CSV format only) thus # this hack is necessary, otherwise we would have to parse the file twice exposure = None # set as side effect when the user reads the site mesh # this hack is necessary, otherwise we would have to parse the exposure twice gmfs, eids = None, None # set as a sided effect when reading gmfs.xml # this hack is necessary, otherwise we would have to parse the file twice
[docs]def get_mesh(oqparam): """ Extract the mesh of points to compute from the sites, the sites_csv, or the region. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance """ global pmap, exposure, gmfs, eids if 'exposure' in oqparam.inputs and exposure is None: # read it only once exposure = get_exposure(oqparam) if oqparam.sites: return geo.Mesh.from_coords(oqparam.sites) elif 'sites' in oqparam.inputs: csv_data = open(oqparam.inputs['sites'], 'U').readlines() has_header = csv_data[0].startswith('site_id') if has_header: # strip site_id data = [] for i, line in enumerate(csv_data[1:]): row = line.replace(',', ' ').split() sid = row[0] if sid != str(i): raise InvalidFile('%s: expected site_id=%d, got %s' % ( oqparam.inputs['sites'], i, sid)) data.append(' '.join(row[1:])) elif 'gmfs' in oqparam.inputs: raise InvalidFile('Missing header in %(sites)s' % oqparam.inputs) else: data = [line.replace(',', ' ') for line in csv_data] coords = valid.coordinates(','.join(data)) start, stop = oqparam.sites_slice c = coords[start:stop] if has_header else sorted(coords[start:stop]) return geo.Mesh.from_coords(c) elif 'hazard_curves' in oqparam.inputs: fname = oqparam.inputs['hazard_curves'] if fname.endswith('.csv'): mesh, pmap = get_pmap_from_csv(oqparam, fname) elif fname.endswith('.xml'): mesh, pmap = get_pmap_from_nrml(oqparam, fname) else: raise NotImplementedError('Reading from %s' % fname) return mesh elif 'gmfs' in oqparam.inputs: eids, gmfs = _get_gmfs(oqparam) # sets oqparam.sites return geo.Mesh.from_coords(oqparam.sites) elif oqparam.region and oqparam.region_grid_spacing: poly = geo.Polygon.from_wkt(oqparam.region) try: mesh = poly.discretize(oqparam.region_grid_spacing) return geo.Mesh.from_coords(zip(mesh.lons, mesh.lats)) except Exception: raise ValueError( 'Could not discretize region %(region)s with grid spacing ' '%(region_grid_spacing)s' % vars(oqparam)) elif 'exposure' in oqparam.inputs: return exposure.mesh
[docs]def get_site_model(oqparam, req_site_params): """ Convert the NRML file into an array of site parameters. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param req_site_params: required site parameters :returns: an array with fields lon, lat, vs30, measured, z1pt0, z2pt5, backarc """ nodes = nrml.read(oqparam.inputs['site_model']).siteModel params = [valid.site_param(node.attrib) for node in nodes] missing = req_site_params - set(params[0]) if missing == set(['backarc']): # use a default of False for param in params: param['backarc'] = False elif missing: raise InvalidFile('%s: missing parameter %s' % (oqparam.inputs['site_model'], ', '.join(missing))) # NB: the sorted in sorted(params[0]) is essential, otherwise there is # an heisenbug in scenario/test_case_4 site_model_dt = numpy.dtype([(p, site.site_param_dt[p]) for p in sorted(params[0])]) tuples = [tuple(param[name] for name in site_model_dt.names) for param in params] return numpy.array(tuples, site_model_dt)
[docs]def get_site_collection(oqparam, mesh=None): """ Returns a SiteCollection instance by looking at the points and the site model defined by the configuration parameters. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param mesh: the mesh to use; if None, it is extracted from the job.ini """ mesh = mesh or get_mesh(oqparam) req_site_params = get_gsim_lt(oqparam).req_site_params if oqparam.inputs.get('site_model'): sm = get_site_model(oqparam, req_site_params) try: # in the future we could have elevation in the site model depth = sm['depth'] except ValueError: # this is the normal case depth = None if mesh is None: # extract the site collection directly from the site model sitecol = site.SiteCollection.from_points( sm['lon'], sm['lat'], depth, sm, req_site_params) else: # associate the site parameters to the mesh sitecol = site.SiteCollection.from_points( mesh.lons, mesh.lats, mesh.depths, None, req_site_params) sc, params = geo.utils.assoc( sm, sitecol, oqparam.max_site_model_distance, 'warn') for name in req_site_params: sitecol._set(name, params[name]) else: # use the default site params sitecol = site.SiteCollection.from_points( mesh.lons, mesh.lats, mesh.depths, oqparam, req_site_params) ss = os.environ.get('OQ_SAMPLE_SITES') if ss: # debugging tip to reduce the size of a calculation # OQ_SAMPLE_SITES=.1 oq engine --run job.ini # will run a computation with 10 times less sites sitecol.array = numpy.array(random_filter(sitecol.array, float(ss))) sitecol.make_complete() return sitecol
[docs]def get_gsim_lt(oqparam, trts=['*']): """ :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param trts: a sequence of tectonic region types as strings; trts=['*'] means that there is no filtering :returns: a GsimLogicTree instance obtained by filtering on the provided tectonic region types. """ if 'gsim_logic_tree' not in oqparam.inputs: return logictree.GsimLogicTree.from_(oqparam.gsim) gsim_file = os.path.join( oqparam.base_path, oqparam.inputs['gsim_logic_tree']) gsim_lt = logictree.GsimLogicTree(gsim_file, trts) gmfcorr = oqparam.correl_model for trt, gsims in gsim_lt.values.items(): for gsim in gsims: if gmfcorr and (gsim.DEFINED_FOR_STANDARD_DEVIATION_TYPES == set([StdDev.TOTAL])): raise CorrelationButNoInterIntraStdDevs(gmfcorr, gsim) return gsim_lt
[docs]def get_gsims(oqparam): """ Return an ordered list of GSIM instances from the gsim name in the configuration file or from the gsim logic tree file. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance """ return [rlz.value[0] for rlz in get_gsim_lt(oqparam)]
[docs]def get_rlzs_by_gsim(oqparam): """ Return an ordered dictionary gsim -> [realization index]. Work for gsim logic trees with a single tectonic region type. """ cinfo = source.CompositionInfo.fake(get_gsim_lt(oqparam)) ra = cinfo.get_rlzs_assoc() dic = collections.OrderedDict() for rlzi, gsim_by_trt in enumerate(ra.gsim_by_trt): dic[gsim_by_trt['*']] = [rlzi] return dic
[docs]def get_rupture(oqparam): """ Read the `rupture_model` file and by filter the site collection :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :returns: an hazardlib rupture """ rup_model = oqparam.inputs['rupture_model'] [rup_node] = nrml.read(rup_model) conv = sourceconverter.RuptureConverter( oqparam.rupture_mesh_spacing, oqparam.complex_fault_mesh_spacing) rup = conv.convert_node(rup_node) rup.tectonic_region_type = '*' # there is not TRT for scenario ruptures rup.seed = oqparam.random_seed return rup
[docs]def get_source_model_lt(oqparam): """ :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :returns: a :class:`openquake.commonlib.logictree.SourceModelLogicTree` instance """ fname = oqparam.inputs.get('source_model_logic_tree') if fname: # NB: converting the random_seed into an integer is needed on Windows return logictree.SourceModelLogicTree( fname, validate=False, seed=int(oqparam.random_seed), num_samples=oqparam.number_of_logic_tree_samples) return logictree.FakeSmlt(oqparam.inputs['source_model'], int(oqparam.random_seed), oqparam.number_of_logic_tree_samples)
[docs]def get_source_models(oqparam, gsim_lt, source_model_lt, monitor, in_memory=True): """ Build all the source models generated by the logic tree. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param gsim_lt: a :class:`openquake.commonlib.logictree.GsimLogicTree` instance :param source_model_lt: a :class:`openquake.commonlib.logictree.SourceModelLogicTree` instance :param monitor: a `openquake.baselib.performance.Monitor` instance :param in_memory: if True, keep in memory the sources, else just collect the TRTs :returns: an iterator over :class:`openquake.commonlib.logictree.LtSourceModel` tuples """ converter = sourceconverter.SourceConverter( oqparam.investigation_time, oqparam.rupture_mesh_spacing, oqparam.complex_fault_mesh_spacing, oqparam.width_of_mfd_bin, oqparam.area_source_discretization) psr = nrml.SourceModelParser(converter) pik = {} if oqparam.calculation_mode.startswith('ucerf'): [grp] = nrml.to_python(oqparam.inputs["source_model"], converter) elif in_memory: logging.info('Pickling the source model(s)') pik = logictree.parallel_pickle_source_models( gsim_lt, source_model_lt, converter, monitor) # consider only the effective realizations smlt_dir = os.path.dirname(source_model_lt.filename) for sm in source_model_lt.gen_source_models(gsim_lt): src_groups = [] for name in sm.names.split(): fname = os.path.abspath(os.path.join(smlt_dir, name)) if oqparam.calculation_mode.startswith('ucerf'): sg = copy.copy(grp) sg.id = sm.ordinal sg.sources = [sg[0].new(sm.ordinal, sm.names)] # one source src_groups.append(sg) elif in_memory: apply_unc = source_model_lt.make_apply_uncertainties(sm.path) src_groups.extend(psr.parse( fname, pik[fname], apply_unc, oqparam.investigation_time)) else: # just collect the TRT models src_groups.extend(logictree.read_source_groups(fname)) num_sources = sum(len(sg.sources) for sg in src_groups) sm.src_groups = src_groups trts = [mod.trt for mod in src_groups] source_model_lt.tectonic_region_types.update(trts) logging.info( 'Processed source model %d with %d potential gsim path(s) and %d ' 'sources', sm.ordinal + 1, sm.num_gsim_paths, num_sources) gsim_file = oqparam.inputs.get('gsim_logic_tree') if gsim_file: # check TRTs for src_group in src_groups: if src_group.trt not in gsim_lt.values: raise ValueError( "Found in %r a tectonic region type %r inconsistent " "with the ones in %r" % (sm, src_group.trt, gsim_file)) yield sm # log if some source file is being used more than once dupl = 0 for fname, hits in psr.fname_hits.items(): if hits > 1: logging.info('%s has been considered %d times', fname, hits) if not psr.changed_sources: dupl += hits if (dupl and not oqparam.optimize_same_id_sources and 'event_based' not in oqparam.calculation_mode): logging.warn('You are doing redundant calculations: please make sure ' 'that different sources have different IDs and set ' 'optimize_same_id_sources=true in your .ini file') # file cleanup for pikfile in pik.values(): os.remove(pikfile)
[docs]def getid(src): try: return src.source_id except AttributeError: return src['id']
[docs]def get_composite_source_model(oqparam, monitor=performance.Monitor(), in_memory=True): """ Parse the XML and build a complete composite source model in memory. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param monitor: a `openquake.baselib.performance.Monitor` instance :param in_memory: if False, just parse the XML without instantiating the sources """ smodels = [] grp_id = 0 idx = 0 gsim_lt = get_gsim_lt(oqparam) source_model_lt = get_source_model_lt(oqparam) if oqparam.number_of_logic_tree_samples == 0: logging.info('Potential number of logic tree paths = {:,d}'.format( source_model_lt.num_paths * gsim_lt.get_num_paths())) if source_model_lt.on_each_source: logging.info('There is a logic tree on each source') for source_model in get_source_models( oqparam, gsim_lt, source_model_lt, monitor, in_memory=in_memory): for src_group in source_model.src_groups: src_group.sources = sorted(src_group, key=getid) src_group.id = grp_id for src in src_group: # there are two cases depending on the flag in_memory: # 1) src is a hazardlib source and has a src_group_id # attribute; in that case the source has to be numbered # 2) src is a Node object, then nothing must be done if isinstance(src, Node): continue src.src_group_id = grp_id src.id = idx idx += 1 grp_id += 1 if grp_id >= TWO16: # the limit is really needed only for event based calculations raise ValueError('There is a limit of %d src groups!' % TWO16) smodels.append(source_model) csm = source.CompositeSourceModel(gsim_lt, source_model_lt, smodels, oqparam.optimize_same_id_sources) for sm in csm.source_models: counter = collections.Counter() for sg in sm.src_groups: for srcid in map(getid, sg): counter[srcid] += 1 dupl = [srcid for srcid in counter if counter[srcid] > 1] if dupl: raise nrml.DuplicatedID('Found duplicated source IDs in %s: %s' % (sm, dupl)) if 'event_based' in oqparam.calculation_mode: # initialize the rupture serial numbers before filtering; in # this way the serials are independent from the site collection csm.init_serials(oqparam.ses_seed) return csm
[docs]def get_imts(oqparam): """ Return a sorted list of IMTs as hazardlib objects """ return list(map(imt.from_string, sorted(oqparam.imtls)))
[docs]def get_risk_model(oqparam): """ Return a :class:`openquake.risklib.riskinput.CompositeRiskModel` instance :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance """ rmdict = get_risk_models(oqparam) oqparam.set_risk_imtls(rmdict) if oqparam.calculation_mode.endswith('_bcr'): retro = get_risk_models(oqparam, 'vulnerability_retrofitted') else: retro = {} return riskinput.CompositeRiskModel(oqparam, rmdict, retro)
[docs]def get_exposure(oqparam): """ Read the full exposure in memory and build a list of :class:`openquake.risklib.asset.Asset` instances. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :returns: an :class:`Exposure` instance or a compatible AssetCollection """ logging.info('Reading the exposure') exposure = asset.Exposure.read( oqparam.inputs['exposure'], oqparam.calculation_mode, oqparam.region, oqparam.ignore_missing_costs) exposure.mesh, exposure.assets_by_site = exposure.get_mesh_assets_by_site() return exposure
[docs]def get_sitecol_assetcol(oqparam, haz_sitecol=None, cost_types=()): """ :param oqparam: calculation parameters :param haz_sitecol: the hazard site collection :param cost_types: the expected cost types :returns: (site collection, asset collection) instances """ global exposure if exposure is None: # haz_sitecol not extracted from the exposure exposure = get_exposure(oqparam) if haz_sitecol is None: haz_sitecol = get_site_collection(oqparam) missing = set(cost_types) - set(exposure.cost_types['name']) - set( ['occupants']) # TODO: remove occupants and fragility special cases if missing and not oqparam.calculation_mode.endswith('damage'): expo = oqparam.inputs.get('exposure', '') raise InvalidFile( 'Expected cost types %s but the exposure %r contains %s' % ( cost_types, expo, exposure.cost_types['name'])) if oqparam.region_grid_spacing and not oqparam.region: # extract the hazard grid from the exposure poly = exposure.mesh.get_convex_hull() mesh = poly.dilate(oqparam.region_grid_spacing).discretize( oqparam.region_grid_spacing) if len(mesh) > len(haz_sitecol): raise LargeExposureGrid(mesh, haz_sitecol.mesh, oqparam.region_grid_spacing) haz_sitecol = get_site_collection(oqparam, mesh) # redefine haz_distance = oqparam.region_grid_spacing if haz_distance != oqparam.asset_hazard_distance: logging.info('Using asset_hazard_distance=%d km instead of %d km', haz_distance, oqparam.asset_hazard_distance) else: haz_distance = oqparam.asset_hazard_distance if haz_sitecol.mesh != exposure.mesh: # associate the assets to the hazard sites tot_assets = sum(len(assets) for assets in exposure.assets_by_site) mode = 'strict' if oqparam.region_grid_spacing else 'filter' sitecol, assets_by = geo.utils.assoc( exposure.assets_by_site, haz_sitecol, haz_distance, mode) assets_by_site = [[] for _ in sitecol.complete.sids] num_assets = 0 for sid, assets in zip(sitecol.sids, assets_by): assets_by_site[sid] = assets num_assets += len(assets) logging.info( 'Associated %d assets to %d sites', num_assets, len(sitecol)) if num_assets < tot_assets: logging.warn('Discarded %d assets outside the ' 'asset_hazard_distance of %d km', tot_assets - num_assets, haz_distance) else: # asset sites and hazard sites are the same sitecol = haz_sitecol assets_by_site = exposure.assets_by_site asset_refs = numpy.array( [exposure.asset_refs[asset.ordinal] for assets in assets_by_site for asset in assets]) assetcol = asset.AssetCollection( asset_refs, assets_by_site, exposure.tagcol, exposure.cost_calculator, oqparam.time_event, exposure.occupancy_periods) return sitecol, assetcol
[docs]def get_mesh_csvdata(csvfile, imts, num_values, validvalues): """ Read CSV data in the format `IMT lon lat value1 ... valueN`. :param csvfile: a file or file-like object with the CSV data :param imts: a list of intensity measure types :param num_values: dictionary with the number of expected values per IMT :param validvalues: validation function for the values :returns: the mesh of points and the data as a dictionary imt -> array of curves for each site """ number_of_values = dict(zip(imts, num_values)) lon_lats = {imt: set() for imt in imts} data = AccumDict() # imt -> list of arrays check_imt = valid.Choice(*imts) for line, row in enumerate(csv.reader(csvfile, delimiter=' '), 1): try: imt = check_imt(row[0]) lon_lat = valid.longitude(row[1]), valid.latitude(row[2]) if lon_lat in lon_lats[imt]: raise DuplicatedPoint(lon_lat) lon_lats[imt].add(lon_lat) values = validvalues(' '.join(row[3:])) if len(values) != number_of_values[imt]: raise ValueError('Found %d values, expected %d' % (len(values), number_of_values[imt])) except (ValueError, DuplicatedPoint) as err: raise err.__class__('%s: file %s, line %d' % (err, csvfile, line)) data += {imt: [numpy.array(values)]} points = lon_lats.pop(imts[0]) for other_imt, other_points in lon_lats.items(): if points != other_points: raise ValueError('Inconsistent locations between %s and %s' % (imts[0], other_imt)) lons, lats = zip(*sorted(points)) mesh = geo.Mesh(numpy.array(lons), numpy.array(lats)) return mesh, {imt: numpy.array(lst) for imt, lst in data.items()}
def _get_gmfs(oqparam): M = len(oqparam.imtls) assert M, ('oqparam.imtls is empty, did you call ' 'oqparam.set_risk_imtls(get_risk_models(oqparam))?') fname = oqparam.inputs['gmfs'] if fname.endswith('.csv'): array = writers.read_composite_array(fname).array R = len(numpy.unique(array['rlzi'])) if R > 1: raise InvalidFile('%s: found %d realizations, currently only one ' 'realization is supported' % (fname, R)) # the array has the structure rlzi, sid, eid, gmv_PGA, gmv_... dtlist = [(name, array.dtype[name]) for name in array.dtype.names[:3]] required_imts = list(oqparam.imtls) imts = [name[4:] for name in array.dtype.names[3:]] if imts != required_imts: raise ValueError('Required %s, but %s contains %s' % ( required_imts, fname, imts)) dtlist.append(('gmv', (F32, M))) eids = numpy.unique(array['eid']) E = len(eids) found_eids = set(eids) expected_eids = set(range(E)) # expected incremental eids missing_eids = expected_eids - found_eids if missing_eids: raise InvalidFile('Missing eids in the gmfs.csv file: %s' % missing_eids) assert expected_eids == found_eids, (expected_eids, found_eids) eidx = {eid: e for e, eid in enumerate(eids)} sitecol = get_site_collection(oqparam) expected_sids = set(sitecol.sids) found_sids = set(numpy.unique(array['sid'])) missing_sids = found_sids - expected_sids if missing_sids: raise InvalidFile( 'Found site IDs missing in the sites.csv file: %s' % missing_sids) N = len(sitecol) gmfs = numpy.zeros((R, N, E, M), F32) counter = collections.Counter() for row in array.view(dtlist): key = row['rlzi'], row['sid'], eidx[row['eid']] gmfs[key] = row['gmv'] counter[key] += 1 dupl = [key for key in counter if counter[key] > 1] if dupl: raise InvalidFile('Duplicated (rlzi, sid, eid) in the GMFs file: ' '%s' % dupl) elif fname.endswith('.xml'): eids, gmfs_by_imt = get_scenario_from_nrml(oqparam, fname) N, E = gmfs_by_imt.shape gmfs = numpy.zeros((1, N, E, M), F32) for imti, imtstr in enumerate(oqparam.imtls): gmfs[0, :, :, imti] = gmfs_by_imt[imtstr] else: raise NotImplemented('Reading from %s' % fname) return eids, gmfs
[docs]@deprecated('Reading hazard curves from CSV may change in the future') def get_pmap_from_csv(oqparam, fname): """ :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param fname: a .txt file with format `IMT lon lat poe1 ... poeN` :returns: the site mesh and the hazard curves read by the .txt file """ if not oqparam.imtls: oqparam.set_risk_imtls(get_risk_models(oqparam)) if not oqparam.imtls: raise ValueError('Missing intensity_measure_types_and_levels in %s' % oqparam.inputs['job_ini']) num_values = list(map(len, list(oqparam.imtls.values()))) with open(oqparam.inputs['hazard_curves']) as csvfile: mesh, hcurves = get_mesh_csvdata( csvfile, list(oqparam.imtls), num_values, valid.decreasing_probabilities) array = numpy.zeros((len(mesh), sum(num_values))) for imt_ in hcurves: array[:, oqparam.imtls(imt_)] = hcurves[imt_] return mesh, ProbabilityMap.from_array(array, range(len(mesh)))
[docs]def get_pmap_from_nrml(oqparam, fname): """ :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param fname: an XML file containing hazard curves :returns: site mesh, curve array """ hcurves_by_imt = {} oqparam.hazard_imtls = imtls = collections.OrderedDict() for hcurves in nrml.read(fname): imt = hcurves['IMT'] oqparam.investigation_time = hcurves['investigationTime'] if imt == 'SA': imt += '(%s)' % hcurves['saPeriod'] imtls[imt] = ~hcurves.IMLs data = sorted((~node.Point.pos, ~node.poEs) for node in hcurves[1:]) hcurves_by_imt[imt] = numpy.array([d[1] for d in data]) lons, lats = [], [] for xy, poes in data: lons.append(xy[0]) lats.append(xy[1]) mesh = geo.Mesh(numpy.array(lons), numpy.array(lats)) num_levels = sum(len(v) for v in imtls.values()) array = numpy.zeros((len(mesh), num_levels)) imtls = DictArray(imtls) for imt_ in hcurves_by_imt: array[:, imtls(imt_)] = hcurves_by_imt[imt_] return mesh, ProbabilityMap.from_array(array, range(len(mesh)))
# used in get_scenario_from_nrml def _extract_eids_sitecounts(gmfset): eids = set() counter = collections.Counter() for gmf in gmfset: eids.add(gmf['ruptureId']) for node in gmf: counter[node['lon'], node['lat']] += 1 eids = numpy.array(sorted(eids), numpy.uint64) if (eids != numpy.arange(len(eids), dtype=numpy.uint64)).any(): raise ValueError('There are ruptureIds in the gmfs_file not in the ' 'range [0, %d)' % len(eids)) return eids, counter
[docs]@deprecated('Use the .csv format for the GMFs instead') def get_scenario_from_nrml(oqparam, fname): """ :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param fname: the NRML files containing the GMFs :returns: a pair (eids, gmf array) """ if not oqparam.imtls: oqparam.set_risk_imtls(get_risk_models(oqparam)) imts = sorted(oqparam.imtls) num_imts = len(imts) imt_dt = numpy.dtype([(imt, F32) for imt in imts]) gmfset = nrml.read(fname).gmfCollection.gmfSet eids, sitecounts = _extract_eids_sitecounts(gmfset) coords = sorted(sitecounts) oqparam.sites = [(lon, lat, 0) for lon, lat in coords] site_idx = {lonlat: i for i, lonlat in enumerate(coords)} oqparam.number_of_ground_motion_fields = num_events = len(eids) num_sites = len(oqparam.sites) gmf_by_imt = numpy.zeros((num_events, num_sites), imt_dt) counts = collections.Counter() for i, gmf in enumerate(gmfset): if len(gmf) != num_sites: # there must be one node per site raise InvalidFile('Expected %d sites, got %d nodes in %s, line %d' % (num_sites, len(gmf), fname, gmf.lineno)) counts[gmf['ruptureId']] += 1 imt = gmf['IMT'] if imt == 'SA': imt = 'SA(%s)' % gmf['saPeriod'] for node in gmf: sid = site_idx[node['lon'], node['lat']] gmf_by_imt[imt][i % num_events, sid] = node['gmv'] for rupid, count in sorted(counts.items()): if count < num_imts: raise InvalidFile("Found a missing ruptureId %d in %s" % (rupid, fname)) elif count > num_imts: raise InvalidFile("Found a duplicated ruptureId '%s' in %s" % (rupid, fname)) expected_gmvs_per_site = num_imts * len(eids) for lonlat, counts in sitecounts.items(): if counts != expected_gmvs_per_site: raise InvalidFile( '%s: expected %d gmvs at location %s, found %d' % (fname, expected_gmvs_per_site, lonlat, counts)) return eids, gmf_by_imt.T
[docs]def get_mesh_hcurves(oqparam): """ Read CSV data in the format `lon lat, v1-vN, w1-wN, ...`. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :returns: the mesh of points and the data as a dictionary imt -> array of curves for each site """ imtls = oqparam.imtls lon_lats = set() data = AccumDict() # imt -> list of arrays ncols = len(imtls) + 1 # lon_lat + curve_per_imt ... csvfile = oqparam.inputs['hazard_curves'] for line, row in enumerate(csv.reader(csvfile), 1): try: if len(row) != ncols: raise ValueError('Expected %d columns, found %d' % ncols, len(row)) x, y = row[0].split() lon_lat = valid.longitude(x), valid.latitude(y) if lon_lat in lon_lats: raise DuplicatedPoint(lon_lat) lon_lats.add(lon_lat) for i, imt_ in enumerate(imtls, 1): values = valid.decreasing_probabilities(row[i]) if len(values) != len(imtls[imt_]): raise ValueError('Found %d values, expected %d' % (len(values), len(imtls([imt_])))) data += {imt_: [numpy.array(values)]} except (ValueError, DuplicatedPoint) as err: raise err.__class__('%s: file %s, line %d' % (err, csvfile, line)) lons, lats = zip(*sorted(lon_lats)) mesh = geo.Mesh(numpy.array(lons), numpy.array(lats)) return mesh, {imt: numpy.array(lst) for imt, lst in data.items()}
# used in utils/reduce_sm and utils/extract_source
[docs]def reduce_source_model(smlt_file, source_ids, remove=True): """ Extract sources from the composite source model """ for path in logictree.collect_info(smlt_file).smpaths: root = nrml.read(path) model = Node('sourceModel', root[0].attrib) origmodel = root[0] if root['xmlns'] == 'http://openquake.org/xmlns/nrml/0.4': for src_node in origmodel: if src_node['id'] in source_ids: model.nodes.append(src_node) else: # nrml/0.5 for src_group in origmodel: sg = copy.copy(src_group) sg.nodes = [] weights = src_group.get('srcs_weights') if weights: assert len(weights) == len(src_group.nodes) else: weights = [1] * len(src_group.nodes) src_group['srcs_weights'] = reduced_weigths = [] for src_node, weight in zip(src_group, weights): if src_node['id'] in source_ids: sg.nodes.append(src_node) reduced_weigths.append(weight) if sg.nodes: model.nodes.append(sg) shutil.copy(path, path + '.bak') if model: with open(path, 'wb') as f: nrml.write([model], f, xmlns=root['xmlns']) logging.warn('Reduced %s' % path) elif remove: # remove the files completely reduced os.remove(path)
[docs]def get_checksum32(oqparam): """ Build an unsigned 32 bit integer from the input files of the calculation """ # NB: using adler32 & 0xffffffff is the documented way to get a checksum # which is the same between Python 2 and Python 3 checksum = 0 for key in sorted(oqparam.inputs): fname = oqparam.inputs[key] if not fname: continue elif key == 'source': # list of fnames and/or strings for f in fname: data = open(f, 'rb').read() checksum = zlib.adler32(data, checksum) & 0xffffffff elif os.path.exists(fname): data = open(fname, 'rb').read() checksum = zlib.adler32(data, checksum) & 0xffffffff else: raise ValueError('%s does not exist or is not a file' % fname) return checksum