# Copyright (c) 2010-2014, GEM Foundation.
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
"""Custom Django field and formfield types (for models and forms."""
import ast
import numpy
import re
import zlib
import json
import cPickle as pickle
from django.utils.encoding import smart_unicode
from django.contrib.gis import forms
from django.contrib.gis.db import models as djm
#: regex for splitting string lists on whitespace and/or commas
ARRAY_RE = re.compile(r'[\s,]+')
# Disable pylint for 'Too many public methods'
# pylint: disable=R0904
[docs]class FloatArrayField(djm.Field):
"""This field models a postgres `float` array."""
[docs] def db_type(self, connection):
return 'float[]'
[docs] def get_prep_value(self, value):
if value is None:
return None
# Normally, the value passed in here will be a list.
# It could also be a string list, each value separated by
# comma/whitespace.
if isinstance(value, str):
if len(value) == 0:
# It's an empty string list
value = []
else:
# try to coerce the string to a list of floats
value = [float(x) for x in ARRAY_RE.split(value)]
# If there's an exception here, just let it be raised.
return "{" + ', '.join(str(v) for v in value) + "}"
[docs]class IntArrayField(djm.Field):
"""This field models a postgresql `int` array"""
# TODO(lp) implement the corresponding form field
[docs] def db_type(self, connection):
return 'int[]'
[docs] def get_prep_value(self, value):
if value is None:
return
return "{%s}" % ','.join(str(v) for v in value)
[docs]class CharArrayField(djm.Field):
"""This field models a postgres `varchar` array."""
[docs] def db_type(self, _connection=None):
return 'varchar[]'
[docs] def to_python(self, value):
"""
Split strings on whitespace or commas and return the list.
If the input ``value`` is not a string, just return the value (for
example, if ``value`` is already a list).
"""
if isinstance(value, basestring):
return list(ARRAY_RE.split(value))
return value
[docs] def get_prep_value(self, value):
"""Return data in a format that has been prepared for use as a
parameter in a query.
:param value: sequence of string values to be saved in a varchar[]
field
:type value: list or tuple
>>> caf = CharArrayField()
>>> caf.get_prep_value(['foo', 'bar', 'baz123'])
'{"foo", "bar", "baz123"}'
"""
if value is None:
return None
# Normally, the value passed in here will be a list.
# It could also be a string list, each value separated by
# comma/whitespace.
if isinstance(value, str):
if len(value) == 0:
# It's an empty string list
value = []
else:
# try to coerce the string to a list of strings
value = list(ARRAY_RE.split(value))
return '{' + ', '.join('"%s"' % str(v) for v in value) + '}'
[docs]class LiteralField(djm.Field):
"""
Convert from Postgres TEXT to Python objects and viceversa by using
`ast.literal_eval` and `repr`.
"""
__metaclass__ = djm.SubfieldBase
[docs] def db_type(self, _connection=None):
return 'text'
[docs] def to_python(self, value):
if value is not None:
return ast.literal_eval(value)
[docs] def get_prep_value(self, value):
return repr(value)
[docs]class PickleField(djm.Field):
"""Field for transparent pickling and unpickling of python objects."""
__metaclass__ = djm.SubfieldBase
SUPPORTED_BACKENDS = set((
'django.contrib.gis.db.backends.postgis',
'django.db.backends.postgresql_psycopg2'
))
[docs] def db_type(self, connection):
"""Return "bytea" as postgres' column type."""
assert connection.settings_dict['ENGINE'] in self.SUPPORTED_BACKENDS
return 'bytea'
[docs] def to_python(self, value):
"""Unpickle the value."""
if isinstance(value, (buffer, str, bytearray)) and value:
return pickle.loads(str(value))
else:
return value
[docs] def get_prep_value(self, value):
"""Pickle the value."""
return bytearray(pickle.dumps(value, pickle.HIGHEST_PROTOCOL))
[docs]class GzippedField(djm.Field):
"""
Automatically stores gzipped text as a bytearray
"""
__metaclass__ = djm.SubfieldBase
[docs] def db_type(self, _connection):
return 'bytea'
[docs] def get_prep_value(self, value):
"""Compress the value"""
return bytearray(zlib.compress(value))
[docs] def to_python(self, value):
"""Decompress the value"""
return zlib.decompress(value)
[docs]class DictField(PickleField):
"""Field for storing Python `dict` objects (or a JSON text representation.
"""
[docs] def to_python(self, value):
"""The value of a DictField can obviously be a `dict`. The value can
also be specified as a JSON string. If it is, convert it to a `dict`.
"""
if isinstance(value, str):
try:
value = json.loads(value)
except ValueError:
# This string is not JSON.
value = super(DictField, self).to_python(value)
else:
value = super(DictField, self).to_python(value)
return value
[docs]class NumpyListField(PickleField):
"""
Field for storing numpy arrays as pickled blobs. The actual blob stored in
the database is simply a pickled `list`. When the field is instantiated,
the value is converted back to a numpy array.
"""
[docs] def to_python(self, value):
"""
Try to reconstruct a `numpy.ndarray` from the given ``value``.
:param value:
The pickled representation of an object which can be reconstituted
using `pickle.loads`.
"""
if value is None:
return None
# first, unpickle:
value = super(NumpyListField, self).to_python(value)
if isinstance(value, (list, tuple, numpy.ndarray)):
return numpy.array(value)
# NOTE: If the value is not a list or tuple, raise an exception.
# The reason we do this is because this field type is intended to be
# used only for storing list-like data. (Technically any object can be
# wrapped in a numpy array, like `numpy.array(object())`, but this is
# not our use case.
raise ValueError(
"Unexpected value of type '%s'. Expected 'list' or 'tuple'."
% type(value)
)
[docs] def get_prep_value(self, value):
"""
Convert the ``value`` to the pickled representation of a `list`. If
``value`` is a `numpy.ndarray`, it will be converted to a list of the
same size and shape before being pickled.
:param value:
A `list`, `tuple`, or `numpy.ndarray`.
"""
# convert to list first before pickling, if it's a numpy array
if isinstance(value, numpy.ndarray):
return super(NumpyListField, self).get_prep_value(value.tolist())
else:
if not isinstance(value, (list, tuple)):
raise ValueError(
"Unexpected value of type '%s'. Expected 'list', 'tuple', "
"or 'numpy.ndarray'"
% type(value)
)
[docs]class OqNullBooleanField(djm.NullBooleanField):
"""
A `NullBooleanField` that can convert meaningful strings to boolean
values (in the case of config file parameters).
"""
[docs] def to_python(self, value):
"""
If ``value`` is a `str`, try to extract some boolean value from it.
"""
if isinstance(value, str):
if value.lower() in ('t', 'true', 'y', 'yes'):
value = True
elif value.lower() in ('f', 'false', 'n', 'no'):
value = False
# otherwise, it will just get cast to a bool, which could produce some
# strange results
value = super(OqNullBooleanField, self).to_python(value)
return value
[docs]class NullFloatField(djm.FloatField):
"""
A nullable float field that handles blank input values properly.
"""
[docs] def get_prep_value(self, value):
if isinstance(value, basestring):
if value.strip():
return super(NullFloatField, self).get_prep_value(value)
else:
return None
else:
return value
[docs]class NullTextField(djm.TextField):
def __init__(self, **kwargs):
kwargs.update(dict(blank=True, null=True))
super(NullTextField, self).__init__(**kwargs)
[docs] def formfield(self, **kwargs):
"""
Specify a custom form field type so forms know how to handle fields of
this type.
"""
defaults = {'form_class': NullCharField}
defaults.update(kwargs)
return super(NullTextField, self).formfield(**defaults)
[docs]class NullCharField(forms.CharField):
[docs] def to_python(self, value):
"Returns a Unicode object."
if value is None:
return None
else:
return smart_unicode(value)