# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2016-2023 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importosimportsysimporttimeimportloggingimportgetpassimportthreadingimportsubprocessfromopenquake.baselibimport(config,zeromqasz,workerpoolasw,parallelasp)fromopenquake.baselib.generalimportsocket_ready,detach_processfromopenquake.hazardlibimportvalidfromopenquake.commonlibimportlogsfromopenquake.server.dbimportactionsfromopenquake.commonlib.dbapiimportdbfromopenquake.serverimport__file__asserver_path
[docs]classDbServer(object):""" A server collecting the received commands into a queue """def__init__(self,db,address,num_workers=5):self.db=dbself.frontend='tcp://%s:%s'%addressself.backend='inproc://dbworkers'self.num_workers=num_workersself.pid=os.getpid()
[docs]defdworker(self,sock):# a database worker responding to commandswithsock:forcmd_insock:cmd,args=cmd_[0],cmd_[1:]ifcmd=='getpid':sock.send(self.pid)continueelifcmd.startswith('workers_'):master=w.WorkerMaster(args[0])# zworkersmsg=getattr(master,cmd[8:])()sock.send(msg)continuetry:func=getattr(actions,cmd)exceptAttributeError:# SQL stringsock.send(p.safely_call(self.db,(cmd,)+args))else:# actionsock.send(p.safely_call(func,(self.db,)+args))
[docs]defstart(self):""" Start database worker threads """# give a nice name to the processw.setproctitle('oq-dbserver')dworkers=[]for_inrange(self.num_workers):sock=z.Socket(self.backend,z.zmq.REP,'connect')threading.Thread(target=self.dworker,args=(sock,)).start()dworkers.append(sock)logging.warning('DB server started with %s on %s, pid %d',sys.executable,self.frontend,self.pid)# start frontend->backend proxy for the database workerstry:z.zmq.proxy(z.bind(self.frontend,z.zmq.ROUTER),z.bind(self.backend,z.zmq.DEALER))except(KeyboardInterrupt,z.zmq.ContextTerminated):forsockindworkers:sock.running=Falseifhasattr(sock,'zsocket'):# actually usedsock.zsocket.close()logging.warning('DB server stopped')finally:self.stop()
[docs]defstop(self):""" Stop the DbServer """self.db.close()
[docs]defdifferent_paths(path1,path2):path1=os.path.realpath(path1)# expand symlinkspath2=os.path.realpath(path2)# expand symlinks# don't care about the extension (it may be .py or .pyc)returnos.path.splitext(path1)[0]!=os.path.splitext(path2)[0]
[docs]defget_status(address=None):""" Check if the DbServer is up. :param address: pair (hostname, port) :returns: 'running' or 'not-running' """address=addressorvalid.host_port()return'running'ifsocket_ready(address)else'not-running'
[docs]defcheck_foreign():""" Check if we the DbServer is the right one """ifnotconfig.multi_userandnotos.environ.get('OQ_DATABASE'):remote_server_path=logs.dbcmd('get_path')ifdifferent_paths(server_path,remote_server_path):return('You are trying to contact a DbServer from another'' instance (got %s, expected %s)\n''Check the configuration or stop the foreign'' DbServer instance')%(remote_server_path,server_path)
[docs]defensure_on():""" Start the DbServer if it is off """if(os.environ.get('OQ_DATABASE',config.dbserver.host)=='127.0.0.1'andgetpass.getuser()!='openquake'):print('Using local database')actions.upgrade_db(db)returnifget_status()=='not-running':ifconfig.multi_userandgetpass.getuser()!='openquake':sys.exit('Please start the DbServer: ''see the documentation for details')# otherwise start the DbServer automatically; NB: I tried to use# multiprocessing.Process(target=run_server).start() and apparently# it works, but then run-demos.sh hangs after the end of the first# calculation, but only if the DbServer is started by oq engine (!?)subprocess.Popen([sys.executable,'-m','openquake.commands','dbserver','start'])# wait for the dbserver to startwaiting_seconds=30whileget_status()=='not-running':ifwaiting_seconds==0:sys.exit('The DbServer cannot be started after 30 seconds. ''Please check the configuration')time.sleep(1)waiting_seconds-=1
[docs]defrun_server(dbhostport=None,loglevel='WARN',foreground=False):""" Run the DbServer on the given database file and port. If not given, use the settings in openquake.cfg. """# configure the logging first of alllogging.basicConfig(level=getattr(logging,loglevel.upper()))ifdbhostport:# assume a string of the form "dbhost:port"dbhost,port=dbhostport.split(':')addr=(dbhost,int(port))else:addr=(config.dbserver.host,config.dbserver.port)# create the db directory if neededdirname=os.path.dirname(os.path.expanduser(config.dbserver.file))ifnotos.path.exists(dirname):os.makedirs(dirname)# create and upgrade the db if neededdb('PRAGMA foreign_keys = ON')# honor ON DELETE CASCADEactions.upgrade_db(db)# the line below is needed to work around a very subtle bug of sqlite;# we need new connections, see https://github.com/gem/oq-engine/pull/3002db.close()# start the dbserverifhasattr(os,'fork')andnot(config.multi_userorforeground):# needed for https://github.com/gem/oq-engine/issues/3211# but only if multi_user = False, otherwise init/supervisor# will loose control of the processdetach_process()DbServer(db,addr).start()# expects to be killed with CTRL-C