Source code for sqlobject.converters

from array import array
import datetime
from decimal import Decimal
import time
import sys
from .compat import PY2, buffer_type

if PY2:
    from types import ClassType, InstanceType, NoneType
else:
    # Use suitable aliases for now
    ClassType = type
    NoneType = type(None)
    # This is may not be what we want in all cases, but will do for now
    InstanceType = object


try:
    from mx.DateTime import DateTimeType, DateTimeDeltaType
except ImportError:
    try:
        from DateTime import DateTimeType, DateTimeDeltaType
    except ImportError:
        DateTimeType = None
        DateTimeDeltaType = None

try:
    import Sybase
    NumericType = Sybase.NumericType
except ImportError:
    NumericType = None


########################################
# Quoting
########################################

sqlStringReplace = [
    ("'", "''"),
    ('\\', '\\\\'),
    ('\000', '\\0'),
    ('\b', '\\b'),
    ('\n', '\\n'),
    ('\r', '\\r'),
    ('\t', '\\t'),
]


[docs]class ConverterRegistry: def __init__(self): self.basic = {} self.klass = {}
[docs] def registerConverter(self, typ, func): if type(typ) is ClassType: self.klass[typ] = func else: self.basic[typ] = func
if PY2: def lookupConverter(self, value, default=None): if type(value) is InstanceType: # lookup on klasses dict return self.klass.get(value.__class__, default) return self.basic.get(type(value), default) else:
[docs] def lookupConverter(self, value, default=None): # python 3 doesn't have classic classes, so everything's # in self.klass due to comparison order in registerConvertor return self.klass.get(value.__class__, default)
converters = ConverterRegistry() registerConverter = converters.registerConverter lookupConverter = converters.lookupConverter
[docs]def StringLikeConverter(value, db): if isinstance(value, array): try: value = value.tounicode() except ValueError: value = value.tostring() elif isinstance(value, buffer_type): value = str(value) if db in ('mysql', 'postgres', 'rdbhost'): for orig, repl in sqlStringReplace: value = value.replace(orig, repl) elif db in ('sqlite', 'firebird', 'sybase', 'maxdb', 'mssql'): value = value.replace("'", "''") else: assert 0, "Database %s unknown" % db if db in ('postgres', 'rdbhost') and ('\\' in value): return "E'%s'" % value return "'%s'" % value
registerConverter(str, StringLikeConverter) if PY2: # noqa for flake8 & python3 registerConverter(unicode, StringLikeConverter) # noqa registerConverter(array, StringLikeConverter) if PY2: registerConverter(bytearray, StringLikeConverter) registerConverter(buffer_type, StringLikeConverter) else: registerConverter(memoryview, StringLikeConverter)
[docs]def IntConverter(value, db): return repr(int(value))
registerConverter(int, IntConverter)
[docs]def LongConverter(value, db): return str(value)
if sys.version_info[0] < 3: # noqa for flake8 & python3 registerConverter(long, LongConverter) # noqa if NumericType: registerConverter(NumericType, IntConverter)
[docs]def BoolConverter(value, db): if db in ('postgres', 'rdbhost'): if value: return "'t'" else: return "'f'" else: if value: return '1' else: return '0'
registerConverter(bool, BoolConverter)
[docs]def FloatConverter(value, db): return repr(value)
registerConverter(float, FloatConverter) if DateTimeType:
[docs] def mxDateTimeConverter(value, db): return "'%s'" % value.strftime("%Y-%m-%d %H:%M:%S")
registerConverter(DateTimeType, mxDateTimeConverter)
[docs] def mxTimeConverter(value, db): return "'%s'" % value.strftime("%H:%M:%S")
registerConverter(DateTimeDeltaType, mxTimeConverter)
[docs]def NoneConverter(value, db): return "NULL"
registerConverter(NoneType, NoneConverter)
[docs]def SequenceConverter(value, db): return "(%s)" % ", ".join([sqlrepr(v, db) for v in value])
registerConverter(tuple, SequenceConverter) registerConverter(list, SequenceConverter) registerConverter(dict, SequenceConverter) registerConverter(set, SequenceConverter) registerConverter(frozenset, SequenceConverter) if hasattr(time, 'struct_time'):
[docs] def StructTimeConverter(value, db): return time.strftime("'%Y-%m-%d %H:%M:%S'", value)
registerConverter(time.struct_time, StructTimeConverter)
[docs]def DateTimeConverter(value, db): return "'%04d-%02d-%02d %02d:%02d:%02d'" % ( value.year, value.month, value.day, value.hour, value.minute, value.second)
[docs]def DateTimeConverterMS(value, db): return "'%04d-%02d-%02d %02d:%02d:%02d.%06d'" % ( value.year, value.month, value.day, value.hour, value.minute, value.second, value.microsecond)
registerConverter(datetime.datetime, DateTimeConverterMS)
[docs]def DateConverter(value, db): return "'%04d-%02d-%02d'" % (value.year, value.month, value.day)
registerConverter(datetime.date, DateConverter)
[docs]def TimeConverter(value, db): return "'%02d:%02d:%02d'" % (value.hour, value.minute, value.second)
[docs]def TimeConverterMS(value, db): return "'%02d:%02d:%02d.%06d'" % (value.hour, value.minute, value.second, value.microsecond)
registerConverter(datetime.time, TimeConverterMS)
[docs]def DecimalConverter(value, db): return value.to_eng_string()
registerConverter(Decimal, DecimalConverter)
[docs]def TimedeltaConverter(value, db): return """INTERVAL '%d days %d seconds'""" % \ (value.days, value.seconds)
registerConverter(datetime.timedelta, TimedeltaConverter)
[docs]def sqlrepr(obj, db=None): try: reprFunc = obj.__sqlrepr__ except AttributeError: converter = lookupConverter(obj) if converter is None: raise ValueError("Unknown SQL builtin type: %s for %s" % (type(obj), repr(obj))) return converter(obj, db) else: return reprFunc(db)
[docs]def quote_str(s, db): if db in ('postgres', 'rdbhost') and ('\\' in s): return "E'%s'" % s return "'%s'" % s
[docs]def unquote_str(s): if s[:2].upper().startswith("E'") and s.endswith("'"): return s[2:-1] elif s.startswith("'") and s.endswith("'"): return s[1:-1] else: return s