import os
import time
from datetime import datetime
from multiprocessing.connection import Client

import numpy

from openquake.baselib.general import humansize
from openquake.baselib import hdf5

import psutil
if psutil.__version__ > '2.0.0':  # Ubuntu 14.10
    def virtual_memory():
        return psutil.virtual_memory()

    def memory_info(proc):
        return proc.memory_info()

elif psutil.__version__ >= '1.2.1':  # Ubuntu 14.04
    def virtual_memory():
        return psutil.virtual_memory()

    def memory_info(proc):
        return proc.get_memory_info()

else:  # Ubuntu 12.04
def virtual_memory(): return psutil.phymem_usage()
def memory_info(proc): return proc.get_memory_info()
perf_dt = numpy.dtype([('operation', (bytes, 50)), ('time_sec', float), ('memory_mb', float), ('counts', int)]) # this is not thread-safe
class Monitor(object):
    """
    Measure the resident memory occupied by a list of processes during
    the execution of a block of code. Should be used as a context manager,
    as follows::

       with Monitor('do_something') as mon:
           do_something()
       print mon.mem

    At the end of the block the Monitor object will have the
    following 5 public attributes:

    .start_time: when the monitor started (a datetime object)
    .duration: time elapsed between start and stop (in seconds)
    .exc: usually None; otherwise the exception happened in the `with` block
    .mem: the memory delta in bytes

    The behaviour of the Monitor can be customized by subclassing it
    and by overriding the method on_exit(), called at end and used to
    display or store the results of the analysis.

    NB: if the .address attribute is set, it is possible for the monitor
    to send commands to that address, assuming there is a
    :class:`multiprocessing.connection.Listener` listening.
    """
    address = None
    authkey = None
    calc_id = None

    def __init__(self, operation='dummy', hdf5path=None,
                 autoflush=False, measuremem=False):
        self.operation = operation
        self.hdf5path = hdf5path
        self.autoflush = autoflush
        self.measuremem = measuremem
        self.mem = 0
        self.duration = 0
        self._start_time = self._stop_time = time.time()
        self.children = []
        self.counts = 0
        self.address = None

    @property
    def dt(self):
        """Last time interval measured"""
        return self._stop_time - self._start_time
def measure_mem(self):
        """A memory measurement (in bytes)"""
        proc = psutil.Process(os.getpid())
        try:
            return memory_info(proc).rss
        except psutil.AccessDenied:
            # no access to information about this process
            pass
@property def start_time(self): """ Datetime instance recording when the monitoring started """ return datetime.fromtimestamp(self._start_time)
def get_data(self):
        """
        :returns:
          an array of dtype perf_dt, with the information
          of the monitor (operation, time_sec, memory_mb, counts);
          the lenght of the array can be 0 (for counts=0) or 1 (otherwise).
        """
        data = []
        if self.counts:
            time_sec = self.duration
            memory_mb = self.mem / 1024. / 1024. if self.measuremem else 0
            data.append((self.operation, time_sec, memory_mb, self.counts))
        return numpy.array(data, perf_dt)
def __enter__(self): self.exc = None # exception self._start_time = time.time() if self.measuremem: self.start_mem = self.measure_mem() return self def __exit__(self, etype, exc, tb): self.exc = exc if self.measuremem: self.stop_mem = self.measure_mem() self.mem += self.stop_mem - self.start_mem self._stop_time = time.time() self.duration += self._stop_time - self._start_time self.counts += 1 self.on_exit()
def on_exit(self):
        "To be overridden in subclasses"
        if self.autoflush:
            self.flush()
def send(self, *args):
        """
        Send a command to the listener. Add the .calc_id as last argument.
        """
        if self.address:
            client = Client(self.address, authkey=self.authkey)
            try:
                client.send(args + (self.calc_id,))
            finally:
                client.close()
def flush(self):
        """
        Save the measurements on the performance file (or on stdout)
        """
        for child in self.children:
            child.flush()
        data = self.get_data()
        if len(data) == 0:  # no information
            return []
        elif self.hdf5path:
            hdf5.extend3(self.hdf5path, 'performance_data', data)
        # reset monitor
        self.duration = 0
        self.mem = 0
        self.counts = 0
        return data
# TODO: rename this as spawn; see what will break def __call__(self, operation='no operation', **kw): """ Return a child of the monitor usable for a different operation. """ child =, **kw) self.children.append(child) return child
def new(self, operation='no operation', **kw):
        """
        Return a copy of the monitor usable for a different operation.
        """
        self_vars = vars(self).copy()
        del self_vars['operation']
        del self_vars['children']
        del self_vars['counts']
        new = self.__class__(operation)
        vars(new).update(self_vars)
        vars(new).update(kw)
        return new
def __repr__(self): calc_id = ' #%s ' % self.calc_id if self.calc_id else ' ' msg = '%s%s%s' % (self.__class__.__name__, calc_id, self.operation) if self.measuremem: return '<%s, duration=%ss, memory=%s>' % ( msg, self.duration, humansize(self.mem)) elif self.duration: return '<%s, duration=%ss>' % (msg, self.duration) else: return '<%s>' % msg