Source code for openquake.commonlib.datastore

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

import os
import re
from openquake.baselib.python3compat import pickle
import collections

import numpy
import h5py

from openquake.baselib import hdf5
from openquake.baselib.general import CallableDict
from openquake.commonlib.writers import write_csv


# a dictionary of views datastore -> array
view = CallableDict()

DATADIR = os.environ.get('OQ_DATADIR', os.path.expanduser('~/oqdata'))


[docs]def get_nbytes(dset): """ If the dataset has an attribute 'nbytes', return it. Otherwise get the size of the underlying array. Returns None if the dataset is actually a group. """ if 'nbytes' in dset.attrs: # look if the dataset has an attribute nbytes return dset.attrs['nbytes'] elif hasattr(dset, 'value'): # else extract nbytes from the underlying array return dset.size * numpy.zeros(1, dset.dtype).nbytes
[docs]class ByteCounter(object): """ A visitor used to measure the dimensions of a HDF5 dataset or group. Use it as ByteCounter.get_nbytes(dset_or_group). """ @classmethod
[docs] def get_nbytes(cls, dset): nbytes = get_nbytes(dset) if nbytes is not None: return nbytes # else dip in the tree self = cls() dset.visititems(self) return self.nbytes
def __init__(self, nbytes=0): self.nbytes = nbytes def __call__(self, name, dset_or_group): nbytes = get_nbytes(dset_or_group) if nbytes: self.nbytes += nbytes
[docs]def get_calc_ids(datadir=DATADIR): """ Extract the available calculation IDs from the datadir, in order. """ if not os.path.exists(datadir): return [] calc_ids = [] for f in os.listdir(datadir): mo = re.match(r'calc_(\d+)\.hdf5', f) if mo: calc_ids.append(int(mo.group(1))) return sorted(calc_ids)
[docs]def get_last_calc_id(datadir): """ Extract the latest calculation ID from the given directory. If none is found, return 0. """ calcs = get_calc_ids(datadir) if not calcs: return 0 return calcs[-1]
[docs]def read(calc_id, mode='r', datadir=DATADIR): """ :param calc_id: calculation ID :param mode: 'r' or 'w' :param datadir: the directory where to look :returns: the corresponding DataStore instance Read the datastore, if it exists and it is accessible. """ if calc_id < 0: # retrieve an old datastore calc_id = get_calc_ids(datadir)[calc_id] fname = os.path.join(datadir, 'calc_%s.hdf5' % calc_id) open(fname).close() # check if the file exists and is accessible dstore = DataStore(calc_id, datadir, mode=mode) try: hc_id = dstore['oqparam'].hazard_calculation_id except KeyError: # no oqparam dstore.close() raise if hc_id: # TODO: we will need to store the parent directory to be able # to use hazard calculations generated by another user; # for the moment we assume that the datadir is the same for # parent and child calculations dstore.set_parent(read(hc_id, datadir=datadir)) return dstore
[docs]class DataStore(collections.MutableMapping): """ DataStore class to store the inputs/outputs of a calculation on the filesystem. Here is a minimal example of usage: >>> ds = DataStore() >>> ds['example'] = 'hello world' >>> ds.items() [(u'example', 'hello world')] >>> ds.clear() When reading the items, the DataStore will return a generator. The items will be ordered lexicographically according to their name. There is a serialization protocol to store objects in the datastore. An object is serializable if it has a method `__toh5__` returning an array and a dictionary, and a method `__fromh5__` taking an array and a dictionary and populating the object. For an example of use see :class:`openquake.hazardlib.site.SiteCollection`. """ def __init__(self, calc_id=None, datadir=DATADIR, export_dir='.', params=(), mode=None): if not os.path.exists(datadir): os.makedirs(datadir) if calc_id is None: # use a new datastore self.calc_id = get_last_calc_id(datadir) + 1 elif calc_id < 0: # use an old datastore calc_ids = get_calc_ids(datadir) try: self.calc_id = calc_ids[calc_id] except IndexError: raise IndexError('There are %d old calculations, cannot ' 'retrieve the %s' % (len(calc_ids), calc_id)) else: # use the given datastore self.calc_id = calc_id self.parent = () # can be set later self.datadir = datadir self.calc_dir = os.path.join(datadir, 'calc_%s' % self.calc_id) self.export_dir = export_dir self.hdf5path = self.calc_dir + '.hdf5' mode = mode or 'r+' if os.path.exists(self.hdf5path) else 'w' self.hdf5 = hdf5.File(self.hdf5path, mode, libver='latest') self.attrs = self.hdf5.attrs for name, value in params: self.attrs[name] = value
[docs] def set_parent(self, parent): """ Give a parent to a datastore and update its .attrs with the parent attributes, which are assumed to be literal strings. """ self.parent = parent # merge parent attrs into child attrs for name, value in self.parent.attrs.items(): if name not in self.attrs: # add missing parameter self.attrs[name] = value
[docs] def set_nbytes(self, key, nbytes=None): """ Set the `nbytes` attribute on the HDF5 object identified by `key`. """ obj = h5py.File.__getitem__(self.hdf5, key) if nbytes is not None: # size set from outside obj.attrs['nbytes'] = nbytes else: # recursively determine the size of the datagroup obj.attrs['nbytes'] = nbytes = ByteCounter.get_nbytes(obj) return nbytes
[docs] def set_attrs(self, key, **kw): """ Set the HDF5 attributes of the given key """ for k, v in kw.items(): h5py.File.__getitem__(self.hdf5, key).attrs[k] = v
[docs] def get_attr(self, key, name, default=None): """ :param key: dataset path :param name: name of the attribute :param default: value to return if the attribute is missing """ obj = h5py.File.__getitem__(self.hdf5, key) try: return obj.attrs[name] except KeyError: if default is None: raise return default
[docs] def create_dset(self, key, dtype, size=None, compression=None): """ Create a one-dimensional HDF5 dataset. :param key: name of the dataset :param dtype: dtype of the dataset (usually composite) :param size: size of the dataset (if None, the dataset is extendable) """ return hdf5.Hdf5Dataset.create( self.hdf5, key, dtype, size, compression)
[docs] def save(self, key, kw): """ Update the object associated to `key` with the `kw` dictionary; works for LiteralAttrs objects and automatically flushes. """ obj = self[key] vars(obj).update(kw) self[key] = obj self.flush()
[docs] def export_path(self, relname, export_dir=None): """ Return the path of the exported file by adding the export_dir in front, the calculation ID at the end. :param relname: relative file name :param export_dir: export directory (if None use .export_dir) """ assert not os.path.dirname(relname), relname name, ext = relname.rsplit('.', 1) newname = '%s_%s.%s' % (name, self.calc_id, ext) if export_dir is None: export_dir = self.export_dir return os.path.join(export_dir, newname)
[docs] def build_fname(self, prefix, postfix, fmt, export_dir=None): """ Build a file name from a realization, by using prefix and extension. :param prefix: the prefix to use :param postfix: the postfix to use (can be a realization object) :param fmt: the extension ('csv', 'xml', etc) :param export_dir: export directory (if None use .export_dir) :returns: relative pathname including the extension """ if hasattr(postfix, 'sm_lt_path'): # is a realization fname = '%s-rlz-%03d.%s' % (prefix, postfix.ordinal, fmt) else: fname = '%s-%s.%s' % (prefix, postfix, fmt) return self.export_path(fname, export_dir)
[docs] def export_csv(self, key): """ Generic csv exporter """ return write_csv(self.export_path(key, 'csv'), self[key])
[docs] def flush(self): """Flush the underlying hdf5 file""" if self.parent != (): self.parent.flush() self.hdf5.flush()
[docs] def close(self): """Close the underlying hdf5 file""" if self.parent != (): self.parent.close() if self.hdf5: # is open self.hdf5.close()
[docs] def clear(self): """Remove the datastore from the file system""" self.close() os.remove(self.hdf5path)
[docs] def getsize(self, key=None): """ Return the size in byte of the output associated to the given key. If no key is given, returns the total size of all files. """ if key is None: return os.path.getsize(self.hdf5path) return ByteCounter.get_nbytes(h5py.File.__getitem__(self.hdf5, key))
[docs] def get(self, key, default): """ :returns: the value associated to the datastore key, or the default """ try: return self[key] except KeyError: return default
def __getitem__(self, key): try: val = self.hdf5[key] except KeyError: if self.parent: try: val = self.parent.hdf5[key] except KeyError: raise KeyError( 'No %r found in %s' % (key, [self, self.parent])) else: raise KeyError('No %r found in %s' % (key, self)) try: shape = val.shape except AttributeError: # val is a group return val if not shape: val = pickle.loads(val.value) return val def __setitem__(self, key, value): if isinstance(value, dict) or hasattr(value, '__toh5__'): val = value elif (not isinstance(value, numpy.ndarray) or value.dtype is numpy.dtype(object)): val = numpy.array(pickle.dumps(value, pickle.HIGHEST_PROTOCOL)) else: val = value if key in self.hdf5: # there is a bug in the current version of HDF5 for composite # arrays: is impossible to save twice the same key; so we remove # the key first, then it is possible to save it again del self[key] try: self.hdf5[key] = val except RuntimeError as exc: raise RuntimeError('Could not save %s: %s in %s' % (key, exc, self.hdf5path)) def __delitem__(self, key): del self.hdf5[key] def __enter__(self): return self def __exit__(self, etype, exc, tb): self.close() def __iter__(self): if not self.hdf5: raise RuntimeError('%s is closed' % self) for path in sorted(self.hdf5): yield path def __contains__(self, key): return key in self.hdf5 def __len__(self): return sum(1 for f in self) def __repr__(self): return '<%s %d>' % (self.__class__.__name__, self.calc_id)
[docs]class Fake(dict): """ A fake datastore as a dict subclass, useful in tests and such """ def __init__(self, attrs=None, **kwargs): self.attrs = {k: repr(v) for k, v in attrs.items()} if attrs else {} self.update(kwargs)
[docs]def persistent_attribute(key): """ Persistent attributes are persisted to the datastore and cached. Modifications to mutable objects are not automagically persisted. If you have a huge object that does not fit in memory use the datastore directory (for instance, open a HDF5 file to create an empty array, then populate it). Notice that you can use any dict-like data structure in place of the datastore, provided you can set attributes on it. Here is an example: >>> class Datastore(dict): ... "A fake datastore" >>> class Store(object): ... a = persistent_attribute('a') ... def __init__(self, a): ... self.datastore = Datastore() ... self.a = a # this assegnation will store the attribute >>> store = Store([1]) >>> store.a # this retrieves the attribute [1] >>> store.a.append(2) >>> store.a = store.a # remember to store the modified attribute! :param key: the name of the attribute to be made persistent :returns: a property to be added to a class with a .datastore attribute """ privatekey = '_' + key def getter(self): # Try to get the value from the privatekey attribute (i.e. from # the cache of the datastore); if not possible, get the value # from the datastore and set the cache; if not possible, get the # value from the parent and set the cache. If the value cannot # be retrieved, raise an AttributeError. try: return getattr(self.datastore, privatekey) except AttributeError: value = self.datastore[key] setattr(self.datastore, privatekey, value) return value def setter(self, value): # Update the datastore and the private key self.datastore[key] = value setattr(self.datastore, privatekey, value) return property(getter, setter)