# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2015-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importioimportosimportreimportgzipimportcollectionsimportnumpyimporth5pyfromopenquake.baselibimporthdf5,performance,generalfromopenquake.commonlib.logsimportget_datadir,CALC_REGEX,dbcmd,init
[docs]defextract_calc_id_datadir(filename):""" Extract the calculation ID from the given filename or integer: >>> id, datadir = extract_calc_id_datadir('/mnt/ssd/oqdata/calc_25.hdf5') >>> id 25 >>> path_items = os.path.normpath(datadir).split(os.sep)[1:] >>> print(path_items) ['mnt', 'ssd', 'oqdata'] >>> wrong_name = '/mnt/ssd/oqdata/wrong_name.hdf5' >>> try: ... extract_calc_id_datadir(wrong_name) ... except ValueError as exc: ... assert 'Cannot extract calc_id from' in str(exc) ... assert 'wrong_name.hdf5' in str(exc) """filename=os.path.abspath(filename)datadir=os.path.dirname(filename)mo=re.match(CALC_REGEX,os.path.basename(filename))ifmoisNone:raiseValueError('Cannot extract calc_id from %s'%filename)calc_id=int(mo.group(2))returncalc_id,datadir
def_read(calc_id:int,datadir,mode,haz_id=None):# low level function to read a datastore fileddir=datadirorget_datadir()ppath=None# look in the dbjob=dbcmd('get_job',calc_id)ifjob:jid=job.idpath=job.ds_calc_dir+'.hdf5'hc_id=job.hazard_calculation_idifnothc_idandhaz_id:dbcmd('update_job',jid,{'hazard_calculation_id':haz_id})hc_id=haz_idifhc_idandhc_id!=jid:hc=dbcmd('get_job',hc_id)ifhc:ppath=hc.ds_calc_dir+'.hdf5'else:ppath=os.path.join(ddir,'calc_%d.hdf5'%hc_id)else:# when using oq run there is no job in the dbpath=os.path.join(ddir,'calc_%s.hdf5'%calc_id)returnDataStore(path,ppath,mode)
[docs]defread(calc_id,mode='r',datadir=None,parentdir=None,read_parent=True):""" :param calc_id: calculation ID or filename :param mode: 'r' or 'w' :param datadir: the directory where to look :param parentdir: the datadir of the parent calculation :param read_parent: read the parent calculation if it is there :returns: the corresponding DataStore instance Read the datastore, if it exists and it is accessible. """ifisinstance(calc_id,str):# pathnamedstore=DataStore(calc_id,mode=mode)else:dstore=_read(calc_id,datadir,mode)try:hc_id=dstore['oqparam'].hazard_calculation_idexceptKeyError:# no oqparamhc_id=Noneifread_parentandhc_id:dstore.parent=_read(hc_id,datadir,mode='r')dstore.ppath=dstore.parent.filenamereturndstore.open(mode)
[docs]defnew(calc_id,oqparam,datadir=None,mode=None):""" :param calc_id: if integer > 0 look in the database and then on the filesystem if integer < 0 look at the old calculations in the filesystem :param oqparam: OqParam instance with the validated parameters of the calculation :returns: a DataStore instance associated to the given calc_id """dstore=_read(calc_id,mode,datadir)if'oqparam'notindstore:dstore['oqparam']=oqparamifoqparam.hazard_calculation_id:dstore.ppath=read(calc_id,'r',datadir).ppathreturndstore
[docs]defcreate_job_dstore(description='custom calculation',parent=(),ini=None):""" :returns: <DataStore> and <LogContext> associated to the calculation """ifiniisnotNone:dic=inielse:dic=dict(description=description,calculation_mode='custom')log=init(dic)dstore=new(log.calc_id,log.get_oqparam(validate=False))dstore.parent=parentreturnlog,dstore
[docs]defread_hc_id(hdf5):""" Getting the hazard_calculation_id, if any """try:oq=hdf5['oqparam']exceptKeyError:# oqparam not saved yetreturnexceptOSError:# file open by another process with oqparam not flushedreturnreturnoq.hazard_calculation_id
[docs]classDataStore(collections.abc.MutableMapping):""" DataStore class to store the inputs/outputs of a calculation on the filesystem. Here is a minimal example of usage: >>> log, dstore = create_job_dstore() >>> with dstore, log: ... dstore['example'] = 42 ... print(dstore['example'][()]) 42 When reading the items, the DataStore will return a generator. The items will be ordered lexicographically according to their name. There is a serialization protocol to store objects in the datastore. An object is serializable if it has a method `__toh5__` returning an array and a dictionary, and a method `__fromh5__` taking an array and a dictionary and populating the object. For an example of use see :class:`openquake.hazardlib.site.SiteCollection`. """calc_id=None# set at instantiation timejob=None# set at instantiation timeopened=0closed=0def__init__(self,path,ppath=None,mode=None):self.filename=pathself.ppath=ppathself.calc_id,datadir=extract_calc_id_datadir(path)self.tempname=self.filename[:-5]+'_tmp.hdf5'ifnotos.path.exists(datadir)andmode!='r':os.makedirs(datadir)self.parent=()# can be set laterself.datadir=datadirself.mode=modeor('r+'ifos.path.exists(self.filename)else'w')ifself.mode=='r'andnotos.path.exists(self.filename):raiseIOError('File not found: %s'%self.filename)self.hdf5=()# so that `key in self.hdf5` is validself.open(self.mode)ifmode!='r':# w, a or r+performance.init_performance(self.hdf5)
[docs]defopen(self,mode):""" Open the underlying .hdf5 file """ifself.hdf5==():# not already opentry:self.hdf5=hdf5.File(self.filename,mode)exceptOSErrorasexc:raiseOSError('%s in %s'%(exc,self.filename))hc_id=read_hc_id(self.hdf5)ifhc_id:self.parent=read(hc_id)returnself
@propertydefexport_dir(self):""" Return the underlying export directory """edir=getattr(self,'_export_dir',None)orself['oqparam'].export_dirreturnedir@export_dir.setterdefexport_dir(self,value):""" Set the export directory """self._export_dir=value
[docs]defgetitem(self,name):""" Return a dataset by using h5py.File.__getitem__ """try:returnh5py.File.__getitem__(self.hdf5,name)exceptKeyError:ifself.parent!=():ifnotself.parent.hdf5:self.parent.open('r')returnself.parent.getitem(name)else:raise
[docs]defswmr_on(self):""" Enable the SWMR mode on the underlying HDF5 file """self.close()# flush everythingself.open('a')ifself.parent!=():self.parent.open('r')try:self.hdf5.swmr_mode=Trueexcept(ValueError,RuntimeError):# already setpass
[docs]defset_attrs(self,key,**kw):""" Set the HDF5 attributes of the given key """self.hdf5.save_attrs(key,kw)
[docs]defset_shape_descr(self,key,**kw):""" Set shape attributes """hdf5.set_shape_descr(self.hdf5,key,kw)
[docs]defget_attr(self,key,name,default=None):""" :param key: dataset path :param name: name of the attribute :param default: value to return if the attribute is missing """try:obj=h5py.File.__getitem__(self.hdf5,key)exceptKeyError:ifself.parent!=():returnself.parent.get_attr(key,name,default)else:raisetry:returnobj.attrs[name]exceptKeyError:ifdefaultisNone:raisereturndefault
[docs]defget_attrs(self,key):""" :param key: dataset path :returns: dictionary of attributes for that path """try:dset=h5py.File.__getitem__(self.hdf5,key)exceptKeyError:ifself.parent!=():dset=h5py.File.__getitem__(self.parent.hdf5,key)else:raisereturndict(dset.attrs)
[docs]defcreate_dset(self,key,dtype,shape=(None,),compression=None,fillvalue=0,attrs=None):""" Create a one-dimensional HDF5 dataset. :param key: name of the dataset :param dtype: dtype of the dataset (usually composite) :param shape: shape of the dataset, possibly extendable :param compression: the kind of HDF5 compression to use :param attrs: dictionary of attributes of the dataset :returns: a HDF5 dataset """ifisinstance(dtype,numpy.ndarray):dset=hdf5.create(self.hdf5,key,dtype.dtype,dtype.shape,compression,fillvalue,attrs)dset[:]=dtypereturndsetreturnhdf5.create(self.hdf5,key,dtype,shape,compression,fillvalue,attrs)
[docs]defcreate_df(self,key,nametypes,compression=None,**kw):""" Create a HDF5 datagroup readable as a pandas DataFrame :param key: name of the dataset :param nametypes: list of pairs (name, dtype) or (name, array) or DataFrame :param compression: the kind of HDF5 compression to use :param kw: extra attributes to store """returnself.hdf5.create_df(key,nametypes,compression,**kw)
[docs]defexport_path(self,relname,export_dir=None):""" Return the path of the exported file by adding the export_dir in front, the calculation ID at the end. :param relname: relative file name :param export_dir: export directory (if None use .export_dir) """# removing inner slashed to avoid creating intermediate directoriesname,ext=relname.replace('/','-').rsplit('.',1)newname='%s_%s.%s'%(name,self.calc_id,ext)ifexport_dirisNone:export_dir=self.export_dirreturnos.path.join(export_dir,newname)
[docs]defbuild_fname(self,prefix,postfix,fmt,export_dir=None):""" Build a file name from a realization, by using prefix and extension. :param prefix: the prefix to use :param postfix: the postfix to use (can be a realization object) :param fmt: the extension ('csv', 'xml', etc) :param export_dir: export directory (if None use .export_dir) :returns: relative pathname including the extension """ifhasattr(postfix,'sm_lt_path'):# is a realizationfname='%s-rlz-%03d.%s'%(prefix,postfix.ordinal,fmt)else:fname=prefix+('-%s'%postfixifpostfixelse'')+'.'+fmtreturnself.export_path(fname,export_dir)
[docs]defflush(self):"""Flush the underlying hdf5 file"""ifself.parent!=():self.parent.flush()ifself.hdf5:# is openself.hdf5.flush()
[docs]defclose(self):"""Close the underlying hdf5 file"""ifself.parent!=():self.parent.flush()self.parent.close()ifself.hdf5:# is openself.hdf5.flush()self.hdf5.close()self.hdf5=()
[docs]defclear(self):"""Remove the datastore from the file system"""self.close()os.remove(self.filename)
[docs]defgetsize(self,key='/'):""" Return the size in byte of the output associated to the given key. If no key is given, returns the total size of all files. """ifkey=='/':returnos.path.getsize(self.filename)try:dset=self.getitem(key)exceptKeyError:ifself.parent!=():dset=self.parent.getitem(key)else:raisereturnhdf5.ByteCounter.get_nbytes(dset)
[docs]defget(self,key,default):""" :returns: the value associated to the datastore key, or the default """try:returnself[key]exceptKeyError:returndefault
[docs]defstore_files(self,fnames,where='input/'):""" :param fnames: a set of full pathnames """prefix=len(os.path.commonprefix(fnames))forfnameinfnames:withopen(fname,'rb')asf:data=gzip.compress(f.read())self[where+fname[prefix:]]=numpy.void(data)
[docs]defget_file(self,key):""" :returns: a BytesIO object """data=bytes(numpy.asarray(self[key][()]))returnio.BytesIO(gzip.decompress(data))
[docs]defread_df(self,key,index=None,sel=(),slc=slice(None)):""" :param key: name of the structured dataset :param index: pandas index (or multi-index), possibly None :param sel: dictionary used to select subsets of the dataset :param slc: slice object to extract a slice of the dataset :returns: pandas DataFrame associated to the dataset """ifkeyinself.hdf5:returnself.hdf5.read_df(key,index,sel,slc)ifself.parent:returnself.parent.read_df(key,index,sel,slc)raiseKeyError(key)
[docs]defread_unique(self,key,field):""" :param key: key to a dataset containing a structured array :param field: a field in the structured array :returns: sorted, unique values Works with chunks of 1M records """unique=set()dset=self.getitem(key)forslcingeneral.gen_slices(0,len(dset),10_000_000):arr=numpy.unique(dset[slc][field])unique.update(arr)returnsorted(unique)
[docs]defsel(self,key,**kw):""" Select a dataset with shape_descr. For instance dstore.sel('hcurves', imt='PGA', sid=2) """returnhdf5.sel(self.getitem(key),kw)
@propertydefmetadata(self):""" :returns: datastore metadata version, date, checksum as a dictionary """a=self.hdf5.attrsif'aelo_version'ina:returndict(generated_by='AELO %s'%a['aelo_version'],start_date=a['date'],checksum=a['checksum32'])else:returndict(generated_by='OpenQuake engine %s'%a['engine_version'],start_date=a['date'],checksum=a['checksum32'])def__getitem__(self,key):ifself.hdf5==():# the datastore is closedraiseValueError('Cannot find %s in %s'%(key,self))try:val=self.hdf5[key]exceptKeyError:ifself.parent!=():self.parent.open('r')try:val=self.parent[key]exceptKeyError:raiseKeyError('No %r found in %s and ancestors'%(key,self))else:raiseKeyError('No %r found in %s'%(key,self))returnvaldef__setitem__(self,key,val):ifkeyinself.hdf5:# there is a bug in the current version of HDF5 for composite# arrays: is impossible to save twice the same key; so we remove# the key first, then it is possible to save it againdelself[key]try:self.hdf5[key]=valexceptRuntimeErrorasexc:raiseRuntimeError('Could not save %s: %s in %s'%(key,exc,self.filename))def__delitem__(self,key):delself.hdf5[key]def__enter__(self):self.was_close=self.hdf5==()ifself.was_close:self.open(self.mode)returnselfdef__exit__(self,etype,exc,tb):ifself.was_close:# and has been opened in __enter__, close itself.close()delself.was_closedef__getstate__(self):# make the datastore pickleablereturndict(mode='r',parent=self.parent,calc_id=self.calc_id,hdf5=(),filename=self.filename,ppath=self.ppath)def__iter__(self):ifnotself.hdf5:raiseRuntimeError('%s is closed'%self)forpathinsorted(self.hdf5):yieldpathdef__contains__(self,key):returnkeyinself.hdf5orself.parentandkeyinself.parent.hdf5def__len__(self):ifself.hdf5==():# closedreturn1returnsum(1forfinself.hdf5)def__hash__(self):returnself.calc_iddef__repr__(self):status='open'ifself.hdf5else'closed'return'<%s%s%s>'%(self.__class__.__name__,self.filename,status)