# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4# Copyright (C) 2015-2025 GEM Foundation# OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.# OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU General Public License for more details.# You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importosimporttimeimportpstatsimportpickleimportsignalimportloggingimportgetpassimporttempfileimportoperatorimportitertoolsimportsubprocessimportcollectionsfromdatetimeimportdatetimefromcontextlibimportcontextmanagerfromdecoratorimportdecoratorimportpsutilimportnumpyimportpandasimportnumbafromopenquake.baselib.generalimporthumansize,fast_aggfromopenquake.baselibimporthdf5# NB: one can use vstr fields in extensible datasets, but then reading# them on-the-fly in SWMR mode will fail with an OSError:# Can't read data (address of object past end of allocation)# this is why below I am using '<S50' byte stringsperf_dt=numpy.dtype([('operation','<S50'),('time_sec',float),('memory_mb',float),('counts',int),('task_no',numpy.int16)])task_info_dt=numpy.dtype([('taskname','<S50'),('task_no',numpy.uint32),('weight',numpy.float32),('duration',numpy.float32),('received',numpy.int64),('mem_gb',numpy.float32)])F16=numpy.float16F64=numpy.float64I64=numpy.int64PStatData=collections.namedtuple('PStatData','ncalls tottime percall cumtime percall2 path')
[docs]@contextmanagerdefperf_stat():""" Profile the current process by using the linux `perf` command """p=subprocess.Popen(["perf","stat","-p",str(os.getpid())])time.sleep(0.5)yieldp.send_signal(signal.SIGINT)
[docs]defprint_stats(pr,fname):""" Print the stats of a Profile instance """withopen(fname,'w')asf:ps=pstats.Stats(pr,stream=f).sort_stats(pstats.SortKey.CUMULATIVE)ps.print_stats()
[docs]defget_pstats(pstatfile,n):""" Return profiling information as a list [(ncalls, cumtime, path), ...] :param pstatfile: path to a .pstat file :param n: the maximum number of stats to retrieve """withtempfile.TemporaryFile(mode='w+')asstream:ps=pstats.Stats(pstatfile,stream=stream)ps.sort_stats('cumtime')ps.print_stats(n)stream.seek(0)lines=list(stream)fori,lineinenumerate(lines):ifline.startswith(' ncalls'):breakdata=[]forlineinlines[i+2:]:columns=line.split()iflen(columns)==6:columns[-1]=os.path.basename(columns[-1])data.append(PStatData(*columns))rows=[(rec.ncalls,rec.cumtime,rec.path)forrecindata]returnrows
[docs]definit_performance(hdf5file,swmr=False):""" :param hdf5file: file name of hdf5.File instance """fname=isinstance(hdf5file,str)h5=hdf5.File(hdf5file,'a')iffnameelsehdf5fileif'performance_data'notinh5:hdf5.create(h5,'performance_data',perf_dt)if'task_info'notinh5:hdf5.create(h5,'task_info',task_info_dt)if'task_sent'notinh5:h5['task_sent']='{}'ifswmr:try:h5.swmr_mode=TrueexceptValueErrorasexc:raiseValueError('%s: %s'%(hdf5file,exc))iffname:h5.close()
[docs]defperformance_view(dstore):""" Returns the performance view as a numpy array. """pdata=dstore['performance_data']pdata.refresh()data=sorted(pdata[()],key=operator.itemgetter(0))out=[]foroperation,groupinitertools.groupby(data,operator.itemgetter(0)):counts=0time=0mem=0forrecingroup:counts+=rec['counts']time+=rec['time_sec']mem=max(mem,rec['memory_mb'])out.append((operation,time,mem,counts))out.sort(key=operator.itemgetter(1),reverse=True)# sort by timemems=dstore['task_info']['mem_gb']maxmem=', maxmem=%.1f GB'%mems.max()iflen(mems)else''ifhasattr(dstore,'calc_id'):operation='calc_%d%s'%(dstore.calc_id,maxmem)else:operation='operation'dtlist=[(operation,perf_dt['operation'])]dtlist.extend((n,perf_dt[n])forninperf_dt.names[1:-1])returnnumpy.array(out,dtlist)
[docs]defmemory_rss(pid):""" :returns: the RSS memory allocated by a process """try:returnpsutil.Process(pid).memory_info().rssexceptpsutil.NoSuchProcess:return0
[docs]defmemory_gb(pids=()):""" :params pids: a list or PIDs running on the same machine :returns: the total memory allocated by the current process and all the PIDs """returnsum(map(memory_rss,[os.getpid()]+list(pids)))/1024**3
# this is not thread-safe
[docs]classMonitor(object):""" Measure the resident memory occupied by a list of processes during the execution of a block of code. Should be used as a context manager, as follows:: with Monitor('do_something') as mon: do_something() print mon.mem At the end of the block the Monitor object will have the following 5 public attributes: .start_time: when the monitor started (a datetime object) .duration: time elapsed between start and stop (in seconds) .exc: usually None; otherwise the exception happened in the `with` block .mem: the memory delta in bytes The behaviour of the Monitor can be customized by subclassing it and by overriding the method on_exit(), called at end and used to display or store the results of the analysis. NB: if the .address attribute is set, it is possible for the monitor to send commands to that address, assuming there is a :class:`multiprocessing.connection.Listener` listening. """address=Noneauthkey=Nonecalc_id=Noneinject=None#config = configdef__init__(self,operation='',measuremem=False,inner_loop=False,h5=None,version=None,dbserver_host='127.0.0.1'):self.operation=operationself.measuremem=measurememself.inner_loop=inner_loopself.h5=h5self.version=versionself._mem=0self.duration=0self._start_time=self._stop_time=time.time()self.children=[]self.counts=0self.address=Noneself.username=getpass.getuser()self.task_no=-1# overridden in parallelself.dbserver_host=dbserver_host@propertydefmem(self):"""Mean memory allocation"""returnself._mem/(self.countsor1)@propertydefdt(self):"""Last time interval measured"""returnself._stop_time-self._start_time
[docs]defmeasure_mem(self):"""A memory measurement (in bytes)"""try:returnmemory_rss(os.getpid())exceptpsutil.AccessDenied:# no access to information about this processpass
@propertydefstart_time(self):""" Datetime instance recording when the monitoring started """returndatetime.fromtimestamp(self._start_time)def_get_data(self):data=[]ifself.counts:time_sec=self.durationmemory_mb=self.mem/1024./1024.ifself.measurememelse0data.append((self.operation,time_sec,memory_mb,self.counts,self.task_no))returnnumpy.array(data,perf_dt)
[docs]defget_data(self):""" :returns: an array of dtype perf_dt, with the information of the monitor (operation, time_sec, memory_mb, counts); the lenght of the array can be 0 (for counts=0) or 1 (otherwise). """ifnotself.children:data=self._get_data()else:lst=[self._get_data()]forchildinself.children:lst.append(child.get_data())child.reset()data=numpy.concatenate(lst,dtype=perf_dt)returndata
[docs]defsave_task_info(self,h5,res,name,mem_gb=0):""" Called by parallel.IterResult. :param h5: where to save the info :param res: a :class:`Result` object :param name: name of the task function :param mem_gb: memory consumption at the saving time (optional) """t=(name,self.task_no,self.weight,self.duration,len(res.pik),mem_gb)data=numpy.array([t],task_info_dt)hdf5.extend(h5['task_info'],data)h5['task_info'].flush()# notify the reader
[docs]defflush(self,h5):""" Save the measurements on the performance file """data=self.get_data()iflen(data):hdf5.extend(h5['performance_data'],data)h5['performance_data'].flush()# notify the readerself.reset()
# TODO: rename this as spawn; see what will breakdef__call__(self,operation='no operation',**kw):""" Return a child of the monitor usable for a different operation. """child=self.new(operation,**kw)self.children.append(child)returnchild
[docs]defnew(self,operation='no operation',**kw):""" Return a copy of the monitor usable for a different operation. """new=object.__new__(self.__class__)vars(new).update(vars(self),operation=operation,children=[],counts=0,mem=0,duration=0)vars(new).update(kw)returnnew
[docs]defsave(self,key,obj):""" :param key: key in the _tmp.hdf5 file :param obj: big object to store in pickle format :returns: True is saved, False if not because the key was taken """tmp=self.filename[:-5]+'_tmp.hdf5'f=hdf5.File(tmp,'a')ifos.path.exists(tmp)elsehdf5.File(tmp,'w')withf:ifkeyinf:# already savedreturnFalseifisinstance(obj,numpy.ndarray):f[key]=objelifisinstance(obj,pandas.DataFrame):f.create_df(key,obj)else:f[key]=pickle.dumps(obj,protocol=pickle.HIGHEST_PROTOCOL)returnTrue
[docs]defread(self,key,slc=slice(None)):""" :param key: key in the _tmp.hdf5 file :param slc: slice to read (default all) :return: unpickled object """tmp=self.filename[:-5]+'_tmp.hdf5'withhdf5.File(tmp,'r')asf:dset=f[key]if'__pdcolumns__'indset.attrs:returnf.read_df(key,slc=slc)elifdset.shape:returndset[slc]returnpickle.loads(dset[()])
[docs]defiter(self,genobj,atstop=lambda:None):""" :param genobj: a generator object :param atstop: optional thunk to call at StopIteration :yields: the elements of the generator object """whileTrue:try:self._mem=0withself:obj=next(genobj)exceptStopIteration:atstop()returnelse:yieldobj
[docs]defvectorize_arg(idx):""" Vectorize a function efficiently, if the argument with index `idx` contains many repetitions. """defcaller(func,*args):args=list(args)uniq,inv=numpy.unique(args[idx],return_inverse=True)res=[]forarginuniq:args[idx]=argres.append(func(*args))returnnumpy.array(res)[inv]returndecorator(caller)
# numba helpers# NB: without cache=True the tests would take hours!!
[docs]defjittable(func):"""Calls numba.njit with a cache"""jitfunc=numba.njit(func,error_model='numpy',cache=True)jitfunc.jittable=Truereturnjitfunc
[docs]defcompile(sigstr):""" Compile a function Ahead-Of-Time using the given signature string """returnnumba.njit(sigstr,error_model='numpy',cache=True)
# used when reading _rates/sid
[docs]@compile(["int64[:, :](uint8[:])","int64[:, :](uint16[:])","int64[:, :](uint32[:])","int64[:, :](int32[:])","int64[:, :](int64[:])"])defidx_start_stop(integers):# given an array of integers returns an array int64 of shape (n, 3)out=[]start=i=0prev=integers[0]fori,valinenumerate(integers[1:],1):ifval!=prev:out.append((I64(prev),start,i))start=iprev=valout.append((I64(prev),start,i+1))returnnumpy.array(out,I64)
[docs]@compile("int64[:, :](uint32[:], uint32)")defsplit_slices(integers,size):# given an array of integers returns an array int64 of shape (n, 2)out=[]start=i=0prev=integers[0]totsize=1fori,valinenumerate(integers[1:],1):totsize+=1ifval!=prevandtotsize>=size:out.append((start,i))totsize=0start=iprev=valout.append((start,i+1))returnnumpy.array(out,I64)
# this is used in split_array and it may dominate the performance# of classical calculations, so it has to be fast@compile(["uint32[:](uint32[:], int64[:], int64[:], int64[:])","uint32[:](uint16[:], int64[:], int64[:], int64[:])"])def_split(uint32s,indices,counts,cumcounts):n=len(uint32s)assertlen(indices)==nassertlen(counts)<=nout=numpy.zeros(n,numpy.uint32)foridx,valinzip(indices,uint32s):cumcounts[idx]-=1out[cumcounts[idx]]=valreturnout# 3-argument version tested in SplitArrayTestCase
[docs]defsplit_array(arr,indices,counts=None):""" :param arr: an array with N elements :param indices: a set of integers with repetitions :param counts: if None the indices MUST be ordered :returns: a list of K arrays, split on the integers >>> arr = numpy.array([.1, .2, .3, .4, .5]) >>> idx = numpy.array([1, 1, 2, 2, 3]) >>> split_array(arr, idx) [array([0.1, 0.2]), array([0.3, 0.4]), array([0.5])] """ifcountsisNone:# ordered indicesreturn[arr[s1:s2]fori,s1,s2inidx_start_stop(indices)]# indices and counts coming from numpy.unique(arr)# this part can be slow, but it is still 10x faster than pandas for EUR!cumcounts=counts.cumsum()out=_split(arr,indices,counts,cumcounts)return[out[s1:s2][::-1]fors1,s2inzip(cumcounts,cumcounts+counts)]
[docs]defkollapse(array,kfields,kround=kround0,mfields=(),afield=''):""" Given a structured array of N elements with a discrete kfield with K <= N unique values, returns a structured array of K elements obtained by averaging the values associated to the kfield. """k_array=kround(array,kfields)uniq,indices,counts=numpy.unique(k_array,return_inverse=True,return_counts=True)klist=[(k,k_array.dtype[k])forkinkfields]formfieldinmfields:klist.append((mfield,array.dtype[mfield]))res=numpy.zeros(len(uniq),klist)forkfieldinkfields:res[kfield]=uniq[kfield]formfieldinmfields:values=array[mfield]res[mfield]=fast_agg(indices,values)/(countsiflen(values.shape)==1elsecounts.reshape(-1,1))ifafield:returnres,split_array(array[afield],indices,counts)returnres