split platform

This commit is contained in:
j 2016-02-06 15:07:19 +05:30
commit a2291f58b8
278 changed files with 114251 additions and 0 deletions

View file

@ -0,0 +1,9 @@
from sqlalchemy.testing.suite.test_ddl import *
from sqlalchemy.testing.suite.test_insert import *
from sqlalchemy.testing.suite.test_sequence import *
from sqlalchemy.testing.suite.test_select import *
from sqlalchemy.testing.suite.test_results import *
from sqlalchemy.testing.suite.test_update_delete import *
from sqlalchemy.testing.suite.test_reflection import *
from sqlalchemy.testing.suite.test_types import *

View file

@ -0,0 +1,65 @@
from .. import fixtures, config, util
from ..config import requirements
from ..assertions import eq_
from sqlalchemy import Table, Column, Integer, String
class TableDDLTest(fixtures.TestBase):
__backend__ = True
def _simple_fixture(self):
return Table('test_table', self.metadata,
Column('id', Integer, primary_key=True,
autoincrement=False),
Column('data', String(50))
)
def _underscore_fixture(self):
return Table('_test_table', self.metadata,
Column('id', Integer, primary_key=True,
autoincrement=False),
Column('_data', String(50))
)
def _simple_roundtrip(self, table):
with config.db.begin() as conn:
conn.execute(table.insert().values((1, 'some data')))
result = conn.execute(table.select())
eq_(
result.first(),
(1, 'some data')
)
@requirements.create_table
@util.provide_metadata
def test_create_table(self):
table = self._simple_fixture()
table.create(
config.db, checkfirst=False
)
self._simple_roundtrip(table)
@requirements.drop_table
@util.provide_metadata
def test_drop_table(self):
table = self._simple_fixture()
table.create(
config.db, checkfirst=False
)
table.drop(
config.db, checkfirst=False
)
@requirements.create_table
@util.provide_metadata
def test_underscore_names(self):
table = self._underscore_fixture()
table.create(
config.db, checkfirst=False
)
self._simple_roundtrip(table)
__all__ = ('TableDDLTest', )

View file

@ -0,0 +1,231 @@
from .. import fixtures, config
from ..config import requirements
from .. import exclusions
from ..assertions import eq_
from .. import engines
from sqlalchemy import Integer, String, select, util
from ..schema import Table, Column
class LastrowidTest(fixtures.TablesTest):
run_deletes = 'each'
__backend__ = True
__requires__ = 'implements_get_lastrowid', 'autoincrement_insert'
__engine_options__ = {"implicit_returning": False}
@classmethod
def define_tables(cls, metadata):
Table('autoinc_pk', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('data', String(50))
)
Table('manual_pk', metadata,
Column('id', Integer, primary_key=True, autoincrement=False),
Column('data', String(50))
)
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
eq_(
row,
(config.db.dialect.default_sequence_base, "some data")
)
def test_autoincrement_on_insert(self):
config.db.execute(
self.tables.autoinc_pk.insert(),
data="some data"
)
self._assert_round_trip(self.tables.autoinc_pk, config.db)
def test_last_inserted_id(self):
r = config.db.execute(
self.tables.autoinc_pk.insert(),
data="some data"
)
pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
eq_(
r.inserted_primary_key,
[pk]
)
# failed on pypy1.9 but seems to be OK on pypy 2.1
# @exclusions.fails_if(lambda: util.pypy,
# "lastrowid not maintained after "
# "connection close")
@requirements.dbapi_lastrowid
def test_native_lastrowid_autoinc(self):
r = config.db.execute(
self.tables.autoinc_pk.insert(),
data="some data"
)
lastrowid = r.lastrowid
pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
eq_(
lastrowid, pk
)
class InsertBehaviorTest(fixtures.TablesTest):
run_deletes = 'each'
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table('autoinc_pk', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('data', String(50))
)
Table('manual_pk', metadata,
Column('id', Integer, primary_key=True, autoincrement=False),
Column('data', String(50))
)
def test_autoclose_on_insert(self):
if requirements.returning.enabled:
engine = engines.testing_engine(
options={'implicit_returning': False})
else:
engine = config.db
r = engine.execute(
self.tables.autoinc_pk.insert(),
data="some data"
)
assert r.closed
assert r.is_insert
assert not r.returns_rows
@requirements.returning
def test_autoclose_on_insert_implicit_returning(self):
r = config.db.execute(
self.tables.autoinc_pk.insert(),
data="some data"
)
assert r.closed
assert r.is_insert
assert not r.returns_rows
@requirements.empty_inserts
def test_empty_insert(self):
r = config.db.execute(
self.tables.autoinc_pk.insert(),
)
assert r.closed
r = config.db.execute(
self.tables.autoinc_pk.select().
where(self.tables.autoinc_pk.c.id != None)
)
assert len(r.fetchall())
@requirements.insert_from_select
def test_insert_from_select(self):
table = self.tables.manual_pk
config.db.execute(
table.insert(),
[
dict(id=1, data="data1"),
dict(id=2, data="data2"),
dict(id=3, data="data3"),
]
)
config.db.execute(
table.insert(inline=True).
from_select(("id", "data",),
select([table.c.id + 5, table.c.data]).
where(table.c.data.in_(["data2", "data3"]))
),
)
eq_(
config.db.execute(
select([table.c.data]).order_by(table.c.data)
).fetchall(),
[("data1", ), ("data2", ), ("data2", ),
("data3", ), ("data3", )]
)
class ReturningTest(fixtures.TablesTest):
run_create_tables = 'each'
__requires__ = 'returning', 'autoincrement_insert'
__backend__ = True
__engine_options__ = {"implicit_returning": True}
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
eq_(
row,
(config.db.dialect.default_sequence_base, "some data")
)
@classmethod
def define_tables(cls, metadata):
Table('autoinc_pk', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('data', String(50))
)
@requirements.fetch_rows_post_commit
def test_explicit_returning_pk_autocommit(self):
engine = config.db
table = self.tables.autoinc_pk
r = engine.execute(
table.insert().returning(
table.c.id),
data="some data"
)
pk = r.first()[0]
fetched_pk = config.db.scalar(select([table.c.id]))
eq_(fetched_pk, pk)
def test_explicit_returning_pk_no_autocommit(self):
engine = config.db
table = self.tables.autoinc_pk
with engine.begin() as conn:
r = conn.execute(
table.insert().returning(
table.c.id),
data="some data"
)
pk = r.first()[0]
fetched_pk = config.db.scalar(select([table.c.id]))
eq_(fetched_pk, pk)
def test_autoincrement_on_insert_implcit_returning(self):
config.db.execute(
self.tables.autoinc_pk.insert(),
data="some data"
)
self._assert_round_trip(self.tables.autoinc_pk, config.db)
def test_last_inserted_id_implicit_returning(self):
r = config.db.execute(
self.tables.autoinc_pk.insert(),
data="some data"
)
pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
eq_(
r.inserted_primary_key,
[pk]
)
__all__ = ('LastrowidTest', 'InsertBehaviorTest', 'ReturningTest')

