Source code for openquake.server.db.models

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2010-2016 GEM Foundation
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <>.

# Disable:
# - 'Maximum number of public methods for a class'
# - 'Missing docstring' (because of all of the model Meta)
# pylint: disable=R0904,C0111

Model representations of the OpenQuake DB tables.
import ast
import collections
from datetime import datetime
from openquake.commonlib.oqvalidation import OqParam, RISK_CALCULATORS
import django
if hasattr(django, 'setup'):
    django.setup()  # for Django >= 1.7
from django.db import models as djm
from django.core.exceptions import ObjectDoesNotExist
from django.db import connections

#: Kind of supported curve statistics
    (u'mean', u'Mean'),
    (u'quantile', u'Quantile'))

#: System Reference ID used for geometry objects

    (u"measured", u"Value obtained from on-site measurements"),
    (u"inferred", u"Estimated value"),

    (u'PGA', u'Peak Ground Acceleration'),
    (u'PGV', u'Peak Ground Velocity'),
    (u'PGD', u'Peak Ground Displacement'),
    (u'SA', u'Spectral Acceleration'),
    (u'IA', u'Arias Intensity'),
    (u'RSD', u'Relative Significant Duration'),
    (u'MMI', u'Modified Mercalli Intensity'),

#: Minimum value for a seed number
MIN_SINT_32 = -(2 ** 31)
#: Maximum value for a seed number
MAX_SINT_32 = (2 ** 31) - 1

#: Kind of supported type of loss outputs
LOSS_TYPES = ["structural", "nonstructural", "fatalities", "contents"]

#: relative tolerance to consider two risk outputs (almost) equal
RISK_RTOL = 0.05

#: absolute tolerance to consider two risk outputs (almost) equal
RISK_ATOL = 0.01

# TODO: these want to be dictionaries
    (u'unknown', u'Unknown'),
    (u'source', u'Source Model'),
    (u'source_model_logic_tree', u'Source Model Logic Tree'),
    (u'gsim_logic_tree', u'Ground Shaking Intensity Model Logic Tree'),
    (u'exposure', u'Exposure'),
    (u'fragility', u'Fragility'),
    (u'site_model', u'Site Model'),
    (u'rupture_model', u'Rupture Model'),

    # vulnerability models
    (u'structural_vulnerability', u'Structural Vulnerability'),
    (u'nonstructural_vulnerability', u'Non Structural Vulnerability'),
    (u'contents_vulnerability', u'Contents Vulnerability'),
     u'Business Interruption Vulnerability'),
    (u'occupants_vulnerability', u'Occupants Vulnerability'),
     u'Structural Vulnerability Retrofitted'))

RAISE_EXC = object()  # sentinel used in OqJob.get_param

