Source code for openquake.commonlib.readinput

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2018 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

from __future__ import division
import os
import csv
import zlib
import zipfile
import logging
import operator
import tempfile
import collections
import numpy

from openquake.baselib.general import groupby, AccumDict, DictArray, deprecated
from openquake.baselib.python3compat import configparser, decode
from openquake.baselib.node import Node
from openquake.baselib import hdf5
from openquake.hazardlib import (
    calc, geo, site, imt, valid, sourceconverter, nrml, InvalidFile)
from openquake.hazardlib.source.rupture import EBRupture
from openquake.hazardlib.probability_map import ProbabilityMap
from openquake.risklib import asset, riskinput
from openquake.risklib.riskmodels import get_risk_models
from openquake.baselib import datastore
from openquake.commonlib.oqvalidation import OqParam
from openquake.commonlib import logictree, source, writers

# the following is quite arbitrary, it gives output weights that I like (MS)
NORMALIZATION_FACTOR = 1E-2
TWO16 = 2 ** 16  # 65,536
F32 = numpy.float32
U16 = numpy.uint16
U32 = numpy.uint32
U64 = numpy.uint64

Site = collections.namedtuple('Site', 'sid lon lat')
stored_event_dt = numpy.dtype([
    ('eid', U64), ('rup_id', U32), ('grp_id', U16), ('year', U32),
    ('ses', U32), ('sample', U32)])