View file

@ -0,0 +1,536 @@
import sqlalchemy as sa
from sqlalchemy import exc as sa_exc
from sqlalchemy import types as sql_types
from sqlalchemy import inspect
from sqlalchemy import MetaData, Integer, String
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.testing import engines, fixtures
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.testing import eq_, assert_raises_message
from sqlalchemy import testing
from .. import config
import operator
from sqlalchemy.schema import DDL, Index
from sqlalchemy import event
metadata, users = None, None
class HasTableTest(fixtures.TablesTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table('test_table', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(50))
)
def test_has_table(self):
with config.db.begin() as conn:
assert config.db.dialect.has_table(conn, "test_table")
assert not config.db.dialect.has_table(conn, "nonexistent_table")
class ComponentReflectionTest(fixtures.TablesTest):
run_inserts = run_deletes = None
__backend__ = True
@classmethod
def define_tables(cls, metadata):
cls.define_reflected_tables(metadata, None)
if testing.requires.schemas.enabled:
cls.define_reflected_tables(metadata, "test_schema")
@classmethod
def define_reflected_tables(cls, metadata, schema):
if schema:
schema_prefix = schema + "."
else:
schema_prefix = ""
if testing.requires.self_referential_foreign_keys.enabled:
users = Table('users', metadata,
Column('user_id', sa.INT, primary_key=True),
Column('test1', sa.CHAR(5), nullable=False),
Column('test2', sa.Float(5), nullable=False),
Column('parent_user_id', sa.Integer,
sa.ForeignKey('%susers.user_id' %
schema_prefix)),
schema=schema,
test_needs_fk=True,
)
else:
users = Table('users', metadata,
Column('user_id', sa.INT, primary_key=True),
Column('test1', sa.CHAR(5), nullable=False),
Column('test2', sa.Float(5), nullable=False),
schema=schema,
test_needs_fk=True,
)
Table("dingalings", metadata,
Column('dingaling_id', sa.Integer, primary_key=True),
Column('address_id', sa.Integer,
sa.ForeignKey('%semail_addresses.address_id' %
schema_prefix)),
Column('data', sa.String(30)),
schema=schema,
test_needs_fk=True,
)
Table('email_addresses', metadata,
Column('address_id', sa.Integer),
Column('remote_user_id', sa.Integer,
sa.ForeignKey(users.c.user_id)),
Column('email_address', sa.String(20)),
sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'),
schema=schema,
test_needs_fk=True,
)
if testing.requires.index_reflection.enabled:
cls.define_index(metadata, users)
if testing.requires.view_column_reflection.enabled:
cls.define_views(metadata, schema)
@classmethod
def define_index(cls, metadata, users):
Index("users_t_idx", users.c.test1, users.c.test2)
Index("users_all_idx", users.c.user_id, users.c.test2, users.c.test1)
@classmethod
def define_views(cls, metadata, schema):
for table_name in ('users', 'email_addresses'):
fullname = table_name
if schema:
fullname = "%s.%s" % (schema, table_name)
view_name = fullname + '_v'
query = "CREATE VIEW %s AS SELECT * FROM %s" % (
view_name, fullname)
event.listen(
metadata,
"after_create",
DDL(query)
)
event.listen(
metadata,
"before_drop",
DDL("DROP VIEW %s" % view_name)
)
@testing.requires.schema_reflection
def test_get_schema_names(self):
insp = inspect(testing.db)
self.assert_('test_schema' in insp.get_schema_names())
@testing.requires.schema_reflection
def test_dialect_initialize(self):
engine = engines.testing_engine()
assert not hasattr(engine.dialect, 'default_schema_name')
inspect(engine)
assert hasattr(engine.dialect, 'default_schema_name')
@testing.requires.schema_reflection
def test_get_default_schema_name(self):
insp = inspect(testing.db)
eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
@testing.provide_metadata
def _test_get_table_names(self, schema=None, table_type='table',
order_by=None):
meta = self.metadata
users, addresses, dingalings = self.tables.users, \
self.tables.email_addresses, self.tables.dingalings
insp = inspect(meta.bind)
if table_type == 'view':
table_names = insp.get_view_names(schema)
table_names.sort()
answer = ['email_addresses_v', 'users_v']
eq_(sorted(table_names), answer)
else:
table_names = insp.get_table_names(schema,
order_by=order_by)
if order_by == 'foreign_key':
answer = ['users', 'email_addresses', 'dingalings']
eq_(table_names, answer)
else:
answer = ['dingalings', 'email_addresses', 'users']
eq_(sorted(table_names), answer)
@testing.requires.table_reflection
def test_get_table_names(self):
self._test_get_table_names()
@testing.requires.table_reflection
@testing.requires.foreign_key_constraint_reflection
def test_get_table_names_fks(self):
self._test_get_table_names(order_by='foreign_key')
@testing.requires.table_reflection
@testing.requires.schemas
def test_get_table_names_with_schema(self):
self._test_get_table_names('test_schema')
@testing.requires.view_column_reflection
def test_get_view_names(self):
self._test_get_table_names(table_type='view')
@testing.requires.view_column_reflection
@testing.requires.schemas
def test_get_view_names_with_schema(self):
self._test_get_table_names('test_schema', table_type='view')
@testing.requires.table_reflection
@testing.requires.view_column_reflection
def test_get_tables_and_views(self):
self._test_get_table_names()
self._test_get_table_names(table_type='view')
def _test_get_columns(self, schema=None, table_type='table'):
meta = MetaData(testing.db)
users, addresses, dingalings = self.tables.users, \
self.tables.email_addresses, self.tables.dingalings
table_names = ['users', 'email_addresses']
if table_type == 'view':
table_names = ['users_v', 'email_addresses_v']
insp = inspect(meta.bind)
for table_name, table in zip(table_names, (users,
addresses)):
schema_name = schema
cols = insp.get_columns(table_name, schema=schema_name)
self.assert_(len(cols) > 0, len(cols))
# should be in order
for i, col in enumerate(table.columns):
eq_(col.name, cols[i]['name'])
ctype = cols[i]['type'].__class__
ctype_def = col.type
if isinstance(ctype_def, sa.types.TypeEngine):
ctype_def = ctype_def.__class__
# Oracle returns Date for DateTime.
if testing.against('oracle') and ctype_def \
in (sql_types.Date, sql_types.DateTime):
ctype_def = sql_types.Date
# assert that the desired type and return type share
# a base within one of the generic types.
self.assert_(len(set(ctype.__mro__).
intersection(ctype_def.__mro__).
intersection([
sql_types.Integer,
sql_types.Numeric,
sql_types.DateTime,
sql_types.Date,
sql_types.Time,
sql_types.String,
sql_types._Binary,
])) > 0, '%s(%s), %s(%s)' %
(col.name, col.type, cols[i]['name'], ctype))
if not col.primary_key:
assert cols[i]['default'] is None
@testing.requires.table_reflection
def test_get_columns(self):
self._test_get_columns()
@testing.provide_metadata
def _type_round_trip(self, *types):
t = Table('t', self.metadata,
*[
Column('t%d' % i, type_)
for i, type_ in enumerate(types)
]
)
t.create()
return [
c['type'] for c in
inspect(self.metadata.bind).get_columns('t')
]
@testing.requires.table_reflection
def test_numeric_reflection(self):
for typ in self._type_round_trip(
sql_types.Numeric(18, 5),
):
assert isinstance(typ, sql_types.Numeric)
eq_(typ.precision, 18)
eq_(typ.scale, 5)
@testing.requires.table_reflection
def test_varchar_reflection(self):
typ = self._type_round_trip(sql_types.String(52))[0]
assert isinstance(typ, sql_types.String)
eq_(typ.length, 52)
@testing.requires.table_reflection
@testing.provide_metadata
def test_nullable_reflection(self):
t = Table('t', self.metadata,
Column('a', Integer, nullable=True),
Column('b', Integer, nullable=False))
t.create()
eq_(
dict(
(col['name'], col['nullable'])
for col in inspect(self.metadata.bind).get_columns('t')
),
{"a": True, "b": False}
)
@testing.requires.table_reflection
@testing.requires.schemas
def test_get_columns_with_schema(self):
self._test_get_columns(schema='test_schema')
@testing.requires.view_column_reflection
def test_get_view_columns(self):
self._test_get_columns(table_type='view')
@testing.requires.view_column_reflection
@testing.requires.schemas
def test_get_view_columns_with_schema(self):
self._test_get_columns(schema='test_schema', table_type='view')
@testing.provide_metadata
def _test_get_pk_constraint(self, schema=None):
meta = self.metadata
users, addresses = self.tables.users, self.tables.email_addresses
insp = inspect(meta.bind)
users_cons = insp.get_pk_constraint(users.name, schema=schema)
users_pkeys = users_cons['constrained_columns']
eq_(users_pkeys, ['user_id'])
addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
addr_pkeys = addr_cons['constrained_columns']
eq_(addr_pkeys, ['address_id'])
with testing.requires.reflects_pk_names.fail_if():
eq_(addr_cons['name'], 'email_ad_pk')
@testing.requires.primary_key_constraint_reflection
def test_get_pk_constraint(self):
self._test_get_pk_constraint()
@testing.requires.table_reflection
@testing.requires.primary_key_constraint_reflection
@testing.requires.schemas
def test_get_pk_constraint_with_schema(self):
self._test_get_pk_constraint(schema='test_schema')
@testing.requires.table_reflection
@testing.provide_metadata
def test_deprecated_get_primary_keys(self):
meta = self.metadata
users = self.tables.users
insp = Inspector(meta.bind)
assert_raises_message(
sa_exc.SADeprecationWarning,
"Call to deprecated method get_primary_keys."
" Use get_pk_constraint instead.",
insp.get_primary_keys, users.name
)
@testing.provide_metadata
def _test_get_foreign_keys(self, schema=None):
meta = self.metadata
users, addresses, dingalings = self.tables.users, \
self.tables.email_addresses, self.tables.dingalings
insp = inspect(meta.bind)
expected_schema = schema
# users
if testing.requires.self_referential_foreign_keys.enabled:
users_fkeys = insp.get_foreign_keys(users.name,
schema=schema)
fkey1 = users_fkeys[0]
with testing.requires.named_constraints.fail_if():
self.assert_(fkey1['name'] is not None)
eq_(fkey1['referred_schema'], expected_schema)
eq_(fkey1['referred_table'], users.name)
eq_(fkey1['referred_columns'], ['user_id', ])
if testing.requires.self_referential_foreign_keys.enabled:
eq_(fkey1['constrained_columns'], ['parent_user_id'])
# addresses
addr_fkeys = insp.get_foreign_keys(addresses.name,
schema=schema)
fkey1 = addr_fkeys[0]
with testing.requires.named_constraints.fail_if():
self.assert_(fkey1['name'] is not None)
eq_(fkey1['referred_schema'], expected_schema)
eq_(fkey1['referred_table'], users.name)
eq_(fkey1['referred_columns'], ['user_id', ])
eq_(fkey1['constrained_columns'], ['remote_user_id'])
@testing.requires.foreign_key_constraint_reflection
def test_get_foreign_keys(self):
self._test_get_foreign_keys()
@testing.requires.foreign_key_constraint_reflection
@testing.requires.schemas
def test_get_foreign_keys_with_schema(self):
self._test_get_foreign_keys(schema='test_schema')
@testing.provide_metadata
def _test_get_indexes(self, schema=None):
meta = self.metadata
users, addresses, dingalings = self.tables.users, \
self.tables.email_addresses, self.tables.dingalings
# The database may decide to create indexes for foreign keys, etc.
# so there may be more indexes than expected.
insp = inspect(meta.bind)
indexes = insp.get_indexes('users', schema=schema)
expected_indexes = [
{'unique': False,
'column_names': ['test1', 'test2'],
'name': 'users_t_idx'},
{'unique': False,
'column_names': ['user_id', 'test2', 'test1'],
'name': 'users_all_idx'}
]
index_names = [d['name'] for d in indexes]
for e_index in expected_indexes:
assert e_index['name'] in index_names
index = indexes[index_names.index(e_index['name'])]
for key in e_index:
eq_(e_index[key], index[key])
@testing.requires.index_reflection
def test_get_indexes(self):
self._test_get_indexes()
@testing.requires.index_reflection
@testing.requires.schemas
def test_get_indexes_with_schema(self):
self._test_get_indexes(schema='test_schema')
@testing.requires.unique_constraint_reflection
def test_get_unique_constraints(self):
self._test_get_unique_constraints()
@testing.requires.unique_constraint_reflection
@testing.requires.schemas
def test_get_unique_constraints_with_schema(self):
self._test_get_unique_constraints(schema='test_schema')
@testing.provide_metadata
def _test_get_unique_constraints(self, schema=None):
uniques = sorted(
[
{'name': 'unique_a', 'column_names': ['a']},
{'name': 'unique_a_b_c', 'column_names': ['a', 'b', 'c']},
{'name': 'unique_c_a_b', 'column_names': ['c', 'a', 'b']},
{'name': 'unique_asc_key', 'column_names': ['asc', 'key']},
],
key=operator.itemgetter('name')
)
orig_meta = self.metadata
table = Table(
'testtbl', orig_meta,
Column('a', sa.String(20)),
Column('b', sa.String(30)),
Column('c', sa.Integer),
# reserved identifiers
Column('asc', sa.String(30)),
Column('key', sa.String(30)),
schema=schema
)
for uc in uniques:
table.append_constraint(
sa.UniqueConstraint(*uc['column_names'], name=uc['name'])
)
orig_meta.create_all()
inspector = inspect(orig_meta.bind)
reflected = sorted(
inspector.get_unique_constraints('testtbl', schema=schema),
key=operator.itemgetter('name')
)
for orig, refl in zip(uniques, reflected):
eq_(orig, refl)
@testing.provide_metadata
def _test_get_view_definition(self, schema=None):
meta = self.metadata
users, addresses, dingalings = self.tables.users, \
self.tables.email_addresses, self.tables.dingalings
view_name1 = 'users_v'
view_name2 = 'email_addresses_v'
insp = inspect(meta.bind)
v1 = insp.get_view_definition(view_name1, schema=schema)
self.assert_(v1)
v2 = insp.get_view_definition(view_name2, schema=schema)
self.assert_(v2)
@testing.requires.view_reflection
def test_get_view_definition(self):
self._test_get_view_definition()
@testing.requires.view_reflection
@testing.requires.schemas
def test_get_view_definition_with_schema(self):
self._test_get_view_definition(schema='test_schema')
@testing.only_on("postgresql", "PG specific feature")
@testing.provide_metadata
def _test_get_table_oid(self, table_name, schema=None):
meta = self.metadata
users, addresses, dingalings = self.tables.users, \
self.tables.email_addresses, self.tables.dingalings
insp = inspect(meta.bind)
oid = insp.get_table_oid(table_name, schema)
self.assert_(isinstance(oid, int))
def test_get_table_oid(self):
self._test_get_table_oid('users')
@testing.requires.schemas
def test_get_table_oid_with_schema(self):
self._test_get_table_oid('users', schema='test_schema')
@testing.requires.table_reflection
@testing.provide_metadata
def test_autoincrement_col(self):
"""test that 'autoincrement' is reflected according to sqla's policy.
Don't mark this test as unsupported for any backend !
(technically it fails with MySQL InnoDB since "id" comes before "id2")
A backend is better off not returning "autoincrement" at all,
instead of potentially returning "False" for an auto-incrementing
primary key column.
"""
meta = self.metadata
insp = inspect(meta.bind)
for tname, cname in [
('users', 'user_id'),
('email_addresses', 'address_id'),
('dingalings', 'dingaling_id'),
]:
cols = insp.get_columns(tname)
id_ = dict((c['name'], c) for c in cols)[cname]
assert id_.get('autoincrement', True)
__all__ = ('ComponentReflectionTest', 'HasTableTest')

