Source code for openquake.server.dbserver

#  -*- coding: utf-8 -*-
#  vim: tabstop=4 shiftwidth=4 softtabstop=4

#  Copyright (C) 2016 GEM Foundation

#  OpenQuake is free software: you can redistribute it and/or modify it
#  under the terms of the GNU Affero General Public License as published
#  by the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.

#  OpenQuake is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.

#  You should have received a copy of the GNU Affero General Public License
#  along with OpenQuake.  If not, see <http://www.gnu.org/licenses/>.

import socket
import os.path
import logging
from Queue import Queue
from threading import Thread
from multiprocessing import Process
from multiprocessing.connection import Listener

from openquake.commonlib import sap
from openquake.commonlib.parallel import safely_call
from openquake.engine import config
from openquake.server.db import actions
from openquake.server.settings import DATABASE
from django.db import connection
import django
if hasattr(django, 'setup'):  # >= 1.7
    django.setup()

queue = Queue()


[docs]def run_command(cmd, args, conn): """ Execute the received command. Errors are trapped and a pair (result, exctype) is sent back. `exctype` is None if there is no exception, otherwise it is an exception class and `result` is an error string containing the traceback. """ try: logging.info('Processing %s%s', cmd, args) func = getattr(actions, cmd) # execute the function by trapping any possible exception res, etype, _ = safely_call(func, args) if etype: logging.error(res) # send back the result and the exception class conn.send((res, etype)) finally: conn.close()
[docs]def run_commands(): """ Execute the received commands in a queue. """ while True: conn, cmd, args = queue.get() if cmd == 'stop': conn.send((None, None)) conn.close() break run_command(cmd, args, conn)
[docs]class DbServer(object): """ A server collecting the received commands into a queue """ def __init__(self, address, authkey): self.address = address self.authkey = authkey self.thread = Thread(target=run_commands)
[docs] def loop(self): listener = Listener(self.address, backlog=5, authkey=self.authkey) logging.warn('DB server listening on %s:%d...' % self.address) self.thread.start() cmd = None try: while cmd != 'stop': try: conn = listener.accept() except KeyboardInterrupt: break except: # unauthenticated connection, for instance by a port # scanner such as the one in manage.py continue cmd_ = conn.recv() # a tuple (name, arg1, ... argN) cmd, args = cmd_[0], cmd_[1:] if cmd.startswith('@'): # slow command, run in process cmd = cmd[1:] # strip @ proc = Process( target=run_command, name=cmd, args=(cmd, args, conn)) proc.start() logging.warn('Started %s%s in process %d', cmd, args, proc.pid) else: queue.put((conn, cmd, args)) finally: listener.close() self.thread.join()
[docs]def get_status(address=None): """ Check if the DbServer is up. :param address: pair (hostname, port) :returns: 'running' or 'not-running' """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: err = sock.connect_ex(address or config.DBS_ADDRESS) finally: sock.close() return 'not-running' if err else 'running'
[docs]def run_server(dbpathport=None, logfile=DATABASE['LOG'], loglevel='WARN'): """ Run the DbServer on the given database file and port. If not given, use the settings in openquake.cfg. """ if dbpathport: # assume a string of the form "dbpath:port" dbpath, port = dbpathport.split(':') addr = (DATABASE['HOST'], int(port)) DATABASE['NAME'] = dbpath DATABASE['PORT'] = int(port) else: addr = config.DBS_ADDRESS # create the db directory if needed dirname = os.path.dirname(DATABASE['NAME']) if not os.path.exists(dirname): os.makedirs(dirname) # create and upgrade the db if needed connection.cursor() # bind the db actions.upgrade_db() # configure logging and start the server logging.basicConfig(level=getattr(logging, loglevel), filename=logfile) DbServer(addr, config.DBS_AUTHKEY).loop()
parser = sap.Parser(run_server) parser.arg('dbpathport', 'dbpath:port') parser.arg('logfile', 'log file') parser.opt('loglevel', 'WARN or INFO') if __name__ == '__main__': parser.callfunc()