Source code for openquake.server.utils
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2020 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import getpass
import requests
import logging
import django
import numpy
from time import sleep
from django.conf import settings
from openquake.engine import __version__ as oqversion
if settings.LOCKDOWN:
django.setup()
from django.contrib.auth.models import User
[docs]def is_superuser(request):
if settings.LOCKDOWN and hasattr(request, 'user'):
if request.user.is_superuser:
return True
return False
[docs]def get_user(request):
"""
Returns the users from `request` if authentication is enabled, otherwise
returns the default user (from settings, or as reported by the OS).
"""
if settings.LOCKDOWN and hasattr(request, 'user'):
if request.user.is_authenticated:
user = request.user.username
else:
# This may happen with crafted requests
user = ''
else:
user = getattr(settings, 'DEFAULT_USER', getpass.getuser())
return user
[docs]def get_valid_users(request):
""""
Returns a list of `users` based on groups membership.
Returns a list made of a single user when it is not member of any group.
"""
users = [get_user(request)]
if settings.LOCKDOWN and hasattr(request, 'user'):
if request.user.is_authenticated:
groups = request.user.groups.all()
if groups:
users = list(User.objects.filter(groups__in=groups)
.values_list('username', flat=True))
else:
# This may happen with crafted requests
users = []
return users
[docs]def get_acl_on(request):
"""
Returns `True` if ACL should be honorated, returns otherwise `False`.
"""
acl_on = settings.ACL_ON
if is_superuser(request):
# ACL is always disabled for superusers
acl_on = False
return acl_on
[docs]def user_has_permission(request, owner):
"""
Returns `True` if user coming from the request has the permission
to view a resource, returns `false` otherwise.
"""
return owner in get_valid_users(request) or not get_acl_on(request)
[docs]def oq_server_context_processor(request):
"""
A custom context processor which allows injection of additional
context variables.
"""
context = {}
context['oq_engine_server_url'] = ('//' +
request.META.get('HTTP_HOST',
'localhost:8800'))
# this context var is also evaluated by the STANDALONE_APPS to identify
# the running environment. Keep it as it is
context['oq_engine_version'] = oqversion
context['server_name'] = settings.SERVER_NAME
return context
[docs]def check_webserver_running(url="http://localhost:8800", max_retries=30):
"""
Returns True if a given URL is responding within a given timeout.
"""
retry = 0
response = ''
success = False
while response != requests.codes.ok and retry < max_retries:
try:
response = requests.head(url, allow_redirects=True).status_code
success = True
except Exception:
sleep(1)
retry += 1
if not success:
logging.warning('Unable to connect to %s within %s retries'
% (url, max_retries))
return success
[docs]def array_of_strings_to_bytes(arr, key):
"""
:param arr: array or array-like object
:param key: string associated to the error (appear in the error message)
If `arr` is a numpy array with dtype object containing strings, convert
it into a numpy array containing bytes, unless it has more than 2
dimensions or contains non-strings (these are errors). Return `arr`
unchanged in the other cases.
"""
if arr is None:
return ()
if not isinstance(arr, numpy.ndarray) or arr.dtype != numpy.dtype('O'):
return arr
if arr.ndim == 1:
return numpy.array([s.encode('utf8') for s in arr])
elif arr.ndim == 2:
return numpy.array([[col.encode('utf8') for col in row]
for row in arr])
else:
raise NotImplementedError('The array for %s has shape %s' %
(key, arr.shape))
return arr