# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4# Copyright (C) 2015-2025 GEM Foundation# OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.# OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU General Public License for more details.# You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importosimportcsvimportsysimportinspectimporttempfileimportwarningsimportimportlibimportitertoolsfromdataclassesimportdataclassfromurllib.parseimportquote_plus,unquote_plusimportcollectionsimportjsonimporttomlimportpandasimportnumpyimporth5pyfromopenquake.baselibimportInvalidFile,generalfromopenquake.baselib.python3compatimportencode,decodevbytes=h5py.special_dtype(vlen=bytes)vstr=h5py.special_dtype(vlen=str)vuint8=h5py.special_dtype(vlen=numpy.uint8)vuint16=h5py.special_dtype(vlen=numpy.uint16)vuint32=h5py.special_dtype(vlen=numpy.uint32)vfloat32=h5py.special_dtype(vlen=numpy.float32)vfloat64=h5py.special_dtype(vlen=numpy.float64)FLOAT=(float,numpy.float32,numpy.float64)INT=(int,numpy.int32,numpy.uint32,numpy.int64,numpy.uint64)MAX_ROWS=10_000_000ifsys.platform=='win32':# go back to the behavior before hdf5==1.12 i.e. h5py==3.4os.environ['HDF5_USE_FILE_LOCKING']='FALSE'
[docs]defsanitize(value):""" Sanitize the value so that it can be stored as an HDF5 attribute """ifisinstance(value,bytes):returnnumpy.void(value)elifisinstance(value,(list,tuple)):ifvalueandisinstance(value[0],str):returnencode(value)elifisinstance(value,int)andvalue>sys.maxsize:returnfloat(value)returnvalue
[docs]defcreate(hdf5,name,dtype,shape=(None,),compression=None,fillvalue=0,attrs=None):""" :param hdf5: a h5py.File object :param name: an hdf5 key string :param dtype: dtype of the dataset (usually composite) :param shape: shape of the dataset (can be extendable) :param compression: None or 'gzip' are recommended :param attrs: dictionary of attributes of the dataset :returns: a HDF5 dataset """ifshape[0]isNone:# extendable datasetdset=hdf5.create_dataset(name,(0,)+shape[1:],dtype,chunks=True,maxshape=shape,compression=compression)else:# fixed-shape datasetdset=hdf5.create_dataset(name,shape,dtype,fillvalue=fillvalue,compression=compression)ifattrs:fork,vinattrs.items():dset.attrs[k]=sanitize(v)returndset
[docs]defpreshape(obj):""" :returns: the shape of obj, except the last dimension """ifhasattr(obj,'shape'):# arrayreturnobj.shape[:-1]return()
[docs]classFakeDataset:""" Used for null saving """
[docs]defextend(dset,array,**attrs):""" Extend an extensible dataset with an array of a compatible dtype. :param dset: an h5py dataset :param array: an array of length L :returns: the total length of the dataset (i.e. initial length + L) """ifisinstance(dset,FakeDataset):# save nothingreturn0length=len(dset)iflen(array)==0:returnlengthnewlength=length+len(array)ifarray.dtype.name=='object':# vlen arrayshape=(newlength,)+preshape(array[0])else:shape=(newlength,)+array.shape[1:]dset.resize(shape)dset[length:newlength]=arrayforkey,valinattrs.items():dset.attrs[key]=valreturnnewlength
[docs]defcls2dotname(cls):""" The full Python name (i.e. `pkg.subpkg.mod.cls`) of a class """return'%s.%s'%(cls.__module__,cls.__name__)
[docs]defdotname2cls(dotname):""" The class associated to the given dotname (i.e. `pkg.subpkg.mod.cls`) """modname,clsname=dotname.rsplit('.',1)returngetattr(importlib.import_module(modname),clsname)
[docs]defget_nbytes(dset):""" :param dset: an HDF5 group or dataset :returns: the size of the underlying array or None if the dataset is actually a group. """ifhasattr(dset,'dtype'):# else extract nbytes from the underlying arrayreturndset.size*numpy.zeros(1,dset.dtype).nbytes
[docs]classByteCounter(object):""" A visitor used to measure the dimensions of a HDF5 dataset or group. Use it as ByteCounter.get_nbytes(dset_or_group). """
[docs]@classmethoddefget_nbytes(cls,dset):nbytes=get_nbytes(dset)ifnbytesisnotNone:returnnbytes# else dip in the treeself=cls()dset.visititems(self)returnself.nbytes
[docs]classGroup(collections.abc.Mapping):""" A mock for a h5py group object """def__init__(self,items,attrs):self.dic={quote_plus(k):vfork,vinitems}self.attrs=attrsdef__getitem__(self,key):returnself.dic[key]def__setitem__(self,key,value):self.dic[key]=valuedef__iter__(self):yield fromself.dicdef__len__(self):returnlen(self.dic)
[docs]defsel(dset,filterdict):""" Select a dataset with shape_descr. For instance dstore.sel('hcurves', imt='PGA', sid=2) """dic=get_shape_descr(dset.attrs['json'])lst=[]fordimindic['shape_descr']:ifdiminfilterdict:val=filterdict[dim]values=dic[dim]ifisinstance(val,INT)andval<0:# for instance sid=-1 means the last sididx=values[val]else:idx=values.index(val)lst.append(slice(idx,idx+1))else:lst.append(slice(None))returndset[tuple(lst)]
[docs]defdset2df(dset,indexfield,filterdict):""" Converts an HDF5 dataset with an attribute shape_descr into a Pandas dataframe. NB: this is very slow for large datasets. """arr=sel(dset,filterdict)dic=get_shape_descr(dset.attrs['json'])tags=[]idxs=[]fordimindic['shape_descr']:values=dic[dim]ifdiminfilterdict:val=filterdict[dim]idx=values.index(val)idxs.append([idx])values=[val]elifhasattr(values,'stop'):# a range object alreadyidxs.append(values)else:idxs.append(range(len(values)))tags.append(values)acc=general.AccumDict(accum=[])index=[]foridx,valsinzip(itertools.product(*idxs),itertools.product(*tags)):forfield,valinzip(dic['shape_descr'],vals):iffield==indexfield:index.append(val)else:acc[field].append(val)acc['value'].append(arr[idx])returnpandas.DataFrame(acc,indexorNone)
[docs]defis_ok(value,expected):""" :returns: True if the value is expected """ifhasattr(expected,'__len__'):returnnumpy.isin(value,expected)returnvalue==expected
[docs]defextract_cols(datagrp,sel,slices,columns):""" :param datagrp: something like and HDF5 data group :param sel: dictionary column name -> value specifying a selection :param slices: list of slices :param columns: the full list of column names :returns: a dictionary col -> array of values """acc=general.AccumDict(accum=[])# col -> arraysifsel:forslcinslices:ok=slice(None)dic={col:datagrp[col][slc]forcolinsel}forcolinsel:ifisinstance(ok,slice):# first selectionok=is_ok(dic[col],sel[col])else:# other selectionsok&=is_ok(dic[col],sel[col])forcolincolumns:acc[col].append(datagrp[col][slc][ok])else:# avoid making unneeded copiesforcolincolumns:dset=datagrp[col]forslcinslices:acc[col].append(dset[slc])fork,vsinacc.items():acc[k]=arr=numpy.concatenate(vs,dtype=vs[0].dtype)iflen(arr)andisinstance(arr[0],bytes):acc[k]=numpy.array(decode(arr))returnacc
[docs]classFile(h5py.File):""" Subclass of :class:`h5py.File` able to store and retrieve objects conforming to the HDF5 protocol used by the OpenQuake software. It works recursively also for dictionaries of the form name->obj. >>> f = File('/tmp/x.h5', 'w') >>> f['dic'] = dict(a=dict(x=1, y=2), b=3) >>> dic = f['dic'] >>> dic['a']['x'][()] 1 >>> dic['b'][()] 3 >>> f.close() """
[docs]classEmptyDataset(ValueError):"""Raised when reading an empty dataset"""
[docs]@classmethoddeftemporary(cls):""" Returns a temporary hdf5 file, open for writing. The temporary name is stored in the .path attribute. It is the user responsability to remove the file when closed. """fh,path=tempfile.mkstemp(suffix='.hdf5')os.close(fh)self=cls(path,'w')self.path=pathreturnself
[docs]defcreate_df(self,key,nametypes,compression=None,**kw):""" Create a HDF5 datagroup readable as a pandas DataFrame :param key: name of the dataset :param nametypes: pairs (name, dtype) or (name, array) or structured array or DataFrame :param compression: the kind of HDF5 compression to use :param kw: extra attributes to store """ifhasattr(nametypes,'dtype')andnametypes.dtype.names:nametypes=[(name,nametypes[name])fornameinnametypes.dtype.names]elifisinstance(nametypes,pandas.DataFrame):nametypes=[(name,nametypes[name].to_numpy())fornameinnametypes.columns]names=[]forname,valueinnametypes:is_array=isinstance(value,numpy.ndarray)ifis_arrayandlen(value)andisinstance(value[0],str):dt=vstrelifis_array:dt=value.dtypeelse:dt=valuedset=create(self,f'{key}/{name}',dt,(None,),compression)ifis_array:extend(dset,value)names.append(name)attrs=self[key].attrsattrs['__pdcolumns__']=' '.join(names)fork,vinkw.items():attrs[k]=v
[docs]defread_df(self,key,index=None,sel=(),slc=slice(None),slices=()):""" :param key: name of the structured dataset :param index: pandas index (or multi-index), possibly None :param sel: dictionary used to select subsets of the dataset :param slc: slice object to extract a slice of the dataset :param slices: an array of shape (N, 2) with start,stop indices :returns: pandas DataFrame associated to the dataset """dset=self.getitem(key)iflen(dset)==0:raiseself.EmptyDataset('Dataset %s is empty'%key)elif'json'indset.attrs:returndset2df(dset,index,sel)elif'__pdcolumns__'indset.attrs:columns=dset.attrs['__pdcolumns__'].split()iflen(slices):slcs=[slice(s0,s1)fors0,s1inslices]elifslc.startisNoneandslc.stopisNone:# split in slicesslcs=list(general.gen_slices(0,len(dset[columns[0]]),MAX_ROWS))else:slcs=[slc]dic=extract_cols(dset,sel,slcs,columns)ifindexisNone:returnpandas.DataFrame(dic)else:returnpandas.DataFrame(dic).set_index(index)dtlist=[]fornameindset.dtype.names:dt=dset.dtype[name]ifdt.shape:# vector fieldtempl=name+'_%d'*len(dt.shape)fori,_innumpy.ndenumerate(numpy.zeros(dt.shape)):dtlist.append((templ%i,dt.base))else:# scalar fielddtlist.append((name,dt))data=numpy.zeros(len(dset),dtlist)fornameindset.dtype.names:arr=dset[name]dt=dset.dtype[name]ifdt.shape:# vector fieldtempl=name+'_%d'*len(dt.shape)fori,_innumpy.ndenumerate(numpy.zeros(dt.shape)):data[templ%i]=arr[(slice(None),)+i]else:# scalar fielddata[name]=arrifsel:fork,vinsel.items():data=data[data[k]==v]returnpandas.DataFrame.from_records(data,index=index)
[docs]defsave_vlen(self,key,data):# used in SourceWriterTestCase""" Save a sequence of variable-length arrays :param key: name of the dataset :param data: data to store as a list of arrays """shape=(None,)+data[0].shape[:-1]try:dset=self[key]exceptKeyError:vdt=h5py.special_dtype(vlen=data[0].dtype)dset=create(self,key,vdt,shape,fillvalue=None)length=len(dset)dset.resize((length+len(data),)+shape[1:])dset[length:length+len(data)]=data
[docs]defsave_attrs(self,path,attrs,**kw):items=list(attrs.items())+list(kw.items())ifitems:a=super().__getitem__(path).attrsfork,vinsorted(items):try:a[k]=sanitize(v)exceptExceptionasexc:raiseTypeError('Could not store attribute %s=%s: %s'%(k,v,exc))
def__setitem__(self,path,obj):cls=obj.__class__ifhasattr(obj,'__toh5__'):obj,attrs=obj.__toh5__()pyclass=cls2dotname(cls)else:pyclass=''ifisinstance(obj,(list,tuple))andlen(obj)andisinstance(obj[0],(str,bytes)):# flat sequence of stringsobj=numpy.array(encode(obj))ifisinstance(obj,(dict,Group))andobj:fork,vinobj.items():# NB: there was a line sorted(obj.items()) here# it was removed because it caused the absurd issue# https://github.com/gem/oq-engine/issues/4761# for an exposure with more than 65536 assetsifisinstance(k,tuple):# multikeyk='-'.join(k)key='%s/%s'%(path,k)self[key]=vifisinstance(obj,Group):self.save_attrs(path,obj.attrs,__pyclass__=cls2dotname(Group))elif(isinstance(obj,numpy.ndarray)andobj.shapeandlen(obj)andisinstance(obj[0],str)):self.create_dataset(path,obj.shape,vstr)[:]=objelifisinstance(obj,numpy.ndarray)andobj.shape:d=self.create_dataset(path,obj.shape,obj.dtype,fillvalue=None)d[:]=objelif(isinstance(obj,numpy.ndarray)andobj.dtype.name.startswith('bytes')):self._set(path,numpy.void(bytes(obj)))elifisinstance(obj,list)andlen(obj)andisinstance(obj[0],numpy.ndarray):self.save_vlen(path,obj)elifisinstance(obj,bytes):self._set(path,numpy.void(obj))else:self._set(path,obj)ifpyclass:self.flush()# make sure it is fully savedself.save_attrs(path,attrs,__pyclass__=pyclass)def_set(self,path,obj):try:super().__setitem__(path,obj)exceptExceptionasexc:raiseexc.__class__('Could not set %s=%r'%(path,obj))def__getitem__(self,path):h5obj=super().__getitem__(path)h5attrs=h5obj.attrsif'__pyclass__'inh5attrs:cls=dotname2cls(h5attrs['__pyclass__'])obj=cls.__new__(cls)ifhasattr(h5obj,'items'):# is grouph5obj={unquote_plus(k):self['%s/%s'%(path,k)]fork,vinh5obj.items()}elifhasattr(h5obj,'shape'):h5obj=h5obj[()]ifhasattr(obj,'__fromh5__'):obj.__fromh5__(h5obj,h5attrs)else:# Group objectobj.dic=h5objobj.attrs=h5attrsreturnobjelse:returnh5objdef__getstate__(self):# make the file pickleablereturn{'_id':0}
[docs]defgetitem(self,name):""" Return a dataset by using h5py.File.__getitem__ """returnh5py.File.__getitem__(self,name)
[docs]defarray_of_vstr(lst):""" :param lst: a list of strings or bytes :returns: an array of variable length ASCII strings """ls=[]forelinlst:try:ls.append(el.encode('utf-8'))exceptAttributeError:ls.append(el)returnnumpy.array(ls,vstr)
[docs]defdumps(dic):""" Dump a dictionary in json. Extend json.dumps to work on numpy objects. """new={}fork,vindic.items():ifvisNoneorisinstance(k,str)andk.startswith('_'):passelifisinstance(v,(list,tuple))andv:ifisinstance(v[0],INT):new[k]=[int(x)forxinv]elifisinstance(v[0],FLOAT):new[k]=[float(x)forxinv]elifisinstance(v[0],numpy.bytes_):new[k]=json.dumps(decode(v))else:new[k]=json.dumps(v)elifisinstance(v,FLOAT):new[k]=float(v)elifisinstance(v,INT):new[k]=int(v)elifhasattr(v,'tolist'):lst=v.tolist()iflstandisinstance(lst[0],bytes):new[k]=json.dumps(decode_array(v))else:new[k]=json.dumps(lst)elifisinstance(v,dict):new[k]=dumps(v)elifhasattr(v,'__dict__'):new[k]={cls2dotname(v.__class__):dumps(vars(v))}else:new[k]=json.dumps(v)return"{%s}"%','.join('\n"%s": %s'%itforitinnew.items())
[docs]defset_shape_descr(hdf5file,dsetname,kw):""" Set shape attributes on a dataset (and possibly other attributes) """dset=hdf5file[dsetname]S=len(dset.shape)iflen(kw)<S:raiseValueError('The dataset %s has %d dimensions but you passed %d'' axis'%(dsetname,S,len(kw)))keys=list(kw)fields,extra=keys[:S],keys[S:]dic=dict(shape_descr=fields)forfinfields:dic[f]=kw[f]dset.attrs['json']=dumps(dic)foreinextra:dset.attrs[e]=kw[e]
[docs]defget_shape_descr(json_string):""" :param json_string: JSON string containing the shape_descr :returns: a dictionary field -> values extracted from the shape_descr """dic=json.loads(json_string)forfieldindic['shape_descr']:val=dic[field]ifisinstance(val,INT):dic[field]=list(range(val))returndic
[docs]classArrayWrapper(object):""" A pickleable and serializable wrapper over an array, HDF5 dataset or group :param array: an array (or the empty tuple) :param attrs: metadata of the array (or dictionary of arrays) """
[docs]@classmethoddeffrom_(cls,obj,extra='value'):ifisinstance(obj,cls):# it is already an ArrayWrapperreturnobjelifinspect.isgenerator(obj):array,attrs=(),dict(obj)elifhasattr(obj,'__toh5__'):returnobjelifhasattr(obj,'attrs'):# is a datasetarray,attrs=obj[()],dict(obj.attrs)if'json'inattrs:attrs.update(get_shape_descr(attrs.pop('json')))else:# assume obj is an arrayarray,attrs=obj,{}returncls(array,attrs,(extra,))
def__init__(self,array,attrs,extra=('value',)):vars(self).update(attrs)self.extra=list(extra)iflen(array):self.array=arraydef__iter__(self):ifhasattr(self,'array'):returniter(self.array)else:returniter(vars(self).items())def__len__(self):ifhasattr(self,'array'):returnlen(self.array)else:returnlen(vars(self))def__getitem__(self,idx):ifisinstance(idx,str)andidxinself.__dict__:returngetattr(self,idx)returnself.array[idx]def__setitem__(self,idx,val):ifisinstance(idx,str)andidxinself.__dict__:setattr(self,idx,val)else:self.array[idx]=valdef__toh5__(self):returnvars(self),{}def__fromh5__(self,dic,attrs):fork,vindic.items():ifisinstance(v,h5py.Dataset):arr=v[()]ifisinstance(arr,INT):arr=numpy.arange(arr)eliflen(arr)andisinstance(arr[0],bytes):arr=decode(arr)setattr(self,k,arr)else:setattr(self,k,v)vars(self).update(attrs)def__repr__(self):ifhasattr(self,'shape_descr'):sd=decode(self.shape_descr)lst=['%s=%d'%(des,size)fordes,sizeinzip(sd,self.shape)]return'<%s(%s)>'%(self.__class__.__name__,', '.join(lst))elifhasattr(self,'shape'):return'<%s%s>'%(self.__class__.__name__,self.shape)else:return'<%s%d bytes>'%(self.__class__.__name__,len(self.array))@propertydefdtype(self):"""dtype of the underlying array"""returnself.array.dtype@propertydefshape(self):"""shape of the underlying array"""returnself.array.shapeifhasattr(self,'array')else()
[docs]deftoml(self):""" :returns: a TOML string representation of the ArrayWrapper """ifself.shape:returntoml.dumps(self.array)dic={}fork,vinvars(self).items():ifk.startswith('_'):continueelifk=='json':dic.update(json.loads(bytes(v)))else:dic[k]=vreturntoml.dumps(dic)
[docs]defto_dframe(self,skip_zeros=True):""" Convert an ArrayWrapper with shape (D1, ..., DN) and attributes (T1, ..., TN) which are list of tags of lenghts (D1, ..., DN) into a DataFrame with rows (tag1, ..., tagN, value) of maximum length D1 * ... * DN. Zero values are discarded. >>> from pprint import pprint >>> dic = dict(shape_descr=['taxonomy', 'occupancy'], ... taxonomy=['RC', 'WOOD'], ... occupancy=['RES', 'IND', 'COM']) >>> arr = numpy.zeros((2, 3)) >>> arr[0, 0] = 2000 >>> arr[0, 1] = 5000 >>> arr[1, 0] = 500 >>> aw = ArrayWrapper(arr, dic) >>> pprint(aw.to_dframe()) taxonomy occupancy value 0 RC RES 2000.0 1 RC IND 5000.0 2 WOOD RES 500.0 It is also possible to pass M > 1 extra fields an convert an array of shape (D1, ..., DN, M) and attributes (T1, ..., TN) into a DataFrame with rows (tag1, ..., tagN, value1, ..., valueM). >>> dic = dict(shape_descr=['taxonomy'], taxonomy=['RC', 'WOOD']) >>> aw = ArrayWrapper(arr, dic, ['RES', 'IND', 'COM']) >>> pprint(aw.to_dframe()) taxonomy RES IND COM 0 RC 2000.0 5000.0 0.0 1 WOOD 500.0 0.0 0.0 """ifhasattr(self,'array'):names=self.array.dtype.namesifnames:# wrapper over a structured arrayreturnpandas.DataFrame({n:self[n]forninnames})ifhasattr(self,'json'):vars(self).update(json.loads(self.json))shape=self.shapetup=len(self.extra)>1iftup:ifshape[-1]!=len(self.extra):raiseValueError('There are %d extra-fields but %d dimensions in %s'%(len(self.extra),shape[-1],self))shape_descr=tuple(decode(d)fordinself.shape_descr)extra=tuple(decode(d)fordinself.extra)fields=shape_descr+extraout=[]tags=[]idxs=[]fori,tagnameinenumerate(shape_descr):values=getattr(self,tagname)iflen(values)!=shape[i]:raiseValueError('The tag %s with %d values is inconsistent with %s'%(tagname,len(values),self))tags.append(decode_array(values))idxs.append(range(len(values)))foridx,valuesinzip(itertools.product(*idxs),itertools.product(*tags)):val=self.array[idx]ifisinstance(val,numpy.ndarray):tup=tuple(val)else:tup=(val,)ifskip_zeros:ifsum(tup):out.append(values+tup)else:out.append(values+tuple(0ifx==0elsexforxintup))returnpandas.DataFrame(out,columns=fields)
[docs]defto_dict(self):""" Convert the public attributes into a dictionary """return{k:vfork,vinvars(self).items()ifnotk.startswith('_')}
[docs]defdecode_array(values):""" Decode the values which are bytestrings. """out=[]forvalinvalues:try:out.append(val.decode('utf8'))exceptAttributeError:out.append(val)returnout
[docs]defparse_comment(comment):""" Parse a comment of the form `investigation_time=50.0, imt="PGA", ...` and returns it as pairs of strings: >>> parse_comment('''path=['b1'], time=50.0, imt="PGA"''') [('path', ['b1']), ('time', 50.0), ('imt', 'PGA')] """ifcomment[0]=='"'andcomment[-1]=='"':comment=comment[1:-1]try:dic=toml.loads('{%s}'%comment.replace('""','"'))excepttoml.TomlDecodeErroraserr:raiseValueError('%s in %s'%(err,comment))returnlist(dic.items())
[docs]defbuild_dt(dtypedict,names,fname):""" Build a composite dtype for a list of names and dictionary name -> dtype with a None entry corresponding to the default dtype. """lst=[]fornameinnames:try:dt=dtypedict[name]exceptKeyError:ifNoneindtypedict:dt=dtypedict[None]else:raiseInvalidFile('%s: missing dtype for field %r'%(fname,name))lst.append((name,vstrifdtisstrelsedt))returnnumpy.dtype(lst)
[docs]defcheck_length(field,size):""" :param field: a bytes field in the exposure :param size: maximum size of the field :returns: a function checking that the value is below the size """defcheck(val):iflen(val)>size:raiseValueError('%s=%r has length %d > %d'%(field,val,len(val),size))returnvalreturncheck
def_read_csv(fileobj,compositedt,usecols=None):dic={}conv={}fornameincompositedt.names:dt=compositedt[name]# NOTE: pandas.read_csv raises a warning and ignores a field dtype if a# converter for the same field is givenifdt.kind=='S':# byte-fieldsconv[name]=check_length(name,dt.itemsize)else:dic[name]=dtdf=pandas.read_csv(fileobj,names=compositedt.names,converters=conv,dtype=dic,usecols=usecols,keep_default_na=False,na_filter=False)returndf
[docs]deffind_error(fname,errors,dtype):""" Given a CSV file with an error, parse it with the csv.reader and get a better exception including the first line with an error """withopen(fname,encoding='utf-8-sig',errors=errors)asf:reader=csv.reader(f)start=1whileTrue:names=next(reader)# headerstart+=1ifnotnames[0].startswith('#'):breaktry:fori,rowinenumerate(reader,start):forname,valinzip(names,row):numpy.array([val],dtype[name])exceptExceptionasexc:exc.lineno=iexc.line=','.join(row)returnexc
[docs]defsniff(fnames,sep=',',ignore=set()):""" Read the first line of a set of CSV files by stripping the pre-headers. :returns: a list of CSVFile namedtuples. """common=Nonefiles=[]forfnameinfnames:withopen(fname,encoding='utf-8-sig',errors='ignore')asf:skip=0whileTrue:first=next(f)iffirst.startswith('#'):skip+=1continuebreakheader=first.strip().split(sep)ifcommonisNone:common=set(header)else:common&=set(header)files.append(CSVFile(fname,header,common,os.path.getsize(fname),skip))common-=ignoreassertcommon,'There is no common header subset among %s'%fnamesreturnfiles
# NB: it would be nice to use numpy.loadtxt(# f, build_dt(dtypedict, header), delimiter=sep, ndmin=1, comments=None)# however numpy does not support quoting, and "foo,bar" would be split :-(
[docs]defread_csv(fname,dtypedict={None:float},renamedict={},sep=',',index=None,errors=None,usecols=None):""" :param fname: a CSV file with an header and float fields :param dtypedict: a dictionary fieldname -> dtype, None -> default :param renamedict: aliases for the fields to rename :param sep: separator (default comma) :param index: if not None, returns a pandas DataFrame :param errors: passed to the underlying open function (default None) :param usecols: columns to read :returns: an ArrayWrapper, unless there is an index """attrs={}withopen(fname,encoding='utf-8-sig',errors=errors)asf:whileTrue:first=next(f)iffirst.startswith('#'):attrs=dict(parse_comment(first.strip('#,\n ')))continuebreakheader=first.strip().split(sep)dt=build_dt(dtypedict,header,fname)try:df=_read_csv(f,dt,usecols)exceptExceptionasexc:err=find_error(fname,errors,dt)iferr:raiseInvalidFile('%s: %s\nline:%d:%s'%(fname,err,err.lineno,err.line))else:raiseInvalidFile('%s: %s'%(fname,exc))arr=numpy.zeros(len(df),dt)forcolindf.columns:arr[col]=df[col].to_numpy()ifrenamedict:newnames=[]fornameinarr.dtype.names:new=renamedict.get(name,name)newnames.append(new)arr.dtype.names=newnamesifindex:df=pandas.DataFrame.from_records(arr,index)vars(df).update(attrs)returndfreturnArrayWrapper(arr,attrs)
def_fix_array(arr,key):""" :param arr: array or array-like object :param key: string associated to the error (appear in the error message) If `arr` is a numpy array with dtype object containing strings, convert it into a numpy array containing bytes, unless it has more than 2 dimensions or contains non-strings (these are errors). Return `arr` unchanged in the other cases. """ifarrisNone:return()ifnotisinstance(arr,numpy.ndarray):returnarrifarr.dtype.names:# for extract_assets d[0] is the pair# ('id', ('|S50', {'h5py_encoding': 'ascii'}))# this is a horrible workaround for the h5py 2.10.0 issue# https://github.com/numpy/numpy/issues/14142dtlist=[]fori,ninenumerate(arr.dtype.names):ifisinstance(arr.dtype.descr[i][1],tuple):dtlist.append((n,str(arr.dtype[n])))else:dtlist.append((n,arr.dtype[n]))arr.dtype=dtlistreturnarr
[docs]defsave_npz(obj,path):""" :param obj: object to serialize :param path: an .npz pathname """ifisinstance(obj,pandas.DataFrame):a={col:obj[col].to_numpy()forcolinobj.columns}else:a={}forkey,valinvars(obj).items():ifkey.startswith('_'):continueelifisinstance(val,str):# without this oq extract would faila[key]=val.encode('utf-8')else:a[key]=_fix_array(val,key)# turn into an error https://github.com/numpy/numpy/issues/14142withwarnings.catch_warnings():warnings.filterwarnings("error",category=UserWarning)numpy.savez_compressed(path,**a)
[docs]defobj_to_json(obj):""" :param obj: a Python object with a .__dict__ :returns: a JSON string """returndumps({cls2dotname(obj.__class__):vars(obj)})
[docs]defjson_to_obj(js):""" :param js: a JSON string with the form {"cls": {"arg1": ...}} :returns: an instance cls(arg1, ...) """[(dotname,attrs)]=json.loads(js).items()cls=dotname2cls(dotname)obj=cls.__new__(cls)vars(obj).update(attrs)returnobj