# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import re
import os
import sys
import itertools
import collections
import numpy
import pandas
from openquake.baselib.general import deprecated, DictArray
from openquake.baselib import hdf5, writers
from openquake.baselib.python3compat import decode
from openquake.hazardlib.imt import from_string
from openquake.calculators.views import view, text_table
from openquake.calculators.extract import extract, get_sites, get_info
from openquake.calculators.export import export
from openquake.commonlib import hazard_writers, calc, util
F32 = numpy.float32
F64 = numpy.float64
U8 = numpy.uint8
U16 = numpy.uint16
U32 = numpy.uint32
# with compression you can save 60% of space by losing only 10% of saving time
savez = numpy.savez_compressed
[docs]def add_quotes(values):
# used to source names in CSV files
return ['"%s"' % val for val in values]
[docs]@export.add(('ruptures', 'csv'))
def export_ruptures_csv(ekey, dstore):
"""
:param ekey: export key, i.e. a pair (datastore key, fmt)
:param dstore: datastore object
"""
oq = dstore['oqparam']
if 'scenario' in oq.calculation_mode:
return []
dest = dstore.export_path('ruptures.csv')
arr = extract(dstore, 'rupture_info')
if export.sanity_check:
bad = view('bad_ruptures', dstore)
if len(bad): # nonempty
print(text_table(bad), file=sys.stderr)
comment = dstore.metadata
comment.update(investigation_time=oq.investigation_time,
ses_per_logic_tree_path=oq.ses_per_logic_tree_path)
arr.array.sort(order='rup_id')
writers.write_csv(dest, arr, comment=comment)
return [dest]
# ####################### export hazard curves ############################ #
HazardCurve = collections.namedtuple('HazardCurve', 'location poes')
[docs]def export_hmaps_csv(key, dest, sitemesh, array, comment):
"""
Export the hazard maps of the given realization into CSV.
:param key: output_type and export_type
:param dest: name of the exported file
:param sitemesh: site collection
:param array: a composite array of dtype hmap_dt
:param comment: comment to use as header of the exported CSV file
"""
curves = util.compose_arrays(sitemesh, array)
writers.write_csv(dest, curves, comment=comment)
return [dest]
[docs]def add_imt(fname, imt):
"""
>>> add_imt('/path/to/hcurve_23.csv', 'SA(0.1)')
'/path/to/hcurve-SA(0.1)_23.csv'
"""
name = os.path.basename(fname)
newname = re.sub(r'(_\d+\.)', '-%s\\1' % imt, name)
return os.path.join(os.path.dirname(fname), newname)
[docs]def export_hcurves_by_imt_csv(
key, kind, fname, sitecol, array, imt, imls, comment):
"""
Export the curves of the given realization into CSV.
:param key: output_type and export_type
:param kind: a string with the kind of output (realization or statistics)
:param fname: name of the exported file
:param sitecol: site collection
:param array: an array of shape (N, 1, L1) and dtype numpy.float32
:param imt: intensity measure type
:param imls: intensity measure levels
:param comment: comment dictionary
"""
nsites = len(sitecol)
dest = add_imt(fname, imt)
lst = [('lon', F32), ('lat', F32), ('depth', F32)]
for iml in imls:
lst.append(('poe-%.7f' % iml, F32))
custom = 'custom_site_id' in sitecol.array.dtype.names
if custom:
lst.insert(0, ('custom_site_id', 'S8'))
hcurves = numpy.zeros(nsites, lst)
if custom:
for sid, csi, lon, lat, dep in zip(
range(nsites), sitecol.custom_site_id,
sitecol.lons, sitecol.lats, sitecol.depths):
hcurves[sid] = (csi, lon, lat, dep) + tuple(array[sid, 0, :])
else:
hcurves = numpy.zeros(nsites, lst)
for sid, lon, lat, dep in zip(
range(nsites), sitecol.lons, sitecol.lats, sitecol.depths):
hcurves[sid] = (lon, lat, dep) + tuple(array[sid, 0, :])
comment.update(imt=imt)
return writers.write_csv(dest, hcurves, comment=comment,
header=[name for (name, dt) in lst])
[docs]def hazard_curve_name(dstore, ekey, kind):
"""
:param calc_id: the calculation ID
:param ekey: the export key
:param kind: the kind of key
"""
key, fmt = ekey
prefix = {'hcurves': 'hazard_curve', 'hmaps': 'hazard_map',
'uhs': 'hazard_uhs'}[key]
if kind.startswith('quantile-'): # strip the 7 characters 'hazard_'
fname = dstore.build_fname('quantile_' + prefix[7:], kind[9:], fmt)
else:
fname = dstore.build_fname(prefix, kind, fmt)
return fname
[docs]def get_kkf(ekey):
"""
:param ekey: export key, for instance ('uhs/rlz-1', 'xml')
:returns: key, kind and fmt from the export key, i.e. 'uhs', 'rlz-1', 'xml'
"""
key, fmt = ekey
if '/' in key:
key, kind = key.split('/', 1)
else:
kind = ''
return key, kind, fmt
[docs]@export.add(('hcurves', 'csv'), ('hmaps', 'csv'), ('uhs', 'csv'))
def export_hcurves_csv(ekey, dstore):
"""
Exports the hazard curves into several .csv files
:param ekey: export key, i.e. a pair (datastore key, fmt)
:param dstore: datastore object
"""
oq = dstore['oqparam']
info = get_info(dstore)
R = dstore['full_lt'].get_num_rlzs()
sitecol = dstore['sitecol']
sitemesh = get_sites(sitecol)
key, kind, fmt = get_kkf(ekey)
fnames = []
comment = dstore.metadata
hmap_dt = oq.hmap_dt()
for kind in oq.get_kinds(kind, R):
fname = hazard_curve_name(dstore, (key, fmt), kind)
comment.update(kind=kind, investigation_time=oq.investigation_time)
if (key in ('hmaps', 'uhs') and oq.uniform_hazard_spectra or
oq.hazard_maps):
hmap = extract(dstore, 'hmaps?kind=' + kind)[kind]
if key == 'uhs' and oq.poes and oq.uniform_hazard_spectra:
uhs_curves = calc.make_uhs(hmap, info)
writers.write_csv(
fname, util.compose_arrays(sitemesh, uhs_curves),
comment=comment)
fnames.append(fname)
elif key == 'hmaps' and oq.poes and oq.hazard_maps:
fnames.extend(
export_hmaps_csv(ekey, fname, sitemesh,
hmap.flatten().view(hmap_dt), comment))
elif key == 'hcurves':
# shape (N, R|S, M, L1)
if ('amplification' in oq.inputs and
oq.amplification_method == 'convolution'):
imtls = DictArray(
{imt: oq.soil_intensities for imt in oq.imtls})
else:
imtls = oq.imtls
for imt, imls in imtls.items():
hcurves = extract(
dstore, 'hcurves?kind=%s&imt=%s' % (kind, imt))[kind]
fnames.append(
export_hcurves_by_imt_csv(
ekey, kind, fname, sitecol, hcurves, imt, imls,
comment))
return sorted(fnames)
UHS = collections.namedtuple('UHS', 'imls location')
[docs]@export.add(('uhs', 'xml'))
@deprecated(msg='Use the CSV exporter instead')
def export_uhs_xml(ekey, dstore):
oq = dstore['oqparam']
rlzs = dstore['full_lt'].get_realizations()
R = len(rlzs)
sitemesh = get_sites(dstore['sitecol'].complete)
key, kind, fmt = get_kkf(ekey)
fnames = []
periods = [imt.period for imt in oq.imt_periods()]
for kind in oq.get_kinds(kind, R):
metadata = get_metadata(rlzs, kind)
uhs = extract(dstore, 'uhs?kind=' + kind)[kind]
for p, poe in enumerate(oq.poes):
fname = hazard_curve_name(dstore, (key, fmt), kind + '-%s' % poe)
writer = hazard_writers.UHSXMLWriter(
fname, periods=periods, poe=poe,
investigation_time=oq.investigation_time, **metadata)
data = []
for site, curve in zip(sitemesh, uhs):
data.append(UHS(curve['%.6f' % poe], Location(site)))
writer.serialize(data)
fnames.append(fname)
return sorted(fnames)
[docs]class Location(object):
def __init__(self, xyz):
self.x, self.y = tuple(xyz)[:2]
self.wkt = 'POINT(%s %s)' % (self.x, self.y)
HazardCurve = collections.namedtuple('HazardCurve', 'location poes')
HazardMap = collections.namedtuple('HazardMap', 'lon lat iml')
[docs]@export.add(('hcurves', 'xml'))
@deprecated(msg='Use the CSV exporter instead')
def export_hcurves_xml(ekey, dstore):
key, kind, fmt = get_kkf(ekey)
len_ext = len(fmt) + 1
oq = dstore['oqparam']
sitemesh = get_sites(dstore['sitecol'])
rlzs = dstore['full_lt'].get_realizations()
R = len(rlzs)
fnames = []
writercls = hazard_writers.HazardCurveXMLWriter
for kind in oq.get_kinds(kind, R):
if kind.startswith('rlz-'):
rlz = rlzs[int(kind[4:])]
smlt_path = '_'.join(rlz.sm_lt_path)
gsimlt_path = rlz.gsim_rlz.pid
else:
smlt_path = ''
gsimlt_path = ''
name = hazard_curve_name(dstore, ekey, kind)
for im in oq.imtls:
key = 'hcurves?kind=%s&imt=%s' % (kind, im)
hcurves = extract(dstore, key)[kind] # shape (N, 1, L1)
imt = from_string(im)
fname = name[:-len_ext] + '-' + im + '.' + fmt
data = [HazardCurve(Location(site), poes[0])
for site, poes in zip(sitemesh, hcurves)]
imt_name = 'SA' if im.startswith('SA') else im
writer = writercls(fname,
investigation_time=oq.investigation_time,
imls=oq.imtls[im], imt=imt_name,
sa_period=getattr(imt, 'period', None) or None,
sa_damping=getattr(imt, 'damping', None),
smlt_path=smlt_path, gsimlt_path=gsimlt_path)
writer.serialize(data)
fnames.append(fname)
return sorted(fnames)
[docs]@export.add(('hmaps', 'xml'))
@deprecated(msg='Use the CSV exporter instead')
def export_hmaps_xml(ekey, dstore):
key, kind, fmt = get_kkf(ekey)
oq = dstore['oqparam']
sitecol = dstore['sitecol']
sitemesh = get_sites(sitecol)
rlzs = dstore['full_lt'].get_realizations()
R = len(rlzs)
fnames = []
writercls = hazard_writers.HazardMapXMLWriter
for kind in oq.get_kinds(kind, R):
# shape (N, M, P)
hmaps = extract(dstore, 'hmaps?kind=' + kind)[kind]
if kind.startswith('rlz-'):
rlz = rlzs[int(kind[4:])]
smlt_path = '_'.join(rlz.sm_lt_path)
gsimlt_path = rlz.gsim_rlz.pid
else:
smlt_path = ''
gsimlt_path = ''
for m, imt in enumerate(oq.imtls):
for p, poe in enumerate(oq.poes):
suffix = '-%s-%s' % (poe, imt)
fname = hazard_curve_name(dstore, ekey, kind + suffix)
data = [HazardMap(site[0], site[1], hmap[m, p])
for site, hmap in zip(sitemesh, hmaps)]
writer = writercls(
fname, investigation_time=oq.investigation_time,
imt=imt, poe=poe,
smlt_path=smlt_path, gsimlt_path=gsimlt_path)
writer.serialize(data)
fnames.append(fname)
return sorted(fnames)
[docs]@export.add(('cs-stats', 'csv'))
def export_cond_spectra(ekey, dstore):
sitecol = dstore['sitecol']
dset = dstore[ekey[0]] # shape (1, M, N, 2, P)
periods = dset.attrs['periods']
imls = dset.attrs['imls']
writer = writers.CsvWriter(fmt=writers.FIVEDIGITS)
fnames = []
for n in sitecol.sids:
spe = dset[0, :, n, 0] # shape M, P
std = dset[0, :, n, 1] # shape M, P
fname = dstore.export_path('conditional-spectrum-%d.csv' % n)
dic = dict(sa_period=periods)
for p in range(len(imls)):
dic['val%d' % p] = spe[:, p]
dic['std%d' % p] = std[:, p]
df = pandas.DataFrame(dic)
comment = dstore.metadata.copy()
comment['imls'] = list(imls)
comment['site_id'] = n
comment['lon'] = sitecol.lons[n]
comment['lat'] = sitecol.lats[n]
writer.save(df, fname, comment=comment)
fnames.append(fname)
return fnames
# TODO: see if I can remove this
def _extract(hmap, imt, j):
# hmap[imt] can be a tuple or a scalar if j=0
tup = hmap[imt]
if hasattr(tup, '__iter__'):
return tup[j]
assert j == 0
return tup
[docs]@export.add(('hcurves', 'npz'), ('hmaps', 'npz'), ('uhs', 'npz'),
('losses_by_asset', 'npz'), ('damages-rlzs', 'npz'))
def export_hazard_npz(ekey, dstore):
fname = dstore.export_path('%s.%s' % ekey)
out = extract(dstore, ekey[0])
kw = {k: v for k, v in vars(out).items() if not k.startswith('_')}
savez(fname, **kw)
return [fname]
[docs]@export.add(('gmf_data', 'csv'))
def export_gmf_data_csv(ekey, dstore):
oq = dstore['oqparam']
imts = list(oq.imtls)
# exporting sitemesh
f = dstore.build_fname('sitemesh', '', 'csv')
sitecol = dstore['sitecol']
names = sitecol.array.dtype.names
arr = sitecol[['lon', 'lat']]
if 'custom_site_id' in names:
sites = util.compose_arrays(
sitecol.custom_site_id, arr, 'custom_site_id')
else:
sites = util.compose_arrays(sitecol.sids, arr, 'site_id')
writers.write_csv(f, sites, comment=dstore.metadata)
# exporting gmfs
df = dstore.read_df('gmf_data').sort_values(['eid', 'sid'])
if 'custom_site_id' in names:
df['csi'] = decode(sitecol.custom_site_id[df.sid])
ren = {'csi': 'custom_site_id', 'eid': 'event_id'}
del df['sid']
else:
ren = {'sid': 'site_id', 'eid': 'event_id'}
for m, imt in enumerate(imts):
ren[f'gmv_{m}'] = 'gmv_' + imt
for imt in oq.get_sec_imts():
ren[imt] = f'sep_{imt}'
df.rename(columns=ren, inplace=True)
event_id = dstore['events']['id']
fname = dstore.build_fname('gmf', 'data', 'csv')
writers.CsvWriter(fmt=writers.FIVEDIGITS).save(
df, fname, comment=dstore.metadata)
# exporting sigma_epsilon
if 'sigma_epsilon' in dstore['gmf_data']:
sig_eps_csv = dstore.build_fname('sigma_epsilon', '', 'csv')
sig_eps = dstore['gmf_data/sigma_epsilon'][()]
sig_eps['eid'] = event_id[sig_eps['eid']]
sig_eps.sort(order='eid')
header = list(sig_eps.dtype.names)
header[0] = 'event_id'
writers.write_csv(sig_eps_csv, sig_eps, header=header,
comment=dstore.metadata)
return [fname, sig_eps_csv, f]
else:
return [fname, f]
[docs]@export.add(('gmf_data', 'hdf5'))
def export_gmf_data_hdf5(ekey, dstore):
fname = dstore.build_fname('gmf', 'data', 'hdf5')
with hdf5.File(fname, 'w') as f:
f['sitecol'] = dstore['sitecol'].complete
dstore.hdf5.copy('gmf_data', f)
return [fname]
[docs]@export.add(('avg_gmf', 'csv'))
def export_avg_gmf_csv(ekey, dstore):
oq = dstore['oqparam']
sitecol = dstore['sitecol'].complete
if 'custom_site_id' in sitecol.array.dtype.names:
dic = dict(custom_site_id=decode(sitecol.custom_site_id))
else:
dic = dict(site_id=sitecol.sids)
dic['lon'] = sitecol.lons
dic['lat'] = sitecol.lats
data = dstore['avg_gmf'][:] # shape (2, N, M)
for m, imt in enumerate(oq.imtls):
dic['gmv_' + imt] = data[0, :, m]
dic['gsd_' + imt] = data[1, :, m]
fname = dstore.build_fname('avg_gmf', '', 'csv')
writers.CsvWriter(fmt=writers.FIVEDIGITS).save(
pandas.DataFrame(dic), fname, comment=dstore.metadata)
return [fname]
def _expand_gmv(array, imts):
# the array-field gmv becomes a set of scalar fields gmv_<imt>
dtype = array.dtype
assert dtype['gmv'].shape[0] == len(imts)
dtlist = []
for name in dtype.names:
dt = dtype[name]
if name == 'gmv':
for imt in imts:
dtlist.append(('gmv_' + imt, F32))
elif name in ('sid', 'eid'):
dtlist.append((name, dt))
else: # secondary perils
dtlist.append((name, dt))
new = numpy.zeros(len(array), dtlist)
imti = {imt: i for i, imt in enumerate(imts)}
for name, _dt in dtlist:
if name.startswith('gmv_'):
new[name] = array['gmv'][:, imti[name[4:]]]
else:
new[name] = array[name]
return new
DisaggMatrix = collections.namedtuple(
'DisaggMatrix', 'poe iml dim_labels matrix')
[docs]def iproduct(*sizes):
ranges = [range(size) for size in sizes]
return itertools.product(*ranges)
[docs]@export.add(('disagg', 'csv'), ('disagg_traditional', 'csv'))
def export_disagg_csv(ekey, dstore):
oq = dstore['oqparam']
sitecol = dstore['sitecol']
hmap4 = dstore['hmap4']
rlzs = dstore['full_lt'].get_realizations()
best_rlzs = dstore['best_rlzs'][:]
N, M, P, Z = hmap4.shape
imts = list(oq.imtls)
fnames = []
bins = {name: dset[:] for name, dset in dstore['disagg-bins'].items()}
ex = 'disagg?kind=%s&imt=%s&site_id=%s&poe_id=%d'
if ekey[0] == 'disagg_traditional':
ex += '&traditional=1'
trad = '-traditional'
else:
trad = ''
skip_keys = ('Mag', 'Dist', 'Lon', 'Lat', 'Eps', 'TRT')
metadata = dstore.metadata
poes_disagg = ['nan'] * P
for p in range(P):
try:
poes_disagg[p] = str(oq.poes_disagg[p])
except IndexError:
pass
for s in range(N):
rlzcols = ['rlz%d' % r for r in best_rlzs[s]]
lon, lat = sitecol.lons[s], sitecol.lats[s]
weights = numpy.array([rlzs[r].weight['weight'] for r in best_rlzs[s]])
weights /= weights.sum() # normalize to 1
metadata.update(investigation_time=oq.investigation_time,
mag_bin_edges=bins['Mag'].tolist(),
dist_bin_edges=bins['Dist'].tolist(),
lon_bin_edges=bins['Lon'][s].tolist(),
lat_bin_edges=bins['Lat'][s].tolist(),
eps_bin_edges=bins['Eps'].tolist(),
tectonic_region_types=decode(bins['TRT'].tolist()),
rlz_ids=best_rlzs[s].tolist(),
weights=weights.tolist(),
lon=lon, lat=lat)
for k in oq.disagg_outputs:
splits = k.lower().split('_')
header = ['imt', 'poe'] + splits + rlzcols
values = []
nonzeros = []
for m, p in iproduct(M, P):
imt = imts[m]
aw = extract(dstore, ex % (k, imt, s, p))
# for instance for Mag_Dist [(mag, dist, poe0, poe1), ...]
poes = aw[:, len(splits):]
if 'trt' in header:
nonzeros.append(True)
else:
nonzeros.append(poes.any()) # nonzero poes
for row in aw:
values.append([imt, poes_disagg[p]] + list(row))
if any(nonzeros):
com = {key: value for key, value in metadata.items()
if value is not None and key not in skip_keys}
com.update(metadata)
fname = dstore.export_path('%s%s-%d.csv' % (k, trad, s))
writers.write_csv(fname, values, header=header,
comment=com, fmt='%.5E')
fnames.append(fname)
return sorted(fnames)
[docs]@export.add(('realizations', 'csv'))
def export_realizations(ekey, dstore):
data = extract(dstore, 'realizations').array
path = dstore.export_path('realizations.csv')
writers.write_csv(path, data, fmt='%.7e', comment=dstore.metadata)
return [path]
[docs]@export.add(('events', 'csv'))
def export_events(ekey, dstore):
events = dstore['events'][()]
path = dstore.export_path('events.csv')
writers.write_csv(path, events, fmt='%s', renamedict=dict(id='event_id'),
comment=dstore.metadata)
return [path]
# because of the code in server.views.calc_results we are not visualizing
# .txt outputs, so we use .rst here
[docs]@export.add(('fullreport', 'rst'))
def export_fullreport(ekey, dstore):
with open(dstore.export_path('report.rst'), 'w') as f:
f.write(view('fullreport', dstore))
return [f.name]