# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import io
import os
import re
import gzip
import collections
import numpy
import h5py
from openquake.baselib import hdf5, performance, general
from openquake.commonlib.logs import (
    get_datadir, get_last_calc_id, CALC_REGEX, dbcmd, init)
# FIXME: you should never use this
[docs]def hdf5new(datadir=None):
    """
    Return a new `hdf5.File by` instance with name determined by the last
    calculation in the datadir (plus one). Set the .path attribute to the
    generated filename.
    """
    datadir = datadir or get_datadir()
    if not os.path.exists(datadir):
        os.makedirs(datadir)
    calc_id = get_last_calc_id(datadir) + 1
    fname = os.path.join(datadir, 'calc_%d.hdf5' % calc_id)
    new = hdf5.File(fname, 'w')
    new.path = fname
    performance.init_performance(new)
    return new 
def _read(calc_id: int, datadir, mode, haz_id=None):
    # low level function to read a datastore file
    ddir = datadir or get_datadir()
    ppath = None
    # look in the db
    job = dbcmd('get_job', calc_id)
    if job:
        jid = job.id
        path = job.ds_calc_dir + '.hdf5'
        hc_id = job.hazard_calculation_id
        if not hc_id and haz_id:
            dbcmd('update_job', jid, {'hazard_calculation_id': haz_id})
            hc_id = haz_id
        if hc_id and hc_id != jid:
            hc = dbcmd('get_job', hc_id)
            if hc:
                ppath = hc.ds_calc_dir + '.hdf5'
            else:
                ppath = os.path.join(ddir, 'calc_%d.hdf5' % hc_id)
    else:  # when using oq run there is no job in the db
        path = os.path.join(ddir, 'calc_%s.hdf5' % calc_id)
    return DataStore(path, ppath, mode)
[docs]def read(calc_id, mode='r', datadir=None, parentdir=None, read_parent=True):
    """
    :param calc_id: calculation ID or filename
    :param mode: 'r' or 'w'
    :param datadir: the directory where to look
    :param parentdir: the datadir of the parent calculation
    :param read_parent: read the parent calculation if it is there
    :returns: the corresponding DataStore instance
    Read the datastore, if it exists and it is accessible.
    """
    if isinstance(calc_id, str):  # pathname
        dstore = DataStore(calc_id, mode=mode)
    else:
        dstore = _read(calc_id, datadir, mode)
    try:
        hc_id = dstore['oqparam'].hazard_calculation_id
    except KeyError:  # no oqparam
        hc_id = None
    if read_parent and hc_id:
        dstore.parent = _read(hc_id, datadir, mode='r')
        dstore.ppath = dstore.parent.filename
    return dstore.open(mode) 
[docs]def new(calc_id, oqparam, datadir=None, mode=None):
    """
    :param calc_id:
        if integer > 0 look in the database and then on the filesystem
        if integer < 0 look at the old calculations in the filesystem
    :param oqparam:
        OqParam instance with the validated parameters of the calculation
    :returns:
        a DataStore instance associated to the given calc_id
    """
    dstore = _read(calc_id, mode, datadir)
    dstore['oqparam'] = oqparam
    if oqparam.hazard_calculation_id:
        dstore.ppath = read(calc_id, 'r', datadir).ppath
    return dstore 
[docs]def build_dstore_log(description='custom calculation', parent=(), ini=None):
    """
    :returns: DataStore instance associated to the .calc_id
    """
    if ini is not None:
        dic = ini
    else:
        dic = dict(description=description, calculation_mode='custom')
    log = init('job', dic)
    dstore = new(log.calc_id, log.get_oqparam(validate=False))
    dstore.parent = parent
    return dstore, log 
[docs]def read_hc_id(hdf5):
    """
    Getting the hazard_calculation_id, if any
    """
    try:
        oq = hdf5['oqparam']
    except KeyError:  # oqparam not saved yet
        return
    except OSError:  # file open by another process with oqparam not flushed
        return
    return oq.hazard_calculation_id 
