Source code for sqlobject.tests.test_select

import pytest
from sqlobject import IntCol, LIKE, RLIKE, SQLObject, \
    SQLObjectIntegrityError, SQLObjectNotFound, StringCol
from sqlobject.sqlbuilder import func
from .dbtest import raises, setupClass, supports, setSQLiteConnectionFactory


[docs]class IterTest(SQLObject): name = StringCol(dbName='name_col', length=200)
names = ('a', 'b', 'c')
[docs]def setupIter(): setupClass(IterTest) for n in names: IterTest(name=n)
[docs]def test_00_normal(): setupIter() count = 0 for test in IterTest.select(): count += 1 assert count == len(names)
[docs]def test_00b_lazy(): setupIter() count = 0 for test in IterTest.select(lazyColumns=True): count += 1 assert count == len(names)
[docs]def test_01_turn_to_list(): count = 0 for test in list(IterTest.select()): count += 1 assert count == len(names)
[docs]def test_02_generator(): all = IterTest.select() count = 0 for i, test in enumerate(all): count += 1 assert count == len(names)
[docs]def test_03_ranged_indexed(): all = IterTest.select() count = 0 for i in range(all.count()): all[i] # test it's there count += 1 assert count == len(names)
[docs]def test_04_indexed_ended_by_exception(): if not supports('limitSelect'): pytest.skip("limitSelect isn't supported") all = IterTest.select() count = 0 try: while 1: all[count] count += 1 # Stop the test if it's gone on too long if count > len(names): break except IndexError: pass assert count == len(names)
[docs]def test_05_select_limit(): setupIter() assert len(list(IterTest.select(limit=2))) == 2 raises(AssertionError, IterTest.select(limit=2).count)
[docs]def test_06_contains(): setupIter() assert len(list(IterTest.select(IterTest.q.name.startswith('a')))) == 1 assert len(list(IterTest.select(IterTest.q.name.contains('a')))) == 1 assert len(list(IterTest.select( IterTest.q.name.contains(func.lower('A'))))) == 1 assert len(list(IterTest.select(IterTest.q.name.contains("a'b")))) == 0 assert len(list(IterTest.select(IterTest.q.name.endswith('a')))) == 1
[docs]def test_07_contains_special(): setupClass(IterTest) a = IterTest(name='\\test') b = IterTest(name='100%') c = IterTest(name='test_') assert list(IterTest.select(IterTest.q.name.startswith('\\'))) == [a] assert list(IterTest.select(IterTest.q.name.contains('%'))) == [b] assert list(IterTest.select(IterTest.q.name.endswith('_'))) == [c]
[docs]def test_select_getOne(): setupClass(IterTest) a = IterTest(name='a') b = IterTest(name='b') assert IterTest.selectBy(name='a').getOne() == a assert IterTest.select(IterTest.q.name == 'b').getOne() == b assert IterTest.selectBy(name='c').getOne(None) is None raises(SQLObjectNotFound, IterTest.selectBy(name="c").getOne) IterTest(name='b') raises(SQLObjectIntegrityError, IterTest.selectBy(name="b").getOne) raises(SQLObjectIntegrityError, IterTest.selectBy(name="b").getOne, None)
[docs]def test_selectBy(): setupClass(IterTest) IterTest(name='a') IterTest(name='b') assert IterTest.selectBy().count() == 2
[docs]def test_selectBy_kwargs(): setupClass(IterTest) raises(TypeError, IterTest, nonexistant='b')
[docs]class UniqTest(SQLObject): name = StringCol(dbName='name_col', unique=True, length=100)
[docs]def test_by_uniq(): setupClass(UniqTest) a = UniqTest(name='a') b = UniqTest(name='b') assert UniqTest.byName('a') is a assert UniqTest.byName('b') is b
[docs]class Counter2(SQLObject): n1 = IntCol(notNull=True) n2 = IntCol(notNull=True)
[docs]class TestSelect:
[docs] def setup_method(self, meth): setupClass(Counter2) for i in range(10): for j in range(10): Counter2(n1=i, n2=j)
[docs] def counterEqual(self, counters, value): assert [(c.n1, c.n2) for c in counters] == value
[docs] def accumulateEqual(self, func, counters, value): assert func([c.n1 for c in counters]) == value
[docs] def test_1(self): self.accumulateEqual(sum, Counter2.select(orderBy='n1'), sum(range(10)) * 10)
[docs] def test_2(self): self.accumulateEqual(len, Counter2.select('all'), 100)
[docs]def test_select_LIKE(): setupClass(IterTest) IterTest(name='sqlobject') IterTest(name='sqlbuilder') assert IterTest.select(LIKE(IterTest.q.name, "sql%")).count() == 2 assert IterTest.select(LIKE(IterTest.q.name, "sqlb%")).count() == 1 assert IterTest.select(LIKE(IterTest.q.name, "sqlb%")).count() == 1 assert IterTest.select(LIKE(IterTest.q.name, "sqlx%")).count() == 0
[docs]def test_select_RLIKE(): if not supports('rlike'): pytest.skip("rlike isn't supported") setupClass(IterTest) if IterTest._connection.dbName == "sqlite": # Implement regexp() function for SQLite import re def regexp(regexp, test): return bool(re.search(regexp, test)) def SQLiteConnectionFactory(sqlite): class MyConnection(sqlite.Connection): def __init__(self, *args, **kwargs): super(MyConnection, self).__init__(*args, **kwargs) self.create_function("regexp", 2, regexp) return MyConnection setSQLiteConnectionFactory(IterTest, SQLiteConnectionFactory) IterTest(name='sqlobject') IterTest(name='sqlbuilder') assert IterTest.select(RLIKE(IterTest.q.name, "^sql.*$")).count() == 2 assert IterTest.select(RLIKE(IterTest.q.name, "^sqlb.*$")).count() == 1 assert IterTest.select(RLIKE(IterTest.q.name, "^sqlb.*$")).count() == 1 assert IterTest.select(RLIKE(IterTest.q.name, "^sqlx.*$")).count() == 0
[docs]def test_select_sqlbuilder(): setupClass(IterTest) IterTest(name='sqlobject') IterTest.select(IterTest.q.name == u'sqlobject')
[docs]def test_select_perConnection(): setupClass(IterTest) IterTest(name='a') assert not IterTest.select().getOne().sqlmeta._perConnection