[docs]def getcursor(route): """Return a cursor from a Django route""" return connections[route].cursor()
[docs]class MissingParameter(KeyError): """Raised by OqJob.get_param when a parameter is missing in the database"""
[docs]class LiteralField(djm.Field): """ Convert from Postgres TEXT to Python objects and viceversa by using `ast.literal_eval` and `repr`. """ __metaclass__ = djm.SubfieldBase
[docs] def db_type(self, connection=None): return 'text'
[docs] def to_python(self, value): if value is not None: return ast.literal_eval(value)
[docs] def get_prep_value(self, value): return repr(value)
# Tables in the 'uiapi' schema.
[docs]class OqJob(djm.Model): ''' An OpenQuake engine run started by the user ''' user_name = djm.TextField() hazard_calculation = djm.ForeignKey('OqJob', null=True) LOG_LEVEL_CHOICES = ( (u'debug', u'Debug'), (u'info', u'Info'), (u'progress', u'Progress'), (u'warn', u'Warn'), (u'error', u'Error'), (u'critical', u'Critical'), ) log_level = djm.TextField(choices=LOG_LEVEL_CHOICES, default='progress') STATUS_CHOICES = ( (u'created', u'Created'), (u'pre_executing', u'Pre-Executing'), (u'executing', u'Executing'), (u'post_executing', u'Post-Executing'), (u'post_processing', u'Post-Processing'), (u'export', u'Exporting results'), (u'clean_up', u'Cleaning up'), (u'complete', u'Complete'), ) status = djm.TextField(choices=STATUS_CHOICES, default='pre_executing') oq_version = djm.TextField(null=True, blank=True) hazardlib_version = djm.TextField(null=True, blank=True) commonlib_version = djm.TextField(null=True, blank=True) risklib_version = djm.TextField(null=True, blank=True) is_running = djm.BooleanField(default=True) duration = djm.IntegerField(default=0) job_pid = djm.IntegerField(default=0) supervisor_pid = djm.IntegerField(default=0) last_update = djm.DateTimeField(editable=False, default=datetime.utcnow) relevant = djm.BooleanField(null=False, default=True) ds_calc_dir = djm.TextField(null=True, blank=True) # datastore calc_dir class Meta: db_table = 'uiapi\".\"oq_job'
[docs] def risk_calculation(self): return self.get_oqparam()
@property def job_type(self): """ 'hazard' or 'risk' """ calcmode = self.get_param('calculation_mode', 'unknown') # the calculation mode can be unknown if the job parameters # have not been written on the database yet return 'risk' if calcmode in RISK_CALCULATORS else 'hazard'
[docs] def get_or_create_output(self, display_name, output_type): """ :param disp_name: display name of the output :param output_type: the output type :returns: an Output instance """ try: output = Output.objects.get( oq_job=self, display_name=display_name, output_type=output_type) except ObjectDoesNotExist: output = Output.objects.create_output( self, display_name, output_type) return output
[docs] def get_param(self, name, missing=RAISE_EXC): """ `job.get_param(name)` returns the value of the requested parameter or raise a MissingParameter exception if the parameter does not exist in the database. `job.get_param(name, missing)` returns the value of the requested parameter or the `missing` value if the parameter does not exist in the database. :param name: the name of the parameter :param missing: value returned if the parameter is missing NB: since job_param.value is NOT NULL, `.get_param(name)` can return None only if the parameter is missing. """ try: return JobParam.objects.get(job=self, name=name).value except ObjectDoesNotExist: if missing is RAISE_EXC: raise MissingParameter(name) return missing
[docs] def get_oqparam(self): """ Return an OqParam object as read from the database """ oqparam = object.__new__(OqParam) for row in JobParam.objects.filter(job=self): setattr(oqparam,, row.value) return oqparam
[docs] def save_params(self, params): """ Save on the database table job_params the given parameters. :param job: an :class:`OqJob` instance :param params: a dictionary {name: string} of parameters """ for name, value in params.iteritems(): if name == 'gsim': # special case value = str(value) JobParam.objects.create(job=self, name=name, value=repr(value))
def __repr__(self): return '<%s %d, %s>' % (self.__class__.__name__,, self.job_type)
[docs]def oqparam(job_id): """ :param job_id: ID of :class:`openquake.server.db.models.OqJob` :returns: instance of :class:`openquake.commonlib.oqvalidation.OqParam` """ return OqJob.objects.get(pk=job_id).get_oqparam()
[docs]class JobStats(djm.Model): ''' Capture various statistics about a job. ''' oq_job = djm.OneToOneField('OqJob') start_time = djm.DateTimeField(editable=False, default=datetime.utcnow) stop_time = djm.DateTimeField(editable=False) # The disk space occupation in bytes disk_space = djm.IntegerField(null=True) class Meta: db_table = 'uiapi\".\"job_stats'
[docs]class JobParam(djm.Model): ''' The parameters of a job ''' job = djm.ForeignKey('OqJob') name = djm.TextField(null=False) value = LiteralField(null=False) class Meta: db_table = 'uiapi\".\"job_param'
[docs]class Performance(djm.Model): ''' Contains performance information about the operations performed by a task launched by a job. ''' oq_job = djm.ForeignKey('OqJob') task_id = djm.TextField(null=True) task = djm.TextField(null=True) operation = djm.TextField(null=False) start_time = djm.DateTimeField(editable=False) duration = djm.FloatField(null=True) pymemory = djm.IntegerField(null=True) pgmemory = djm.IntegerField(null=True) class Meta: db_table = 'uiapi\".\"performance'
[docs]class Log(djm.Model): ''' Log table for calculations ''' job = djm.ForeignKey('OqJob', null=True) timestamp = djm.DateTimeField(editable=False, default=datetime.utcnow) level = djm.TextField(choices=OqJob.LOG_LEVEL_CHOICES) process = djm.TextField(null=False) message = djm.TextField(null=False) class Meta: db_table = 'uiapi\".\"log'
[docs]def extract_from(objlist, attr): """ Extract an attribute from a list of Django objects, by scanning them in order until a not None attribute is found. If nothing is found, or if an exception ObjectDoesNotExist is raised, return None. :param objlist: the list of Django objects :param str attr: the name of the attribute to look for """ for obj in objlist: try: value = getattr(obj, attr, None) except ObjectDoesNotExist: value = None if value is not None: return value
[docs]class OutputManager(djm.Manager): """ Manager class to filter and create Output objects """
[docs] def create_output(self, job, display_name, output_type): """ Create an output for the given `job`, `display_name` and `output_type` (default to hazard_curve) """ return self.create(oq_job=job, display_name=display_name, output_type=output_type)
[docs]class Output(djm.Model): ''' A single artifact which is a result of an OpenQuake job. The data may reside in a file or in the database. ''' #: Metadata of hazard outputs used by risk calculation. See #: `hazard_metadata` property for more details HazardMetadata = collections.namedtuple( 'hazard_metadata', 'investigation_time statistics quantile sm_path gsim_path') #: Hold the full paths in the model trees of ground shaking #: intensity models and of source models, respectively. LogicTreePath = collections.namedtuple( 'logic_tree_path', 'gsim_path sm_path') #: Hold the statistical params (statistics, quantile). StatisticalParams = collections.namedtuple( 'statistical_params', 'statistics quantile') oq_job = djm.ForeignKey('OqJob', null=False) display_name = djm.TextField() HAZARD_OUTPUT_TYPE_CHOICES = ( (u'disagg_matrix', u'Disaggregation Matrix'), (u'gmf', u'Ground Motion Field'), (u'gmf_scenario', u'Ground Motion Field'), (u'hazard_curve', u'Hazard Curve'), (u'hazard_curve_multi', u'Hazard Curve (multiple imts)'), (u'hazard_map', u'Hazard Map'), (u'ses', u'Stochastic Event Set'), (u'uh_spectra', u'Uniform Hazard Spectra'), ) RISK_OUTPUT_TYPE_CHOICES = ( (u'agg_loss_curve', u'Aggregate Loss Curve'), (u'aggregate_loss', u'Aggregate Losses'), (u'bcr_distribution', u'Benefit-cost ratio distribution'), (u'collapse_map', u'Collapse Map Distribution'), (u'dmg_dist_per_asset', u'Damage Distribution Per Asset'), (u'dmg_dist_per_taxonomy', u'Damage Distribution Per Taxonomy'), (u'dmg_dist_total', u'Total Damage Distribution'), (u'event_loss', u'Event Loss Table'), (u'event_loss_asset', u'Event Loss Asset'), (u'loss_curve', u'Loss Curve'), (u'event_loss_curve', u'Loss Curve'), (u'loss_fraction', u'Loss fractions'), (u'loss_map', u'Loss Map'), (u'dmg_per_asset', 'Damage Per Asset'), ) output_type = djm.TextField( choices=HAZARD_OUTPUT_TYPE_CHOICES + RISK_OUTPUT_TYPE_CHOICES) last_update = djm.DateTimeField(editable=False, default=datetime.utcnow) ds_key = djm.TextField(null=True, blank=True) # datastore key objects = OutputManager() def __str__(self): return "%d||%s||%s" % (, self.output_type, self.display_name) class Meta: db_table = 'uiapi\".\"output' ordering = ['id']
[docs] def is_hazard_curve(self): return self.output_type in ['hazard_curve', 'hazard_curve_multi']
@property def output_container(self): """ :returns: the output container associated with this output """ # FIXME(lp). Remove the following outstanding exceptions if self.output_type in ['agg_loss_curve', 'event_loss_curve']: return self.loss_curve elif self.output_type == 'hazard_curve_multi': return self.hazard_curve elif self.output_type == 'gmf_scenario': return self.gmf elif self.output_type == 'event_loss_asset': return self.event_loss return getattr(self, self.output_type) @property def lt_realization_paths(self): """ :returns: an instance of `LogicTreePath` the output is associated with. When the output is not associated with any logic tree branch then it returns a LogicTreePath namedtuple with a couple of None. """ hazard_output_types = [el[0] for el in self.HAZARD_OUTPUT_TYPE_CHOICES] risk_output_types = [el[0] for el in self.RISK_OUTPUT_TYPE_CHOICES] container = self.output_container if self.output_type in hazard_output_types: rlz = getattr(container, 'lt_realization_id', None) if rlz is not None: return self.LogicTreePath( tuple(container.lt_realization.gsim_lt_path), tuple(container.lt_realization.sm_lt_path)) else: return self.LogicTreePath(None, None) elif self.output_type in risk_output_types: if getattr(container, 'hazard_output_id', None): return container.hazard_output.lt_realization_paths else: return self.LogicTreePath(None, None) raise RuntimeError("unexpected output type %s" % self.output_type) @property def statistical_params(self): """ :returns: an instance of `StatisticalParams` the output is associated with """ if getattr(self.output_container, 'statistics', None) is not None: return self.StatisticalParams(self.output_container.statistics, self.output_container.quantile) elif getattr( self.output_container, 'hazard_output_id', None) is not None: return self.output_container.hazard_output.statistical_params else: return self.StatisticalParams(None, None) @property def hazard_metadata(self): """ Given an Output produced by a risk calculation it returns the corresponding hazard metadata. :returns: A `namedtuple` with the following attributes:: * investigation_time: the hazard investigation time (float) * statistics: the kind of hazard statistics (None, "mean" or "quantile") * quantile: quantile value (when `statistics` is "quantile") * sm_path: a list representing the source model path * gsim_path: a list representing the gsim logic tree path """ oq = self.oq_job.get_oqparam() statistics, quantile = self.statistical_params gsim_lt_path, sm_lt_path = self.lt_realization_paths return self.HazardMetadata(oq.investigation_time, statistics, quantile, sm_lt_path, gsim_lt_path)