Source code for openquake.calculators.export.hazard

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2017 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

import re
import os
import logging
import operator
import collections

import numpy

from openquake.baselib import hdf5, parallel, performance
from openquake.baselib.general import (
    humansize, get_array, group_array, DictArray)
from openquake.hazardlib import valid
from openquake.hazardlib.imt import from_string
from openquake.hazardlib.calc import disagg, gmf
from openquake.calculators.views import view
from openquake.calculators.export import export
from openquake.risklib.riskinput import GmfDataGetter, gmf_data_dt
from openquake.commonlib import writers, hazard_writers, calc, util, source

F32 = numpy.float32
F64 = numpy.float64
U8 = numpy.uint8
U16 = numpy.uint16
U32 = numpy.uint32


GMF_MAX_SIZE = 10 * 1024 * 1024  # 10 MB
GMF_WARNING = '''\
There are a lot of ground motion fields; the export will be slow.
Consider canceling the operation and accessing directly %s.'''

# with compression you can save 60% of space by losing only 10% of saving time
savez = numpy.savez_compressed


[docs]def get_mesh(sitecol, complete=True): sc = sitecol.complete if complete else sitecol if sc.at_sea_level(): mesh = numpy.zeros(len(sc), [('lon', F64), ('lat', F64)]) mesh['lon'] = sc.lons mesh['lat'] = sc.lats else: mesh = numpy.zeros(len(sc), [('lon', F64), ('lat', F64), ('depth', F64)]) mesh['lon'] = sc.lons mesh['lat'] = sc.lats mesh['depth'] = sc.depths return mesh
@export.add(('ruptures', 'xml'))
[docs]def export_ruptures_xml(ekey, dstore): """ :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """ fmt = ekey[-1] oq = dstore['oqparam'] sm_by_grp = dstore['csm_info'].get_sm_by_grp() mesh = get_mesh(dstore['sitecol']) ruptures = {} for grp in dstore['ruptures']: grp_id = int(grp[4:]) # strip grp- ruptures[grp_id] = [] for ebr in calc.get_ruptures(dstore, grp_id): ruptures[grp_id].append(ebr.export(mesh, sm_by_grp)) dest = dstore.export_path('ses.' + fmt) writer = hazard_writers.SESXMLWriter(dest) writer.serialize(ruptures, oq.investigation_time) return [dest]
@export.add(('ruptures', 'csv'))
[docs]def export_ses_csv(ekey, dstore): """ :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """ oq = dstore['oqparam'] if 'scenario' in oq.calculation_mode: return [] dest = dstore.export_path('ruptures.csv') header = ('rupid multiplicity mag centroid_lon centroid_lat centroid_depth' ' trt strike dip rake boundary').split() csm_info = dstore['csm_info'] grp_trt = csm_info.grp_trt() gsims = csm_info.get_rlzs_assoc().gsims_by_grp_id rows = [] for grp_id, trt in sorted(grp_trt.items()): rup_data = calc.RuptureData(trt, gsims[grp_id]).to_array( calc.get_ruptures(dstore, grp_id)) for r in rup_data: rows.append( (r['rup_id'], r['multiplicity'], r['mag'], r['lon'], r['lat'], r['depth'], trt, r['strike'], r['dip'], r['rake'], r['boundary'])) rows.sort() # by rupture serial writers.write_csv(dest, rows, header=header, sep='\t') return [dest]
# #################### export Ground Motion fields ########################## #
[docs]class GmfSet(object): """ Small wrapper around the list of Gmf objects associated to the given SES. """ def __init__(self, gmfset, investigation_time, ses_idx): self.gmfset = gmfset self.investigation_time = investigation_time self.stochastic_event_set_id = ses_idx def __iter__(self): return iter(self.gmfset) def __bool__(self): return bool(self.gmfset) def __str__(self): return ( 'GMFsPerSES(investigation_time=%f, ' 'stochastic_event_set_id=%s,\n%s)' % ( self.investigation_time, self.stochastic_event_set_id, '\n'.join( sorted(str(g) for g in self.gmfset))))
[docs]class GroundMotionField(object): """ The Ground Motion Field generated by the given rupture """ def __init__(self, imt, sa_period, sa_damping, rupture_id, gmf_nodes): self.imt = imt self.sa_period = sa_period self.sa_damping = sa_damping self.rupture_id = rupture_id self.gmf_nodes = gmf_nodes def __iter__(self): return iter(self.gmf_nodes) def __getitem__(self, key): return self.gmf_nodes[key] def __str__(self): # string representation of a _GroundMotionField object showing the # content of the nodes (lon, lat an gmv). This is useful for debugging # and testing. mdata = ('imt=%(imt)s sa_period=%(sa_period)s ' 'sa_damping=%(sa_damping)s rupture_id=%(rupture_id)s' % vars(self)) nodes = sorted(map(str, self.gmf_nodes)) return 'GMF(%s\n%s)' % (mdata, '\n'.join(nodes))
[docs]class GroundMotionFieldNode(object): # the signature is not (gmv, x, y) because the XML writer expects # a location object def __init__(self, gmv, loc): self.gmv = gmv self.location = loc def __lt__(self, other): """ A reproducible ordering by lon and lat; used in :function:`openquake.commonlib.hazard_writers.gen_gmfs` """ return (self.location.x, self.location.y) < ( other.location.x, other.location.y) def __str__(self): """Return lon, lat and gmv of the node in a compact string form""" return '<X=%9.5f, Y=%9.5f, GMV=%9.7f>' % ( self.location.x, self.location.y, self.gmv)
[docs]class GmfCollection(object): """ Object converting the parameters :param sitecol: SiteCollection :param ruptures: ruptures :param investigation_time: investigation time into an object with the right form for the EventBasedGMFXMLWriter. Iterating over a GmfCollection yields GmfSet objects. """ def __init__(self, sitecol, imts, ruptures, investigation_time): self.sitecol = sitecol self.ruptures = ruptures self.imts = imts self.investigation_time = investigation_time def __iter__(self): completemesh = self.sitecol.complete.mesh gmfset = collections.defaultdict(list) for imti, imt_str in enumerate(self.imts): imt, sa_period, sa_damping = from_string(imt_str) for rupture in self.ruptures: mesh = completemesh[rupture.indices] gmf = get_array(rupture.gmfa, imti=imti)['gmv'] assert len(mesh) == len(gmf), (len(mesh), len(gmf)) nodes = (GroundMotionFieldNode(gmv, loc) for gmv, loc in zip(gmf, mesh)) gmfset[rupture.ses_idx].append( GroundMotionField( imt, sa_period, sa_damping, rupture.eid, nodes)) for ses_idx in sorted(gmfset): yield GmfSet(gmfset[ses_idx], self.investigation_time, ses_idx)
# ####################### export hazard curves ############################ # HazardCurve = collections.namedtuple('HazardCurve', 'location poes')
[docs]def convert_to_array(pmap, sitemesh, imtls): """ Convert the probability map into a composite array with header of the form PGA-0.1, PGA-0.2 ... :param pmap: probability map :param sitemesh: mesh of N sites :param imtls: a DictArray with IMT and levels :returns: a composite array of lenght N """ nsites = len(sitemesh) lst = [] # build the export dtype, of the form PGA-0.1, PGA-0.2 ... for imt, imls in imtls.items(): for iml in imls: lst.append(('%s-%s' % (imt, iml), F64)) curves = numpy.zeros(nsites, numpy.dtype(lst)) for sid, pcurve in pmap.items(): curve = curves[sid] idx = 0 for imt, imls in imtls.items(): for iml in imls: curve['%s-%s' % (imt, iml)] = pcurve.array[idx] idx += 1 return util.compose_arrays(sitemesh, curves)
[docs]def export_hazard_csv(key, dest, sitemesh, pmap, imtls, comment): """ Export the curves of the given realization into CSV. :param key: output_type and export_type :param dest: name of the exported file :param sitemesh: site collection :param pmap: a ProbabilityMap :param dict imtls: intensity measure types and levels :param comment: comment to use as header of the exported CSV file """ curves = convert_to_array(pmap, sitemesh, imtls) writers.write_csv(dest, curves, comment=comment) return [dest]
[docs]def add_imt(fname, imt): """ >>> add_imt('/path/to/hcurve_23.csv', 'SA(0.1)') '/path/to/hcurve-SA(0.1)_23.csv' """ name = os.path.basename(fname) newname = re.sub('(_\d+\.)', '-%s\\1' % imt, name) return os.path.join(os.path.dirname(fname), newname)
[docs]def export_hcurves_by_imt_csv(key, kind, rlzs_assoc, fname, sitecol, pmap, oq): """ Export the curves of the given realization into CSV. :param key: output_type and export_type :param kind: a string with the kind of output (realization or statistics) :param rlzs_assoc: a :class:`openquake.commonlib.source.RlzsAssoc` instance :param fname: name of the exported file :param sitecol: site collection :param pmap: a probability map :param oq: job.ini parameters """ nsites = len(sitecol) fnames = [] slicedic = oq.imtls.slicedic for imt, imls in oq.imtls.items(): dest = add_imt(fname, imt) lst = [('lon', F32), ('lat', F32), ('depth', F32)] for iml in imls: lst.append(('%s-%s' % (imt, iml), F32)) hcurves = numpy.zeros(nsites, lst) for sid, lon, lat, dep in zip( range(nsites), sitecol.lons, sitecol.lats, sitecol.depths): poes = pmap.setdefault(sid, 0).array[slicedic[imt]] hcurves[sid] = (lon, lat, dep) + tuple(poes) fnames.append(writers.write_csv(dest, hcurves, comment=_comment( rlzs_assoc, kind, oq.investigation_time) + ',imt=%s' % imt)) return fnames
[docs]def hazard_curve_name(dstore, ekey, kind, rlzs_assoc): """ :param calc_id: the calculation ID :param ekey: the export key :param kind: the kind of key :param rlzs_assoc: a RlzsAssoc instance """ key, fmt = ekey prefix = {'hcurves': 'hazard_curve', 'hmaps': 'hazard_map', 'uhs': 'hazard_uhs'}[key] if kind.startswith('quantile-'): # strip the 7 characters 'hazard_' fname = dstore.build_fname('quantile_' + prefix[7:], kind[9:], fmt) else: fname = dstore.build_fname(prefix, kind, fmt) return fname
def _comment(rlzs_assoc, kind, investigation_time): rlz = rlzs_assoc.get_rlz(kind) if not rlz: return '%s, investigation_time=%s' % (kind, investigation_time) else: return ( 'source_model_tree_path=%s,gsim_tree_path=%s,' 'investigation_time=%s' % ( rlz.sm_lt_path, rlz.gsim_lt_path, investigation_time)) @util.reader
[docs]def build_hcurves(getter, imtls, monitor): with getter.dstore: pmaps = getter.get_pmaps(getter.sids) idx = dict(zip(getter.sids, range(len(getter.sids)))) curves = numpy.zeros((len(getter.sids), len(pmaps)), imtls.dt) for r, pmap in enumerate(pmaps): for sid in pmap: curves[idx[sid], r] = pmap[sid].convert(imtls) return getter.sids, curves
@export.add(('hcurves-rlzs', 'hdf5'))
[docs]def export_hcurves_rlzs(ekey, dstore): """ Export all hazard curves in a single .hdf5 file. This is not recommended, even if this exporter is parallel and very efficient. I was able to export 6 GB of curves per minute. However for large calculations it is then impossible to view the .hdf5 file with the hdfviewer because you will run out of memory. Also, compression is not enabled, otherwise all the time will be spent in the compression phase in the controller node with the workers doing nothing. The recommended way to postprocess large computations is to instantiate the PmapGetter and to work one block of sites at the time, discarding what it is not needed. The exporter here is meant for small/medium calculation and as an example of what you should implement yourself if you need to postprocess the hazard curves. """ oq = dstore['oqparam'] imtls = oq.imtls rlzs_assoc = dstore['csm_info'].get_rlzs_assoc() sitecol = dstore['sitecol'] pgetter = calc.PmapGetter(dstore, rlzs_assoc) N = len(sitecol) R = len(rlzs_assoc.realizations) fname = dstore.export_path('%s.%s' % ekey) monitor = performance.Monitor(ekey[0], fname) size = humansize(dstore.get_attr('poes', 'nbytes')) logging.info('Reading %s of probability maps', size) allargs = [(pgetter.new(tile.sids), imtls, monitor) for tile in sitecol.split_in_tiles(R)] with hdf5.File(fname, 'w') as f: f['imtls'] = imtls dset = f.create_dataset('hcurves-rlzs', (N, R), imtls.dt) dset.attrs['investigation_time'] = oq.investigation_time logging.info('Building the hazard curves for %d sites, %d rlzs', N, R) for sids, allcurves in parallel.Processmap(build_hcurves, allargs): for sid, curves in zip(sids, allcurves): dset[sid] = curves return [fname]
@export.add(('hcurves', 'csv'), ('hmaps', 'csv'), ('uhs', 'csv'))
[docs]def export_hcurves_csv(ekey, dstore): """ Exports the hazard curves into several .csv files :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """ oq = dstore['oqparam'] rlzs_assoc = dstore['csm_info'].get_rlzs_assoc() sitecol = dstore['sitecol'] sitemesh = get_mesh(sitecol) key, fmt = ekey if '/' in key: key, kind = key.rsplit('/', 1) ekey = (key, fmt) else: kind = '' fnames = [] if oq.poes: pdic = DictArray({imt: oq.poes for imt in oq.imtls}) for kind, hcurves in calc.PmapGetter(dstore).items(kind): fname = hazard_curve_name(dstore, ekey, kind, rlzs_assoc) comment = _comment(rlzs_assoc, kind, oq.investigation_time) if key == 'uhs' and oq.poes and oq.uniform_hazard_spectra: uhs_curves = calc.make_uhs( hcurves, oq.imtls, oq.poes, len(sitemesh)) writers.write_csv( fname, util.compose_arrays(sitemesh, uhs_curves), comment=comment) fnames.append(fname) elif key == 'hmaps' and oq.poes and oq.hazard_maps: hmap = calc.make_hmap(hcurves, oq.imtls, oq.poes) fnames.extend( export_hazard_csv(ekey, fname, sitemesh, hmap, pdic, comment)) elif key == 'hcurves': if export.from_db: # called by export_from_db fnames.extend( export_hcurves_by_imt_csv( ekey, kind, rlzs_assoc, fname, sitecol, hcurves, oq)) else: # when exporting directly from the datastore fnames.extend( export_hazard_csv( ekey, fname, sitemesh, hcurves, oq.imtls, comment)) return sorted(fnames)
UHS = collections.namedtuple('UHS', 'imls location')
[docs]def get_metadata(realizations, kind): """ :param list realizations: realization objects :param str kind: kind of data, i.e. a key in the datastore :returns: a dictionary with smlt_path, gsimlt_path, statistics, quantile_value """ metadata = {} if kind.startswith('rlz-'): rlz = realizations[int(kind[4:])] metadata['smlt_path'] = '_'.join(rlz.sm_lt_path) metadata['gsimlt_path'] = rlz.gsim_rlz.uid elif kind.startswith('quantile-'): metadata['statistics'] = 'quantile' metadata['quantile_value'] = float(kind[9:]) elif kind == 'mean': metadata['statistics'] = 'mean' elif kind == 'max': metadata['statistics'] = 'max' return metadata
@export.add(('uhs', 'xml'))
[docs]def export_uhs_xml(ekey, dstore): oq = dstore['oqparam'] rlzs_assoc = dstore['csm_info'].get_rlzs_assoc() pgetter = calc.PmapGetter(dstore) sitemesh = get_mesh(dstore['sitecol'].complete) key, fmt = ekey fnames = [] periods = [imt for imt in oq.imtls if imt.startswith('SA') or imt == 'PGA'] for kind, hcurves in pgetter.items(): metadata = get_metadata(rlzs_assoc.realizations, kind) _, periods = calc.get_imts_periods(oq.imtls) uhs = calc.make_uhs(hcurves, oq.imtls, oq.poes, len(sitemesh)) for poe in oq.poes: fname = hazard_curve_name( dstore, ekey, kind + '-%s' % poe, rlzs_assoc) writer = hazard_writers.UHSXMLWriter( fname, periods=periods, poe=poe, investigation_time=oq.investigation_time, **metadata) data = [] for site, curve in zip(sitemesh, uhs[str(poe)]): data.append(UHS(curve, Location(site))) writer.serialize(data) fnames.append(fname) return sorted(fnames)
[docs]class Location(object): def __init__(self, xyz): self.x, self.y = tuple(xyz)[:2] self.wkt = 'POINT(%s %s)' % (self.x, self.y)
HazardCurve = collections.namedtuple('HazardCurve', 'location poes') HazardMap = collections.namedtuple('HazardMap', 'lon lat iml') @export.add(('hcurves', 'xml'), ('hcurves', 'geojson'))
[docs]def export_hcurves_xml_json(ekey, dstore): export_type = ekey[1] len_ext = len(export_type) + 1 oq = dstore['oqparam'] sitemesh = get_mesh(dstore['sitecol']) rlzs_assoc = dstore['csm_info'].get_rlzs_assoc() fnames = [] writercls = (hazard_writers.HazardCurveGeoJSONWriter if export_type == 'geojson' else hazard_writers.HazardCurveXMLWriter) for kind, hcurves in calc.PmapGetter(dstore).items(): if kind.startswith('rlz-'): rlz = rlzs_assoc.realizations[int(kind[4:])] smlt_path = '_'.join(rlz.sm_lt_path) gsimlt_path = rlz.gsim_rlz.uid else: smlt_path = '' gsimlt_path = '' curves = hcurves.convert(oq.imtls, len(sitemesh)) name = hazard_curve_name(dstore, ekey, kind, rlzs_assoc) for imt in oq.imtls: imtype, sa_period, sa_damping = from_string(imt) fname = name[:-len_ext] + '-' + imt + '.' + export_type data = [HazardCurve(Location(site), poes[imt]) for site, poes in zip(sitemesh, curves)] writer = writercls(fname, investigation_time=oq.investigation_time, imls=oq.imtls[imt], imt=imtype, sa_period=sa_period, sa_damping=sa_damping, smlt_path=smlt_path, gsimlt_path=gsimlt_path) writer.serialize(data) fnames.append(fname) return sorted(fnames)
@export.add(('hmaps', 'xml'), ('hmaps', 'geojson'))
[docs]def export_hmaps_xml_json(ekey, dstore): export_type = ekey[1] oq = dstore['oqparam'] sitecol = dstore['sitecol'] sitemesh = get_mesh(sitecol) rlzs_assoc = dstore['csm_info'].get_rlzs_assoc() fnames = [] writercls = (hazard_writers.HazardMapGeoJSONWriter if export_type == 'geojson' else hazard_writers.HazardMapXMLWriter) pdic = DictArray({imt: oq.poes for imt in oq.imtls}) nsites = len(sitemesh) for kind, hcurves in calc.PmapGetter(dstore).items(): hmaps = calc.make_hmap( hcurves, oq.imtls, oq.poes).convert(pdic, nsites) if kind.startswith('rlz-'): rlz = rlzs_assoc.realizations[int(kind[4:])] smlt_path = '_'.join(rlz.sm_lt_path) gsimlt_path = rlz.gsim_rlz.uid else: smlt_path = '' gsimlt_path = '' for imt in oq.imtls: for j, poe in enumerate(oq.poes): suffix = '-%s-%s' % (poe, imt) fname = hazard_curve_name( dstore, ekey, kind + suffix, rlzs_assoc) data = [HazardMap(site[0], site[1], _extract(hmap, imt, j)) for site, hmap in zip(sitemesh, hmaps)] writer = writercls( fname, investigation_time=oq.investigation_time, imt=imt, poe=poe, smlt_path=smlt_path, gsimlt_path=gsimlt_path) writer.serialize(data) fnames.append(fname) return sorted(fnames)
def _extract(hmap, imt, j): # hmap[imt] can be a tuple or a scalar if j=0 tup = hmap[imt] if hasattr(tup, '__iter__'): return tup[j] assert j == 0 return tup @export.add(('hcurves', 'npz'))
[docs]def export_hcurves_npz(ekey, dstore): mesh = get_mesh(dstore['sitecol']) imtls = dstore['oqparam'].imtls fname = dstore.export_path('%s.%s' % ekey) arr = numpy.zeros(1, imtls.dt) for imt in imtls: arr[imt] = imtls[imt] dic = dict(imtls=arr[0]) for kind, hcurves in calc.PmapGetter(dstore).items(): curves = hcurves.convert(imtls, len(mesh)) dic[kind] = util.compose_arrays(mesh, curves) savez(fname, **dic) return [fname]
@export.add(('uhs', 'npz'))
[docs]def export_uhs_npz(ekey, dstore): oq = dstore['oqparam'] mesh = get_mesh(dstore['sitecol']) fname = dstore.export_path('%s.%s' % ekey) dic = {} for kind, hcurves in calc.PmapGetter(dstore).items(): uhs_curves = calc.make_uhs(hcurves, oq.imtls, oq.poes, len(mesh)) dic[kind] = util.compose_arrays(mesh, uhs_curves) savez(fname, **dic) return [fname]
@export.add(('hmaps', 'npz'))
[docs]def export_hmaps_npz(ekey, dstore): oq = dstore['oqparam'] mesh = get_mesh(dstore['sitecol']) pdic = DictArray({imt: oq.poes for imt in oq.imtls}) fname = dstore.export_path('%s.%s' % ekey) dic = {} for kind, hcurves in calc.PmapGetter(dstore).items(): hmap = calc.make_hmap(hcurves, oq.imtls, oq.poes) dic[kind] = convert_to_array(hmap, mesh, pdic) savez(fname, **dic) return [fname]
@export.add(('gmf_data', 'xml'))
[docs]def export_gmf(ekey, dstore): """ :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """ sitecol = dstore['sitecol'] rlzs_assoc = dstore['csm_info'].get_rlzs_assoc() oq = dstore['oqparam'] investigation_time = (None if oq.calculation_mode == 'scenario' else oq.investigation_time) fmt = ekey[-1] gmf_data = dstore['gmf_data'] nbytes = gmf_data.attrs['nbytes'] logging.info('Internal size of the GMFs: %s', humansize(nbytes)) if nbytes > GMF_MAX_SIZE: logging.warn(GMF_WARNING, dstore.hdf5path) fnames = [] ruptures_by_rlz = collections.defaultdict(list) for grp_id, gsim in rlzs_assoc: key = 'grp-%02d' % grp_id try: events = dstore['events/' + key] except KeyError: # source model producing zero ruptures continue eventdict = dict(zip(events['eid'], events)) try: data = gmf_data['%s/%s' % (key, gsim)].value except KeyError: # no GMFs for the given realization continue for rlzi, rlz in enumerate(rlzs_assoc[grp_id, gsim]): ruptures = ruptures_by_rlz[rlz] gmf_arr = get_array(data, rlzi=rlzi) for eid, gmfa in group_array(gmf_arr, 'eid').items(): ses_idx = eventdict[eid]['ses'] rup = Rup(eid, ses_idx, sorted(set(gmfa['sid'])), gmfa) ruptures.append(rup) for rlz in sorted(ruptures_by_rlz): ruptures_by_rlz[rlz].sort(key=operator.attrgetter('eid')) fname = dstore.build_fname('gmf', rlz, fmt) fnames.append(fname) globals()['export_gmf_%s' % fmt]( ('gmf', fmt), fname, sitecol, oq.imtls, ruptures_by_rlz[rlz], rlz, investigation_time) return fnames
Rup = collections.namedtuple('Rup', ['eid', 'ses_idx', 'indices', 'gmfa'])
[docs]def export_gmf_xml(key, dest, sitecol, imts, ruptures, rlz, investigation_time): """ :param key: output_type and export_type :param dest: name of the exported file :param sitecol: the full site collection :param imts: the list of intensity measure types :param ruptures: an ordered list of ruptures :param rlz: a realization object :param investigation_time: investigation time (None for scenario) """ if hasattr(rlz, 'gsim_rlz'): # event based smltpath = '_'.join(rlz.sm_lt_path) gsimpath = rlz.gsim_rlz.uid else: # scenario smltpath = '' gsimpath = rlz.uid writer = hazard_writers.EventBasedGMFXMLWriter( dest, sm_lt_path=smltpath, gsim_lt_path=gsimpath) writer.serialize( GmfCollection(sitecol, imts, ruptures, investigation_time)) return {key: [dest]}
@export.add(('gmf_data', 'csv'))
[docs]def export_gmf_data_csv(ekey, dstore): oq = dstore['oqparam'] rlzs_assoc = dstore['csm_info'].get_rlzs_assoc() if 'scenario' in oq.calculation_mode: imtls = dstore['oqparam'].imtls gsims = [str(rlz.gsim_rlz) for rlz in rlzs_assoc.realizations] n_gmfs = oq.number_of_ground_motion_fields fields = ['%03d' % i for i in range(n_gmfs)] dt = numpy.dtype([(f, F32) for f in fields]) etags, gmfs_ = calc.get_gmfs(dstore) sitemesh = get_mesh(dstore['sitecol']) writer = writers.CsvWriter(fmt='%.5f') for gsim, gmfa in zip(gsims, gmfs_): # gmfa of shape (N, I, E) for imti, imt in enumerate(imtls): gmfs = numpy.zeros(len(gmfa), dt) for e, event in enumerate(dt.names): gmfs[event] = gmfa[:, imti, e] dest = dstore.build_fname('gmf', '%s-%s' % (gsim, imt), 'csv') data = util.compose_arrays(sitemesh, gmfs) writer.save(data, dest) return writer.getsaved() else: # event based eid = int(ekey[0].split('/')[1]) if '/' in ekey[0] else None gmfa = numpy.fromiter( GmfDataGetter.gen_gmfs(dstore['gmf_data'], rlzs_assoc, eid), gmf_data_dt) if eid is None: # new format fname = dstore.build_fname('gmf', 'data', 'csv') gmfa.sort(order=['rlzi', 'sid', 'eid', 'imti']) writers.write_csv(fname, gmfa) return [fname] # old format for single eid fnames = [] imts = list(oq.imtls) for rlzi, array in group_array(gmfa, 'rlzi').items(): rlz = rlzs_assoc.realizations[rlzi] data, comment = _build_csv_data( array, rlz, dstore['sitecol'], imts, oq.investigation_time) fname = dstore.build_fname( 'gmf', '%d-rlz-%03d' % (eid, rlzi), 'csv') writers.write_csv(fname, data, comment=comment) fnames.append(fname) return fnames
def _build_csv_data(array, rlz, sitecol, imts, investigation_time): # lon, lat, gmv_imt1, ..., gmv_imtN smlt_path = '_'.join(rlz.sm_lt_path) gsimlt_path = rlz.gsim_rlz.uid comment = ('smlt_path=%s, gsimlt_path=%s, investigation_time=%s' % (smlt_path, gsimlt_path, investigation_time)) rows = [['lon', 'lat'] + imts] irange = range(len(imts)) for sid, data in group_array(array, 'sid').items(): dic = dict(zip(data['imti'], data['gmv'])) row = ['%.5f' % sitecol.lons[sid], '%.5f' % sitecol.lats[sid]] + [ dic.get(imti, 0) for imti in irange] rows.append(row) return rows, comment @export.add(('gmf_data', 'npz'))
[docs]def export_gmf_scenario_npz(ekey, dstore): oq = dstore['oqparam'] dic = {} fname = dstore.export_path('%s.%s' % ekey) if 'scenario' in oq.calculation_mode: # compute the GMFs on the fly from the stored rupture # NB: for visualization purposes we want to export the full mesh # of points, including the ones outside the maximum distance # NB2: in the future, I want to add a sitecol output, then the # visualization of the mesh will be possibile even without the GMFs; # in the future, here we will change # sitemesh = get_mesh(dstore['sitecol'], complete=False) sitecol = dstore['sitecol'].complete sitemesh = get_mesh(sitecol) rlzs_assoc = dstore['csm_info'].get_rlzs_assoc() gsims = rlzs_assoc.gsims_by_grp_id[0] # there is a single grp_id E = oq.number_of_ground_motion_fields correl_model = oq.get_correl_model() [ebrupture] = calc.get_ruptures(dstore, 0) computer = gmf.GmfComputer( ebrupture, sitecol, oq.imtls, gsims, oq.truncation_level, correl_model) gmf_dt = numpy.dtype([(imt, (F32, (E,))) for imt in oq.imtls]) imts = list(oq.imtls) for gsim in gsims: arr = computer.compute(gsim, E, oq.random_seed) I, S, E = arr.shape # #IMTs, #sites, #events gmfa = numpy.zeros(S, gmf_dt) for imti, imt in enumerate(imts): gmfa[imt] = arr[imti] dic[str(gsim)] = util.compose_arrays(sitemesh, gmfa) elif 'event_based' in oq.calculation_mode: dic['sitemesh'] = get_mesh(dstore['sitecol']) for grp in sorted(dstore['gmf_data']): for rlzno in sorted(dstore['gmf_data/' + grp]): dic['rlz-' + rlzno] = dstore[ 'gmf_data/%s/%s' % (grp, rlzno)].value else: # nothing to export return [] savez(fname, **dic) return [fname]
DisaggMatrix = collections.namedtuple( 'DisaggMatrix', 'poe iml dim_labels matrix') @export.add(('disagg', 'xml'))
[docs]def export_disagg_xml(ekey, dstore): oq = dstore['oqparam'] rlzs = dstore['csm_info'].get_rlzs_assoc().realizations group = dstore['disagg'] fnames = [] writercls = hazard_writers.DisaggXMLWriter for key in group: matrix = dstore['disagg/' + key] attrs = group[key].attrs rlz = rlzs[attrs['rlzi']] poe = attrs['poe'] iml = attrs['iml'] imt, sa_period, sa_damping = from_string(attrs['imt']) fname = dstore.export_path(key + '.xml') lon, lat = attrs['location'] # TODO: add poe=poe below writer = writercls( fname, investigation_time=oq.investigation_time, imt=imt, smlt_path='_'.join(rlz.sm_lt_path), gsimlt_path=rlz.gsim_rlz.uid, lon=lon, lat=lat, sa_period=sa_period, sa_damping=sa_damping, mag_bin_edges=attrs['mag_bin_edges'], dist_bin_edges=attrs['dist_bin_edges'], lon_bin_edges=attrs['lon_bin_edges'], lat_bin_edges=attrs['lat_bin_edges'], eps_bin_edges=attrs['eps_bin_edges'], tectonic_region_types=attrs['trts'], ) data = [ DisaggMatrix(poe, iml, dim_labels, matrix['_'.join(dim_labels)]) for i, dim_labels in enumerate(disagg.pmf_map)] writer.serialize(data) fnames.append(fname) return sorted(fnames)
# adapted from the nrml_converters
[docs]def save_disagg_to_csv(metadata, matrices): """ Save disaggregation matrices to multiple .csv files. """ skip_keys = ('Mag', 'Dist', 'Lon', 'Lat', 'Eps', 'TRT') base_header = ','.join( '%s=%s' % (key, value) for key, value in metadata.items() if value is not None and key not in skip_keys) for disag_tup, (poe, iml, matrix, fname) in matrices.items(): header = '%s,poe=%s,iml=%s\n' % (base_header, poe, iml) if disag_tup == ('Mag', 'Lon', 'Lat'): matrix = numpy.swapaxes(matrix, 0, 1) matrix = numpy.swapaxes(matrix, 1, 2) disag_tup = ('Lon', 'Lat', 'Mag') axis = [metadata[v] for v in disag_tup] header += ','.join(v for v in disag_tup) header += ',poe' # compute axis mid points axis = [(ax[: -1] + ax[1:]) / 2. if ax.dtype == float else ax for ax in axis] values = None if len(axis) == 1: values = numpy.array([axis[0], matrix.flatten()]).T else: grids = numpy.meshgrid(*axis, indexing='ij') values = [g.flatten() for g in grids] values.append(matrix.flatten()) values = numpy.array(values).T writers.write_csv(fname, values, comment=header, fmt='%.5E')
@export.add(('disagg', 'csv'))
[docs]def export_disagg_csv(ekey, dstore): oq = dstore['oqparam'] disagg_outputs = oq.disagg_outputs or valid.disagg_outs rlzs = dstore['csm_info'].get_rlzs_assoc().realizations group = dstore['disagg'] fnames = [] for key in group: matrix = dstore['disagg/' + key] attrs = group[key].attrs rlz = rlzs[attrs['rlzi']] poe = attrs['poe'] iml = attrs['iml'] imt, sa_period, sa_damping = from_string(attrs['imt']) lon, lat = attrs['location'] metadata = collections.OrderedDict() # Loads "disaggMatrices" nodes metadata['smlt_path'] = '_'.join(rlz.sm_lt_path) metadata['gsimlt_path'] = rlz.gsim_rlz.uid metadata['imt'] = imt metadata['investigation_time'] = oq.investigation_time metadata['lon'] = lon metadata['lat'] = lat metadata['Mag'] = attrs['mag_bin_edges'] metadata['Dist'] = attrs['dist_bin_edges'] metadata['Lon'] = attrs['lon_bin_edges'] metadata['Lat'] = attrs['lat_bin_edges'] metadata['Eps'] = attrs['eps_bin_edges'] metadata['TRT'] = attrs['trts'] data = {} for label in disagg_outputs: tup = tuple(label.split('_')) fname = dstore.export_path(key + '_%s.csv' % label) data[tup] = poe, iml, matrix[label].value, fname fnames.append(fname) save_disagg_to_csv(metadata, data) return fnames
@export.add(('realizations', 'csv'))
[docs]def export_realizations(ekey, dstore): rlzs = dstore[ekey[0]] data = [['ordinal', 'uid', 'model', 'gsim', 'weight']] for i, rlz in enumerate(rlzs): data.append([i, rlz['uid'], rlz['model'], rlz['gsims'], rlz['weight']]) path = dstore.export_path('realizations.csv') writers.write_csv(path, data, fmt='%s') return [path]
@export.add(('sourcegroups', 'csv'))
[docs]def export_sourcegroups(ekey, dstore): csm_info = dstore['csm_info'] data = [['grp_id', 'trt', 'eff_ruptures']] for i, sm in enumerate(csm_info.source_models): for src_group in sm.src_groups: trt = source.capitalize(src_group.trt) er = src_group.eff_ruptures data.append((src_group.id, trt, er)) path = dstore.export_path('sourcegroups.csv') writers.write_csv(path, data, fmt='%s') return [path]
# because of the code in server.views.calc_results we are not visualizing # .txt outputs, so we use .rst here @export.add(('fullreport', 'rst'))
[docs]def export_fullreport(ekey, dstore): with open(dstore.export_path('report.rst'), 'w') as f: f.write(view('fullreport', dstore)) return [f.name]