1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| import logging
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger('test')
import sys
PyVER = (sys.version_info.major, sys.version_info.minor)
PyV3 = PyVER >= (3, 1)
PyV2 = PyVER == (2, 7)
import sqlalchemy
assert sqlalchemy.__version__.startswith('0.8')
import inspect
def trace():
frames = inspect.getouterframes(inspect.currentframe())
for f in frames:
log.debug(str(f))
if PyV2:
import sqlalchemy.types as types
class MyString(types.TypeDecorator):
impl = types.String
def process_bind_param(self, value, dialect):
if value:
log.debug('*** process_bind_param: got type=%s' % type(value))
assert isinstance(value, str)
value = value.decode('utf-8')
trace()
return value
def process_result_value(self, value, dialect):
if value:
log.debug('*** process_result_value: got type=%s' % type(value))
assert isinstance(value, unicode)
value = value.encode('utf-8')
return value
if PyV3:
import sqlalchemy.dialects.sqlite as dialect
class MyString(dialect.VARCHAR):
def bind_processor(self, dialect):
impl = super().bind_processor(dialect) or (lambda value: value)
def process(value):
log.debug('*** bind_processor: got type=%s' % type(value))
value = impl(value)
log.debug('*** bind_processor: returns type=%s' % type(value))
return value
return process
def result_processor(self, dialect, coltype):
impl = super().result_processor(dialect, coltype) or (lambda value: value)
def process(value):
log.debug('*** result_processor: got type=%s' % type(value))
value = impl(value)
log.debug('*** result_processor: return type=%s' % type(value))
return value
return process
if __name__ == '__main__':
import os
from sqlalchemy import MetaData, Table, create_engine, Column, select, String
path = None
meta = MetaData()
if PyV2:
test_table = Table('test_table', meta,
Column('name', MyString(50)),
Column('data', String(255)))
elif PyV3:
test_table = Table('test_table', meta,
Column('name', MyString(50, convert_unicode='force')),
Column('data', String(255)))
if path is not None and os.path.exists(path):
os.remove(path)
else:
path = ':memory:'
engine = create_engine('sqlite:///%s' % path)
meta.bind = engine
test_table.create()
test_table.insert().execute(name='foo', data='some data')
rows = select(
[ test_table.c.name, test_table.c.data ]
).execute(
).fetchall()
for name, data in rows:
log.debug ('name: %s, data=%s' % (name, data)) |
Partager