from array import array
import datetime
from decimal import Decimal
import time
import sys
from .compat import PY2, buffer_type
if PY2:
from types import ClassType, InstanceType, NoneType
else:
# Use suitable aliases for now
ClassType = type
NoneType = type(None)
# This is may not be what we want in all cases, but will do for now
InstanceType = object
try:
from mx.DateTime import DateTimeType as mxDateTimeType, \
DateTimeDeltaType as mxDateTimeDeltaType
except ImportError:
mxDateTimeType = None
mxDateTimeDeltaType = None
try:
import pendulum
except ImportError:
pendulumDateTimeType = None
else:
pendulumDateTimeType = pendulum.DateTime
try:
from DateTime import DateTime as zopeDateTime
except ImportError:
zopeDateTimeType = None
else:
zopeDateTimeType = type(zopeDateTime())
try:
import Sybase
NumericType = Sybase.NumericType
except ImportError:
NumericType = None
########################################
# Quoting
########################################
sqlStringReplace = [
("'", "''"),
('\\', '\\\\'),
('\000', '\\0'),
('\b', '\\b'),
('\n', '\\n'),
('\r', '\\r'),
('\t', '\\t'),
]
[docs]class ConverterRegistry:
def __init__(self):
self.basic = {}
self.klass = {}
[docs] def registerConverter(self, typ, func):
if type(typ) is ClassType:
self.klass[typ] = func
else:
self.basic[typ] = func
if PY2:
def lookupConverter(self, value, default=None):
if type(value) is InstanceType:
# lookup on klasses dict
return self.klass.get(value.__class__, default)
return self.basic.get(type(value), default)
else:
[docs] def lookupConverter(self, value, default=None):
# python 3 doesn't have classic classes, so everything's
# in self.klass due to comparison order in registerConvertor
return self.klass.get(value.__class__, default)
converters = ConverterRegistry()
registerConverter = converters.registerConverter
lookupConverter = converters.lookupConverter
[docs]def StringLikeConverter(value, db):
if isinstance(value, array):
try:
value = value.tounicode()
except ValueError:
value = value.tostring()
elif isinstance(value, buffer_type):
value = str(value)
if db in ('mysql', 'postgres'):
for orig, repl in sqlStringReplace:
value = value.replace(orig, repl)
elif db in ('sqlite', 'firebird', 'sybase', 'maxdb', 'mssql'):
value = value.replace("'", "''")
else:
assert 0, "Database %s unknown" % db
if (db == 'postgres') and ('\\' in value):
return "E'%s'" % value
return "'%s'" % value
registerConverter(str, StringLikeConverter)
if PY2:
# noqa for flake8 & python3
registerConverter(unicode, StringLikeConverter) # noqa
registerConverter(array, StringLikeConverter)
if PY2:
registerConverter(bytearray, StringLikeConverter)
registerConverter(buffer_type, StringLikeConverter)
else:
registerConverter(memoryview, StringLikeConverter)
[docs]def IntConverter(value, db):
return repr(int(value))
registerConverter(int, IntConverter)
[docs]def LongConverter(value, db):
return str(value)
if sys.version_info[0] < 3:
# noqa for flake8 & python3
registerConverter(long, LongConverter) # noqa
if NumericType:
registerConverter(NumericType, IntConverter)
[docs]def BoolConverter(value, db):
if db == 'postgres':
if value:
return "'t'"
else:
return "'f'"
else:
if value:
return '1'
else:
return '0'
registerConverter(bool, BoolConverter)
[docs]def FloatConverter(value, db):
return repr(value)
registerConverter(float, FloatConverter)
[docs]def NoneConverter(value, db):
return "NULL"
registerConverter(NoneType, NoneConverter)
[docs]def SequenceConverter(value, db):
return "(%s)" % ", ".join([sqlrepr(v, db) for v in value])
registerConverter(tuple, SequenceConverter)
registerConverter(list, SequenceConverter)
registerConverter(dict, SequenceConverter)
registerConverter(set, SequenceConverter)
registerConverter(frozenset, SequenceConverter)
if hasattr(time, 'struct_time'):
[docs] def StructTimeConverter(value, db):
return time.strftime("'%Y-%m-%d %H:%M:%S'", value)
registerConverter(time.struct_time, StructTimeConverter)
[docs]def DateTimeConverter(value, db):
return "'%04d-%02d-%02d %02d:%02d:%02d'" % (
value.year, value.month, value.day,
value.hour, value.minute, value.second)
[docs]def DateTimeConverterMS(value, db):
return "'%04d-%02d-%02d %02d:%02d:%02d.%06d'" % (
value.year, value.month, value.day,
value.hour, value.minute, value.second, value.microsecond)
registerConverter(datetime.datetime, DateTimeConverterMS)
[docs]def DateConverter(value, db):
return "'%04d-%02d-%02d'" % (value.year, value.month, value.day)
registerConverter(datetime.date, DateConverter)
[docs]def TimeConverter(value, db):
return "'%02d:%02d:%02d'" % (value.hour, value.minute, value.second)
[docs]def TimeConverterMS(value, db):
return "'%02d:%02d:%02d.%06d'" % (value.hour, value.minute,
value.second, value.microsecond)
registerConverter(datetime.time, TimeConverterMS)
if mxDateTimeType:
def mxDateTimeConverter(value, db):
return "'%s'" % value.strftime("%Y-%m-%d %H:%M:%S")
registerConverter(mxDateTimeType, mxDateTimeConverter)
def mxTimeConverter(value, db):
return "'%s'" % value.strftime("%H:%M:%S")
registerConverter(mxDateTimeDeltaType, mxTimeConverter)
if pendulumDateTimeType:
def pendulumConverter(value, db):
return "'%s'" % value.to_datetime_string()
registerConverter(pendulum.DateTime, pendulumConverter)
if zopeDateTimeType:
def zopeDateTimeConverter(value, db):
return "'%s'" % value.strftime("%Y-%m-%d %H:%M:%S")
registerConverter(zopeDateTimeType, zopeDateTimeConverter)
[docs]def DecimalConverter(value, db):
return value.to_eng_string()
registerConverter(Decimal, DecimalConverter)
[docs]def TimedeltaConverter(value, db):
return """INTERVAL '%d days %d seconds'""" % \
(value.days, value.seconds)
registerConverter(datetime.timedelta, TimedeltaConverter)
[docs]def sqlrepr(obj, db=None):
try:
reprFunc = obj.__sqlrepr__
except AttributeError:
converter = lookupConverter(obj)
if converter is None:
raise ValueError("Unknown SQL builtin type: %s for %s" %
(type(obj), repr(obj)))
return converter(obj, db)
else:
return reprFunc(db)
[docs]def quote_str(s, db):
if (db == 'postgres') and ('\\' in s):
return "E'%s'" % s
return "'%s'" % s
[docs]def unquote_str(s):
if s[:2].upper().startswith("E'") and s.endswith("'"):
return s[2:-1]
elif s.startswith("'") and s.endswith("'"):
return s[1:-1]
else:
return s