Source code for openquake.server.db.actions

#  -*- coding: utf-8 -*-
#  vim: tabstop=4 shiftwidth=4 softtabstop=4

#  Copyright (C) 2016 GEM Foundation

#  OpenQuake is free software: you can redistribute it and/or modify it
#  under the terms of the GNU Affero General Public License as published
#  by the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.

#  OpenQuake is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.

#  You should have received a copy of the GNU Affero General Public License
#  along with OpenQuake.  If not, see <http://www.gnu.org/licenses/>.
import os
import operator
from datetime import datetime

from django.core import exceptions
from django import db

from openquake.risklib import valid
from openquake.commonlib import datastore
from openquake.server.db import models
from openquake.server.db.schema.upgrades import upgrader
from openquake.server.db import upgrade_manager


[docs]class InvalidCalculationID(Exception): pass
RISK_HAZARD_MAP = dict( scenario_risk=['scenario', 'scenario_risk'], scenario_damage=['scenario', 'scenario_damage'], classical_risk=['classical', 'classical_risk'], classical_bcr=['classical', 'classical_bcr'], classical_damage=['classical', 'classical_damage'], event_based_risk=['event_based', 'event_based_risk']) INPUT_TYPES = set(dict(models.INPUT_TYPE_CHOICES)) UNABLE_TO_DEL_HC_FMT = 'Unable to delete hazard calculation: %s' UNABLE_TO_DEL_RC_FMT = 'Unable to delete risk calculation: %s'
[docs]def check_outdated(): """ Check if the db is outdated, called before starting anything """ return upgrader.check_versions(db.connection)
[docs]def reset_is_running(): """ Reset the flag job.is_running to False. This is called when the Web UI is re-started: the idea is that it is restarted only when all computations are completed. """ db.connection.cursor().execute( # reset the flag job.is_running 'UPDATE job SET is_running=0 WHERE is_running=1')
[docs]def create_job(calc_mode, description, user_name, datadir, hc_id=None): """ Create job for the given user, return it. :param str calc_mode: Calculation mode, such as classical, event_based, etc :param user_name: User who owns/started this job. :param datadir: Data directory of the user who owns/started this job. :param description: Description of the calculation :param hc_id: If not None, then the created job is a risk job :returns: :class:`openquake.server.db.models.OqJob` instance. """ calc_id = get_calc_id(datadir) + 1 job = models.OqJob.objects.create( id=calc_id, calculation_mode=calc_mode, description=description, user_name=user_name, ds_calc_dir=os.path.join('%s/calc_%s' % (datadir, calc_id))) if hc_id: job.hazard_calculation = models.get(models.OqJob, pk=hc_id) job.save() return job.id
[docs]def delete_uncompleted_calculations(user): """ Delete the uncompleted calculations of the given user """ for job in models.OqJob.objects.filter( oqjob__user_name=user).exclude( oqjob__status="complete"): del_calc(job.id, user)
[docs]def get_job_id(job_id, username): """ If job_id is negative, return the last calculation of the current user, otherwise returns the job_id unchanged. """ job_id = int(job_id) if job_id > 0: return job_id my_jobs = models.OqJob.objects.filter(user_name=username).order_by('id') n = my_jobs.count() if n == 0: # no jobs return else: # typically job_id is -1 return my_jobs[n + job_id].id
[docs]def get_calc_id(datadir, job_id=None): """ Return the latest calc_id by looking both at the datastore and the database. """ calcs = datastore.get_calc_ids(datadir) calc_id = 0 if not calcs else calcs[-1] if job_id is None: try: job_id = models.OqJob.objects.latest('id').id except exceptions.ObjectDoesNotExist: job_id = 0 return max(calc_id, job_id)
[docs]def list_calculations(job_type, user_name): """ Yield a summary of past calculations. :param job_type: 'hazard' or 'risk' """ jobs = [job for job in models.OqJob.objects.filter( user_name=user_name).order_by('start_time') if job.job_type == job_type] if len(jobs) == 0: yield 'None' else: yield ('job_id | status | start_time | ' ' description') for job in jobs: descr = job.description latest_job = job if latest_job.is_running: status = 'pending' else: if latest_job.status == 'complete': status = 'successful' else: status = 'failed' start_time = latest_job.start_time.strftime( '%Y-%m-%d %H:%M:%S %Z' ) yield ('%6d | %10s | %s| %s' % ( job.id, status, start_time, descr)).encode('utf-8')
[docs]def list_outputs(job_id, full=True): """ List the outputs for a given :class:`~openquake.server.db.models.OqJob`. :param job_id: ID of a calculation. :param bool full: If True produce a full listing, otherwise a short version """ outputs = get_outputs(job_id) return print_outputs_summary(outputs, full)
[docs]def get_outputs(job_id): """ :param job_id: ID of a calculation. :returns: A sequence of :class:`openquake.server.db.models.Output` objects """ return models.Output.objects.filter(oq_job=job_id)
DISPLAY_NAME = dict(dmg_by_asset='dmg_by_asset_and_collapse_map')
[docs]def create_outputs(job_id, dskeys): """ Build a correspondence between the outputs in the datastore and the ones in the database. :param job_id: ID of the current job :param dskeys: a list of datastore keys """ job = models.get(models.OqJob, pk=job_id) for key in dskeys: models.Output.objects.create_output( job, DISPLAY_NAME.get(key, key), ds_key=key)
[docs]def check_hazard_risk_consistency(haz_job, risk_mode): """ Make sure that the provided hazard job is the right one for the current risk calculator. :param job: an OqJob instance referring to the previous hazard calculation :param risk_mode: the `calculation_mode` string of the current risk calculation """ # check for obsolete calculation_mode if risk_mode in ('classical', 'event_based', 'scenario'): raise ValueError('Please change calculation_mode=%s into %s_risk ' 'in the .ini file' % (risk_mode, risk_mode)) # check calculation_mode consistency prev_mode = haz_job.calculation_mode ok_mode = RISK_HAZARD_MAP[risk_mode] if prev_mode not in ok_mode: raise InvalidCalculationID( 'In order to run a risk calculation of kind %r, ' 'you need to provide a calculation of kind %r, ' 'but you provided a %r instead' % (risk_mode, ok_mode, prev_mode))
[docs]def finish(job_id, status): """ Set the job columns `is_running`, `status`, and `stop_time` """ job = models.get(models.OqJob, pk=job_id) job.is_running = False job.status = status job.stop_time = datetime.utcnow() job.save()
[docs]def del_calc(job_id, user): """ Delete a calculation and all associated outputs. :param job_id: ID of a :class:`~openquake.server.db.models.OqJob`. """ try: job = models.get(models.OqJob, pk=job_id) except exceptions.ObjectDoesNotExist: raise RuntimeError('Unable to delete hazard calculation: ' 'ID=%s does not exist' % job_id) if job.user_name == user: # we are allowed to delete this # but first, check if any risk calculations are referencing any of our # outputs, or the hazard calculation itself msg = UNABLE_TO_DEL_HC_FMT % ( 'The following risk calculations are referencing this hazard' ' calculation: %s') assoc_outputs = models.OqJob.objects.filter(hazard_calculation=job) if assoc_outputs.count() > 0: raise RuntimeError( msg % ', '.join(str(x.id) for x in assoc_outputs)) job.delete() else: # this doesn't belong to the current user raise RuntimeError(UNABLE_TO_DEL_HC_FMT % 'Access denied') try: os.remove(job.ds_calc_dir + '.hdf5') except: # already removed or missing permission pass
[docs]def log(job_id, timestamp, level, process, message): """ Write a log record in the database """ db.connection.cursor().execute( 'INSERT INTO log (job_id, timestamp, level, process, message) VALUES' '(%s, %s, %s, %s, %s)', (job_id, timestamp, level, process, message))
[docs]def get_log(job_id): """ Extract the logs as a big string """ logs = models.Log.objects.filter(job=job_id).order_by('id') for log in logs: time = str(log.timestamp)[:-4] # strip decimals yield '[%s #%d %s] %s' % (time, job_id, log.level, log.message)
[docs]def get_output(output_id): """ :param output_id: ID of an Output object :returns: (ds_key, calc_id, dirname) """ out = models.get(models.Output, pk=output_id) return out.ds_key, out.oq_job.id, os.path.dirname(out.oq_job.ds_calc_dir)
[docs]def save_performance(job_id, records): """ Save in the database the performance information about the given job """ for rec in records: models.Performance.objects.create( job_id=job_id, operation=rec['operation'], time_sec=rec['time_sec'], memory_mb=rec['memory_mb'], counts=rec['counts'])
# used in make_report
[docs]def fetch(templ, *args): """ Run queries directly on the database. Return header + rows """ curs = db.connection.cursor() curs.execute(templ, args) header = [r[0] for r in curs.description] return [header] + curs.fetchall()
[docs]def get_dbpath(): """ Returns the path to the database file """ curs = db.connection.cursor() curs.execute('PRAGMA database_list') # return a row with fields (id, dbname, dbpath) return curs.fetchall()[0][-1]
# ########################## upgrade operations ########################## #
[docs]def what_if_I_upgrade(extract_scripts): db.connection.cursor() # bind the connection conn = db.connection.connection return upgrade_manager.what_if_I_upgrade( conn, extract_scripts=extract_scripts)
[docs]def version_db(): db.connection.cursor() # bind the connection conn = db.connection.connection return upgrade_manager.version_db(conn)
[docs]def upgrade_db(): db.connection.cursor() # bind the connection conn = db.connection.connection return upgrade_manager.upgrade_db(conn)
# ################### used in Web UI ######################## #
[docs]def calc_info(calc_id): """ :param calc_id: calculation ID :returns: dictionary of info about the given calculation """ job = models.get(models.OqJob, pk=calc_id) response_data = {} response_data['user_name'] = job.user_name response_data['status'] = job.status response_data['start_time'] = str(job.start_time) response_data['stop_time'] = str(job.stop_time) response_data['is_running'] = job.is_running return response_data
[docs]def get_calcs(request_get_dict, user_name, user_acl_on=False, id=None): """ :returns: list of tuples (job_id, user_name, job_status, job_type, job_is_running, job_description) """ # helper to get job+calculation data from the oq-engine database jobs = models.OqJob.objects.filter() # user_acl_on is true if settings.ACL_ON = True or when the user is a # Django super user if user_acl_on: jobs = jobs.filter(user_name=user_name) if id is not None: jobs = jobs.filter(id=id) if 'job_type' in request_get_dict: job_type = request_get_dict.get('job_type') jobs = jobs.filter(hazard_calculation__isnull=job_type == 'hazard') if 'is_running' in request_get_dict: is_running = request_get_dict.get('is_running') jobs = jobs.filter(is_running=valid.boolean(is_running)) if 'relevant' in request_get_dict: relevant = request_get_dict.get('relevant') jobs = jobs.filter(relevant=valid.boolean(relevant)) return [(job.id, job.user_name, job.status, job.job_type, job.is_running, job.description) for job in jobs.order_by('-id')]
[docs]def set_relevant(calc_id, flag): """ Set the `relevant` field of the given calculation record """ job = models.get(models.OqJob, pk=calc_id) job.relevant = flag job.save()
[docs]def log_to_json(log): """ Convert a log record into a list of strings """ return [log.timestamp.isoformat()[:22], log.level, log.process, log.message]
[docs]def get_log_slice(calc_id, start, stop): """ Get a slice of the calculation log as a JSON list of rows """ start = start or 0 stop = stop or None rows = models.Log.objects.filter(job_id=calc_id)[start:stop] return map(log_to_json, rows)
[docs]def get_log_size(calc_id): """ Get a slice of the calculation log as a JSON list of rows """ return models.Log.objects.filter(job_id=calc_id).count()
[docs]def get_traceback(calc_id): """ Return the traceback of the given calculation as a list of lines. The list is empty if the calculation was successful. """ # strange: understand why the filter returns two lines log = list(models.Log.objects.filter(job_id=calc_id, level='CRITICAL'))[-1] response_data = log.message.splitlines() return response_data
[docs]def get_result(result_id): """ :returns: (job_id, job_status, datadir, datastore_key) """ output = models.get(models.Output, pk=result_id) job = output.oq_job return job.id, job.status, os.path.dirname(job.ds_calc_dir), output.ds_key
[docs]def get_results(job_id): """ :returns: (datadir, datastore_keys) """ job = models.get(models.OqJob, pk=job_id) datadir = os.path.dirname(job.ds_calc_dir) return datadir, [output.ds_key for output in get_outputs(job_id)]