[docs]class DataStore(collections.abc.MutableMapping):
    """
    DataStore class to store the inputs/outputs of a calculation on the
    filesystem.
    Here is a minimal example of usage:
    >>> dstore, log = build_dstore_log()
    >>> with dstore, log:
    ...     dstore['example'] = 42
    ...     print(dstore['example'][()])
    42
    When reading the items, the DataStore will return a generator. The
    items will be ordered lexicographically according to their name.
    There is a serialization protocol to store objects in the datastore.
    An object is serializable if it has a method `__toh5__` returning
    an array and a dictionary, and a method `__fromh5__` taking an array
    and a dictionary and populating the object.
    For an example of use see :class:`openquake.hazardlib.site.SiteCollection`.
    """
    calc_id = None  # set at instantiation time
    job = None  # set at instantiation time
    opened = 0
    closed = 0
    def __init__(self, path, ppath=None, mode=None):
        self.filename = path
        self.ppath = ppath
        self.calc_id, datadir = extract_calc_id_datadir(path)
        self.tempname = self.filename[:-5] + '_tmp.hdf5'
        if not os.path.exists(datadir) and mode != 'r':
            os.makedirs(datadir)
        self.parent = ()  # can be set later
        self.datadir = datadir
        self.mode = mode or ('r+' if os.path.exists(self.filename) else 'w')
        if self.mode == 'r' and not os.path.exists(self.filename):
            raise IOError('File not found: %s' % self.filename)
        self.hdf5 = ()  # so that `key in self.hdf5` is valid
        self.open(self.mode)
        if mode != 'r':  # w, a or r+
            performance.init_performance(self.hdf5)
[docs]    def open(self, mode):
        """
        Open the underlying .hdf5 file
        """
        if self.hdf5 == ():  # not already open
            try:
                self.hdf5 = hdf5.File(self.filename, mode)
            except OSError as exc:
                raise OSError('%s in %s' % (exc, self.filename))
            hc_id = read_hc_id(self.hdf5)
            if hc_id:
                self.parent = read(hc_id)
        return self 
    @property
    def export_dir(self):
        """
        Return the underlying export directory
        """
        edir = getattr(self, '_export_dir', None) or self['oqparam'].export_dir
        return edir
    @export_dir.setter
    def export_dir(self, value):
        """
        Set the export directory
        """
        self._export_dir = value
[docs]    def getitem(self, name):
        """
        Return a dataset by using h5py.File.__getitem__
        """
        try:
            return h5py.File.__getitem__(self.hdf5, name)
        except KeyError:
            if self.parent != ():
                if not self.parent.hdf5:
                    self.parent.open('r')
                return self.parent.getitem(name)
            else:
                raise 
[docs]    def swmr_on(self):
        """
        Enable the SWMR mode on the underlying HDF5 file
        """
        self.close()  # flush everything
        self.open('a')
        if self.parent != ():
            self.parent.open('r')
        try:
            self.hdf5.swmr_mode = True
        except (ValueError, RuntimeError):  # already set
            pass 
[docs]    def set_attrs(self, key, **kw):
        """
        Set the HDF5 attributes of the given key
        """
        self.hdf5.save_attrs(key, kw) 
[docs]    def set_shape_descr(self, key, **kw):
        """
        Set shape attributes
        """
        hdf5.set_shape_descr(self.hdf5, key, kw) 
[docs]    def get_attr(self, key, name, default=None):
        """
        :param key: dataset path
        :param name: name of the attribute
        :param default: value to return if the attribute is missing
        """
        try:
            obj = h5py.File.__getitem__(self.hdf5, key)
        except KeyError:
            if self.parent != ():
                return self.parent.get_attr(key, name, default)
            else:
                raise
        try:
            return obj.attrs[name]
        except KeyError:
            if default is None:
                raise
            return default 
[docs]    def get_attrs(self, key):
        """
        :param key: dataset path
        :returns: dictionary of attributes for that path
        """
        try:
            dset = h5py.File.__getitem__(self.hdf5, key)
        except KeyError:
            if self.parent != ():
                dset = h5py.File.__getitem__(self.parent.hdf5, key)
            else:
                raise
        return dict(dset.attrs) 