[docs]class DuplicatedPoint(Exception): """ Raised when reading a CSV file with duplicated (lon, lat) pairs """
[docs]def collect_files(dirpath, cond=lambda fullname: True): """ Recursively collect the files contained inside dirpath. :param dirpath: path to a readable directory :param cond: condition on the path to collect the file """ files = [] for fname in os.listdir(dirpath): fullname = os.path.join(dirpath, fname) if os.path.isdir(fullname): # navigate inside files.extend(collect_files(fullname)) else: # collect files if cond(fullname): files.append(fullname) return files
[docs]def extract_from_zip(path, candidates): """ Given a zip archive and a function to detect the presence of a given filename, unzip the archive into a temporary directory and return the full path of the file. Raise an IOError if the file cannot be found within the archive. :param path: pathname of the archive :param candidates: list of names to search for """ temp_dir = tempfile.mkdtemp() with zipfile.ZipFile(path) as archive: archive.extractall(temp_dir) return [f for f in collect_files(temp_dir) if os.path.basename(f) in candidates]
def _update(params, items, base_path): for key, value in items: if key.endswith(('_file', '_csv')): if os.path.isabs(value): raise ValueError('%s=%s is an absolute path' % (key, value)) input_type, _ext = key.rsplit('_', 1) params['inputs'][input_type] = ( os.path.join(base_path, value) if value else '') else: params[key] = value
[docs]def get_params(job_inis, **kw): """ Parse one or more INI-style config files. :param job_inis: List of configuration files (or list containing a single zip archive) :param kw: Optionally override some parameters :returns: A dictionary of parameters """ if len(job_inis) == 1 and job_inis[0].endswith('.zip'): job_inis = extract_from_zip( job_inis[0], ['job_hazard.ini', 'job_haz.ini', 'job.ini', 'job_risk.ini']) not_found = [ini for ini in job_inis if not os.path.exists(ini)] if not_found: # something was not found raise IOError('File not found: %s' % not_found[0]) cp = configparser.ConfigParser() cp.read(job_inis) # directory containing the config files we're parsing job_ini = os.path.abspath(job_inis[0]) base_path = decode(os.path.dirname(job_ini)) params = dict(base_path=base_path, inputs={'job_ini': job_ini}) for sect in cp.sections(): _update(params, cp.items(sect), base_path) _update(params, kw.items(), base_path) # override on demand # populate the 'source' list inputs = params['inputs'] smlt = inputs.get('source_model_logic_tree') if smlt: inputs['source'] = [] for paths in gen_sm_paths(smlt): inputs['source'].extend(paths) elif 'source_model' in inputs: inputs['source'] = [inputs['source_model']] return params
[docs]def gen_sm_paths(smlt): """ Yields the path names for the source models listed in the smlt file, a block at the time. """ base_path = os.path.dirname(smlt) for model in source.collect_source_model_paths(smlt): paths = [] for name in model.split(): if os.path.isabs(name): raise InvalidFile('%s: %s must be a relative path' % (smlt, name)) fname = os.path.abspath(os.path.join(base_path, name)) if os.path.exists(fname): # consider only real paths paths.append(fname) yield paths
[docs]def get_oqparam(job_ini, pkg=None, calculators=None, hc_id=None): """ Parse a dictionary of parameters from an INI-style config file. :param job_ini: Path to configuration file/archive or dictionary of parameters :param pkg: Python package where to find the configuration file (optional) :param calculators: Sequence of calculator names (optional) used to restrict the valid choices for `calculation_mode` :param hc_id: Not None only when called from a post calculation :returns: An :class:`openquake.commonlib.oqvalidation.OqParam` instance containing the validate and casted parameters/values parsed from the job.ini file as well as a subdictionary 'inputs' containing absolute paths to all of the files referenced in the job.ini, keyed by the parameter name. """ # UGLY: this is here to avoid circular imports from openquake.calculators import base OqParam.calculation_mode.validator.choices = tuple( calculators or base.calculators) if not isinstance(job_ini, dict): basedir = os.path.dirname(pkg.__file__) if pkg else '' job_ini = get_params([os.path.join(basedir, job_ini)]) if hc_id: job_ini.update(hazard_calculation_id=str(hc_id)) oqparam = OqParam(**job_ini) oqparam.validate() return oqparam
pmap = None # set as side effect when the user reads hazard_curves from a file # the hazard curves format does not split the site locations from the data (an # unhappy legacy design choice that I fixed in the GMFs CSV format only) thus # this hack is necessary, otherwise we would have to parse the file twice
[docs]def get_mesh(oqparam): """ Extract the mesh of points to compute from the sites, the sites_csv, or the region. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance """ global pmap if oqparam.sites: return geo.Mesh.from_coords(sorted(oqparam.sites)) elif 'sites' in oqparam.inputs: csv_data = open(oqparam.inputs['sites'], 'U').readlines() has_header = csv_data[0].startswith('site_id') if has_header: # strip site_id data = [] for i, line in enumerate(csv_data[1:]): row = line.replace(',', ' ').split() sid = row[0] if sid != str(i): raise InvalidFile('%s: expected site_id=%d, got %s' % ( oqparam.inputs['sites'], i, sid)) data.append(' '.join(row[1:])) elif 'gmfs' in oqparam.inputs: raise InvalidFile('Missing header in %(sites)s' % oqparam.inputs) else: data = [line.replace(',', ' ') for line in csv_data] coords = valid.coordinates(','.join(data)) start, stop = oqparam.sites_slice c = coords[start:stop] if has_header else sorted(coords[start:stop]) return geo.Mesh.from_coords(c) elif 'hazard_curves' in oqparam.inputs: fname = oqparam.inputs['hazard_curves'] if fname.endswith('.csv'): mesh, pmap = get_pmap_from_csv(oqparam, fname) elif fname.endswith('.xml'): mesh, pmap = get_pmap_from_nrml(oqparam, fname) else: raise NotImplementedError('Reading from %s' % fname) return mesh elif oqparam.region: # close the linear polygon ring by appending the first # point to the end firstpoint = geo.Point(*oqparam.region[0]) points = [geo.Point(*xy) for xy in oqparam.region] + [firstpoint] try: mesh = geo.Polygon(points).discretize(oqparam.region_grid_spacing) lons, lats = zip(*sorted(zip(mesh.lons, mesh.lats))) return geo.Mesh(numpy.array(lons), numpy.array(lats)) except: raise ValueError( 'Could not discretize region %(region)s with grid spacing ' '%(region_grid_spacing)s' % vars(oqparam)) elif 'exposure' in oqparam.inputs: # the mesh will be extracted from the exposure later return elif 'site_model' in oqparam.inputs: coords = [(param['lon'], param['lat'], 0) for param in get_site_model(oqparam)] mesh = geo.Mesh.from_coords(sorted(coords)) mesh.from_site_model = True return mesh
site_model_dt = numpy.dtype([ ('lon', numpy.float64), ('lat', numpy.float64), ('vs30', numpy.float64), ('vs30measured', numpy.bool), ('z1pt0', numpy.float64), ('z2pt5', numpy.float64), ('backarc', numpy.bool), ])
[docs]def get_site_model(oqparam): """ Convert the NRML file into an array of site parameters. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :returns: an array with fields lon, lat, vs30, measured, z1pt0, z2pt5, backarc """ nodes = nrml.read(oqparam.inputs['site_model']).siteModel params = sorted(valid.site_param(**node.attrib) for node in nodes) array = numpy.zeros(len(params), site_model_dt) for i, param in enumerate(params): rec = array[i] for name in site_model_dt.names: rec[name] = getattr(param, name) return array
[docs]def get_site_collection(oqparam, mesh=None): """ Returns a SiteCollection instance by looking at the points and the site model defined by the configuration parameters. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param mesh: a mesh of hazardlib points; if None the mesh is determined by invoking get_mesh """ if mesh is None: mesh = get_mesh(oqparam) if mesh is None and oqparam.hazard_calculation_id: with datastore.read(oqparam.hazard_calculation_id) as dstore: return dstore['sitecol'].complete elif mesh is None: return if oqparam.inputs.get('site_model'): sm = get_site_model(oqparam) if getattr(mesh, 'from_site_model', False): return site.SiteCollection.from_points( mesh.lons, mesh.lats, None, sm) # associate the site parameters to the mesh site_model_params = geo.utils.GeographicObjects( sm, operator.itemgetter('lon'), operator.itemgetter('lat')) sitecol = site.SiteCollection.from_points(mesh.lons, mesh.lats) dic = site_model_params.assoc( sitecol, oqparam.max_site_model_distance, 'warn') for sid in dic: for name in site_model_dt.names[2:]: # all names except lon, lat sitecol.array[sid][name] = dic[sid][name] return sitecol # else use the default site params return site.SiteCollection.from_points( mesh.lons, mesh.lats, mesh.depths, oqparam)
[docs]def get_gsim_lt(oqparam, trts=['*']): """ :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param trts: a sequence of tectonic region types as strings; trts=['*'] means that there is no filtering :returns: a GsimLogicTree instance obtained by filtering on the provided tectonic region types. """ if 'gsim_logic_tree' not in oqparam.inputs: return logictree.GsimLogicTree.from_(oqparam.gsim) gsim_file = os.path.join( oqparam.base_path, oqparam.inputs['gsim_logic_tree']) gsim_lt = logictree.GsimLogicTree(gsim_file, trts) return gsim_lt
[docs]def get_gsims(oqparam): """ Return an ordered list of GSIM instances from the gsim name in the configuration file or from the gsim logic tree file. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance """ return [rlz.value[0] for rlz in get_gsim_lt(oqparam)]
[docs]def get_rlzs_by_gsim(oqparam): """ Return an ordered dictionary gsim -> [realization index]. Work for gsim logic trees with a single tectonic region type. """ cinfo = source.CompositionInfo.fake(get_gsim_lt(oqparam)) ra = cinfo.get_rlzs_assoc() dic = collections.OrderedDict() for rlzi, gsim_by_trt in enumerate(ra.gsim_by_trt): dic[gsim_by_trt['*']] = [rlzi] return dic
[docs]def get_rupture_sitecol(oqparam, sitecol): """ Read the `rupture_model` file and by filter the site collection :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param sitecol: a :class:`openquake.hazardlib.site.SiteCollection` instance :returns: a pair (EBRupture, SiteCollection) """ rup_model = oqparam.inputs['rupture_model'] [rup_node] = nrml.read(rup_model) conv = sourceconverter.RuptureConverter( oqparam.rupture_mesh_spacing, oqparam.complex_fault_mesh_spacing) rup = conv.convert_node(rup_node) rup.tectonic_region_type = '*' # there is not TRT for scenario ruptures rup.seed = oqparam.random_seed maxdist = oqparam.maximum_distance['default'] sc = calc.filters.filter_sites_by_distance_to_rupture( rup, maxdist, sitecol) if sc is None: raise RuntimeError( 'All sites were filtered out! maximum_distance=%s km' % maxdist) n = oqparam.number_of_ground_motion_fields events = numpy.zeros(n, stored_event_dt) events['eid'] = numpy.arange(n) ebr = EBRupture(rup, sc.sids, events) return ebr, sc
[docs]def get_source_model_lt(oqparam): """ :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :returns: a :class:`openquake.commonlib.logictree.SourceModelLogicTree` instance """ fname = oqparam.inputs.get('source_model_logic_tree') if fname: # NB: converting the random_seed into an integer is needed on Windows return logictree.SourceModelLogicTree( fname, validate=False, seed=int(oqparam.random_seed), num_samples=oqparam.number_of_logic_tree_samples) return logictree.FakeSmlt(oqparam.inputs['source_model'], int(oqparam.random_seed), oqparam.number_of_logic_tree_samples)
[docs]def get_source_models(oqparam, gsim_lt, source_model_lt, in_memory=True): """ Build all the source models generated by the logic tree. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param gsim_lt: a :class:`openquake.commonlib.logictree.GsimLogicTree` instance :param source_model_lt: a :class:`openquake.commonlib.logictree.SourceModelLogicTree` instance :param in_memory: if True, keep in memory the sources, else just collect the TRTs :returns: an iterator over :class:`openquake.commonlib.logictree.SourceModel` tuples """ converter = sourceconverter.SourceConverter( oqparam.investigation_time, oqparam.rupture_mesh_spacing, oqparam.complex_fault_mesh_spacing, oqparam.width_of_mfd_bin, oqparam.area_source_discretization) psr = nrml.SourceModelParser(converter) # consider only the effective realizations smlt_dir = os.path.dirname(source_model_lt.filename) for sm in source_model_lt.gen_source_models(gsim_lt): src_groups = [] for name in sm.names.split(): fname = os.path.abspath(os.path.join(smlt_dir, name)) if in_memory: apply_unc = source_model_lt.make_apply_uncertainties(sm.path) logging.info('Reading %s', fname) src_groups.extend(psr.parse_src_groups(fname, apply_unc)) else: # just collect the TRT models smodel = nrml.read(fname).sourceModel if smodel[0].tag.endswith('sourceGroup'): # NRML 0.5 format for sg_node in smodel: sg = sourceconverter.SourceGroup( sg_node['tectonicRegion']) sg.sources = sg_node.nodes src_groups.append(sg) else: # NRML 0.4 format: smodel is a list of source nodes src_groups.extend( sourceconverter.SourceGroup.collect(smodel)) num_sources = sum(len(sg.sources) for sg in src_groups) sm.src_groups = src_groups trts = [mod.trt for mod in src_groups] source_model_lt.tectonic_region_types.update(trts) logging.info( 'Processed source model %d with %d potential gsim path(s) and %d ' 'sources', sm.ordinal + 1, sm.num_gsim_paths, num_sources) gsim_file = oqparam.inputs.get('gsim_logic_tree') if gsim_file: # check TRTs for src_group in src_groups: if src_group.trt not in gsim_lt.values: raise ValueError( "Found in %r a tectonic region type %r inconsistent " "with the ones in %r" % (sm, src_group.trt, gsim_file)) yield sm # check investigation_time psr.check_nonparametric_sources(oqparam.investigation_time) # log if some source file is being used more than once for fname, hits in psr.fname_hits.items(): if hits > 1: logging.info('%s has been considered %d times', fname, hits)
[docs]def getid(src): try: return src.source_id except AttributeError: return src['id']
[docs]def get_composite_source_model(oqparam, in_memory=True): """ Parse the XML and build a complete composite source model in memory. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param in_memory: if False, just parse the XML without instantiating the sources """ smodels = [] grp_id = 0 idx = 0 gsim_lt = get_gsim_lt(oqparam) source_model_lt = get_source_model_lt(oqparam) for source_model in get_source_models( oqparam, gsim_lt, source_model_lt, in_memory=in_memory): for src_group in source_model.src_groups: src_group.sources = sorted(src_group, key=getid) src_group.id = grp_id for src in src_group: # there are two cases depending on the flag in_memory: # 1) src is a hazardlib source and has a src_group_id # attribute; in that case the source has to be numbered # 2) src is a Node object, then nothing must be done if isinstance(src, Node): continue src.src_group_id = grp_id src.id = idx idx += 1 grp_id += 1 if grp_id >= TWO16: # the limit is really needed only for event based calculations raise ValueError('There is a limit of %d src groups!' % TWO16) smodels.append(source_model) csm = source.CompositeSourceModel(gsim_lt, source_model_lt, smodels, oqparam.optimize_same_id_sources) for sm in csm.source_models: srcs = [] for sg in sm.src_groups: srcs.extend(map(getid, sg)) if len(set(srcs)) < len(srcs): raise nrml.DuplicatedID( 'Found duplicated source IDs: use oq info %s', sm, oqparam.inputs['job_ini']) return csm
[docs]def get_imts(oqparam): """ Return a sorted list of IMTs as hazardlib objects """ return list(map(imt.from_string, sorted(oqparam.imtls)))
[docs]def get_risk_model(oqparam): """ Return a :class:`openquake.risklib.riskinput.CompositeRiskModel` instance :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance """ rmdict = get_risk_models(oqparam) oqparam.set_risk_imtls(rmdict) if oqparam.calculation_mode.endswith('_bcr'): retro = get_risk_models(oqparam, 'vulnerability_retrofitted') else: retro = {} return riskinput.CompositeRiskModel(oqparam, rmdict, retro)
[docs]def get_cost_calculator(oqparam): """ Read the first lines of the exposure file and infers the cost calculator """ exposure = asset._get_exposure(oqparam.inputs['exposure'], stop='assets') return exposure[0].cost_calculator
[docs]def get_exposure(oqparam): """ Read the full exposure in memory and build a list of :class:`openquake.risklib.asset.Asset` instances. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :returns: an :class:`Exposure` instance or a compatible AssetCollection """ return asset.Exposure.read( oqparam.inputs['exposure'], oqparam.calculation_mode, oqparam.region_constraint, oqparam.ignore_missing_costs)
def _get_mesh_assets_by_site(oqparam, exposure): assets_by_loc = groupby(exposure, key=lambda a: a.location) lons, lats = zip(*sorted(assets_by_loc)) mesh = geo.Mesh(numpy.array(lons), numpy.array(lats)) assets_by_site = [ assets_by_loc[lonlat] for lonlat in zip(mesh.lons, mesh.lats)] return mesh, assets_by_site
[docs]def get_sitecol_assetcol(oqparam, haz_sitecol=None): """ :param oqparam: calculation parameters :param haz_sitecol: a pre-existing site collection, if any :returns: (site collection, asset collection) instances """ exposure = get_exposure(oqparam) mesh, assets_by_site = _get_mesh_assets_by_site(oqparam, exposure) if haz_sitecol: tot_assets = sum(len(assets) for assets in assets_by_site) all_sids = haz_sitecol.complete.sids sids = set(haz_sitecol.sids) # associate the assets to the hazard sites asset_hazard_distance = oqparam.asset_hazard_distance siteobjects = geo.utils.GeographicObjects( Site(sid, lon, lat) for sid, lon, lat in zip(haz_sitecol.sids, haz_sitecol.lons, haz_sitecol.lats)) assets_by_sid = AccumDict(accum=[]) for assets in assets_by_site: lon, lat = assets[0].location site, distance = siteobjects.get_closest(lon, lat) if site.sid in sids and distance <= asset_hazard_distance: # keep the assets, otherwise discard them assets_by_sid += {site.sid: list(assets)} if not assets_by_sid: raise geo.utils.SiteAssociationError( 'Could not associate any site to any assets within the ' 'asset_hazard_distance of %s km' % asset_hazard_distance) mask = numpy.array( [sid in assets_by_sid for sid in all_sids]) assets_by_site = [ sorted(assets_by_sid[sid], key=operator.attrgetter('ordinal')) for sid in all_sids] num_assets = sum(len(assets) for assets in assets_by_site) sitecol = haz_sitecol.complete.filter(mask) logging.info('Associated %d/%d assets to %d sites', num_assets, tot_assets, len(sitecol)) else: # use the exposure sites as hazard sites sitecol = get_site_collection(oqparam, mesh) asset_refs = [exposure.asset_refs[asset.ordinal] for assets in assets_by_site for asset in assets] assetcol = asset.AssetCollection( asset_refs, assets_by_site, exposure.tagcol, exposure.cost_calculator, oqparam.time_event, occupancy_periods=hdf5.array_of_vstr( sorted(exposure.occupancy_periods))) return sitecol, assetcol
[docs]def get_mesh_csvdata(csvfile, imts, num_values, validvalues): """ Read CSV data in the format `IMT lon lat value1 ... valueN`. :param csvfile: a file or file-like object with the CSV data :param imts: a list of intensity measure types :param num_values: dictionary with the number of expected values per IMT :param validvalues: validation function for the values :returns: the mesh of points and the data as a dictionary imt -> array of curves for each site """ number_of_values = dict(zip(imts, num_values)) lon_lats = {imt: set() for imt in imts} data = AccumDict() # imt -> list of arrays check_imt = valid.Choice(*imts) for line, row in enumerate(csv.reader(csvfile, delimiter=' '), 1): try: imt = check_imt(row[0]) lon_lat = valid.longitude(row[1]), valid.latitude(row[2]) if lon_lat in lon_lats[imt]: raise DuplicatedPoint(lon_lat) lon_lats[imt].add(lon_lat) values = validvalues(' '.join(row[3:])) if len(values) != number_of_values[imt]: raise ValueError('Found %d values, expected %d' % (len(values), number_of_values[imt])) except (ValueError, DuplicatedPoint) as err: raise err.__class__('%s: file %s, line %d' % (err, csvfile, line)) data += {imt: [numpy.array(values)]} points = lon_lats.pop(imts[0]) for other_imt, other_points in lon_lats.items(): if points != other_points: raise ValueError('Inconsistent locations between %s and %s' % (imts[0], other_imt)) lons, lats = zip(*sorted(points)) mesh = geo.Mesh(numpy.array(lons), numpy.array(lats)) return mesh, {imt: numpy.array(lst) for imt, lst in data.items()}
[docs]def get_gmfs(oqparam): """ :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :returns: sitecol, eids, gmf array of shape (R, N, E, M) """ M = len(oqparam.imtls) fname = oqparam.inputs['gmfs'] if fname.endswith('.csv'): array = writers.read_composite_array(fname) R = len(numpy.unique(array['rlzi'])) if R > 1: raise InvalidFile('%s: found %d realizations, currently only one ' 'realization is supported' % (fname, R)) # the array has the structure rlzi, sid, eid, gmv_PGA, gmv_... dtlist = [(name, array.dtype[name]) for name in array.dtype.names[:3]] required_imts = list(oqparam.imtls) imts = [name[4:] for name in array.dtype.names[3:]] if imts != required_imts: raise ValueError('Required %s, but %s contains %s' % ( required_imts, fname, imts)) dtlist.append(('gmv', (F32, M))) eids = numpy.unique(array['eid']) E = len(eids) found_eids = set(eids) expected_eids = set(range(E)) # expected incremental eids missing_eids = expected_eids - found_eids if missing_eids: raise InvalidFile('Missing eids in the gmfs.csv file: %s' % missing_eids) assert expected_eids == found_eids, (expected_eids, found_eids) eidx = {eid: e for e, eid in enumerate(eids)} sitecol = get_site_collection(oqparam) expected_sids = set(sitecol.sids) found_sids = set(numpy.unique(array['sid'])) missing_sids = found_sids - expected_sids if missing_sids: raise InvalidFile( 'Found site IDs missing in the sites.csv file: %s' % missing_sids) N = len(sitecol) gmfs = numpy.zeros((R, N, E, M), F32) counter = collections.Counter() for row in array.view(dtlist): key = row['rlzi'], row['sid'], eidx[row['eid']] gmfs[key] = row['gmv'] counter[key] += 1 dupl = [key for key in counter if counter[key] > 1] if dupl: raise InvalidFile('Duplicated (rlzi, sid, eid) in the GMFs file: ' '%s' % dupl) elif fname.endswith('.xml'): eids, gmfs_by_imt = get_scenario_from_nrml(oqparam, fname) N, E = gmfs_by_imt.shape gmfs = numpy.zeros((1, N, E, M), F32) for imti, imtstr in enumerate(oqparam.imtls): gmfs[0, :, :, imti] = gmfs_by_imt[imtstr] else: raise NotImplemented('Reading from %s' % fname) return eids, gmfs
[docs]@deprecated('Reading hazard curves from CSV may change in the future') def get_pmap_from_csv(oqparam, fname): """ :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param fname: a .txt file with format `IMT lon lat poe1 ... poeN` :returns: the site mesh and the hazard curves read by the .txt file """ if not oqparam.imtls: oqparam.set_risk_imtls(get_risk_models(oqparam)) if not oqparam.imtls: raise ValueError('Missing intensity_measure_types_and_levels in %s' % oqparam.inputs['job_ini']) num_values = list(map(len, list(oqparam.imtls.values()))) with open(oqparam.inputs['hazard_curves']) as csvfile: mesh, hcurves = get_mesh_csvdata( csvfile, list(oqparam.imtls), num_values, valid.decreasing_probabilities) array = numpy.zeros((len(mesh), sum(num_values))) for imt_ in hcurves: array[:, oqparam.imtls.slicedic[imt_]] = hcurves[imt_] return mesh, ProbabilityMap.from_array(array, range(len(mesh)))
[docs]def get_pmap_from_nrml(oqparam, fname): """ :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param fname: an XML file containing hazard curves :returns: site mesh, curve array """ hcurves_by_imt = {} oqparam.hazard_imtls = imtls = collections.OrderedDict() for hcurves in nrml.read(fname): imt = hcurves['IMT'] oqparam.investigation_time = hcurves['investigationTime'] if imt == 'SA': imt += '(%s)' % hcurves['saPeriod'] imtls[imt] = ~hcurves.IMLs data = sorted((~node.Point.pos, ~node.poEs) for node in hcurves[1:]) hcurves_by_imt[imt] = numpy.array([d[1] for d in data]) lons, lats = [], [] for xy, poes in data: lons.append(xy[0]) lats.append(xy[1]) mesh = geo.Mesh(numpy.array(lons), numpy.array(lats)) num_levels = sum(len(v) for v in imtls.values()) array = numpy.zeros((len(mesh), num_levels)) imtls = DictArray(imtls) for imt_ in hcurves_by_imt: array[:, imtls.slicedic[imt_]] = hcurves_by_imt[imt_] return mesh, ProbabilityMap.from_array(array, range(len(mesh)))
# used in get_scenario_from_nrml def _extract_eids_sitecounts(gmfset): eids = set() counter = collections.Counter() for gmf in gmfset: eids.add(gmf['ruptureId']) for node in gmf: counter[node['lon'], node['lat']] += 1 return numpy.array(sorted(eids), numpy.uint64), counter
[docs]@deprecated('Use the .csv format for the GMFs instead') def get_scenario_from_nrml(oqparam, fname): """ :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param fname: the NRML files containing the GMFs :returns: a triple (sitecol, eids, gmf array) """ if not oqparam.imtls: oqparam.set_risk_imtls(get_risk_models(oqparam)) imts = sorted(oqparam.imtls) num_imts = len(imts) imt_dt = numpy.dtype([(imt, F32) for imt in imts]) gmfset = nrml.read(fname).gmfCollection.gmfSet eids, sitecounts = _extract_eids_sitecounts(gmfset) coords = sorted(sitecounts) oqparam.sites = [(lon, lat, 0) for lon, lat in coords] site_idx = {lonlat: i for i, lonlat in enumerate(coords)} oqparam.number_of_ground_motion_fields = num_events = len(eids) num_sites = len(oqparam.sites) gmf_by_imt = numpy.zeros((num_events, num_sites), imt_dt) counts = collections.Counter() for i, gmf in enumerate(gmfset): if len(gmf) != num_sites: # there must be one node per site raise InvalidFile('Expected %d sites, got %d nodes in %s, line %d' % (num_sites, len(gmf), fname, gmf.lineno)) counts[gmf['ruptureId']] += 1 imt = gmf['IMT'] if imt == 'SA': imt = 'SA(%s)' % gmf['saPeriod'] for node in gmf: sid = site_idx[node['lon'], node['lat']] gmf_by_imt[imt][i % num_events, sid] = node['gmv'] for rupid, count in sorted(counts.items()): if count < num_imts: raise InvalidFile("Found a missing ruptureId %d in %s" % (rupid, fname)) elif count > num_imts: raise InvalidFile("Found a duplicated ruptureId '%s' in %s" % (rupid, fname)) expected_gmvs_per_site = num_imts * len(eids) for lonlat, counts in sitecounts.items(): if counts != expected_gmvs_per_site: raise InvalidFile( '%s: expected %d gmvs at location %s, found %d' % (fname, expected_gmvs_per_site, lonlat, counts)) return eids, gmf_by_imt.T
[docs]def get_mesh_hcurves(oqparam): """ Read CSV data in the format `lon lat, v1-vN, w1-wN, ...`. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :returns: the mesh of points and the data as a dictionary imt -> array of curves for each site """ imtls = oqparam.imtls lon_lats = set() data = AccumDict() # imt -> list of arrays ncols = len(imtls) + 1 # lon_lat + curve_per_imt ... csvfile = oqparam.inputs['hazard_curves'] for line, row in enumerate(csv.reader(csvfile), 1): try: if len(row) != ncols: raise ValueError('Expected %d columns, found %d' % ncols, len(row)) x, y = row[0].split() lon_lat = valid.longitude(x), valid.latitude(y) if lon_lat in lon_lats: raise DuplicatedPoint(lon_lat) lon_lats.add(lon_lat) for i, imt_ in enumerate(imtls, 1): values = valid.decreasing_probabilities(row[i]) if len(values) != len(imtls[imt_]): raise ValueError('Found %d values, expected %d' % (len(values), len(imtls([imt_])))) data += {imt_: [numpy.array(values)]} except (ValueError, DuplicatedPoint) as err: raise err.__class__('%s: file %s, line %d' % (err, csvfile, line)) lons, lats = zip(*sorted(lon_lats)) mesh = geo.Mesh(numpy.array(lons), numpy.array(lats)) return mesh, {imt: numpy.array(lst) for imt, lst in data.items()}
[docs]def get_checksum32(oqparam): """ Build an unsigned 32 bit integer from the input files of the calculation """ # NB: using adler32 & 0xffffffff is the documented way to get a checksum # which is the same between Python 2 and Python 3 checksum = 0 for key in sorted(oqparam.inputs): fname = oqparam.inputs[key] if not fname: continue elif key == 'source': # list of fnames and/or strings for f in fname: data = open(f, 'rb').read() checksum = zlib.adler32(data, checksum) & 0xffffffff elif os.path.exists(fname): data = open(fname, 'rb').read() checksum = zlib.adler32(data, checksum) & 0xffffffff else: raise ValueError('%s does not exist or is not a file' % fname) return checksum