# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2010-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importosimportcsvimporttempfileimportnumpy# this is needed by the doctests, don't remove itimportpandasfromopenquake.baselib.nodeimportscientificformatFIVEDIGITS='%.5E'# recursive function used internally by build_headerdef_build_header(dtype,root):header=[]ifdtype.fieldsisNone:ifnotroot:return[]return[root+(str(dtype),dtype.shape)]forfieldindtype.names:dt=dtype.fields[field][0]ifdt.subdtypeisNone:# nestedheader.extend(_build_header(dt,root+(field,)))else:numpytype=str(dt.subdtype[0])header.append(root+(field,numpytype,dt.shape))returnheader
[docs]defbuild_header(dtype):""" Convert a numpy nested dtype into a list of strings suitable as header of csv file. >>> imt_dt = numpy.dtype([('PGA', numpy.float32, 3), ... ('PGV', numpy.float32, 4)]) >>> build_header(imt_dt) ['PGA:3', 'PGV:4'] >>> gmf_dt = numpy.dtype([('A', imt_dt), ('B', imt_dt), ... ('idx', numpy.uint32)]) >>> build_header(gmf_dt) ['A~PGA:3', 'A~PGV:4', 'B~PGA:3', 'B~PGV:4', 'idx'] """header=_build_header(dtype,())h=[]forcolinheader:name='~'.join(col[:-2])shape=col[-1]coldescr=nameifshape:coldescr+=':'+':'.join(map(str,shape))h.append(coldescr)returnh
[docs]defwrite_csv(dest,data,sep=',',fmt='%.6E',header=(),comment=None,renamedict=None):""" :param dest: None, file, filename or io.StringIO instance :param data: array to save :param sep: separator to use (default comma) :param fmt: formatting string (default '%12.8E') :param header: optional list with the names of the columns to display :param comment: optional comment dictionary """ifcommentisnotNone:comment=', '.join('%s=%r'%itemforitemincomment.items())close=TrueifdestisNone:# write on a temporary filefd,dest=tempfile.mkstemp(suffix='.csv')os.close(fd)ifhasattr(dest,'write'):# file-like object in append mode# it must be closed by client codeclose=Falseelifnothasattr(dest,'getvalue'):# assume dest is a filenamedest=open(dest,'w',newline='',encoding='utf-8')w=csv.writer(dest,delimiter=sep)try:# see if data is a composite numpy arraydata.dtype.fieldsexceptAttributeError:# not a composite arrayautoheader=[]else:autoheader=build_header(data.dtype)nfields=len(autoheader)orlen(header)orlen(data[0])ifcomment:w.writerow(['#']+['']*(nfields-2)+[comment])someheader=headerorautoheaderifheader!='no-header'andsomeheader:w.writerow(_header(someheader,renamedict))defformat(val):returnscientificformat(val,fmt)ifautoheader:all_fields=[col.split(':',1)[0].split('~')forcolinautoheader]forrecordindata:row=[]forfieldsinall_fields:val=extract_from(record,fields)iffields[0]in('lon','lat','depth'):row.append('%.5f'%val)else:row.append(format(val))w.writerow(_header(row,renamedict))else:forrowindata:w.writerow([format(col)forcolinrow])ifhasattr(dest,'getvalue'):returnelifclose:dest.close()returndest.name
[docs]classCsvWriter(object):""" Class used in the exporters to save a bunch of CSV files """def__init__(self,sep=',',fmt='%12.8E'):self.sep=sepself.fmt=fmtself.fnames=set()
[docs]defsave(self,data,fname,header=(),comment=None,renamedict=None):""" Save data on fname. :param data: numpy array, list of lists or pandas DataFrame :param fname: path name :param header: header to use :param comment: optional dictionary to be converted in a comment :param renamedict: a dictionary for renaming the columns """ifisinstance(data,pandas.DataFrame):ifcommentisNone:data.to_csv(fname,index=False,float_format=self.fmt,lineterminator='\r\n',na_rep='nan')else:write_csv(fname,[],self.sep,self.fmt,list(data.columns),comment=comment)data.to_csv(fname,index=False,float_format=self.fmt,lineterminator='\r\n',na_rep='nan',header=False,mode='a')else:write_csv(fname,data,self.sep,self.fmt,header,comment,renamedict)self.fnames.add(getattr(fname,'name',fname))
[docs]defsave_block(self,data,dest):""" Save data on dest, which is a file open in 'a' mode """write_csv(dest,data,self.sep,self.fmt,'no-header')
[docs]defgetsaved(self):""" Returns the list of files saved by this CsvWriter """returnsorted(self.fnames)
[docs]defcastable_to_int(s):""" Return True if the string `s` can be interpreted as an integer """try:int(s)exceptValueError:returnFalseelse:returnTrue
[docs]defparse_header(header):""" Convert a list of the form `['fieldname:fieldtype:fieldsize',...]` into a numpy composite dtype. The parser understands headers generated by :func:`openquake.baselib.writers.build_header`. Here is an example: >>> parse_header(['PGA:float32', 'PGV', 'avg:float32:2']) (['PGA', 'PGV', 'avg'], dtype([('PGA', '<f4'), ('PGV', '<f4'), ('avg', '<f4', (2,))])) :params header: a list of type descriptions :returns: column names and the corresponding composite dtype """triples=[]fields=[]forcol_strinheader:col=col_str.strip().split(':')n=len(col)ifn==1:# default dtype and no shapecol=[col[0],'float32','']elifn==2:ifcastable_to_int(col[1]):# default dtype and shapecol=[col[0],'float32',col[1]]else:# dtype and no shapecol=[col[0],col[1],'']elifn>3:raiseValueError('Invalid column description: %s'%col_str)field=col[0]numpytype=col[1]shape=()ifnotcol[2].strip()else(int(col[2]),)triples.append((field,numpytype,shape))fields.append(field)returnfields,numpy.dtype(triples)
if__name__=='__main__':# pretty print of NRML filesimportsysimportshutilfromopenquake.hazardlibimportnrmlnrmlfiles=sys.argv[1:]forfnameinnrmlfiles:node=nrml.read(fname)shutil.copy(fname,fname+'.bak')withopen(fname,'w')asout:nrml.write(list(node),out)