Initial support for pg8000

The major difference from psycopg2/cffi is that it doesn't have native
lobject support. This is easily worked around, but this draft is not
complete because it doesn't support chunks. Some refactoring will be
required to elegantly handle this.

Two minor differences:

- It doesn't support multiple statements in a single `cursor.execute()`
  call. Apparently it turns all statements into prepared statements,
  because the error is from the server. Easily worked around and
  unlikely to be a perfomance difference.
- It handles transaction isolation levels like most other drivers, in
  SQL. This could be handled more elegantly too.

One thing: The server spits out lots of "WARNING: not in a transaction".
If I enable statement logging, I see every `commit` immediately followed
by a `rollback`, which generates the warning. I'm not sure if that's
just us, or something the driver is doing differently (e.g, maybe
psycopg2/cffi always follows `commit` with `begin`? and then when we try
to `rollback` we're already in a transaction?). Annoying but harmless.
Still, would like to figure it out though.

Testing this pure-python driver on Python 3 also revealed a lot of
connection leaks, many of which I've fixed, but there are probably more.
This should be a win for PyPy. See also zopefoundation/ZODB#78.

The testing matrix is expanded to test the pure-python drivers on
CPython too and to test pg8000. (Travis will be the first time I run
psycopg2 tests; hope I didn't break anything!)

Fixes #85.
This commit is contained in:
Jason Madden 2016-07-01 17:10:36 -05:00
parent 76168254d9
commit 4ba2b252d5
No known key found for this signature in database
GPG Key ID: 349F84431A08B99E
18 changed files with 224 additions and 43 deletions

View File

@ -15,11 +15,14 @@ env:
matrix:
- ENV=mysql
- ENV=postgres
- ENV=pypostgres
- ENV=pymysql
matrix:
fast_finish: true
script:
# coverage slows PyPy down from 2minutes to 12+.
- if [[ $TRAVIS_PYTHON_VERSION == 'pypy' ]]; then python -m relstorage.tests.alltests -v; fi
# But don't run the pymysql/pypy tests twice.
- if [[ $TRAVIS_PYTHON_VERSION == 'pypy' -a $ENV != 'pymysql' ]]; then python -m relstorage.tests.alltests -v; fi
- if [[ $TRAVIS_PYTHON_VERSION != 'pypy' ]]; then coverage run -m relstorage.tests.alltests -v; fi
after_success:
- coveralls

10
.travis/mysql.sh Executable file
View File

@ -0,0 +1,10 @@
mysql -uroot -e "CREATE USER 'relstoragetest'@'localhost' IDENTIFIED BY 'relstoragetest';"
mysql -uroot -e "CREATE DATABASE relstoragetest;"
mysql -uroot -e "GRANT ALL ON relstoragetest.* TO 'relstoragetest'@'localhost';"
mysql -uroot -e "CREATE DATABASE relstoragetest2;"
mysql -uroot -e "GRANT ALL ON relstoragetest2.* TO 'relstoragetest'@'localhost';"
mysql -uroot -e "CREATE DATABASE relstoragetest_hf;"
mysql -uroot -e "GRANT ALL ON relstoragetest_hf.* TO 'relstoragetest'@'localhost';"
mysql -uroot -e "CREATE DATABASE relstoragetest2_hf;"
mysql -uroot -e "GRANT ALL ON relstoragetest2_hf.* TO 'relstoragetest'@'localhost';"
mysql -uroot -e "FLUSH PRIVILEGES;"

5
.travis/postgres.sh Executable file
View File

@ -0,0 +1,5 @@
psql -U postgres -c "CREATE USER relstoragetest WITH PASSWORD 'relstoragetest';"
psql -U postgres -c "CREATE DATABASE relstoragetest OWNER relstoragetest;"
psql -U postgres -c "CREATE DATABASE relstoragetest2 OWNER relstoragetest;"
psql -U postgres -c "CREATE DATABASE relstoragetest_hf OWNER relstoragetest;"
psql -U postgres -c "CREATE DATABASE relstoragetest2_hf OWNER relstoragetest;"

View File

@ -1,11 +1,2 @@
pip install -U -e ".[mysql]"
mysql -uroot -e "CREATE USER 'relstoragetest'@'localhost' IDENTIFIED BY 'relstoragetest';"
mysql -uroot -e "CREATE DATABASE relstoragetest;"
mysql -uroot -e "GRANT ALL ON relstoragetest.* TO 'relstoragetest'@'localhost';"
mysql -uroot -e "CREATE DATABASE relstoragetest2;"
mysql -uroot -e "GRANT ALL ON relstoragetest2.* TO 'relstoragetest'@'localhost';"
mysql -uroot -e "CREATE DATABASE relstoragetest_hf;"
mysql -uroot -e "GRANT ALL ON relstoragetest_hf.* TO 'relstoragetest'@'localhost';"
mysql -uroot -e "CREATE DATABASE relstoragetest2_hf;"
mysql -uroot -e "GRANT ALL ON relstoragetest2_hf.* TO 'relstoragetest'@'localhost';"
mysql -uroot -e "FLUSH PRIVILEGES;"
`dirname $0`/mysql.sh

0
.travis/setup-oracle.sh Normal file → Executable file
View File

View File

@ -1,6 +1,2 @@
pip install -U -e ".[postgresql]"
psql -U postgres -c "CREATE USER relstoragetest WITH PASSWORD 'relstoragetest';"
psql -U postgres -c "CREATE DATABASE relstoragetest OWNER relstoragetest;"
psql -U postgres -c "CREATE DATABASE relstoragetest2 OWNER relstoragetest;"
psql -U postgres -c "CREATE DATABASE relstoragetest_hf OWNER relstoragetest;"
psql -U postgres -c "CREATE DATABASE relstoragetest2_hf OWNER relstoragetest;"
`dirname`/postgres.sh

2
.travis/setup-pymysql.sh Executable file
View File

@ -0,0 +1,2 @@
pip install -U pymysql
`dirname $0`/mysql.sh

2
.travis/setup-pypostgres.sh Executable file
View File

@ -0,0 +1,2 @@
pip install -U pg8000
`dirname`/postgres.sh

View File

@ -30,7 +30,8 @@ PostgreSQL Adapter Options
The PostgreSQL adapter accepts:
driver
Either "psycopg2" or "psycopg2cffi".
Either "psycopg2" or "psycopg2cffi" for the native libpg based
drivers. "pg8000" is a pure-python driver suitable for use with gevent.
dsn
Specifies the data source name for connecting to PostgreSQL.

View File

@ -32,8 +32,8 @@ database.
On CPython2, install psycopg2 2.6.1+, mysqlclient 1.3.7+, or cx_Oracle
5.2+; PyMySQL and umysql are also known to work. For CPython3, install
psycopg2, mysqlclient 1.3.7+ or cx_Oracle. On PyPy, install
5.2+; PyMySQL 0.7 and umysql are also known to work. For CPython3, install
psycopg2, mysqlclient 1.3.7+ or cx_Oracle; PyMySQL is also known to work. On PyPy, install
psycopg2cffi 2.7.4+ or PyMySQL 0.6.6+ (PyPy will generally work with
psycopg2 and mysqlclient, but it will be *much* slower; cx_Oracle is
untested on PyPy).
@ -45,12 +45,13 @@ adapter:
Platform MySQL PostgreSQL Oracle
======== ================ ================ ======
CPython2 MySQL-python; **psycopg2**; **cx_Oracle**
**mysqlclient**; psycopg2cffi
PyMySQL;
**mysqlclient**; psycopg2cffi;
PyMySQL; pg8000
umysql
CPython3 **mysqlclient**; **psycopg2** **cx_Oracle**
PyMySQL
PyMySQL pg8000
PyPy **PyMySQL** **psycopg2cffi** <untested>
pg8000
======== ================ ================ ======
Memcache Integration

View File

@ -56,7 +56,10 @@ else:
Binary = psycopg2.Binary
connect = _create_connection(psycopg2)
extensions = psycopg2.extensions
# extensions
ISOLATION_LEVEL_READ_COMMITTED = psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED
ISOLATION_LEVEL_SERIALIZABLE = psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
driver = Psycopg2Driver()
driver_map[driver.__name__] = driver
@ -79,7 +82,9 @@ else: # pragma: no cover
Binary = psycopg2cffi.Binary
connect = _create_connection(psycopg2cffi)
extensions = psycopg2cffi.extensions
# extensions
ISOLATION_LEVEL_READ_COMMITTED = psycopg2cffi.extensions.ISOLATION_LEVEL_READ_COMMITTED
ISOLATION_LEVEL_SERIALIZABLE = psycopg2cffi.extensions.ISOLATION_LEVEL_SERIALIZABLE
driver = Psycopg2cffiDriver()
@ -91,6 +96,86 @@ else: # pragma: no cover
del driver
del psycopg2cffi
try:
import pg8000
except ImportError:
pass
else:
import traceback
class _ConnWrapper(object): # pragma: no cover
def __init__(self, conn):
self.__conn = conn
self.__type = type(conn)
self.__at = ''.join(traceback.format_stack())
def __getattr__(self, name):
return getattr(self.__conn, name)
def __setattr__(self, name, value):
if name in ('_ConnWrapper__conn', '_ConnWrapper__at', '_ConnWrapper__type'):
object.__setattr__(self, name, value)
return
return setattr(self.__conn, name, value)
def cursor(self):
return _ConnWrapper(self.__conn.cursor())
def __iter__(self):
return self.__conn.__iter__()
def close(self):
if self.__conn is None:
return
try:
self.__conn.close()
finally:
self.__conn = None
def __del__(self):
if self.__conn is not None:
print("Failed to close", self, self.__type, " from:", self.__at, file=sys.stderr)
print("Deleted at", ''.join(traceback.format_stack()))
@implementer(IDBDriver)
class PG8000Driver(object):
__name__ = 'pg8000'
disconnected_exceptions, close_exceptions, lock_exceptions = _standard_exceptions(pg8000)
# XXX TEsting
disconnected_exceptions += (AttributeError,)
use_replica_exceptions = (pg8000.OperationalError,)
Binary = staticmethod(pg8000.Binary)
_connect = staticmethod(pg8000.connect)
_wrap = False
def connect(self, dsn):
# Parse the DSN into parts to pass as keywords.
# TODO: We can do this is psycopg2 as well.
kwds = {}
parts = dsn.split(' ')
for part in parts:
key, value = part.split('=')
value = value.strip("'\"")
if key == 'dbname':
key = 'database'
kwds[key] = value
conn = self._connect(**kwds)
return _ConnWrapper(conn) if self._wrap else conn
ISOLATION_LEVEL_READ_COMMITTED = 'ISOLATION LEVEL READ COMMITTED'
ISOLATION_LEVEL_SERIALIZABLE = 'ISOLATION LEVEL SERIALIZABLE'
# XXX: global side-effect!
pg8000.paramstyle = 'pyformat'
driver = PG8000Driver()
driver_map[driver.__name__] = driver
if not preferred_driver_name:
preferred_driver_name = driver.__name__
if os.environ.get("RS_PG_DRIVER"): # pragma: no cover
preferred_driver_name = os.environ["RS_PG_DRIVER"]
print("Forcing postgres driver to ", preferred_driver_name)

View File

@ -64,10 +64,20 @@ class PostgreSQLLocker(Locker):
LOCK TABLE commit_lock IN EXCLUSIVE MODE%s;
LOCK TABLE object_state IN SHARE MODE
""" % (timeout_stmt, nowait and ' NOWAIT' or '',)
cursor.execute(stmt)
for s in stmt.splitlines():
if not s.strip():
continue
cursor.execute(s)
else:
cursor.execute("%sLOCK TABLE commit_lock IN EXCLUSIVE MODE%s" %
(timeout_stmt, nowait and ' NOWAIT' or '',))
stmt = """
%s
LOCK TABLE commit_lock IN EXCLUSIVE MODE%s
""" % (timeout_stmt, ' NOWAIT' if nowait else '')
for s in stmt.splitlines():
if not s.strip():
continue
cursor.execute(s)
except self.lock_exceptions:
if nowait:
return False

View File

@ -391,23 +391,26 @@ class ObjectMover(object):
def postgresql_on_store_opened(self, cursor, restart=False):
"""Create the temporary tables for storing objects"""
# note that the md5 column is not used if self.keep_history == False.
stmt = """
stmts = ["""
CREATE TEMPORARY TABLE temp_store (
zoid BIGINT NOT NULL,
prev_tid BIGINT NOT NULL,
md5 CHAR(32),
state BYTEA
) ON COMMIT DROP;
) ON COMMIT DROP;""",
"""
CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid);
""",
"""
CREATE TEMPORARY TABLE temp_blob_chunk (
zoid BIGINT NOT NULL,
chunk_num BIGINT NOT NULL,
chunk OID
) ON COMMIT DROP;
) ON COMMIT DROP;""",
"""
CREATE UNIQUE INDEX temp_blob_chunk_key
ON temp_blob_chunk (zoid, chunk_num);
ON temp_blob_chunk (zoid, chunk_num);""",
"""
-- This trigger removes blobs that get replaced before being
-- moved to blob_chunk. Note that it is never called when
-- the temp_blob_chunk table is being dropped or truncated.
@ -415,8 +418,9 @@ class ObjectMover(object):
BEFORE DELETE ON temp_blob_chunk
FOR EACH ROW
EXECUTE PROCEDURE temp_blob_chunk_delete_trigger();
"""
cursor.execute(stmt)
""",]
for stmt in stmts:
cursor.execute(stmt)
@metricmethod_sampled
def mysql_on_store_opened(self, cursor, restart=False):
@ -949,13 +953,15 @@ class ObjectMover(object):
# nothing needs to be updated
return
params = {'tid': tid}
cursor.execute("""
-- Insert objects created in this transaction into current_object.
INSERT INTO current_object (zoid, tid)
SELECT zoid, tid FROM object_state
WHERE tid = %(tid)s
AND prev_tid = 0;
AND prev_tid = 0;""", params)
cursor.execute("""
-- Change existing objects. To avoid deadlocks,
-- update in OID order.
UPDATE current_object SET tid = %(tid)s
@ -965,7 +971,7 @@ class ObjectMover(object):
AND prev_tid != 0
ORDER BY zoid
)
""", {'tid': tid})
""", params)
@metricmethod_sampled
def mysql_update_current(self, cursor, tid):
@ -1033,6 +1039,17 @@ class ObjectMover(object):
try:
cursor.execute(stmt, (oid, tid))
for chunk_num, loid in cursor.fetchall():
if not hasattr(cursor.connection, 'lobject'):
# pg8000, etc
# TODO:We're not chunking these.
assert chunk_num == 0
cursor.execute("SELECT lo_get(%(oid)s)", {'oid': loid})
row = cursor.fetchone()
data = row[0]
with open(filename, 'wb') as f:
f.write(data)
return len(data)
blob = cursor.connection.lobject(loid, 'rb')
if chunk_num == 0:
@ -1191,6 +1208,28 @@ class ObjectMover(object):
VALUES (%(oid)s, %(chunk_num)s, %(loid)s)
"""
if not hasattr(cursor.connection, 'lobject'):
# Not on pg8000.
# TODO: Optimize this and chunk it.
# TODO: We can easily emulate the psycopg2 interface.
with open(filename, 'rb') as f:
data = f.read()
params = {'oid': oid, 'data': self.Binary(data)}
if use_tid:
insert_stmt = """
INSERT INTO blob_chunk (zoid, tid, chunk_num, chunk)
SELECT %(oid)s, %(tid)s, 0, lo_from_bytea(0, %(data)s)
"""
params['tid'] = tid
else:
insert_stmt = """
INSERT INTO temp_blob_chunk (zoid, chunk_num, chunk)
SELECT %(oid)s, 0, lo_from_bytea(0, %(data)s)
"""
cursor.execute(insert_stmt, params)
return
blob = None
maxsize = self.postgresql_blob_chunk_maxsize

View File

@ -147,8 +147,8 @@ class Psycopg2ConnectionManager(AbstractConnectionManager):
self.disconnected_exceptions = driver.disconnected_exceptions
self.close_exceptions = driver.close_exceptions
self.use_replica_exceptions = driver.use_replica_exceptions
self.isolation_read_committed = driver.extensions.ISOLATION_LEVEL_READ_COMMITTED
self.isolation_serializable = driver.extensions.ISOLATION_LEVEL_SERIALIZABLE
self.isolation_read_committed = driver.ISOLATION_LEVEL_READ_COMMITTED
self.isolation_serializable = driver.ISOLATION_LEVEL_SERIALIZABLE
self.keep_history = options.keep_history
self._db_connect = driver.connect
super(Psycopg2ConnectionManager, self).__init__(options)
@ -183,10 +183,21 @@ class Psycopg2ConnectionManager(AbstractConnectionManager):
while True:
try:
conn = self._db_connect(dsn)
conn.set_isolation_level(isolation)
isolation_set = False
if hasattr(conn, 'set_isolation_level'):
# psycopg2 family
conn.set_isolation_level(isolation)
isolation_set = True
cursor = conn.cursor()
if not isolation_set:
cursor.execute('SET TRANSACTION %s' % isolation)
cursor.execute("SET SESSION CHARACTERISTICS AS TRANSACTION %s" % isolation)
conn.commit()
cursor.arraysize = 64
conn.replica = replica
assert not conn.autocommit
#print("Opened", conn, cursor)
return conn, cursor
except self.use_replica_exceptions as e:
if replica is not None:

View File

@ -290,7 +290,12 @@ class RelStorage(UndoLogCompatible,
# Implement this method of MVCCAdapterInstance
# (possibly destined for IMVCCStorage) as a small optimization
# in ZODB5 that can eventually simplify ZODB.Connection.Connection
return self.HistoricalStorageAdapter(self.new_instance(), before)
# XXX: 5.0a2 doesn't forward the release method, so we leak
# open connections.
i = self.new_instance()
x = self.HistoricalStorageAdapter(i, before)
x.release = i.release
return x
@property
@ -327,6 +332,7 @@ class RelStorage(UndoLogCompatible,
The first two function parameters are the load connection and cursor.
"""
if self._load_cursor is None:
assert self._load_conn is None
need_restart = False
self._open_load_connection()
else:
@ -350,6 +356,7 @@ class RelStorage(UndoLogCompatible,
def _open_store_connection(self):
"""Open the store connection to the database. Return nothing."""
assert self._store_conn is None
conn, cursor = self._adapter.connmanager.open_for_store()
self._drop_store_connection()
self._store_conn, self._store_cursor = conn, cursor
@ -363,6 +370,7 @@ class RelStorage(UndoLogCompatible,
def _restart_store(self):
"""Restart the store connection, creating a new connection if needed"""
if self._store_cursor is None:
assert self._store_conn is None
self._open_store_connection()
return
try:
@ -417,6 +425,7 @@ class RelStorage(UndoLogCompatible,
connections.
"""
with self._lock:
#print("Dropping", self._load_conn, self._store_conn)
self._drop_load_connection()
self._drop_store_connection()
self._cache.release()
@ -1506,8 +1515,16 @@ class TransactionIterator(object):
self._index = 0
def close(self):
if self._closed:
return
self._adapter.connmanager.close(self._conn, self._cursor)
self._closed = True
self._conn = None
self._cursor = None
def __del__(self):
# belt-and-suspenders, effective on CPython
self.close()
def iterator(self):
return self
@ -1523,10 +1540,11 @@ class TransactionIterator(object):
return next(self)
def next(self):
if self._index >= len(self._transactions):
self.close() # Don't leak our connection
raise StorageStopIteration()
if self._closed:
raise IOError("TransactionIterator already closed")
if self._index >= len(self._transactions):
raise StorageStopIteration()
params = self._transactions[self._index]
res = RelStorageTransactionRecord(self, *params)
self._index += 1

View File

@ -362,3 +362,6 @@ class UndoableRecoveryStorage(BasicRecoveryStorage):
# part of the test.
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
c.close()
db.close()

View File

@ -286,6 +286,8 @@ class HistoryPreservingRelStorageTests(
transaction1.abort()
eq(historical_conn.root()['first']['count'], 0)
historical_conn.close()
conn.close()
db.close()
def checkImplementsExternalGC(self):

View File

@ -291,6 +291,8 @@ class GenericRelStorageTests(
inst = zodb_unpickle(data)
self.assertEqual(inst._value, 5)
finally:
storage1.close()
storage2.close()
self._storage = root_storage
def check16KObject(self):