forked from taskcluster/taskcluster
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path0022.yml
291 lines (282 loc) · 10.2 KB
/
0022.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
version: 22
description: "queue task dependencies phase 2 step 1 (slow migration: 10μs per row in table `queue_task_dependency_entities`)"
migrationScript: 0022-migration.sql
downgradeScript: 0022-downgrade.sql
methods:
####
# queue_task_dependency_entities
queue_task_dependency_entities_load:
deprecated: true
description: See taskcluster-lib-entities
mode: read
serviceName: queue
args: partition_key text, row_key text
returns: table (partition_key_out text, row_key_out text, value jsonb, version integer, etag uuid)
body: |-
begin
return query
select
required_task_id,
dependent_task_id,
jsonb_build_object(
'PartitionKey', required_task_id,
'RowKey', dependent_task_id,
'taskId', slugid_to_uuid(required_task_id),
'dependentTaskId', slugid_to_uuid(dependent_task_id),
'require', replace(requires::text, 'all-', ''),
'expires', expires) as value,
1 as version,
task_dependencies.etag as etag
from task_dependencies
where
required_task_id = partition_key and dependent_task_id = row_key;
end
queue_task_dependency_entities_create:
deprecated: true
serviceName: queue
description: See taskcluster-lib-entities
mode: write
args: pk text, rk text, properties jsonb, overwrite boolean, version integer
returns: uuid
body: |-
declare
new_etag uuid;
begin
-- note that this function always overwrites (queue always calls it that way anyway)
new_etag = public.gen_random_uuid();
insert into task_dependencies select
rk,
pk,
('all-' || (properties ->> 'require'))::task_requires,
true,
(properties ->> 'expires')::timestamptz,
new_etag
-- if the row already exists, that's because it was created by
-- queue_task_requirement_entities_create, so just update the requires value
on conflict (required_task_id, dependent_task_id) do
update
set requires = ('all-' || (properties ->> 'require'))::task_requires
where
task_dependencies.required_task_id = pk and
task_dependencies.dependent_task_id = rk;
return new_etag;
end
queue_task_dependency_entities_remove:
deprecated: true
serviceName: queue
description: See taskcluster-lib-entities
mode: write
args: partition_key text, row_key text
returns: table (etag uuid)
body: |-
begin
return query delete from task_dependencies
where
task_dependencies.required_task_id = partition_key and
task_dependencies.dependent_task_id = row_key
returning task_dependencies.etag;
end
queue_task_dependency_entities_modify:
deprecated: true
serviceName: queue
description: See taskcluster-lib-entities
mode: write
args: partition_key text, row_key text, properties jsonb, version integer, old_etag uuid
returns: table (etag uuid)
body: |-
begin
raise exception 'not implemented';
end
queue_task_dependency_entities_scan:
deprecated: true
description: See taskcluster-lib-entities
mode: read
serviceName: queue
args: pk text, rk text, condition text, size integer, page integer
returns: table (partition_key text, row_key text, value jsonb, version integer, etag uuid)
body: |-
declare
cond text[];
exp_cond_field text;
exp_cond_operator text;
exp_cond_operand timestamptz;
begin
if not condition is null then
cond := regexp_split_to_array(condition, '\s+');
exp_cond_field := trim(cond[3], '''');
exp_cond_operator := cond[4];
exp_cond_operand := cond[5] :: timestamptz;
if not exp_cond_field || exp_cond_operator in ('expires<', 'expires>=') then
raise exception 'scan only supports filtering for expired rows, not on %', exp_cond_field || exp_cond_operator;
end if;
end if;
return query select
required_task_id as partition_key,
dependent_task_id as row_key,
jsonb_build_object(
'PartitionKey', required_task_id,
'RowKey', dependent_task_id,
'taskId', slugid_to_uuid(required_task_id),
'dependentTaskId', slugid_to_uuid(dependent_task_id),
'require', replace(requires::text, 'all-', ''),
'expires', expires) as value,
1 as version,
task_dependencies.etag as etag from task_dependencies
where
(pk is null or required_task_id = pk) and
case
when exp_cond_field = 'expires' and exp_cond_operator = '<' then expires < exp_cond_operand
when exp_cond_field = 'expires' and exp_cond_operator = '>=' then expires >= exp_cond_operand
else true
end
order by task_dependencies.required_task_id, task_dependencies.dependent_task_id
limit case
when (size is not null and size > 0) then size + 1
else null
end
offset case
when (page is not null and page > 0) then page
else 0
end;
end
####
# queue_task_dependency_entities
#
# This is synthesized from the task_dependencies table
queue_task_requirement_entities_load:
deprecated: true
description: See taskcluster-lib-entities
mode: read
serviceName: queue
args: partition_key text, row_key text
returns: table (partition_key_out text, row_key_out text, value jsonb, version integer, etag uuid)
body: |-
begin
return query
select
dependent_task_id,
required_task_id,
jsonb_build_object(
'PartitionKey', dependent_task_id,
'RowKey', required_task_id,
'taskId', slugid_to_uuid(dependent_task_id),
'requiredTaskId', slugid_to_uuid(required_task_id),
'expires', expires) as value,
1 as version,
task_dependencies.etag as etag
from task_dependencies
where
dependent_task_id = partition_key and required_task_id = row_key and not satisfied;
end
queue_task_requirement_entities_create:
deprecated: true
serviceName: queue
description: See taskcluster-lib-entities
mode: write
args: pk text, rk text, properties jsonb, overwrite boolean, version integer
returns: uuid
body: |-
declare
new_etag uuid;
begin
-- note that this function always overwrites (queue always calls it that way anyway)
new_etag = public.gen_random_uuid();
insert into task_dependencies select
pk,
rk,
-- arbitrary value; in practice TaskDependency.create is called shortly
-- after this and will set the value correctly
'all-completed',
-- if a TaskRequirement exists, then the dependency is not satisified
false,
(properties ->> 'expires')::timestamptz,
new_etag
-- if the dependency was created already, just update satisfied
on conflict (required_task_id, dependent_task_id) do
update
set satisfied = false
where
task_dependencies.required_task_id = rk and
task_dependencies.dependent_task_id = pk;
return new_etag;
end
queue_task_requirement_entities_remove:
deprecated: true
serviceName: queue
description: See taskcluster-lib-entities
mode: write
args: partition_key text, row_key text
returns: table (etag uuid)
body: |-
begin
-- removing the requirement means that this dep is satisfied
return query
update task_dependencies
set satisfied = true
where
task_dependencies.dependent_task_id = partition_key and
task_dependencies.required_task_id = row_key and
not task_dependencies.satisfied
returning task_dependencies.etag;
end
queue_task_requirement_entities_modify:
deprecated: true
serviceName: queue
description: See taskcluster-lib-entities
mode: write
args: partition_key text, row_key text, properties jsonb, version integer, old_etag uuid
returns: table (etag uuid)
body: |-
begin
raise exception 'not implemented';
end
queue_task_requirement_entities_scan:
deprecated: true
description: See taskcluster-lib-entities
mode: read
serviceName: queue
args: pk text, rk text, condition text, size integer, page integer
returns: table (partition_key text, row_key text, value jsonb, version integer, etag uuid)
body: |-
declare
cond text[];
exp_cond_field text;
exp_cond_operator text;
exp_cond_operand timestamptz;
begin
if not condition is null then
cond := regexp_split_to_array(condition, '\s+');
exp_cond_field := trim(cond[3], '''');
exp_cond_operator := cond[4];
exp_cond_operand := cond[5] :: timestamptz;
if exp_cond_field || exp_cond_operator != 'expires<' then
raise exception 'scan only supports filtering for expired rows';
end if;
-- if this is the expiration crontask, return an empty set -- expiration of TaskDependency
-- is sufficient
return;
end if;
return query select
dependent_task_id as partition_key,
required_task_id as row_key,
jsonb_build_object(
'PartitionKey', dependent_task_id,
'RowKey', required_task_id,
'taskId', slugid_to_uuid(dependent_task_id),
'requiredTaskId', slugid_to_uuid(required_task_id),
'expires', expires) as value,
1 as version,
task_dependencies.etag as etag from task_dependencies
where
(pk is null or dependent_task_id = pk) and
not satisfied
order by task_dependencies.dependent_task_id, task_dependencies.required_task_id
limit case
when (size is not null and size > 0) then size + 1
else null
end
offset case
when (page is not null and page > 0) then page
else 0
end;
end