from sqlobject import ForeignKey, IntCol, SQLMultipleJoin, SQLObject, \
StringCol, func
from sqlobject.views import ViewSQLObject
from sqlobject.tests.dbtest import inserts, setupClass
[docs]class PhoneNumber(SQLObject):
number = StringCol()
calls = SQLMultipleJoin('PhoneCall')
incoming = SQLMultipleJoin('PhoneCall', joinColumn='toID')
[docs]class PhoneCall(SQLObject):
phoneNumber = ForeignKey('PhoneNumber')
to = ForeignKey('PhoneNumber')
minutes = IntCol()
[docs]class ViewPhoneCall(ViewSQLObject):
class sqlmeta:
idName = PhoneCall.q.id
clause = PhoneCall.q.phoneNumberID == PhoneNumber.q.id
minutes = IntCol(dbName=PhoneCall.q.minutes)
number = StringCol(dbName=PhoneNumber.q.number)
phoneNumber = ForeignKey('PhoneNumber', dbName=PhoneNumber.q.id)
call = ForeignKey('PhoneCall', dbName=PhoneCall.q.id)
[docs]class ViewPhone(ViewSQLObject):
class sqlmeta:
idName = PhoneNumber.q.id
clause = PhoneCall.q.phoneNumberID == PhoneNumber.q.id
minutes = IntCol(dbName=func.SUM(PhoneCall.q.minutes))
numberOfCalls = IntCol(dbName=func.COUNT(PhoneCall.q.phoneNumberID))
number = StringCol(dbName=PhoneNumber.q.number)
phoneNumber = ForeignKey('PhoneNumber', dbName=PhoneNumber.q.id)
calls = SQLMultipleJoin('PhoneCall', joinColumn='phoneNumberID')
vCalls = SQLMultipleJoin('ViewPhoneCall', joinColumn='phoneNumberID',
orderBy='id')
[docs]class ViewPhoneMore(ViewSQLObject):
''' View on top of view '''
class sqlmeta:
idName = ViewPhone.q.id
clause = ViewPhone.q.id == PhoneCall.q.toID
number = StringCol(dbName=ViewPhone.q.number)
timesCalled = IntCol(dbName=func.COUNT(PhoneCall.q.toID))
timesCalledLong = IntCol(dbName=func.COUNT(PhoneCall.q.toID))
timesCalledLong.aggregateClause = PhoneCall.q.minutes > 10
minutesCalled = IntCol(dbName=func.SUM(PhoneCall.q.minutes))
[docs]class ViewPhoneMore2(ViewPhoneMore):
class sqlmeta:
table = 'vpm'
[docs]class ViewPhoneInnerAggregate(ViewPhone):
twiceMinutes = IntCol(dbName=func.SUM(PhoneCall.q.minutes) * 2)
[docs]def setup_module(mod):
global calls, phones, sqlrepr
setupClass([PhoneNumber, PhoneCall])
ViewPhoneCall._connection = PhoneNumber._connection
ViewPhone._connection = PhoneNumber._connection
ViewPhoneMore._connection = PhoneNumber._connection
phones = inserts(PhoneNumber, [('1234567890',), ('1111111111',)], 'number')
calls = inserts(PhoneCall, [(phones[0], phones[1], 5),
(phones[0], phones[1], 20),
(phones[1], phones[0], 10),
(phones[1], phones[0], 25)],
'phoneNumber to minutes')
sqlrepr = PhoneNumber._connection.sqlrepr
[docs]def testSimpleVPC():
assert hasattr(ViewPhoneCall, 'minutes')
assert hasattr(ViewPhoneCall, 'number')
assert hasattr(ViewPhoneCall, 'phoneNumberID')
[docs]def testColumnSQLVPC():
assert str(sqlrepr(ViewPhoneCall.q.id)) == 'view_phone_call.id'
assert str(sqlrepr(ViewPhoneCall.q.minutes)) == 'view_phone_call.minutes'
q = sqlrepr(ViewPhoneCall.q)
assert q.count('phone_call.minutes AS minutes')
assert q.count('phone_number.number AS number')
[docs]def testAliasOverride():
assert str(sqlrepr(ViewPhoneMore2.q.id)) == 'vpm.id'
[docs]def checkAttr(cls, id, attr, value):
assert getattr(cls.get(id), attr) == value
[docs]def testGetVPC():
checkAttr(ViewPhoneCall, calls[0].id, 'number',
calls[0].phoneNumber.number)
checkAttr(ViewPhoneCall, calls[0].id, 'minutes', calls[0].minutes)
checkAttr(ViewPhoneCall, calls[0].id, 'phoneNumber', calls[0].phoneNumber)
checkAttr(ViewPhoneCall, calls[2].id, 'number',
calls[2].phoneNumber.number)
checkAttr(ViewPhoneCall, calls[2].id, 'minutes', calls[2].minutes)
checkAttr(ViewPhoneCall, calls[2].id, 'phoneNumber', calls[2].phoneNumber)
[docs]def testGetVP():
checkAttr(ViewPhone, phones[0].id, 'number', phones[0].number)
checkAttr(ViewPhone, phones[0].id, 'minutes',
phones[0].calls.sum(PhoneCall.q.minutes))
checkAttr(ViewPhone, phones[0].id, 'phoneNumber', phones[0])
[docs]def testGetVPM():
checkAttr(ViewPhoneMore, phones[0].id, 'number', phones[0].number)
checkAttr(ViewPhoneMore, phones[0].id, 'minutesCalled',
phones[0].incoming.sum(PhoneCall.q.minutes))
checkAttr(ViewPhoneMore, phones[0].id, 'timesCalled',
phones[0].incoming.count())
checkAttr(ViewPhoneMore, phones[0].id, 'timesCalledLong',
phones[0].incoming.filter(PhoneCall.q.minutes > 10).count())
[docs]def testJoinView():
p = ViewPhone.get(phones[0].id)
assert p.calls.count() == 2
assert p.vCalls.count() == 2
assert p.vCalls[0] == ViewPhoneCall.get(calls[0].id)
[docs]def testInnerAggregate():
checkAttr(ViewPhoneInnerAggregate, phones[0].id, 'twiceMinutes',
phones[0].calls.sum(PhoneCall.q.minutes) * 2)
[docs]def testSelect():
s = ViewPhone.select()
assert s.count() == len(phones)
s = ViewPhoneCall.select()
assert s.count() == len(calls)
[docs]def testSelect2():
s = ViewPhone.select(ViewPhone.q.number == phones[0].number)
assert s.getOne().phoneNumber == phones[0]
[docs]def testDistinctCount():
# This test is for SelectResults non-* based count when distinct
# We're really just checking this doesn't raise anything
# due to lack of sqlrepr'ing.
assert ViewPhone.select(distinct=True).count() == 2