Source code for openquake.engine.engine

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2010-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

"""Engine: A collection of fundamental functions for initializing and running
calculations."""

import os
import sys
import time
import getpass
import itertools
import operator
import traceback
from contextlib import contextmanager
from datetime import datetime

import celery.task.control

import openquake.engine

from django.core import exceptions
from django import db as django_db

from openquake.baselib.performance import PerformanceMonitor
from openquake.engine import logs
from openquake.server.db import models
from openquake.engine.utils import config, tasks
from openquake.engine.celery_node_monitor import CeleryNodeMonitor
from openquake.server.db.schema.upgrades import upgrader

from openquake import hazardlib, risklib, commonlib

from openquake.commonlib import readinput, valid, datastore, export


[docs]def get_calc_id(job_id=None): """ Return the latest calc_id by looking both at the datastore and the database. """ calcs = datastore.get_calc_ids(datastore.DATADIR) calc_id = 1 if not calcs else calcs[-1] if job_id is None: try: job_id = models.OqJob.objects.latest('id').id except exceptions.ObjectDoesNotExist: job_id = 1 return max(calc_id, job_id)
INPUT_TYPES = set(dict(models.INPUT_TYPE_CHOICES)) UNABLE_TO_DEL_HC_FMT = 'Unable to delete hazard calculation: %s' UNABLE_TO_DEL_RC_FMT = 'Unable to delete risk calculation: %s' TERMINATE = valid.boolean( config.get('celery', 'terminate_workers_on_revoke') or 'false') USE_CELERY = valid.boolean(config.get('celery', 'use_celery') or 'false')
[docs]class InvalidCalculationID(Exception): pass
RISK_HAZARD_MAP = dict( scenario_risk=['scenario', 'scenario_risk'], scenario_damage=['scenario', 'scenario_damage'], classical_risk=['classical', 'classical_risk'], classical_bcr=['classical', 'classical_bcr'], classical_damage=['classical', 'classical_damage'], event_based_risk=['event_based', 'event_based_risk']) # this is called only if USE_CELERY is True
[docs]def cleanup_after_job(job, terminate, task_ids=()): """ Release the resources used by an openquake job. In particular revoke the running tasks (if any). :param int job_id: the job id :param bool terminate: the celery revoke command terminate flag :param task_ids: celery task IDs """ # Using the celery API, terminate and revoke and terminate any running # tasks associated with the current job. if task_ids: logs.LOG.warn('Revoking %d tasks', len(task_ids)) else: # this is normal when OQ_NO_DISTRIBUTE=1 logs.LOG.debug('No task to revoke') for tid in task_ids: celery.task.control.revoke(tid, terminate=terminate) logs.LOG.debug('Revoked task %s', tid)
@contextmanager
[docs]def job_stats(job): """ A context manager saving information such as the number of sites in the job_stats table. The information is saved at the end of the job, even if the job fails. """ js = job.jobstats try: yield finally: tb = traceback.format_exc() # get the traceback of the error, if any job.is_running = False if tb != 'None\n': # rollback the transactions; unfortunately, for mysterious reasons, # this is not enough and an OperationalError may still show up in # the finalization phase when forks are involved for conn in django_db.connections.all(): conn.rollback() # try to save the job stats on the database and then clean up; # if there was an error in the calculation, this part may fail; # in such a situation, we simply log the cleanup error without # taking further action, so that the real error can propagate try: job.save() js.stop_time = datetime.utcnow() js.save() if USE_CELERY: cleanup_after_job(job, TERMINATE, tasks.OqTaskManager.task_ids) except: # log the finalization error only if there is not real error if tb == 'None\n': logs.LOG.error('finalizing', exc_info=True) # log the real error, if any if tb != 'None\n': try: logs.LOG.critical(tb) except: # an OperationalError may always happen sys.stderr.write(tb)
[docs]def create_job(user_name="openquake", log_level='progress', hc_id=None): """ Create job for the given user, return it. :param str username: Username of the user who owns/started this job. If the username doesn't exist, a user record for this name will be created. :param str log_level: Defaults to 'progress'. Specify a logging level for this job. This level can be passed, for example, from the command line interface using the `--log-level` directive. :param hc_id: If not None, then the created job is a risk job :returns: :class:`openquake.server.db.models.OqJob` instance. """ job = models.OqJob.objects.create( id=get_calc_id() + 1, user_name=user_name, log_level=log_level, oq_version=openquake.engine.__version__, hazardlib_version=hazardlib.__version__, risklib_version=risklib.__version__, commonlib_version=commonlib.__version__) if hc_id: job.hazard_calculation = models.OqJob.objects.get(pk=hc_id) return job
[docs]class EnginePerformanceMonitor(PerformanceMonitor): """ PerformanceMonitor that writes both in the datastore and in the database """
[docs] def flush(self): curs = models.getcursor('job_init') data = PerformanceMonitor.flush(self) for rec in data: curs.execute("""INSERT INTO uiapi.performance (oq_job_id, operation, start_time, duration, pymemory) VALUES (%s, %s, %s, %s, %s)""", ( self.job_id, rec['operation'], self.start_time, rec['time_sec'], rec['memory_mb']))
# used by bin/openquake and openquake.server.views
[docs]def run_calc(job, log_level, log_file, exports, hazard_calculation_id=None): """ Run a calculation. :param job: :class:`openquake.server.db.model.OqJob` instance :param str log_level: The desired logging level. Valid choices are 'debug', 'info', 'progress', 'warn', 'error', and 'critical'. :param str log_file: Complete path (including file name) to file where logs will be written. If `None`, logging will just be printed to standard output. :param exports: A comma-separated string of export types. """ # let's import the calculator classes here, when they are needed; # the reason is that the command `$ oq-engine --upgrade-db` # does not need them and would raise strange errors during installation # time if the PYTHONPATH is not set and commonlib is not visible from openquake.calculators import base calculator = base.calculators(job.get_oqparam(), calc_id=job.id) calculator.job = job calculator.monitor = EnginePerformanceMonitor( '', calculator.datastore.hdf5path) calculator.monitor.job_id = job.id # first of all check the database version and exit if the db is outdated upgrader.check_versions(django_db.connections['admin']) with logs.handle(job, log_level, log_file), job_stats(job): # run the job _do_run_calc(calculator, exports, hazard_calculation_id) job.ds_calc_dir = calculator.datastore.calc_dir job.save() expose_outputs(calculator.datastore, job) return calculator
def _do_run_calc(calc, exports, hazard_calculation_id): """ Step through all of the phases of a calculation, updating the job status at each phase. :param calc: An :class:`~openquake.engine.calculators.base.Calculator` instance. :param exports: a (potentially empty) comma-separated string of export targets """ calc.save_params() calc.run(exports=exports, hazard_calculation_id=hazard_calculation_id) calc.job.status = 'complete'
[docs]def del_calc(job_id): """ Delete a calculation and all associated outputs. :param job_id: ID of a :class:`~openquake.server.db.models.OqJob`. """ try: job = models.OqJob.objects.get(id=job_id) except exceptions.ObjectDoesNotExist: raise RuntimeError('Unable to delete hazard calculation: ' 'ID=%s does not exist' % job_id) user = getpass.getuser() if job.user_name == user: # we are allowed to delete this # but first, check if any risk calculations are referencing any of our # outputs, or the hazard calculation itself msg = UNABLE_TO_DEL_HC_FMT % ( 'The following risk calculations are referencing this hazard' ' calculation: %s') assoc_outputs = models.OqJob.objects.filter(hazard_calculation=job) if assoc_outputs.count() > 0: raise RuntimeError( msg % ', '.join(str(x.id) for x in assoc_outputs)) # No risk calculation are referencing what we want to delete. # Carry on with the deletion. Notice that we cannot use job.delete() # directly because Django is so stupid that it reads from the database # all the records to delete before deleting them: thus, it runs out # of memory for large calculations curs = models.getcursor('admin') curs.execute('DELETE FROM uiapi.oq_job WHERE id=%s', (job_id,)) else: # this doesn't belong to the current user raise RuntimeError(UNABLE_TO_DEL_HC_FMT % 'Access denied') try: os.remove(job.ds_calc_dir + '.hdf5') except: pass else: print('Removed %s' % job.ds_calc_dir + '.hdf5')
[docs]def list_outputs(job_id, full=True): """ List the outputs for a given :class:`~openquake.server.db.models.OqJob`. :param job_id: ID of a calculation. :param bool full: If True produce a full listing, otherwise a short version """ outputs = get_outputs(job_id) if models.oqparam(job_id).calculation_mode == 'scenario': # ignore SES output outputs = [o for o in outputs if o.output_type != 'ses'] print_outputs_summary(outputs, full)
# this function is called only by openquake_cli.py, not by the engine server
[docs]def run_job(cfg_file, log_level, log_file, exports='', hazard_output_id=None, hazard_calculation_id=None): """ Run a job using the specified config file and other options. :param str cfg_file: Path to calculation config (INI-style) files. :param str log_level: 'debug', 'info', 'warn', 'error', or 'critical' :param str log_file: Path to log file. :param exports: A comma-separated string of export types requested by the user. Currently only 'xml' is supported. """ # first of all check the database version and exit if the db is outdated upgrader.check_versions(django_db.connections['admin']) with CeleryNodeMonitor(openquake.engine.no_distribute(), interval=3): job = job_from_file( cfg_file, getpass.getuser(), log_level, exports, hazard_output_id=hazard_output_id, hazard_calculation_id=hazard_calculation_id) job.ds_calc_dir = datastore.DataStore(job.id).calc_dir job.save() t0 = time.time() run_calc(job, log_level, log_file, exports, hazard_calculation_id=hazard_calculation_id) duration = time.time() - t0 if job.status == 'complete': print_results(job.id, duration, list_outputs) else: sys.exit('Calculation %s failed' % job.id) return job
DISPLAY_NAME = dict(dmg_by_asset='dmg_by_asset_and_collapse_map')
[docs]def expose_outputs(dstore, job): """ Build a correspondence between the outputs in the datastore and the ones in the database. :param dstore: a datastore instance :param job: an OqJob instance """ exportable = set(ekey[0] for ekey in export.export) # small hack: remove the sescollection outputs from scenario # calculators, as requested by Vitor calcmode = job.get_param('calculation_mode') if 'scenario' in calcmode and 'sescollection' in exportable: exportable.remove('sescollection') uhs = job.get_param('uniform_hazard_spectra', False) if uhs and 'hmaps' in dstore: out = models.Output.objects.create_output( job, 'uhs', output_type='datastore') out.ds_key = 'uhs' out.save() for key in dstore: if key in exportable: if key == 'realizations' and len(dstore['realizations']) == 1: continue # there is no point in exporting a single realization out = models.Output.objects.create_output( job, DISPLAY_NAME.get(key, key), output_type='datastore') out.ds_key = key out.save()
[docs]def check_hazard_risk_consistency(haz_job, risk_mode): """ Make sure that the provided hazard job is the right one for the current risk calculator. :param job: an OqJob instance referring to the previous hazard calculation :param risk_mode: the `calculation_mode` string of the current risk calculation """ # check for obsolete calculation_mode if risk_mode in ('classical', 'event_based', 'scenario'): raise ValueError('Please change calculation_mode=%s into %s_risk ' 'in the .ini file' % (risk_mode, risk_mode)) # check calculation_mode consistency prev_mode = haz_job.get_param('calculation_mode') ok_mode = RISK_HAZARD_MAP[risk_mode] if prev_mode not in ok_mode: raise InvalidCalculationID( 'In order to run a risk calculation of kind %r, ' 'you need to provide a calculation of kind %r, ' 'but you provided a %r instead' % (risk_mode, ok_mode, prev_mode))
@django_db.transaction.atomic
[docs]def job_from_file(cfg_file, username, log_level='info', exports='', hazard_output_id=None, hazard_calculation_id=None, **extras): """ Create a full job profile from a job config file. :param str cfg_file: Path to the job.ini files. :param str username: The user who will own this job profile and all results. :param str log_level: Desired log level. :param exports: Comma-separated sting of desired export types :param hazard_output_id: Hazard output ID :param hazard_calculation_id: Hazard calculation ID :params extras: Extra parameters (used only in the tests to override the params) :returns: :class:`openquake.server.db.models.OqJob` object :raises: `RuntimeError` if the input job configuration is not valid """ from openquake.calculators import base # create the current job job = create_job(username, log_level, hazard_calculation_id) models.JobStats.objects.create(oq_job=job) with logs.handle(job, log_level): # read calculation params and create the calculation profile params = readinput.get_params([cfg_file]) params.update(extras) # build and validate an OqParam object oqparam = readinput.get_oqparam(params, calculators=base.calculators) job.save_params(vars(oqparam)) job.save() return job
# this is patched in the tests
[docs]def get_outputs(job_id): """ :param job_id: ID of a calculation. :returns: A sequence of :class:`openquake.server.db.models.Output` objects """ return models.Output.objects.filter(oq_job=job_id)