# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2016-2017 GEM Foundation
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import sys
import time
import socket
import sqlite3
import os.path
import logging
import subprocess
from multiprocessing.connection import Listener
from concurrent.futures import ThreadPoolExecutor
from openquake.baselib import sap
from openquake.baselib.parallel import safely_call
from openquake.hazardlib import valid
from openquake.commonlib import config, logs
from openquake.server.db import actions
from openquake.server import dbapi
from openquake.server import __file__ as server_path
from openquake.server.settings import DATABASE
# using a ThreadPool because SQLite3 isn't fork-safe on macOS Sierra
# ref: https://bugs.python.org/issue27126
executor = ThreadPoolExecutor(1)
[docs]class DbServer(object):
"""
A server collecting the received commands into a queue
"""
def __init__(self, db, address, authkey):
self.db = db
self.address = address
self.authkey = authkey
[docs] def loop(self):
listener = Listener(self.address, backlog=5, authkey=self.authkey)
logging.warn('DB server started with %s, listening on %s:%d...',
sys.executable, *self.address)
try:
while True:
try:
conn = listener.accept()
except KeyboardInterrupt:
break
except:
# unauthenticated connection, for instance by a port
# scanner such as the one in manage.py
continue
cmd_ = conn.recv() # a tuple (name, arg1, ... argN)
cmd, args = cmd_[0], cmd_[1:]
logging.debug('Got ' + str(cmd_))
if cmd == 'stop':
conn.send((None, None))
conn.close()
break
func = getattr(actions, cmd)
fut = executor.submit(safely_call, func, (self.db,) + args)
def sendback(fut, conn=conn):
res, etype, _mon = fut.result()
if etype:
logging.error(res)
# send back the result and the exception class
conn.send((res, etype))
conn.close()
fut.add_done_callback(sendback)
finally:
listener.close()
[docs]def different_paths(path1, path2):
path1 = os.path.realpath(path1) # expand symlinks
path2 = os.path.realpath(path2) # expand symlinks
# don't care about the extension (it may be .py or .pyc)
return os.path.splitext(path1)[0] != os.path.splitext(path2)[0]
[docs]def get_status(address=None):
"""
Check if the DbServer is up.
:param address: pair (hostname, port)
:returns: 'running' or 'not-running'
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
err = sock.connect_ex(address or config.DBS_ADDRESS)
finally:
sock.close()
return 'not-running' if err else 'running'
[docs]def check_foreign():
"""
Check if we the DbServer is the right one
"""
if not config.flag_set('dbserver', 'multi_user'):
remote_server_path = logs.dbcmd('get_path')
if different_paths(server_path, remote_server_path):
return('You are trying to contact a DbServer from another'
' instance (got %s, expected %s)\n'
'Check the configuration or stop the foreign'
' DbServer instance') % (remote_server_path, server_path)
[docs]def ensure_on():
"""
Start the DbServer if it is off
"""
if get_status() == 'not-running':
if valid.boolean(config.get('dbserver', 'multi_user')):
sys.exit('Please start the DbServer: '
'see the documentation for details')
# otherwise start the DbServer automatically
subprocess.Popen([sys.executable, '-m', 'openquake.server.dbserver',
'-l', 'INFO'])
# wait for the dbserver to start
waiting_seconds = 10
while get_status() == 'not-running':
if waiting_seconds == 0:
sys.exit('The DbServer cannot be started after 10 seconds. '
'Please check the configuration')
time.sleep(1)
waiting_seconds -= 1
@sap.Script
def run_server(dbhostport=None, dbpath=None, logfile=DATABASE['LOG'],
loglevel='WARN'):
"""
Run the DbServer on the given database file and port. If not given,
use the settings in openquake.cfg.
"""
if dbhostport: # assume a string of the form "dbhost:port"
dbhost, port = dbhostport.split(':')
addr = (dbhost, int(port))
DATABASE['PORT'] = int(port)
else:
addr = config.DBS_ADDRESS
if dbpath:
DATABASE['NAME'] = dbpath
# create the db directory if needed
dirname = os.path.dirname(DATABASE['NAME'])
if not os.path.exists(dirname):
os.makedirs(dirname)
# create and upgrade the db if needed
db = dbapi.Db(sqlite3.connect, DATABASE['NAME'], isolation_level=None,
detect_types=sqlite3.PARSE_DECLTYPES)
db('PRAGMA foreign_keys = ON') # honor ON DELETE CASCADE
actions.upgrade_db(db)
db.conn.close()
# configure logging and start the server
logging.basicConfig(level=getattr(logging, loglevel), filename=logfile)
DbServer(db, addr, config.DBS_AUTHKEY).loop()
run_server.arg('dbhostport', 'dbhost:port')
run_server.arg('dbpath', 'dbpath')
run_server.arg('logfile', 'log file')
run_server.opt('loglevel', 'WARN or INFO')
if __name__ == '__main__':
run_server.callfunc()