# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2010-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import io
import types
import logging
import warnings
from contextlib import contextmanager
from xml.sax.saxutils import escape, quoteattr
import numpy # this is needed by the doctests, don't remove it
from openquake.baselib.python3compat import unicode
from openquake.commonlib import InvalidFile
FIVEDIGITS = '%.5E'
@contextmanager
zeroset = set(['E', '-', '+', '.', '0'])
[docs]class StreamingXMLWriter(object):
"""
A bynary stream XML writer. The typical usage is something like this::
with StreamingXMLWriter(output_file) as writer:
writer.start_tag('root')
for node in nodegenerator():
writer.serialize(node)
writer.end_tag('root')
"""
def __init__(self, bytestream, indent=4, encoding='utf-8', nsmap=None):
"""
:param stream: the stream or a file where to write the XML
:param int indent: the indentation to use in the XML (default 4 spaces)
"""
assert not isinstance(bytestream, io.StringIO) # common error
self.stream = bytestream
self.indent = indent
self.encoding = encoding
self.indentlevel = 0
self.nsmap = nsmap
[docs] def shorten(self, tag):
"""
Get the short representation of a fully qualified tag
:param str tag: a (fully qualified or not) XML tag
"""
if tag.startswith('{'):
ns, _tag = tag.rsplit('}')
tag = self.nsmap.get(ns[1:], '') + _tag
return tag
def _write(self, text):
"""Write text by respecting the current indentlevel"""
spaces = ' ' * (self.indent * self.indentlevel)
t = spaces + text.strip() + '\n'
if hasattr(t, 'encode'):
t = t.encode(self.encoding, 'xmlcharrefreplace')
self.stream.write(t) # expected bytes
[docs] def emptyElement(self, name, attrs):
"""Add an empty element (may have attributes)"""
attr = ' '.join('%s=%s' % (n, quoteattr(scientificformat(v)))
for n, v in sorted(attrs.items()))
self._write('<%s %s/>' % (name, attr))
[docs] def start_tag(self, name, attrs=None):
"""Open an XML tag"""
if not attrs:
self._write('<%s>' % name)
else:
self._write('<' + name)
for (name, value) in sorted(attrs.items()):
self._write(
' %s=%s' % (name, quoteattr(scientificformat(value))))
self._write('>')
self.indentlevel += 1
[docs] def end_tag(self, name):
"""Close an XML tag"""
self.indentlevel -= 1
self._write('</%s>' % name)
[docs] def serialize(self, node):
"""Serialize a node object (typically an ElementTree object)"""
if isinstance(node.tag, types.FunctionType):
# this looks like a bug of ElementTree: comments are stored as
# functions!?? see https://hg.python.org/sandbox/python2.7/file/tip/Lib/xml/etree/ElementTree.py#l458
return
if self.nsmap is not None:
tag = self.shorten(node.tag)
else:
tag = node.tag
with warnings.catch_warnings(): # unwanted ElementTree warning
warnings.simplefilter('ignore')
leafnode = not node
# NB: we cannot use len(node) to identify leafs since nodes containing
# an iterator have no length. They are always True, even if empty :-(
if leafnode and node.text is None:
self.emptyElement(tag, node.attrib)
return
self.start_tag(tag, node.attrib)
if node.text is not None:
self._write(escape(scientificformat(node.text).strip()))
for subnode in node:
self.serialize(subnode)
self.end_tag(tag)
def __enter__(self):
"""Write the XML declaration"""
self._write('<?xml version="1.0" encoding="%s"?>\n' %
self.encoding)
return self
def __exit__(self, etype, exc, tb):
"""Close the XML document"""
pass
[docs]def tostring(node, indent=4, nsmap=None):
"""
Convert a node into an XML string by using the StreamingXMLWriter.
This is useful for testing purposes.
:param node: a node object (typically an ElementTree object)
:param indent: the indentation to use in the XML (default 4 spaces)
"""
out = io.BytesIO()
writer = StreamingXMLWriter(out, indent, nsmap=nsmap)
writer.serialize(node)
return out.getvalue()
htranslator = HeaderTranslator(
asset_ref='asset_ref:|S20',
rup_id='rup_id:uint32',
taxonomy='taxonomy:|S100',
rupserial='rupserial:uint32',
multiplicity='multiplicity:uint16',
numsites='numsites:uint32',
)
# recursive function used internally by build_header
def _build_header(dtype, root):
header = []
if dtype.fields is None:
if not root:
return []
return [root + (str(dtype), dtype.shape)]
for field in dtype.names:
dt = dtype.fields[field][0]
if dt.subdtype is None: # nested
header.extend(_build_header(dt, root + (field,)))
else:
numpytype = str(dt.subdtype[0])
header.append(root + (field, numpytype, dt.shape))
return header
# NB: builds an header that can be read by parse_header
[docs]def write_csv(dest, data, sep=',', fmt='%.6E', header=None):
"""
:param dest: destination filename or io.StringIO instance
:param data: array to save
:param sep: separator to use (default comma)
:param fmt: formatting string (default '%12.8E')
:param header:
optional list with the names of the columns to display
"""
if len(data) == 0:
logging.warn('%s is empty', dest)
if not hasattr(dest, 'getvalue'):
# not a StringIO, assume dest is a filename
dest = open(dest, 'w')
try:
# see if data is a composite numpy array
data.dtype.fields
except AttributeError:
# not a composite array
autoheader = []
else:
autoheader = build_header(data.dtype)
someheader = header or autoheader
if someheader:
dest.write(sep.join(htranslator.write(someheader)) + u'\n')
if autoheader:
all_fields = [col.split(':', 1)[0].split('~')
for col in autoheader]
for record in data:
row = []
for fields in all_fields:
val = extract_from(record, fields)
if fields == ['lon'] or fields == ['lat']:
row.append('%.5f' % val)
else:
row.append(scientificformat(val, fmt))
dest.write(sep.join(row) + u'\n')
else:
for row in data:
dest.write(sep.join(scientificformat(col, fmt)
for col in row) + u'\n')
if hasattr(dest, 'getvalue'):
return dest.getvalue()[:-1] # a newline is strangely added
else:
dest.close()
return dest.name
[docs]class CsvWriter(object):
"""
Class used in the exporters to save a bunch of CSV files
"""
def __init__(self, sep=',', fmt='%12.8E'):
self.sep = sep
self.fmt = fmt
self.fnames = []
[docs] def save(self, data, fname, header=None):
"""
Save data on fname.
:param data: numpy array or list of lists
:param fname: path name
:param header: header to use
"""
write_csv(fname, data, self.sep, self.fmt, header)
self.fnames.append(fname)
[docs] def getsaved(self):
"""
Returns the list of files saved by this CsvWriter
"""
return sorted(self.fnames)
[docs]def castable_to_int(s):
"""
Return True if the string `s` can be interpreted as an integer
"""
try:
int(s)
except ValueError:
return False
else:
return True
def _cast(col, ntype, shape, lineno, fname):
# convert strings into tuples or numbers, used inside read_composite_array
if shape:
return tuple(map(ntype, col.split()))
else:
return ntype(col)
# NB: this only works with flat composite arrays
[docs]def read_composite_array(fname, sep=','):
r"""
Convert a CSV file with header into a numpy array of records.
>>> from openquake.baselib.general import writetmp
>>> fname = writetmp('PGA:float64:3,PGV:float64:2,avg:float64:1\n'
... '.1 .2 .3,.4 .5,.6\n')
>>> print read_composite_array(fname) # array of shape (1,)
[([0.1, 0.2, 0.3], [0.4, 0.5], [0.6])]
"""
with open(fname) as f:
header = next(f)
fields, dtype = parse_header(htranslator.read(header.split(sep)))
ts_pairs = [] # [(type, shape), ...]
for name in fields:
dt = dtype.fields[name][0]
ts_pairs.append((dt.subdtype[0].type if dt.subdtype else dt.type,
dt.shape))
col_ids = list(range(1, len(ts_pairs) + 1))
num_columns = len(col_ids)
records = []
col, col_id = '', 0
for i, line in enumerate(f, 2):
row = line.split(sep)
if len(row) != num_columns:
raise InvalidFile(
'expected %d columns, found %d in file %s, line %d' %
(num_columns, len(row), fname, i))
try:
record = []
for (ntype, shape), col, col_id in zip(ts_pairs, row, col_ids):
record.append(_cast(col, ntype, shape, i, fname))
records.append(tuple(record))
except Exception as e:
raise InvalidFile(
'Could not cast %r in file %s, line %d, column %d '
'using %s: %s' % (col, fname, i, col_id,
(ntype.__name__,) + shape, e))
return numpy.array(records, dtype)
# this is simple and without error checking for the moment
[docs]def read_array(fname, sep=','):
r"""
Convert a CSV file without header into a numpy array of floats.
>>> from openquake.baselib.general import writetmp
>>> print read_array(writetmp('.1 .2, .3 .4, .5 .6\n'))
[[[ 0.1 0.2]
[ 0.3 0.4]
[ 0.5 0.6]]]
"""
with open(fname) as f:
records = []
for line in f:
row = line.split(sep)
record = [list(map(float, col.split())) for col in row]
records.append(record)
return numpy.array(records)
if __name__ == '__main__': # pretty print of NRML files
import sys
import shutil
from openquake.commonlib import nrml
nrmlfiles = sys.argv[1:]
for fname in nrmlfiles:
node = nrml.read(fname)
shutil.copy(fname, fname + '.bak')
with open(fname, 'w') as out:
nrml.write(list(node), out)