View file

@ -0,0 +1,220 @@
from .. import fixtures, config
from ..config import requirements
from .. import exclusions
from ..assertions import eq_
from .. import engines
from sqlalchemy import Integer, String, select, util, sql, DateTime
import datetime
from ..schema import Table, Column
class RowFetchTest(fixtures.TablesTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table('plain_pk', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(50))
)
Table('has_dates', metadata,
Column('id', Integer, primary_key=True),
Column('today', DateTime)
)
@classmethod
def insert_data(cls):
config.db.execute(
cls.tables.plain_pk.insert(),
[
{"id": 1, "data": "d1"},
{"id": 2, "data": "d2"},
{"id": 3, "data": "d3"},
]
)
config.db.execute(
cls.tables.has_dates.insert(),
[
{"id": 1, "today": datetime.datetime(2006, 5, 12, 12, 0, 0)}
]
)
def test_via_string(self):
row = config.db.execute(
self.tables.plain_pk.select().
order_by(self.tables.plain_pk.c.id)
).first()
eq_(
row['id'], 1
)
eq_(
row['data'], "d1"
)
def test_via_int(self):
row = config.db.execute(
self.tables.plain_pk.select().
order_by(self.tables.plain_pk.c.id)
).first()
eq_(
row[0], 1
)
eq_(
row[1], "d1"
)
def test_via_col_object(self):
row = config.db.execute(
self.tables.plain_pk.select().
order_by(self.tables.plain_pk.c.id)
).first()
eq_(
row[self.tables.plain_pk.c.id], 1
)
eq_(
row[self.tables.plain_pk.c.data], "d1"
)
@requirements.duplicate_names_in_cursor_description
def test_row_with_dupe_names(self):
result = config.db.execute(
select([self.tables.plain_pk.c.data,
self.tables.plain_pk.c.data.label('data')]).
order_by(self.tables.plain_pk.c.id)
)
row = result.first()
eq_(result.keys(), ['data', 'data'])
eq_(row, ('d1', 'd1'))
def test_row_w_scalar_select(self):
"""test that a scalar select as a column is returned as such
and that type conversion works OK.
(this is half a SQLAlchemy Core test and half to catch database
backends that may have unusual behavior with scalar selects.)
"""
datetable = self.tables.has_dates
s = select([datetable.alias('x').c.today]).as_scalar()
s2 = select([datetable.c.id, s.label('somelabel')])
row = config.db.execute(s2).first()
eq_(row['somelabel'], datetime.datetime(2006, 5, 12, 12, 0, 0))
class PercentSchemaNamesTest(fixtures.TablesTest):
"""tests using percent signs, spaces in table and column names.
This is a very fringe use case, doesn't work for MySQL
or Postgresql. the requirement, "percent_schema_names",
is marked "skip" by default.
"""
__requires__ = ('percent_schema_names', )
__backend__ = True
@classmethod
def define_tables(cls, metadata):
cls.tables.percent_table = Table('percent%table', metadata,
Column("percent%", Integer),
Column(
"spaces % more spaces", Integer),
)
cls.tables.lightweight_percent_table = sql.table(
'percent%table', sql.column("percent%"),
sql.column("spaces % more spaces")
)
def test_single_roundtrip(self):
percent_table = self.tables.percent_table
for params in [
{'percent%': 5, 'spaces % more spaces': 12},
{'percent%': 7, 'spaces % more spaces': 11},
{'percent%': 9, 'spaces % more spaces': 10},
{'percent%': 11, 'spaces % more spaces': 9}
]:
config.db.execute(percent_table.insert(), params)
self._assert_table()
def test_executemany_roundtrip(self):
percent_table = self.tables.percent_table
config.db.execute(
percent_table.insert(),
{'percent%': 5, 'spaces % more spaces': 12}
)
config.db.execute(
percent_table.insert(),
[{'percent%': 7, 'spaces % more spaces': 11},
{'percent%': 9, 'spaces % more spaces': 10},
{'percent%': 11, 'spaces % more spaces': 9}]
)
self._assert_table()
def _assert_table(self):
percent_table = self.tables.percent_table
lightweight_percent_table = self.tables.lightweight_percent_table
for table in (
percent_table,
percent_table.alias(),
lightweight_percent_table,
lightweight_percent_table.alias()):
eq_(
list(
config.db.execute(
table.select().order_by(table.c['percent%'])
)
),
[
(5, 12),
(7, 11),
(9, 10),
(11, 9)
]
)
eq_(
list(
config.db.execute(
table.select().
where(table.c['spaces % more spaces'].in_([9, 10])).
order_by(table.c['percent%']),
)
),
[
(9, 10),
(11, 9)
]
)
row = config.db.execute(table.select().
order_by(table.c['percent%'])).first()
eq_(row['percent%'], 5)
eq_(row['spaces % more spaces'], 12)
eq_(row[table.c['percent%']], 5)
eq_(row[table.c['spaces % more spaces']], 12)
config.db.execute(
percent_table.update().values(
{percent_table.c['spaces % more spaces']: 15}
)
)
eq_(
list(
config.db.execute(
percent_table.
select().
order_by(percent_table.c['percent%'])
)
),
[(5, 15), (7, 15), (9, 15), (11, 15)]
)