[docs]    def create_dset(self, key, dtype, shape=(None,), compression=None,
                    fillvalue=0, attrs=None):
        """
        Create a one-dimensional HDF5 dataset.
        :param key: name of the dataset
        :param dtype: dtype of the dataset (usually composite)
        :param shape: shape of the dataset, possibly extendable
        :param compression: the kind of HDF5 compression to use
        :param attrs: dictionary of attributes of the dataset
        :returns: a HDF5 dataset
        """
        if isinstance(dtype, numpy.ndarray):
            dset = hdf5.create(
                self.hdf5, key, dtype.dtype, dtype.shape,
                compression, fillvalue, attrs)
            dset[:] = dtype
            return dset
        return hdf5.create(
            self.hdf5, key, dtype, shape, compression, fillvalue, attrs) 
[docs]    def create_df(self, key, nametypes, compression=None, **kw):
        """
        Create a HDF5 datagroup readable as a pandas DataFrame
        :param key:
            name of the dataset
        :param nametypes:
            list of pairs (name, dtype) or (name, array) or DataFrame
        :param compression:
            the kind of HDF5 compression to use
        :param kw:
            extra attributes to store
        """
        return self.hdf5.create_df(key, nametypes, compression, **kw) 
[docs]    def export_path(self, relname, export_dir=None):
        """
        Return the path of the exported file by adding the export_dir in
        front, the calculation ID at the end.
        :param relname: relative file name
        :param export_dir: export directory (if None use .export_dir)
        """
        # removing inner slashed to avoid creating intermediate directories
        name, ext = relname.replace('/', '-').rsplit('.', 1)
        newname = '%s_%s.%s' % (name, self.calc_id, ext)
        if export_dir is None:
            export_dir = self.export_dir
        return os.path.join(export_dir, newname) 
[docs]    def build_fname(self, prefix, postfix, fmt, export_dir=None):
        """
        Build a file name from a realization, by using prefix and extension.
        :param prefix: the prefix to use
        :param postfix: the postfix to use (can be a realization object)
        :param fmt: the extension ('csv', 'xml', etc)
        :param export_dir: export directory (if None use .export_dir)
        :returns: relative pathname including the extension
        """
        if hasattr(postfix, 'sm_lt_path'):  # is a realization
            fname = '%s-rlz-%03d.%s' % (prefix, postfix.ordinal, fmt)
        else:
            fname = prefix + ('-%s' % postfix if postfix else '') + '.' + fmt
        return self.export_path(fname, export_dir) 
[docs]    def flush(self):
        """Flush the underlying hdf5 file"""
        if self.parent != ():
            self.parent.flush()
        if self.hdf5:  # is open
            self.hdf5.flush() 
[docs]    def close(self):
        """Close the underlying hdf5 file"""
        if self.parent != ():
            self.parent.flush()
            self.parent.close()
        if self.hdf5:  # is open
            self.hdf5.flush()
            self.hdf5.close()
            self.hdf5 = () 
[docs]    def clear(self):
        """Remove the datastore from the file system"""
        self.close()
        os.remove(self.filename) 
[docs]    def getsize(self, key='/'):
        """
        Return the size in byte of the output associated to the given key.
        If no key is given, returns the total size of all files.
        """
        if key == '/':
            return os.path.getsize(self.filename)
        try:
            dset = self.getitem(key)
        except KeyError:
            if self.parent != ():
                dset = self.parent.getitem(key)
            else:
                raise
        return hdf5.ByteCounter.get_nbytes(dset) 
[docs]    def get(self, key, default):
        """
        :returns: the value associated to the datastore key, or the default
        """
        try:
            return self[key]
        except KeyError:
            return default 
[docs]    def store_files(self, fnames, where='input/'):
        """
        :param fnames: a set of full pathnames
        """
        prefix = len(os.path.commonprefix(fnames))
        for fname in fnames:
            with open(fname, 'rb') as f:
                data = gzip.compress(f.read())
            self[where + fname[prefix:]] = numpy.void(data) 
