# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2020 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import re
import os
import sys
import operator
import collections
import numpy
from openquake.baselib.general import (
group_array, deprecated, AccumDict, DictArray)
from openquake.hazardlib.imt import from_string
from openquake.hazardlib.calc import disagg
from openquake.calculators.views import view
from openquake.calculators.extract import extract, get_mesh, get_info
from openquake.calculators.export import export
from openquake.calculators.getters import gen_rgetters
from openquake.commonlib import writers, hazard_writers, calc, util
F32 = numpy.float32
F64 = numpy.float64
U8 = numpy.uint8
U16 = numpy.uint16
U32 = numpy.uint32
# with compression you can save 60% of space by losing only 10% of saving time
savez = numpy.savez_compressed
[docs]def add_quotes(values):
# used to source names in CSV files
return ['"%s"' % val for val in values]
[docs]@export.add(('ruptures', 'xml'))
@deprecated(msg='This exporter will disappear in the future')
def export_ruptures_xml(ekey, dstore):
"""
:param ekey: export key, i.e. a pair (datastore key, fmt)
:param dstore: datastore object
"""
fmt = ekey[-1]
oq = dstore['oqparam']
events = group_array(dstore['events'][()], 'rup_id')
ruptures_by_grp = AccumDict(accum=[])
for rgetter in gen_rgetters(dstore):
ebrs = []
for proxy in rgetter.get_proxies():
events_by_ses = group_array(events[proxy['id']], 'ses_id')
ebr = proxy.to_ebr(rgetter.trt, rgetter.samples)
ebrs.append(ebr.export(events_by_ses))
ruptures_by_grp[rgetter.grp_id].extend(ebrs)
dest = dstore.export_path('ses.' + fmt)
writer = hazard_writers.SESXMLWriter(dest)
writer.serialize(ruptures_by_grp, oq.investigation_time)
return [dest]
[docs]@export.add(('ruptures', 'csv'))
def export_ruptures_csv(ekey, dstore):
"""
:param ekey: export key, i.e. a pair (datastore key, fmt)
:param dstore: datastore object
"""
oq = dstore['oqparam']
if 'scenario' in oq.calculation_mode:
return []
dest = dstore.export_path('ruptures.csv')
arr = extract(dstore, 'rupture_info')
if export.sanity_check:
bad = view('bad_ruptures', dstore)
if bad.count('\n') > 3: # nonempty rst_table
print(bad, file=sys.stderr)
comment = dstore.metadata
comment.update(investigation_time=oq.investigation_time,
ses_per_logic_tree_path=oq.ses_per_logic_tree_path)
arr.array.sort(order='rup_id')
writers.write_csv(dest, arr, comment=comment)
return [dest]
# ####################### export hazard curves ############################ #
HazardCurve = collections.namedtuple('HazardCurve', 'location poes')
[docs]def export_hmaps_csv(key, dest, sitemesh, array, comment):
"""
Export the hazard maps of the given realization into CSV.
:param key: output_type and export_type
:param dest: name of the exported file
:param sitemesh: site collection
:param array: a composite array of dtype hmap_dt
:param comment: comment to use as header of the exported CSV file
"""
curves = util.compose_arrays(sitemesh, array)
writers.write_csv(dest, curves, comment=comment)
return [dest]
[docs]def add_imt(fname, imt):
"""
>>> add_imt('/path/to/hcurve_23.csv', 'SA(0.1)')
'/path/to/hcurve-SA(0.1)_23.csv'
"""
name = os.path.basename(fname)
newname = re.sub(r'(_\d+\.)', '-%s\\1' % imt, name)
return os.path.join(os.path.dirname(fname), newname)
[docs]def export_hcurves_by_imt_csv(
key, kind, fname, sitecol, array, imtls, comment):
"""
Export the curves of the given realization into CSV.
:param key: output_type and export_type
:param kind: a string with the kind of output (realization or statistics)
:param fname: name of the exported file
:param sitecol: site collection
:param array: an array of shape (N, L) and dtype numpy.float32
:param imtls: intensity measure types and levels
:param comment: comment dictionary
"""
nsites = len(sitecol)
fnames = []
for imt, imls in imtls.items():
slc = imtls(imt)
dest = add_imt(fname, imt)
lst = [('lon', F32), ('lat', F32), ('depth', F32)]
for iml in imls:
lst.append(('poe-%.7f' % iml, F32))
hcurves = numpy.zeros(nsites, lst)
for sid, lon, lat, dep in zip(
range(nsites), sitecol.lons, sitecol.lats, sitecol.depths):
hcurves[sid] = (lon, lat, dep) + tuple(array[sid, slc])
comment.update(imt=imt)
fnames.append(
writers.write_csv(dest, hcurves, comment=comment,
header=[name for (name, dt) in lst]))
return fnames
[docs]def hazard_curve_name(dstore, ekey, kind):
"""
:param calc_id: the calculation ID
:param ekey: the export key
:param kind: the kind of key
"""
key, fmt = ekey
prefix = {'hcurves': 'hazard_curve', 'hmaps': 'hazard_map',
'uhs': 'hazard_uhs'}[key]
if kind.startswith('quantile-'): # strip the 7 characters 'hazard_'
fname = dstore.build_fname('quantile_' + prefix[7:], kind[9:], fmt)
else:
fname = dstore.build_fname(prefix, kind, fmt)
return fname
[docs]def get_kkf(ekey):
"""
:param ekey: export key, for instance ('uhs/rlz-1', 'xml')
:returns: key, kind and fmt from the export key, i.e. 'uhs', 'rlz-1', 'xml'
"""
key, fmt = ekey
if '/' in key:
key, kind = key.split('/', 1)
else:
kind = ''
return key, kind, fmt
[docs]@export.add(('hcurves', 'csv'), ('hmaps', 'csv'), ('uhs', 'csv'))
def export_hcurves_csv(ekey, dstore):
"""
Exports the hazard curves into several .csv files
:param ekey: export key, i.e. a pair (datastore key, fmt)
:param dstore: datastore object
"""
oq = dstore['oqparam']
info = get_info(dstore)
R = dstore['full_lt'].get_num_rlzs()
sitecol = dstore['sitecol']
sitemesh = get_mesh(sitecol)
key, kind, fmt = get_kkf(ekey)
fnames = []
comment = dstore.metadata
hmap_dt = oq.hmap_dt()
for kind in oq.get_kinds(kind, R):
fname = hazard_curve_name(dstore, (key, fmt), kind)
comment.update(kind=kind, investigation_time=oq.investigation_time)
if (key in ('hmaps', 'uhs') and oq.uniform_hazard_spectra or
oq.hazard_maps):
hmap = extract(dstore, 'hmaps?kind=' + kind)[kind]
if key == 'uhs' and oq.poes and oq.uniform_hazard_spectra:
uhs_curves = calc.make_uhs(hmap, info)
writers.write_csv(
fname, util.compose_arrays(sitemesh, uhs_curves),
comment=comment)
fnames.append(fname)
elif key == 'hmaps' and oq.poes and oq.hazard_maps:
fnames.extend(
export_hmaps_csv(ekey, fname, sitemesh,
hmap.flatten().view(hmap_dt), comment))
elif key == 'hcurves':
hcurves = extract(dstore, 'hcurves?kind=' + kind)[kind]
if 'amplification' in oq.inputs:
imtls = DictArray(
{imt: oq.soil_intensities for imt in oq.imtls})
else:
imtls = oq.imtls
fnames.extend(
export_hcurves_by_imt_csv(
ekey, kind, fname, sitecol, hcurves, imtls, comment))
return sorted(fnames)
UHS = collections.namedtuple('UHS', 'imls location')
[docs]@export.add(('uhs', 'xml'))
@deprecated(msg='Use the CSV exporter instead')
def export_uhs_xml(ekey, dstore):
oq = dstore['oqparam']
rlzs = dstore['full_lt'].get_realizations()
R = len(rlzs)
sitemesh = get_mesh(dstore['sitecol'].complete)
key, kind, fmt = get_kkf(ekey)
fnames = []
periods = [imt.period for imt in oq.imt_periods()]
for kind in oq.get_kinds(kind, R):
metadata = get_metadata(rlzs, kind)
uhs = extract(dstore, 'uhs?kind=' + kind)[kind]
for p, poe in enumerate(oq.poes):
fname = hazard_curve_name(dstore, (key, fmt), kind + '-%s' % poe)
writer = hazard_writers.UHSXMLWriter(
fname, periods=periods, poe=poe,
investigation_time=oq.investigation_time, **metadata)
data = []
for site, curve in zip(sitemesh, uhs):
data.append(UHS(curve[:, p], Location(site)))
writer.serialize(data)
fnames.append(fname)
return sorted(fnames)
[docs]class Location(object):
def __init__(self, xyz):
self.x, self.y = tuple(xyz)[:2]
self.wkt = 'POINT(%s %s)' % (self.x, self.y)
HazardCurve = collections.namedtuple('HazardCurve', 'location poes')
HazardMap = collections.namedtuple('HazardMap', 'lon lat iml')
[docs]@export.add(('hcurves', 'xml'))
@deprecated(msg='Use the CSV exporter instead')
def export_hcurves_xml(ekey, dstore):
key, kind, fmt = get_kkf(ekey)
len_ext = len(fmt) + 1
oq = dstore['oqparam']
sitemesh = get_mesh(dstore['sitecol'])
rlzs = dstore['full_lt'].get_realizations()
R = len(rlzs)
fnames = []
writercls = hazard_writers.HazardCurveXMLWriter
for kind in oq.get_kinds(kind, R):
if kind.startswith('rlz-'):
rlz = rlzs[int(kind[4:])]
smlt_path = '_'.join(rlz.sm_lt_path)
gsimlt_path = rlz.gsim_rlz.pid
else:
smlt_path = ''
gsimlt_path = ''
name = hazard_curve_name(dstore, ekey, kind)
hcurves = extract(dstore, 'hcurves?kind=' + kind)[kind]
for im in oq.imtls:
slc = oq.imtls(im)
imt = from_string(im)
fname = name[:-len_ext] + '-' + im + '.' + fmt
data = [HazardCurve(Location(site), poes[slc])
for site, poes in zip(sitemesh, hcurves)]
writer = writercls(fname,
investigation_time=oq.investigation_time,
imls=oq.imtls[im], imt=imt.name,
sa_period=getattr(imt, 'period', None) or None,
sa_damping=getattr(imt, 'damping', None),
smlt_path=smlt_path, gsimlt_path=gsimlt_path)
writer.serialize(data)
fnames.append(fname)
return sorted(fnames)
[docs]@export.add(('hmaps', 'xml'))
@deprecated(msg='Use the CSV exporter instead')
def export_hmaps_xml(ekey, dstore):
key, kind, fmt = get_kkf(ekey)
oq = dstore['oqparam']
sitecol = dstore['sitecol']
sitemesh = get_mesh(sitecol)
rlzs = dstore['full_lt'].get_realizations()
R = len(rlzs)
fnames = []
writercls = hazard_writers.HazardMapXMLWriter
for kind in oq.get_kinds(kind, R):
# shape (N, M, P)
hmaps = extract(dstore, 'hmaps?kind=' + kind)[kind]
if kind.startswith('rlz-'):
rlz = rlzs[int(kind[4:])]
smlt_path = '_'.join(rlz.sm_lt_path)
gsimlt_path = rlz.gsim_rlz.pid
else:
smlt_path = ''
gsimlt_path = ''
for m, imt in enumerate(oq.imtls):
for p, poe in enumerate(oq.poes):
suffix = '-%s-%s' % (poe, imt)
fname = hazard_curve_name(dstore, ekey, kind + suffix)
data = [HazardMap(site[0], site[1], hmap[m, p])
for site, hmap in zip(sitemesh, hmaps)]
writer = writercls(
fname, investigation_time=oq.investigation_time,
imt=imt, poe=poe,
smlt_path=smlt_path, gsimlt_path=gsimlt_path)
writer.serialize(data)
fnames.append(fname)
return sorted(fnames)
def _extract(hmap, imt, j):
# hmap[imt] can be a tuple or a scalar if j=0
tup = hmap[imt]
if hasattr(tup, '__iter__'):
return tup[j]
assert j == 0
return tup
[docs]@export.add(('hcurves', 'npz'), ('hmaps', 'npz'), ('uhs', 'npz'),
('gmf_data', 'npz'),
('losses_by_asset', 'npz'), ('dmg_by_asset', 'npz'))
def export_hazard_npz(ekey, dstore):
fname = dstore.export_path('%s.%s' % ekey)
out = extract(dstore, ekey[0])
kw = {k: v for k, v in vars(out).items() if not k.startswith('_')}
savez(fname, **kw)
return [fname]
[docs]@export.add(('gmf_data', 'csv'))
def export_gmf_data_csv(ekey, dstore):
oq = dstore['oqparam']
rlzs = dstore['full_lt'].get_realizations()
imts = list(oq.imtls)
sc = dstore['sitecol'].array
arr = sc[['lon', 'lat']]
eid = int(ekey[0].split('/')[1]) if '/' in ekey[0] else None
gmfa = dstore['gmf_data/data'][('eid', 'sid', 'gmv')]
event_id = dstore['events']['id']
gmfa['eid'] = event_id[gmfa['eid']]
if eid is None: # we cannot use extract here
f = dstore.build_fname('sitemesh', '', 'csv')
sids = numpy.arange(len(arr), dtype=U32)
sites = util.compose_arrays(sids, arr, 'site_id')
writers.write_csv(f, sites)
fname = dstore.build_fname('gmf', 'data', 'csv')
gmfa.sort(order=['eid', 'sid'])
writers.write_csv(fname, _expand_gmv(gmfa, imts),
renamedict={'sid': 'site_id', 'eid': 'event_id'})
if 'sigma_epsilon' in dstore['gmf_data']:
sig_eps_csv = dstore.build_fname('sigma_epsilon', '', 'csv')
sig_eps = dstore['gmf_data/sigma_epsilon'][()]
sig_eps['eid'] = event_id[sig_eps['eid']]
sig_eps.sort(order='eid')
header = list(sig_eps.dtype.names)
header[0] = 'event_id'
writers.write_csv(sig_eps_csv, sig_eps, header=header)
return [fname, sig_eps_csv, f]
else:
return [fname, f]
# old format for single eid
# TODO: is this still used?
gmfa = gmfa[gmfa['eid'] == eid]
eid2rlz = dict(dstore['events'])
rlzi = eid2rlz[eid]
rlz = rlzs[rlzi]
data, comment = _build_csv_data(
gmfa, rlz, dstore['sitecol'], imts, oq.investigation_time)
fname = dstore.build_fname(
'gmf', '%d-rlz-%03d' % (eid, rlzi), 'csv')
return writers.write_csv(fname, data, comment=comment)
def _expand_gmv(array, imts):
# the array-field gmv becomes a set of scalar fields gmv_<imt>
dtype = array.dtype
assert dtype['gmv'].shape[0] == len(imts)
dtlist = []
for name in dtype.names:
dt = dtype[name]
if name == 'gmv':
for imt in imts:
dtlist.append(('gmv_' + imt, F32))
else:
dtlist.append((name, dt))
new = numpy.zeros(len(array), dtlist)
for name in dtype.names:
if name == 'gmv':
for i, imt in enumerate(imts):
new['gmv_' + imt] = array['gmv'][:, i]
else:
new[name] = array[name]
return new
def _build_csv_data(array, rlz, sitecol, imts, investigation_time):
# lon, lat, gmv_imt1, ..., gmv_imtN
smlt_path = '_'.join(rlz.sm_lt_path)
gsimlt_path = rlz.gsim_rlz.pid
comment = ('smlt_path=%s, gsimlt_path=%s, investigation_time=%s' %
(smlt_path, gsimlt_path, investigation_time))
rows = [['lon', 'lat'] + imts]
for sid, data in group_array(array, 'sid').items():
row = ['%.5f' % sitecol.lons[sid], '%.5f' % sitecol.lats[sid]] + list(
data['gmv'])
rows.append(row)
return rows, comment
DisaggMatrix = collections.namedtuple(
'DisaggMatrix', 'poe iml dim_labels matrix')
# NB: this is not able to export the mean files
[docs]@export.add(('disagg', 'xml'))
@deprecated(msg='Use the CSV exporter instead')
def export_disagg_xml(ekey, dstore):
oq = dstore['oqparam']
rlzs = dstore['full_lt'].get_realizations()
group = dstore['disagg']
fnames = []
writercls = hazard_writers.DisaggXMLWriter
trts = dstore.get_attr('full_lt', 'trts')
for key in group:
if not key.startswith('rlz-'):
continue
matrix = dstore['disagg/' + key]
attrs = group[key].attrs
rlz = rlzs[attrs['rlzi']]
poe_agg = attrs['poe_agg']
iml = attrs['iml']
imt = from_string(attrs['imt'])
fname = dstore.export_path(key + '.xml')
lon, lat = attrs['location']
writer = writercls(
fname, investigation_time=oq.investigation_time,
imt=imt.name, smlt_path='_'.join(rlz.sm_lt_path),
gsimlt_path=rlz.gsim_rlz.pid, lon=lon, lat=lat,
sa_period=getattr(imt, 'period', None) or None,
sa_damping=getattr(imt, 'damping', None),
mag_bin_edges=attrs['mag_bin_edges'],
dist_bin_edges=attrs['dist_bin_edges'],
lon_bin_edges=attrs['lon_bin_edges'],
lat_bin_edges=attrs['lat_bin_edges'],
eps_bin_edges=attrs['eps_bin_edges'],
tectonic_region_types=trts)
data = []
for poe, k in zip(poe_agg, oq.disagg_outputs or disagg.pmf_map):
data.append(DisaggMatrix(poe, iml, k.split('_'), matrix[k]))
writer.serialize(data)
fnames.append(fname)
return sorted(fnames)
[docs]@export.add(('disagg', 'csv'))
def export_disagg_csv(ekey, dstore):
oq = dstore['oqparam']
rlzs = dstore['full_lt'].get_realizations()
group = dstore[ekey[0]]
fnames = []
skip_keys = ('Mag', 'Dist', 'Lon', 'Lat', 'Eps', 'TRT')
for key in group:
attrs = group[key].attrs
try:
poe = attrs['poe']
except Exception: # no poes_disagg were given
poe = attrs['poe_agg'][0]
try:
rlz = rlzs[attrs['rlzi']]
except TypeError:
rlz = ''
dic = dict(poe='%.7f' % poe)
ex = 'disagg?kind=%s&imt=%s&site_id=%s&poe_id=%d'
else:
dic = dict(poe='%.7f' % poe, iml='%.7e' % attrs['iml'],
rlz=rlz.ordinal)
ex = ('disagg?kind=%s&imt=%s&site_id=%s&poe_id=%d&rlz=' +
str(rlz.ordinal))
imt = from_string(attrs['imt'])
site_id = attrs['site_id']
lon, lat = attrs['location']
metadata = dstore.metadata
# Loads "disaggMatrices" nodes
if hasattr(rlz, 'sm_lt_path'):
metadata['smlt_path'] = '_'.join(rlz.sm_lt_path)
metadata['gsimlt_path'] = rlz.gsim_rlz.pid
metadata['imt'] = imt.name
metadata['investigation_time'] = oq.investigation_time
metadata['lon'] = lon
metadata['lat'] = lat
metadata['Mag'] = attrs['mag_bin_edges']
metadata['Dist'] = attrs['dist_bin_edges']
metadata['Lon'] = attrs['lon_bin_edges']
metadata['Lat'] = attrs['lat_bin_edges']
metadata['Eps'] = attrs['eps_bin_edges']
metadata['TRT'] = attrs['trt_bin_edges']
# example: key = 'rlz-0-PGA-sid-0-poe-0'
poe_id = int(key.rsplit('-', 1)[1])
for label, dset in sorted(group[key].items()):
header = label.lower().split('_') + ['poe']
com = {key: value for key, value in metadata.items()
if value is not None and key not in skip_keys}
com.update(dic)
fname = dstore.export_path(key + '_%s.csv' % label)
values = extract(dstore, ex % (label, imt, site_id, poe_id))
writers.write_csv(fname, values, header=header, comment=com,
fmt='%.5E')
fnames.append(fname)
return fnames
[docs]@export.add(('disagg_by_src', 'csv'))
def export_disagg_by_src_csv(ekey, dstore):
paths = []
srcdata = dstore['disagg_by_grp'][()]
header = ['source_id', 'poe']
by_poe = operator.itemgetter(1)
for name in dstore['disagg_by_src']:
probs = dstore['disagg_by_src/' + name][()]
ok = probs > 0
src = srcdata[ok]
data = [header] + sorted(zip(add_quotes(src['grp_name']), probs[ok]),
key=by_poe, reverse=True)
path = dstore.export_path(name + '_Src.csv')
writers.write_csv(path, data, fmt='%.7e')
paths.append(path)
return paths
[docs]@export.add(('realizations', 'csv'))
def export_realizations(ekey, dstore):
data = extract(dstore, 'realizations').array
path = dstore.export_path('realizations.csv')
writers.write_csv(path, data, fmt='%.7e')
return [path]
[docs]@export.add(('events', 'csv'))
def export_events(ekey, dstore):
events = dstore['events'][()]
path = dstore.export_path('events.csv')
writers.write_csv(path, events, fmt='%s', renamedict=dict(id='event_id'))
return [path]
# because of the code in server.views.calc_results we are not visualizing
# .txt outputs, so we use .rst here
[docs]@export.add(('fullreport', 'rst'))
def export_fullreport(ekey, dstore):
with open(dstore.export_path('report.rst'), 'w') as f:
f.write(view('fullreport', dstore))
return [f.name]