View file

@ -0,0 +1,87 @@
from .. import fixtures, config
from ..assertions import eq_
from sqlalchemy import util
from sqlalchemy import Integer, String, select, func
from ..schema import Table, Column
class OrderByLabelTest(fixtures.TablesTest):
"""Test the dialect sends appropriate ORDER BY expressions when
labels are used.
This essentially exercises the "supports_simple_order_by_label"
setting.
"""
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table("some_table", metadata,
Column('id', Integer, primary_key=True),
Column('x', Integer),
Column('y', Integer),
Column('q', String(50)),
Column('p', String(50))
)
@classmethod
def insert_data(cls):
config.db.execute(
cls.tables.some_table.insert(),
[
{"id": 1, "x": 1, "y": 2, "q": "q1", "p": "p3"},
{"id": 2, "x": 2, "y": 3, "q": "q2", "p": "p2"},
{"id": 3, "x": 3, "y": 4, "q": "q3", "p": "p1"},
]
)
def _assert_result(self, select, result):
eq_(
config.db.execute(select).fetchall(),
result
)
def test_plain(self):
table = self.tables.some_table
lx = table.c.x.label('lx')
self._assert_result(
select([lx]).order_by(lx),
[(1, ), (2, ), (3, )]
)
def test_composed_int(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label('lx')
self._assert_result(
select([lx]).order_by(lx),
[(3, ), (5, ), (7, )]
)
def test_composed_multiple(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label('lx')
ly = (func.lower(table.c.q) + table.c.p).label('ly')
self._assert_result(
select([lx, ly]).order_by(lx, ly.desc()),
[(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
)
def test_plain_desc(self):
table = self.tables.some_table
lx = table.c.x.label('lx')
self._assert_result(
select([lx]).order_by(lx.desc()),
[(3, ), (2, ), (1, )]
)
def test_composed_int_desc(self):
table = self.tables.some_table
lx = (table.c.x + table.c.y).label('lx')
self._assert_result(
select([lx]).order_by(lx.desc()),
[(7, ), (5, ), (3, )]
)

View file

@ -0,0 +1,126 @@
from .. import fixtures, config
from ..config import requirements
from ..assertions import eq_
from ... import testing
from ... import Integer, String, Sequence, schema
from ..schema import Table, Column
class SequenceTest(fixtures.TablesTest):
__requires__ = ('sequences',)
__backend__ = True
run_create_tables = 'each'
@classmethod
def define_tables(cls, metadata):
Table('seq_pk', metadata,
Column('id', Integer, Sequence('tab_id_seq'), primary_key=True),
Column('data', String(50))
)
Table('seq_opt_pk', metadata,
Column('id', Integer, Sequence('tab_id_seq', optional=True),
primary_key=True),
Column('data', String(50))
)
def test_insert_roundtrip(self):
config.db.execute(
self.tables.seq_pk.insert(),
data="some data"
)
self._assert_round_trip(self.tables.seq_pk, config.db)
def test_insert_lastrowid(self):
r = config.db.execute(
self.tables.seq_pk.insert(),
data="some data"
)
eq_(
r.inserted_primary_key,
[1]
)
def test_nextval_direct(self):
r = config.db.execute(
self.tables.seq_pk.c.id.default
)
eq_(
r, 1
)
@requirements.sequences_optional
def test_optional_seq(self):
r = config.db.execute(
self.tables.seq_opt_pk.insert(),
data="some data"
)
eq_(
r.inserted_primary_key,
[1]
)
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
eq_(
row,
(1, "some data")
)
class HasSequenceTest(fixtures.TestBase):
__requires__ = 'sequences',
__backend__ = True
def test_has_sequence(self):
s1 = Sequence('user_id_seq')
testing.db.execute(schema.CreateSequence(s1))
try:
eq_(testing.db.dialect.has_sequence(testing.db,
'user_id_seq'), True)
finally:
testing.db.execute(schema.DropSequence(s1))
@testing.requires.schemas
def test_has_sequence_schema(self):
s1 = Sequence('user_id_seq', schema="test_schema")
testing.db.execute(schema.CreateSequence(s1))
try:
eq_(testing.db.dialect.has_sequence(
testing.db, 'user_id_seq', schema="test_schema"), True)
finally:
testing.db.execute(schema.DropSequence(s1))
def test_has_sequence_neg(self):
eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
False)
@testing.requires.schemas
def test_has_sequence_schemas_neg(self):
eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
schema="test_schema"),
False)
@testing.requires.schemas
def test_has_sequence_default_not_in_remote(self):
s1 = Sequence('user_id_seq')
testing.db.execute(schema.CreateSequence(s1))
try:
eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
schema="test_schema"),
False)
finally:
testing.db.execute(schema.DropSequence(s1))
@testing.requires.schemas
def test_has_sequence_remote_not_in_default(self):
s1 = Sequence('user_id_seq', schema="test_schema")
testing.db.execute(schema.CreateSequence(s1))
try:
eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
False)
finally:
testing.db.execute(schema.DropSequence(s1))