[docs]    def retrieve_files(self, prefix='input'):
        """
        :yields: pairs (relative path, data)
        """
        for k, v in self[prefix].items():
            if hasattr(v, 'items'):
                yield from self.retrieve_files(prefix + '/' + k)
            else:
                yield prefix + '/' + k, gzip.decompress(
                    bytes(numpy.asarray(v[()]))) 
[docs]    def get_file(self, key):
        """
        :returns: a BytesIO object
        """
        data = bytes(numpy.asarray(self[key][()]))
        return io.BytesIO(gzip.decompress(data)) 
[docs]    def read_df(self, key, index=None, sel=(), slc=slice(None)):
        """
        :param key: name of the structured dataset
        :param index: pandas index (or multi-index), possibly None
        :param sel: dictionary used to select subsets of the dataset
        :param slc: slice object to extract a slice of the dataset
        :returns: pandas DataFrame associated to the dataset
        """
        if key in self.hdf5:
            return self.hdf5.read_df(key, index, sel, slc)
        if self.parent:
            return self.parent.read_df(key, index, sel, slc)
        raise KeyError(key) 
[docs]    def read_unique(self, key, field):
        """
        :param key: key to a dataset containing a structured array
        :param field: a field in the structured array
        :returns: sorted, unique values
        Works with chunks of 1M records
        """
        unique = set()
        dset = self.getitem(key)
        for slc in general.gen_slices(0, len(dset), 10_000_000):
            arr = numpy.unique(dset[slc][field])
            unique.update(arr)
        return sorted(unique) 
[docs]    def sel(self, key, **kw):
        """
        Select a dataset with shape_descr. For instance
        dstore.sel('hcurves', imt='PGA', sid=2)
        """
        return hdf5.sel(self.getitem(key), kw) 
    @property
    def metadata(self):
        """
        :returns: datastore metadata version, date, checksum as a dictionary
        """
        a = self.hdf5.attrs
        return dict(generated_by='OpenQuake engine %s' % a['engine_version'],
                    start_date=a['date'], checksum=a['checksum32'])
    def __getitem__(self, key):
        if self.hdf5 == ():  # the datastore is closed
            raise ValueError('Cannot find %s in %s' % (key, self))
        try:
            val = self.hdf5[key]
        except KeyError:
            if self.parent != ():
                self.parent.open('r')
                try:
                    val = self.parent[key]
                except KeyError:
                    raise KeyError(
                        'No %r found in %s and ancestors' % (key, self))
            else:
                raise KeyError('No %r found in %s' % (key, self))
        return val
    def __setitem__(self, key, val):
        if key in self.hdf5:
            # there is a bug in the current version of HDF5 for composite
            # arrays: is impossible to save twice the same key; so we remove
            # the key first, then it is possible to save it again
            del self[key]
        try:
            self.hdf5[key] = val
        except RuntimeError as exc:
            raise RuntimeError('Could not save %s: %s in %s' %
                               (key, exc, self.filename))
    def __delitem__(self, key):
        del self.hdf5[key]
    def __enter__(self):
        self.was_close = self.hdf5 == ()
        if self.was_close:
            self.open(self.mode)
        return self
    def __exit__(self, etype, exc, tb):
        if self.was_close:  # and has been opened in __enter__, close it
            self.close()
        del self.was_close
    def __getstate__(self):
        # make the datastore pickleable
        return dict(mode='r',
                    parent=self.parent,
                    calc_id=self.calc_id,
                    hdf5=(),
                    filename=self.filename,
                    ppath=self.ppath)
    def __iter__(self):
        if not self.hdf5:
            raise RuntimeError('%s is closed' % self)
        for path in sorted(self.hdf5):
            yield path
    def __contains__(self, key):
        return key in self.hdf5 or self.parent and key in self.parent.hdf5
    def __len__(self):
        if self.hdf5 == ():  # closed
            return 1
        return sum(1 for f in self.hdf5)
    def __hash__(self):
        return self.calc_id
    def __repr__(self):
        status = 'open' if self.hdf5 else 'closed'
        return '<%s %s %s>' % (self.__class__.__name__, self.filename, status)