8
8
from allocation .domain import model
9
9
from allocation .service_layer import unit_of_work
10
10
from ..random_refs import random_sku , random_batchref , random_orderid
11
+ from sqlalchemy .sql import text
11
12
12
13
pytestmark = pytest .mark .usefixtures ("mappers" )
13
14
14
15
15
16
def insert_batch (session , ref , sku , qty , eta , product_version = 1 ):
16
17
session .execute (
17
- "INSERT INTO products (sku, version_number) VALUES (:sku, :version)" ,
18
+ text ( "INSERT INTO products (sku, version_number)" " VALUES (:sku, :version)") ,
18
19
dict (sku = sku , version = product_version ),
19
20
)
20
21
session .execute (
21
- "INSERT INTO batches (reference, sku, _purchased_quantity, eta)"
22
- " VALUES (:ref, :sku, :qty, :eta)" ,
22
+ text (
23
+ "INSERT INTO batches (reference, sku, _purchased_quantity, eta)"
24
+ " VALUES (:ref, :sku, :qty, :eta)"
25
+ ),
23
26
dict (ref = ref , sku = sku , qty = qty , eta = eta ),
24
27
)
25
28
26
29
27
30
def get_allocated_batch_ref (session , orderid , sku ):
28
31
[[orderlineid ]] = session .execute (
29
- "SELECT id FROM order_lines WHERE orderid=:orderid AND sku=:sku" ,
32
+ text ( "SELECT id FROM order_lines WHERE orderid=:orderid AND sku=:sku" ) ,
30
33
dict (orderid = orderid , sku = sku ),
31
34
)
32
35
[[batchref ]] = session .execute (
33
- "SELECT b.reference FROM allocations JOIN batches AS b ON batch_id = b.id"
34
- " WHERE orderline_id=:orderlineid" ,
36
+ text (
37
+ "SELECT b.reference FROM allocations JOIN batches AS b ON batch_id = b.id"
38
+ " WHERE orderline_id=:orderlineid"
39
+ ),
35
40
dict (orderlineid = orderlineid ),
36
41
)
37
42
return batchref
@@ -59,7 +64,7 @@ def test_rolls_back_uncommitted_work_by_default(sqlite_session_factory):
59
64
insert_batch (uow .session , "batch1" , "MEDIUM-PLINTH" , 100 , None )
60
65
61
66
new_session = sqlite_session_factory ()
62
- rows = list (new_session .execute ('SELECT * FROM "batches"' ))
67
+ rows = list (new_session .execute (text ( 'SELECT * FROM "batches"' ) ))
63
68
assert rows == []
64
69
65
70
@@ -74,7 +79,7 @@ class MyException(Exception):
74
79
raise MyException ()
75
80
76
81
new_session = sqlite_session_factory ()
77
- rows = list (new_session .execute ('SELECT * FROM "batches"' ))
82
+ rows = list (new_session .execute (text ( 'SELECT * FROM "batches"' ) ))
78
83
assert rows == []
79
84
80
85
@@ -113,20 +118,22 @@ def test_concurrent_updates_to_version_are_not_allowed(postgres_session_factory)
113
118
thread2 .join ()
114
119
115
120
[[version ]] = session .execute (
116
- "SELECT version_number FROM products WHERE sku=:sku" ,
121
+ text ( "SELECT version_number FROM products WHERE sku=:sku" ) ,
117
122
dict (sku = sku ),
118
123
)
119
124
assert version == 2
120
125
[exception ] = exceptions
121
126
assert "could not serialize access due to concurrent update" in str (exception )
122
127
123
128
orders = session .execute (
124
- "SELECT orderid FROM allocations"
125
- " JOIN batches ON allocations.batch_id = batches.id"
126
- " JOIN order_lines ON allocations.orderline_id = order_lines.id"
127
- " WHERE order_lines.sku=:sku" ,
129
+ text (
130
+ "SELECT orderid FROM allocations"
131
+ " JOIN batches ON allocations.batch_id = batches.id"
132
+ " JOIN order_lines ON allocations.orderline_id = order_lines.id"
133
+ " WHERE order_lines.sku=:sku"
134
+ ),
128
135
dict (sku = sku ),
129
136
)
130
137
assert orders .rowcount == 1
131
138
with unit_of_work .SqlAlchemyUnitOfWork (postgres_session_factory ) as uow :
132
- uow .session .execute ("select 1" )
139
+ uow .session .execute (text ( "select 1" ) )
0 commit comments