# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2010-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
# Disable:
# - 'Maximum number of public methods for a class'
# - 'Missing docstring' (because of all of the model Meta)
# pylint: disable=R0904,C0111
'''
Model representations of the OpenQuake DB tables.
'''
import ast
import collections
from datetime import datetime
from openquake.commonlib.oqvalidation import OqParam, RISK_CALCULATORS
import django
if hasattr(django, 'setup'):
django.setup() # for Django >= 1.7
from django.db import models as djm
from django.core.exceptions import ObjectDoesNotExist
from django.db import connections
#: Kind of supported curve statistics
STAT_CHOICES = (
(u'mean', u'Mean'),
(u'quantile', u'Quantile'))
#: System Reference ID used for geometry objects
DEFAULT_SRID = 4326
VS30_TYPE_CHOICES = (
(u"measured", u"Value obtained from on-site measurements"),
(u"inferred", u"Estimated value"),
)
IMT_CHOICES = (
(u'PGA', u'Peak Ground Acceleration'),
(u'PGV', u'Peak Ground Velocity'),
(u'PGD', u'Peak Ground Displacement'),
(u'SA', u'Spectral Acceleration'),
(u'IA', u'Arias Intensity'),
(u'RSD', u'Relative Significant Duration'),
(u'MMI', u'Modified Mercalli Intensity'),
)
#: Minimum value for a seed number
MIN_SINT_32 = -(2 ** 31)
#: Maximum value for a seed number
MAX_SINT_32 = (2 ** 31) - 1
#: Kind of supported type of loss outputs
LOSS_TYPES = ["structural", "nonstructural", "fatalities", "contents"]
#: relative tolerance to consider two risk outputs (almost) equal
RISK_RTOL = 0.05
#: absolute tolerance to consider two risk outputs (almost) equal
RISK_ATOL = 0.01
# TODO: these want to be dictionaries
INPUT_TYPE_CHOICES = (
(u'unknown', u'Unknown'),
(u'source', u'Source Model'),
(u'source_model_logic_tree', u'Source Model Logic Tree'),
(u'gsim_logic_tree', u'Ground Shaking Intensity Model Logic Tree'),
(u'exposure', u'Exposure'),
(u'fragility', u'Fragility'),
(u'site_model', u'Site Model'),
(u'rupture_model', u'Rupture Model'),
# vulnerability models
(u'structural_vulnerability', u'Structural Vulnerability'),
(u'nonstructural_vulnerability', u'Non Structural Vulnerability'),
(u'contents_vulnerability', u'Contents Vulnerability'),
(u'business_interruption_vulnerability',
u'Business Interruption Vulnerability'),
(u'occupants_vulnerability', u'Occupants Vulnerability'),
(u'structural_vulnerability_retrofitted',
u'Structural Vulnerability Retrofitted'))
RAISE_EXC = object() # sentinel used in OqJob.get_param
[docs]def getcursor(route):
"""Return a cursor from a Django route"""
return connections[route].cursor()
[docs]class MissingParameter(KeyError):
"""Raised by OqJob.get_param when a parameter is missing in the database"""
[docs]class LiteralField(djm.Field):
"""
Convert from Postgres TEXT to Python objects and viceversa by using
`ast.literal_eval` and `repr`.
"""
__metaclass__ = djm.SubfieldBase
[docs] def db_type(self, connection=None):
return 'text'
[docs] def to_python(self, value):
if value is not None:
return ast.literal_eval(value)
[docs] def get_prep_value(self, value):
return repr(value)
# Tables in the 'uiapi' schema.
[docs]class OqJob(djm.Model):
'''
An OpenQuake engine run started by the user
'''
user_name = djm.TextField()
hazard_calculation = djm.ForeignKey('OqJob', null=True)
LOG_LEVEL_CHOICES = (
(u'debug', u'Debug'),
(u'info', u'Info'),
(u'progress', u'Progress'),
(u'warn', u'Warn'),
(u'error', u'Error'),
(u'critical', u'Critical'),
)
log_level = djm.TextField(choices=LOG_LEVEL_CHOICES, default='progress')
STATUS_CHOICES = (
(u'created', u'Created'),
(u'pre_executing', u'Pre-Executing'),
(u'executing', u'Executing'),
(u'post_executing', u'Post-Executing'),
(u'post_processing', u'Post-Processing'),
(u'export', u'Exporting results'),
(u'clean_up', u'Cleaning up'),
(u'complete', u'Complete'),
)
status = djm.TextField(choices=STATUS_CHOICES, default='pre_executing')
oq_version = djm.TextField(null=True, blank=True)
hazardlib_version = djm.TextField(null=True, blank=True)
commonlib_version = djm.TextField(null=True, blank=True)
risklib_version = djm.TextField(null=True, blank=True)
is_running = djm.BooleanField(default=True)
duration = djm.IntegerField(default=0)
job_pid = djm.IntegerField(default=0)
supervisor_pid = djm.IntegerField(default=0)
last_update = djm.DateTimeField(editable=False, default=datetime.utcnow)
relevant = djm.BooleanField(null=False, default=True)
ds_calc_dir = djm.TextField(null=True, blank=True) # datastore calc_dir
class Meta:
db_table = 'uiapi\".\"oq_job'
[docs] def risk_calculation(self):
return self.get_oqparam()
@property
def job_type(self):
"""
'hazard' or 'risk'
"""
calcmode = self.get_param('calculation_mode', 'unknown')
# the calculation mode can be unknown if the job parameters
# have not been written on the database yet
return 'risk' if calcmode in RISK_CALCULATORS else 'hazard'
[docs] def get_or_create_output(self, display_name, output_type):
"""
:param disp_name: display name of the output
:param output_type: the output type
:returns: an Output instance
"""
try:
output = Output.objects.get(
oq_job=self, display_name=display_name,
output_type=output_type)
except ObjectDoesNotExist:
output = Output.objects.create_output(
self, display_name, output_type)
return output
[docs] def get_param(self, name, missing=RAISE_EXC):
"""
`job.get_param(name)` returns the value of the requested parameter
or raise a MissingParameter exception if the parameter does not
exist in the database.
`job.get_param(name, missing)` returns the value of the requested
parameter or the `missing` value if the parameter does not
exist in the database.
:param name: the name of the parameter
:param missing: value returned if the parameter is missing
NB: since job_param.value is NOT NULL, `.get_param(name)`
can return None only if the parameter is missing.
"""
try:
return JobParam.objects.get(job=self, name=name).value
except ObjectDoesNotExist:
if missing is RAISE_EXC:
raise MissingParameter(name)
return missing
[docs] def get_oqparam(self):
"""
Return an OqParam object as read from the database
"""
oqparam = object.__new__(OqParam)
for row in JobParam.objects.filter(job=self):
setattr(oqparam, row.name, row.value)
return oqparam
[docs] def save_params(self, params):
"""
Save on the database table job_params the given parameters.
:param job: an :class:`OqJob` instance
:param params: a dictionary {name: string} of parameters
"""
for name, value in params.iteritems():
if name == 'gsim': # special case
value = str(value)
JobParam.objects.create(job=self, name=name, value=repr(value))
def __repr__(self):
return '<%s %d, %s>' % (self.__class__.__name__,
self.id, self.job_type)
[docs]def oqparam(job_id):
"""
:param job_id: ID of :class:`openquake.server.db.models.OqJob`
:returns: instance of :class:`openquake.commonlib.oqvalidation.OqParam`
"""
return OqJob.objects.get(pk=job_id).get_oqparam()
[docs]class JobStats(djm.Model):
'''
Capture various statistics about a job.
'''
oq_job = djm.OneToOneField('OqJob')
start_time = djm.DateTimeField(editable=False, default=datetime.utcnow)
stop_time = djm.DateTimeField(editable=False)
# The disk space occupation in bytes
disk_space = djm.IntegerField(null=True)
class Meta:
db_table = 'uiapi\".\"job_stats'
[docs]class JobParam(djm.Model):
'''
The parameters of a job
'''
job = djm.ForeignKey('OqJob')
name = djm.TextField(null=False)
value = LiteralField(null=False)
class Meta:
db_table = 'uiapi\".\"job_param'
[docs]class Log(djm.Model):
'''
Log table for calculations
'''
job = djm.ForeignKey('OqJob', null=True)
timestamp = djm.DateTimeField(editable=False, default=datetime.utcnow)
level = djm.TextField(choices=OqJob.LOG_LEVEL_CHOICES)
process = djm.TextField(null=False)
message = djm.TextField(null=False)
class Meta:
db_table = 'uiapi\".\"log'
[docs]class OutputManager(djm.Manager):
"""
Manager class to filter and create Output objects
"""
[docs] def create_output(self, job, display_name, output_type):
"""
Create an output for the given `job`, `display_name` and
`output_type` (default to hazard_curve)
"""
return self.create(oq_job=job,
display_name=display_name,
output_type=output_type)
[docs]class Output(djm.Model):
'''
A single artifact which is a result of an OpenQuake job.
The data may reside in a file or in the database.
'''
#: Metadata of hazard outputs used by risk calculation. See
#: `hazard_metadata` property for more details
HazardMetadata = collections.namedtuple(
'hazard_metadata',
'investigation_time statistics quantile sm_path gsim_path')
#: Hold the full paths in the model trees of ground shaking
#: intensity models and of source models, respectively.
LogicTreePath = collections.namedtuple(
'logic_tree_path',
'gsim_path sm_path')
#: Hold the statistical params (statistics, quantile).
StatisticalParams = collections.namedtuple(
'statistical_params',
'statistics quantile')
oq_job = djm.ForeignKey('OqJob', null=False)
display_name = djm.TextField()
HAZARD_OUTPUT_TYPE_CHOICES = (
(u'disagg_matrix', u'Disaggregation Matrix'),
(u'gmf', u'Ground Motion Field'),
(u'gmf_scenario', u'Ground Motion Field'),
(u'hazard_curve', u'Hazard Curve'),
(u'hazard_curve_multi', u'Hazard Curve (multiple imts)'),
(u'hazard_map', u'Hazard Map'),
(u'ses', u'Stochastic Event Set'),
(u'uh_spectra', u'Uniform Hazard Spectra'),
)
RISK_OUTPUT_TYPE_CHOICES = (
(u'agg_loss_curve', u'Aggregate Loss Curve'),
(u'aggregate_loss', u'Aggregate Losses'),
(u'bcr_distribution', u'Benefit-cost ratio distribution'),
(u'collapse_map', u'Collapse Map Distribution'),
(u'dmg_dist_per_asset', u'Damage Distribution Per Asset'),
(u'dmg_dist_per_taxonomy', u'Damage Distribution Per Taxonomy'),
(u'dmg_dist_total', u'Total Damage Distribution'),
(u'event_loss', u'Event Loss Table'),
(u'event_loss_asset', u'Event Loss Asset'),
(u'loss_curve', u'Loss Curve'),
(u'event_loss_curve', u'Loss Curve'),
(u'loss_fraction', u'Loss fractions'),
(u'loss_map', u'Loss Map'),
(u'dmg_per_asset', 'Damage Per Asset'),
)
output_type = djm.TextField(
choices=HAZARD_OUTPUT_TYPE_CHOICES + RISK_OUTPUT_TYPE_CHOICES)
last_update = djm.DateTimeField(editable=False, default=datetime.utcnow)
ds_key = djm.TextField(null=True, blank=True) # datastore key
objects = OutputManager()
def __str__(self):
return "%d||%s||%s" % (self.id, self.output_type, self.display_name)
class Meta:
db_table = 'uiapi\".\"output'
ordering = ['id']
[docs] def is_hazard_curve(self):
return self.output_type in ['hazard_curve', 'hazard_curve_multi']
@property
def output_container(self):
"""
:returns: the output container associated with this output
"""
# FIXME(lp). Remove the following outstanding exceptions
if self.output_type in ['agg_loss_curve', 'event_loss_curve']:
return self.loss_curve
elif self.output_type == 'hazard_curve_multi':
return self.hazard_curve
elif self.output_type == 'gmf_scenario':
return self.gmf
elif self.output_type == 'event_loss_asset':
return self.event_loss
return getattr(self, self.output_type)
@property
def lt_realization_paths(self):
"""
:returns: an instance of `LogicTreePath` the output is
associated with. When the output is not associated with any
logic tree branch then it returns a LogicTreePath namedtuple
with a couple of None.
"""
hazard_output_types = [el[0] for el in self.HAZARD_OUTPUT_TYPE_CHOICES]
risk_output_types = [el[0] for el in self.RISK_OUTPUT_TYPE_CHOICES]
container = self.output_container
if self.output_type in hazard_output_types:
rlz = getattr(container, 'lt_realization_id', None)
if rlz is not None:
return self.LogicTreePath(
tuple(container.lt_realization.gsim_lt_path),
tuple(container.lt_realization.sm_lt_path))
else:
return self.LogicTreePath(None, None)
elif self.output_type in risk_output_types:
if getattr(container, 'hazard_output_id', None):
return container.hazard_output.lt_realization_paths
else:
return self.LogicTreePath(None, None)
raise RuntimeError("unexpected output type %s" % self.output_type)
@property
def statistical_params(self):
"""
:returns: an instance of `StatisticalParams` the output is
associated with
"""
if getattr(self.output_container, 'statistics', None) is not None:
return self.StatisticalParams(self.output_container.statistics,
self.output_container.quantile)
elif getattr(
self.output_container, 'hazard_output_id', None) is not None:
return self.output_container.hazard_output.statistical_params
else:
return self.StatisticalParams(None, None)
@property
def hazard_metadata(self):
"""
Given an Output produced by a risk calculation it returns the
corresponding hazard metadata.
:returns:
A `namedtuple` with the following attributes::
* investigation_time: the hazard investigation time (float)
* statistics: the kind of hazard statistics (None, "mean" or
"quantile")
* quantile: quantile value (when `statistics` is "quantile")
* sm_path: a list representing the source model path
* gsim_path: a list representing the gsim logic tree path
"""
oq = self.oq_job.get_oqparam()
statistics, quantile = self.statistical_params
gsim_lt_path, sm_lt_path = self.lt_realization_paths
return self.HazardMetadata(oq.investigation_time,
statistics, quantile,
sm_lt_path, gsim_lt_path)