Source code for openquake.engine.performance

import os

from openquake.commonlib.parallel import PerformanceMonitor
from openquake.engine import logs
from openquake.engine.db import models


class EnginePerformanceMonitor(PerformanceMonitor):
    """
    Performance monitor specialized for the engine. It takes in input a
    string, a job_id, and a celery task; the on_exit method
    send the relevant info to the uiapi.performance table.
    For efficiency reasons the saving on the database is delayed and
    done in chunks of 1,000 rows each. That means that hundreds of
    concurrents task can log simultaneously on the uiapi.performance table
    without problems. You can save more often by calling the .cache.flush()
    method; it is automatically called for you by the oqtask decorator;
    it is also called at the end of the main engine process.
    """
    # the monitor can also be used to measure the memory in postgres;
    # to that aim extract the pid with
    # connections['job_init'].cursor().connection.get_backend_pid()

    @classmethod
    def monitor(cls, method):
        """
        A decorator to add monitoring to calculator methods. The only
        constraints are:
        1) the method has no keyword arguments
        2) there is an attribute self.job.id
        """
        def newmeth(self, *args):
            with cls(method.__name__, self.job.id, autoflush=True):
                return method(self, *args)
        newmeth.__name__ = method.__name__
        return newmeth

    def __init__(self, operation, job_id, task=None, tracing=False,
                 measuremem=True, autoflush=False):
        self.measuremem = measuremem
        pid = os.getpid() if measuremem else None
        super(EnginePerformanceMonitor, self).__init__(
            operation, pid, autoflush=autoflush)
        self.job_id = job_id
        if task:
            self.task = task
        else:
            self.task = None
        self.tracing = tracing
        if tracing:
            self.tracer = logs.tracing(operation)

    @property
    def task_id(self):
        """Return the celery task ID or None"""
        return None if self.task is None else self.task.request.id

    def __call__(self, operation, task=None, **kw):
        """
        Return a copy of the monitor usable for a different operation
        in the same task.
        """
        new = self.__class__(operation, self.job_id, task or self.task,
                             self.tracing, self.measuremem, self.autoflush)
        vars(new).update(kw)
        return new

    def __enter__(self):
        # start measuring time and memory
        super(EnginePerformanceMonitor, self).__enter__()
        if self.tracing:
            self.tracer.__enter__()
        return self

    def __exit__(self, etype, exc, tb):
        # measuring time and memory
        super(EnginePerformanceMonitor, self).__exit__(etype, exc, tb)
        if self.tracing:
            self.tracer.__exit__(etype, exc, tb)

    def on_exit(self):
        """
        Save the memory consumption on the uiapi.performance table.
        """
        if self.autoflush and self.exc is None:  # save only valid measures
            self.flush()

    def flush(self):
        """Save a row in the performance table"""
        models.Performance.objects.create(
            oq_job_id=self.job_id,
            task_id=self.task_id,
            task=getattr(self.task, '__name__', None),
            operation=self.operation,
            start_time=self.start_time,
            duration=self.duration,
            pymemory=self.mem if self.measuremem else None,
            pgmemory=None)
        self.mem = 0
        self.duration = 0