View file

@ -0,0 +1,594 @@
# coding: utf-8
from .. import fixtures, config
from ..assertions import eq_
from ..config import requirements
from sqlalchemy import Integer, Unicode, UnicodeText, select
from sqlalchemy import Date, DateTime, Time, MetaData, String, \
Text, Numeric, Float, literal, Boolean
from ..schema import Table, Column
from ... import testing
import decimal
import datetime
from ...util import u
from ... import util
class _LiteralRoundTripFixture(object):
@testing.provide_metadata
def _literal_round_trip(self, type_, input_, output, filter_=None):
"""test literal rendering """
# for literal, we test the literal render in an INSERT
# into a typed column. we can then SELECT it back as its
# official type; ideally we'd be able to use CAST here
# but MySQL in particular can't CAST fully
t = Table('t', self.metadata, Column('x', type_))
t.create()
for value in input_:
ins = t.insert().values(x=literal(value)).compile(
dialect=testing.db.dialect,
compile_kwargs=dict(literal_binds=True)
)
testing.db.execute(ins)
for row in t.select().execute():
value = row[0]
if filter_ is not None:
value = filter_(value)
assert value in output
class _UnicodeFixture(_LiteralRoundTripFixture):
__requires__ = 'unicode_data',
data = u("Alors vous imaginez ma surprise, au lever du jour, "
"quand une drôle de petite voix ma réveillé. Elle "
"disait: « Sil vous plaît… dessine-moi un mouton! »")
@classmethod
def define_tables(cls, metadata):
Table('unicode_table', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('unicode_data', cls.datatype),
)
def test_round_trip(self):
unicode_table = self.tables.unicode_table
config.db.execute(
unicode_table.insert(),
{
'unicode_data': self.data,
}
)
row = config.db.execute(
select([
unicode_table.c.unicode_data,
])
).first()
eq_(
row,
(self.data, )
)
assert isinstance(row[0], util.text_type)
def test_round_trip_executemany(self):
unicode_table = self.tables.unicode_table
config.db.execute(
unicode_table.insert(),
[
{
'unicode_data': self.data,
}
for i in range(3)
]
)
rows = config.db.execute(
select([
unicode_table.c.unicode_data,
])
).fetchall()
eq_(
rows,
[(self.data, ) for i in range(3)]
)
for row in rows:
assert isinstance(row[0], util.text_type)
def _test_empty_strings(self):
unicode_table = self.tables.unicode_table
config.db.execute(
unicode_table.insert(),
{"unicode_data": u('')}
)
row = config.db.execute(
select([unicode_table.c.unicode_data])
).first()
eq_(row, (u(''),))
def test_literal(self):
self._literal_round_trip(self.datatype, [self.data], [self.data])
class UnicodeVarcharTest(_UnicodeFixture, fixtures.TablesTest):
__requires__ = 'unicode_data',
__backend__ = True
datatype = Unicode(255)
@requirements.empty_strings_varchar
def test_empty_strings_varchar(self):
self._test_empty_strings()
class UnicodeTextTest(_UnicodeFixture, fixtures.TablesTest):
__requires__ = 'unicode_data', 'text_type'
__backend__ = True
datatype = UnicodeText()
@requirements.empty_strings_text
def test_empty_strings_text(self):
self._test_empty_strings()
class TextTest(_LiteralRoundTripFixture, fixtures.TablesTest):
__requires__ = 'text_type',
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table('text_table', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('text_data', Text),
)
def test_text_roundtrip(self):
text_table = self.tables.text_table
config.db.execute(
text_table.insert(),
{"text_data": 'some text'}
)
row = config.db.execute(
select([text_table.c.text_data])
).first()
eq_(row, ('some text',))
def test_text_empty_strings(self):
text_table = self.tables.text_table
config.db.execute(
text_table.insert(),
{"text_data": ''}
)
row = config.db.execute(
select([text_table.c.text_data])
).first()
eq_(row, ('',))
def test_literal(self):
self._literal_round_trip(Text, ["some text"], ["some text"])
def test_literal_quoting(self):
data = '''some 'text' hey "hi there" that's text'''
self._literal_round_trip(Text, [data], [data])
def test_literal_backslashes(self):
data = r'backslash one \ backslash two \\ end'
self._literal_round_trip(Text, [data], [data])
class StringTest(_LiteralRoundTripFixture, fixtures.TestBase):
__backend__ = True
@requirements.unbounded_varchar
def test_nolength_string(self):
metadata = MetaData()
foo = Table('foo', metadata,
Column('one', String)
)
foo.create(config.db)
foo.drop(config.db)
def test_literal(self):
self._literal_round_trip(String(40), ["some text"], ["some text"])
def test_literal_quoting(self):
data = '''some 'text' hey "hi there" that's text'''
self._literal_round_trip(String(40), [data], [data])
def test_literal_backslashes(self):
data = r'backslash one \ backslash two \\ end'
self._literal_round_trip(String(40), [data], [data])
class _DateFixture(_LiteralRoundTripFixture):
compare = None
@classmethod
def define_tables(cls, metadata):
Table('date_table', metadata,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('date_data', cls.datatype),
)
def test_round_trip(self):
date_table = self.tables.date_table
config.db.execute(
date_table.insert(),
{'date_data': self.data}
)
row = config.db.execute(
select([
date_table.c.date_data,
])
).first()
compare = self.compare or self.data
eq_(row,
(compare, ))
assert isinstance(row[0], type(compare))
def test_null(self):
date_table = self.tables.date_table
config.db.execute(
date_table.insert(),
{'date_data': None}
)
row = config.db.execute(
select([
date_table.c.date_data,
])
).first()
eq_(row, (None,))
@testing.requires.datetime_literals
def test_literal(self):
compare = self.compare or self.data
self._literal_round_trip(self.datatype, [self.data], [compare])
class DateTimeTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'datetime',
__backend__ = True
datatype = DateTime
data = datetime.datetime(2012, 10, 15, 12, 57, 18)
class DateTimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'datetime_microseconds',
__backend__ = True
datatype = DateTime
data = datetime.datetime(2012, 10, 15, 12, 57, 18, 396)
class TimeTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'time',
__backend__ = True
datatype = Time
data = datetime.time(12, 57, 18)
class TimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'time_microseconds',
__backend__ = True
datatype = Time
data = datetime.time(12, 57, 18, 396)
class DateTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'date',
__backend__ = True
datatype = Date
data = datetime.date(2012, 10, 15)
class DateTimeCoercedToDateTimeTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'date', 'date_coerces_from_datetime'
__backend__ = True
datatype = Date
data = datetime.datetime(2012, 10, 15, 12, 57, 18)
compare = datetime.date(2012, 10, 15)
class DateTimeHistoricTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'datetime_historic',
__backend__ = True
datatype = DateTime
data = datetime.datetime(1850, 11, 10, 11, 52, 35)
class DateHistoricTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'date_historic',
__backend__ = True
datatype = Date
data = datetime.date(1727, 4, 1)
class IntegerTest(_LiteralRoundTripFixture, fixtures.TestBase):
__backend__ = True
def test_literal(self):
self._literal_round_trip(Integer, [5], [5])
class NumericTest(_LiteralRoundTripFixture, fixtures.TestBase):
__backend__ = True
@testing.emits_warning(r".*does \*not\* support Decimal objects natively")
@testing.provide_metadata
def _do_test(self, type_, input_, output,
filter_=None, check_scale=False):
metadata = self.metadata
t = Table('t', metadata, Column('x', type_))
t.create()
t.insert().execute([{'x': x} for x in input_])
result = set([row[0] for row in t.select().execute()])
output = set(output)
if filter_:
result = set(filter_(x) for x in result)
output = set(filter_(x) for x in output)
eq_(result, output)
if check_scale:
eq_(
[str(x) for x in result],
[str(x) for x in output],
)
@testing.emits_warning(r".*does \*not\* support Decimal objects natively")
def test_render_literal_numeric(self):
self._literal_round_trip(
Numeric(precision=8, scale=4),
[15.7563, decimal.Decimal("15.7563")],
[decimal.Decimal("15.7563")],
)
@testing.emits_warning(r".*does \*not\* support Decimal objects natively")
def test_render_literal_numeric_asfloat(self):
self._literal_round_trip(
Numeric(precision=8, scale=4, asdecimal=False),
[15.7563, decimal.Decimal("15.7563")],
[15.7563],
)
def test_render_literal_float(self):
self._literal_round_trip(
Float(4),
[15.7563, decimal.Decimal("15.7563")],
[15.7563, ],
filter_=lambda n: n is not None and round(n, 5) or None
)
@testing.requires.precision_generic_float_type
def test_float_custom_scale(self):
self._do_test(
Float(None, decimal_return_scale=7, asdecimal=True),
[15.7563827, decimal.Decimal("15.7563827")],
[decimal.Decimal("15.7563827"), ],
check_scale=True
)
def test_numeric_as_decimal(self):
self._do_test(
Numeric(precision=8, scale=4),
[15.7563, decimal.Decimal("15.7563")],
[decimal.Decimal("15.7563")],
)
def test_numeric_as_float(self):
self._do_test(
Numeric(precision=8, scale=4, asdecimal=False),
[15.7563, decimal.Decimal("15.7563")],
[15.7563],
)
@testing.requires.fetch_null_from_numeric
def test_numeric_null_as_decimal(self):
self._do_test(
Numeric(precision=8, scale=4),
[None],
[None],
)
@testing.requires.fetch_null_from_numeric
def test_numeric_null_as_float(self):
self._do_test(
Numeric(precision=8, scale=4, asdecimal=False),
[None],
[None],
)
@testing.requires.floats_to_four_decimals
def test_float_as_decimal(self):
self._do_test(
Float(precision=8, asdecimal=True),
[15.7563, decimal.Decimal("15.7563"), None],
[decimal.Decimal("15.7563"), None],
)
def test_float_as_float(self):
self._do_test(
Float(precision=8),
[15.7563, decimal.Decimal("15.7563")],
[15.7563],
filter_=lambda n: n is not None and round(n, 5) or None
)
@testing.requires.precision_numerics_general
def test_precision_decimal(self):
numbers = set([
decimal.Decimal("54.234246451650"),
decimal.Decimal("0.004354"),
decimal.Decimal("900.0"),
])
self._do_test(
Numeric(precision=18, scale=12),
numbers,
numbers,
)
@testing.requires.precision_numerics_enotation_large
def test_enotation_decimal(self):
"""test exceedingly small decimals.
Decimal reports values with E notation when the exponent
is greater than 6.
"""
numbers = set([
decimal.Decimal('1E-2'),
decimal.Decimal('1E-3'),
decimal.Decimal('1E-4'),
decimal.Decimal('1E-5'),
decimal.Decimal('1E-6'),
decimal.Decimal('1E-7'),
decimal.Decimal('1E-8'),
decimal.Decimal("0.01000005940696"),
decimal.Decimal("0.00000005940696"),
decimal.Decimal("0.00000000000696"),
decimal.Decimal("0.70000000000696"),
decimal.Decimal("696E-12"),
])
self._do_test(
Numeric(precision=18, scale=14),
numbers,
numbers
)
@testing.requires.precision_numerics_enotation_large
def test_enotation_decimal_large(self):
"""test exceedingly large decimals.
"""
numbers = set([
decimal.Decimal('4E+8'),
decimal.Decimal("5748E+15"),
decimal.Decimal('1.521E+15'),
decimal.Decimal('00000000000000.1E+12'),
])
self._do_test(
Numeric(precision=25, scale=2),
numbers,
numbers
)
@testing.requires.precision_numerics_many_significant_digits
def test_many_significant_digits(self):
numbers = set([
decimal.Decimal("31943874831932418390.01"),
decimal.Decimal("319438950232418390.273596"),
decimal.Decimal("87673.594069654243"),
])
self._do_test(
Numeric(precision=38, scale=12),
numbers,
numbers
)
@testing.requires.precision_numerics_retains_significant_digits
def test_numeric_no_decimal(self):
numbers = set([
decimal.Decimal("1.000")
])
self._do_test(
Numeric(precision=5, scale=3),
numbers,
numbers,
check_scale=True
)
class BooleanTest(_LiteralRoundTripFixture, fixtures.TablesTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table('boolean_table', metadata,
Column('id', Integer, primary_key=True, autoincrement=False),
Column('value', Boolean),
Column('unconstrained_value', Boolean(create_constraint=False)),
)
def test_render_literal_bool(self):
self._literal_round_trip(
Boolean(),
[True, False],
[True, False]
)
def test_round_trip(self):
boolean_table = self.tables.boolean_table
config.db.execute(
boolean_table.insert(),
{
'id': 1,
'value': True,
'unconstrained_value': False
}
)
row = config.db.execute(
select([
boolean_table.c.value,
boolean_table.c.unconstrained_value
])
).first()
eq_(
row,
(True, False)
)
assert isinstance(row[0], bool)
def test_null(self):
boolean_table = self.tables.boolean_table
config.db.execute(
boolean_table.insert(),
{
'id': 1,
'value': None,
'unconstrained_value': None
}
)
row = config.db.execute(
select([
boolean_table.c.value,
boolean_table.c.unconstrained_value
])
).first()
eq_(
row,
(None, None)
)
__all__ = ('UnicodeVarcharTest', 'UnicodeTextTest',
'DateTest', 'DateTimeTest', 'TextTest',
'NumericTest', 'IntegerTest',
'DateTimeHistoricTest', 'DateTimeCoercedToDateTimeTest',
'TimeMicrosecondsTest', 'TimeTest', 'DateTimeMicrosecondsTest',
'DateHistoricTest', 'StringTest', 'BooleanTest')

View file

@ -0,0 +1,63 @@
from .. import fixtures, config
from ..assertions import eq_
from sqlalchemy import Integer, String
from ..schema import Table, Column
class SimpleUpdateDeleteTest(fixtures.TablesTest):
run_deletes = 'each'
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table('plain_pk', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(50))
)
@classmethod
def insert_data(cls):
config.db.execute(
cls.tables.plain_pk.insert(),
[
{"id": 1, "data": "d1"},
{"id": 2, "data": "d2"},
{"id": 3, "data": "d3"},
]
)
def test_update(self):
t = self.tables.plain_pk
r = config.db.execute(
t.update().where(t.c.id == 2),
data="d2_new"
)
assert not r.is_insert
assert not r.returns_rows
eq_(
config.db.execute(t.select().order_by(t.c.id)).fetchall(),
[
(1, "d1"),
(2, "d2_new"),
(3, "d3")
]
)
def test_delete(self):
t = self.tables.plain_pk
r = config.db.execute(
t.delete().where(t.c.id == 2)
)
assert not r.is_insert
assert not r.returns_rows
eq_(
config.db.execute(t.select().order_by(t.c.id)).fetchall(),
[
(1, "d1"),
(3, "d3")
]
)
__all__ = ('SimpleUpdateDeleteTest', )