# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2016-2023 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>."""\One of the worst thing about Python is the `DB API 2.0`_specification, which is unusable except for building frameworks. Itshould have been a stepping stone for an usable DB API 3.0 that neverhappened. So, instead of a good low level API, we had a proliferationof Object Relational Mappers making our lives a lot harder. Fortunately,there has always been good Pythonistas in the anti-ORM camp.This module is heavily inspired by the dbapiext_ module byMartin Blais, which is part of the antiorm_ package. The main (only)difference is that I am using the question mark (?) for the placeholdersinstead of the percent sign (%) to avoid confusions with other usagesof the %s, in particular in LIKE queries and in expressions like`strftime('%s', time)` used in SQLite.In less than 200 lines of code there is enough supportto build dynamic SQL queries and to make an ORM unneeded, sincewe do not need database independence... _DB API 2.0: https://www.python.org/dev/peps/pep-0249.. _dbapiext: http://furius.ca/pubcode/pub/antiorm/lib/python/dbapiext.py.html.. _antiorm: https://bitbucket.org/blais/antiormdbiapi tutorial--------------------------The only thing you must to know is the `Db` class, which is lazy wrapperover a database connection. You instantiate it by passing a connectionfunction and its arguments:>>> import sqlite3>>> db = Db(sqlite3.connect, ':memory:')Now you have an interface to your database, the `db` object. This objectis lazy, i.e. the connection is not yet instantiated, but it will bewhen you will access its `.conn` attribute. This attribute is automaticallyaccessed when you call the interface to run a query, for instance to createan empty table:>>> curs = db('CREATE TABLE job ('... 'id INTEGER PRIMARY KEY AUTOINCREMENT, value INTEGER)')You can populate the table by using the `.insert` method:>>> db.insert('job', ['value'], [(42,), (43,)]) # doctest: +ELLIPSIS<sqlite3.Cursor object at ...>Notice that this method returns a standard DB API 2.0 cursor andyou have access to all of its features: for instance here you could extractthe lastrowid.Then you can run SELECT queries:>>> rows = db('SELECT * FROM job')The dbapi provides a `Row` class which is used to hold the results ofSELECT queries and is working as one would expect:>>> rows[<Row(id=1, value=42)>, <Row(id=2, value=43)>]>>> tuple(rows[0])(1, 42)>>> rows[0].id1>>> rows[0].value42>>> rows[0]._fields['id', 'value']The queries can have different kind of `?` parameters:- `?s` is for interpolated string parameters: >>> db('SELECT * FROM ?s', 'job') # ?s is replaced by 'job' [<Row(id=1, value=42)>, <Row(id=2, value=43)>]- `?x` is for escaped parameters (to avoid SQL injection): >>> db('SELECT * FROM job WHERE id=?x', 1) # ?x is replaced by 1 [<Row(id=1, value=42)>]- `?s` and `?x` are for scalar parameters; `?S` and `?X` are for sequences: >>> db('INSERT INTO job (?S) VALUES (?X)', ['id', 'value'], (3, 44)) # doctest: +ELLIPSIS <sqlite3.Cursor object at ...>You can see how the interpolation works by calling the `expand` methodthat returns the interpolated template (alternatively, there is a`debug=True` flag when calling `db` that prints the same info). In this case>>> db.expand('INSERT INTO job (?S) VALUES (?X)', ['id', 'value'], [3, 44])'INSERT INTO job (id, value) VALUES (?, ?)'As you see, `?S` parameters work by replacing a list of strings with a commaseparated string, where `?X` parameters are replaced by a comma separatedsequence of question marks, i.e. the low level placeholder for SQLite.The interpolation performs a regular search and replace,so if you have a `?-` string in your template that must not be escaped,you can run into issues. This is an error:>>> match("SELECT * FROM job WHERE id=?x AND description='Lots of ?s'", 1)Traceback (most recent call last): ...ValueError: Incorrect number of ?-parameters in SELECT * FROM job WHERE id=?x AND description='Lots of ?s', expected 1This is correct:>>> match("SELECT * FROM job WHERE id=?x AND description=?x", 1, 'Lots of ?s')('SELECT * FROM job WHERE id=? AND description=?', (1, 'Lots of ?s'))There are three other `?` parameters:- `?D` is for dictionaries and it is used mostly in UPDATE queries: >>> match('UPDATE mytable SET ?D WHERE id=?x', dict(value=33, other=5), 1) ('UPDATE mytable SET other=?, value=? WHERE id=?', (5, 33, 1))- `?A` is for dictionaries and it is used in AND queries: >>> match('SELECT * FROM job WHERE ?A', dict(value=33, id=5)) ('SELECT * FROM job WHERE id=? AND value=?', (5, 33))- `?O` is for dictionaries and it is used in OR queries: >>> match('SELECT * FROM job WHERE ?O', dict(value=33, id=5)) ('SELECT * FROM job WHERE id=? OR value=?', (5, 33))The dictionary parameters are ordered per field name, just to makethe templates reproducible. `?A` and `?O` are smart enough totreat specially `None` parameters, that are turned into `NULL`: >>> match('SELECT * FROM job WHERE ?A', dict(value=None, id=5)) ('SELECT * FROM job WHERE id=? AND value IS NULL', (5,))The `?` parameters are matched positionally; it is also possible topass to the `db` object a few keyword arguments to tune the standardbehavior. In particular, if you know that a query must return asingle row you can do the following:>>> db('SELECT * FROM job WHERE id=?x', 1, one=True)<Row(id=1, value=42)>Without the `one=True` the query would have returned a list with a singleelement. If you know that the query must return a scalar you can do thefollowing:>>> db('SELECT value FROM job WHERE id=?x', 1, scalar=True)42If a query that should return a scalar returns something else, or if aquery that should return a row returns a different number of rows,appropriate errors are raised:>>> db('SELECT * FROM job WHERE id=?x', 1, scalar=True) # doctest: +IGNORE_EXCEPTION_DETAILTraceback (most recent call last): ...TooManyColumns: 2, expected 1>>> db('SELECT * FROM job', None, one=True) # doctest: +IGNORE_EXCEPTION_DETAILTraceback (most recent call last): ...TooManyRows: 3, expected 1If a row is expected but not found, a NotFound exception is raised:>>> db('SELECT * FROM job WHERE id=?x', None, one=True) # doctest: +IGNORE_EXCEPTION_DETAILTraceback (most recent call last): ...NotFound"""importosimportreimportsqlite3importthreadingimportcollectionsfromopenquake.baselibimportconfig
[docs]classNotFound(Exception):"""Raised when a scalar query has not output"""
[docs]classTooManyRows(Exception):"""Raised when a scalar query produces more than one row"""
[docs]classTooManyColumns(Exception):"""Raised when a scalar query has more than one column"""
class_Replacer(object):# helper class for the match function belowrx=re.compile(r'\?S|\?X|\?D|\?A|\?O|\?s|\?x')ph='?'def__init__(self,all_args):self.all_args=list(all_args)self.xargs=[]self.sargs=[]def__call__(self,mo):arg=self.all_args[0]# can raise an IndexErrordelself.all_args[0]placeholder=mo.group()ifplaceholder=='?X':self.xargs.extend(arg)return', '.join([self.ph]*len(arg))elifplaceholder=='?S':self.sargs.extend(arg)return', '.join(['{}']*len(arg))elifplaceholder=='?D':keys,values=zip(*sorted(arg.items()))self.sargs.extend(keys)self.xargs.extend(values)return', '.join(['{}='+self.ph]*len(arg))elifplaceholder=='?A':returnself.join(' AND ',arg)or'1'elifplaceholder=='?O':returnself.join(' OR ',arg)or'1'elifplaceholder=='?x':self.xargs.append(arg)returnself.phelifplaceholder=='?s':self.sargs.append(arg)return'{}'defjoin(self,sep,arg):ls=[]fornameinsorted(arg):self.sargs.append(name)value=arg[name]ifvalueisNone:ls.append('{} IS NULL')else:self.xargs.append(value)ls.append('{}='+self.ph)returnsep.join(ls)defmatch(self,m_templ):templ=self.rx.sub(self,m_templ)returntempl.format(*self.sargs),tuple(self.xargs)
[docs]defmatch(m_templ,*m_args):""" :param m_templ: a meta template string :param m_args: all arguments :returns: template, args Here is an example of usage: >>> match('SELECT * FROM job WHERE id=?x', 1) ('SELECT * FROM job WHERE id=?', (1,)) """# strip commented linesm_templ='\n'.join(lineforlineinm_templ.splitlines()ifnotline.lstrip().startswith('--'))ifnotm_args:returnm_templ,()try:return_Replacer(m_args).match(m_templ)exceptIndexError:raiseValueError('Incorrect number of ?-parameters in %s, expected %s'%(m_templ,len(m_args)))
[docs]classDb(object):""" A wrapper over a DB API 2 connection. See the tutorial. """def__init__(self,connect,*args,**kw):self.connect=connectself.args=argsself.kw=kwself.local=threading.local()
[docs]@classmethoddefexpand(cls,m_templ,*m_args):""" Performs partial interpolation of the template. Used for debugging. """returnmatch(m_templ,*m_args)[0]
@propertydefconn(self):try:returnself.local.connexceptAttributeError:dname=os.path.dirname(self.args[0])# empty for :memory:ifdnameandnotos.path.exists(dname):os.makedirs(dname)self.local.conn=self.connect(*self.args,**self.kw)# honor ON DELETE CASCADEself.local.conn.execute('PRAGMA foreign_keys = ON')returnself.local.conn@propertydefpath(self):"""Path to the underlying sqlite file"""returnself.args[0]def__enter__(self):returnselfdef__exit__(self,etype,exc,tb):ifetype:self.conn.rollback()else:self.conn.commit()def__call__(self,m_templ,*m_args,**kw):cursor=self.conn.cursor()templ,args=match(m_templ,*m_args)ifkw.get('debug'):print(templ)print('args = %s'%repr(args))try:ifargs:cursor.execute(templ,args)else:cursor.execute(templ)exceptExceptionasexc:raiseexc.__class__('%s: %s%s'%(exc,templ,args))iftempl.lstrip().lower().startswith(('select','pragma')):rows=cursor.fetchall()ifkw.get('scalar'):# scalar queryifnotrows:raiseNotFoundeliflen(rows)>1:raiseTooManyRows('%s, expected 1'%len(rows))eliflen(rows[0])>1:raiseTooManyColumns('%s, expected 1'%len(rows[0]))returnrows[0][0]elifkw.get('one'):# query returning a single rowifnotrows:raiseNotFound(args)eliflen(rows)>1:raiseTooManyRows('%s, expected 1'%len(rows))elifcursor.descriptionisNone:returncursorcolnames=[r[0]forrincursor.description]ifkw.get('one'):returnRow(colnames,rows[0])else:returnTable(colnames,rows)else:returncursor
[docs]definsert(self,table,columns,rows):""" Insert several rows with executemany. Return a cursor. """cursor=self.conn.cursor()iflen(rows):templ,_args=match('INSERT INTO ?s (?S) VALUES (?X)',table,columns,rows[0])cursor.executemany(templ,rows)returncursor
[docs]defclose(self):""" Close the main thread connection and refresh the threadlocal object """self.conn.close()self.local=threading.local()
[docs]classTable(list):"""Just a list of Rows with an attribute _fields"""def__init__(self,fields,rows):self._fields=fieldsforrowinrows:self.append(Row(fields,row))
# we cannot use a namedtuple here because one would get a PicklingError:# Can't pickle <class 'openquake.commonlib.dbapi.Row'>: attribute lookup# openquake.commonlib.dbapi.Row failed
[docs]classRow(collections.abc.Sequence):""" A pickleable row, working both as a tuple and an object: >>> row = Row(['id', 'value'], (1, 2)) >>> tuple(row) (1, 2) >>> assert row[0] == row.id and row[1] == row.value :param fields: a sequence of field names :param values: a sequence of values (one per field) """def__init__(self,fields,values):iflen(values)!=len(fields):raiseValueError('Got %d values, expected %d'%(len(values),len(fields)))self._fields=fieldsself._values=valuesforf,vinzip(fields,values):setattr(self,f,v)def__getitem__(self,i):returnself._values[i]def__len__(self):returnlen(self._values)def__repr__(self):items=['%s=%s'%(f,getattr(self,f))forfinself._fields]return'<Row(%s)>'%', '.join(items)
db=Db(sqlite3.connect,os.path.expanduser(config.dbserver.file),isolation_level=None,detect_types=sqlite3.PARSE_DECLTYPES,timeout=20)# NB: I am increasing the timeout from 5 to 20 seconds and the random# OperationalError: "database is locked" disappears in the WebUI tests