forked from orafce/orafce
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdbms_pipe_session_B.sql
184 lines (153 loc) · 6.28 KB
/
dbms_pipe_session_B.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
\set ECHO none
-- wait for other processes, wait max 100 sec
do $$
declare c int;
begin
if pg_try_advisory_xact_lock(1) then
for i in 1..1000 loop
perform pg_sleep(0.1);
c := (select count(*) from pg_locks where locktype = 'advisory' and objid = 1 and not granted);
if c = 1 then
return;
end if;
end loop;
else
perform pg_advisory_xact_lock(1);
end if;
end;
$$;
\set VERBOSITY terse
--Wait for 'pipe_test_owner' created notification to be sent by session A
SELECT dbms_pipe.receive_message('pipe_test_owner_created_notifier');
-- create new connection under the userid of 'pipe_test_owner'
SET SESSION AUTHORIZATION pipe_test_owner;
/* Tests receive_message(text,integer), next_item_type() and all versions of
* unpack_message_<type>() and purge(text)
*/
CREATE OR REPLACE FUNCTION receiveFrom(pipename text) RETURNS void AS $$
DECLARE
typ INTEGER;
BEGIN
WHILE true LOOP
PERFORM dbms_pipe.receive_message(pipename,2);
SELECT dbms_pipe.next_item_type() INTO typ;
IF typ = 0 THEN EXIT;
ELSIF typ=9 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_number();
ELSIF typ=11 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_text();
ELSIF typ=12 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_date();
ELSIF typ=13 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_timestamp();
ELSIF typ=23 THEN RAISE NOTICE 'RECEIVE %: %', typ, encode(dbms_pipe.unpack_message_bytea(),'escape');
ELSIF typ=24 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_record();
END IF;
END LOOP;
PERFORM dbms_pipe.purge(pipename);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION bulkReceive() RETURNS void AS $$
DECLARE
typ INTEGER;
BEGIN
IF dbms_pipe.receive_message('named_pipe_2',2) = 1 THEN
RAISE NOTICE 'Timeout';
PERFORM pg_sleep(2);
PERFORM dbms_pipe.receive_message('named_pipe_2',2);
END IF;
WHILE true LOOP
SELECT dbms_pipe.next_item_type() INTO typ;
IF typ = 0 THEN EXIT;
ELSIF typ=9 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_number();
ELSIF typ=11 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_text();
ELSIF typ=12 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_date();
ELSIF typ=13 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_timestamp();
ELSIF typ=23 THEN RAISE NOTICE 'RECEIVE %: %', typ, encode(dbms_pipe.unpack_message_bytea()::bytea,'escape');
ELSIF typ=24 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_record();
END IF;
END LOOP;
PERFORM dbms_pipe.purge('named_pipe_2');
END;
$$ LANGUAGE plpgsql;
-- Tests receive_message(text)
CREATE OR REPLACE FUNCTION checkReceive1(pipename text) RETURNS void AS $$
BEGIN
PERFORM dbms_pipe.receive_message(pipename);
RAISE NOTICE 'RECEIVE %',dbms_pipe.unpack_message_text();
END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION dropTempTable() RETURNS void AS $$
BEGIN
WHILE dbms_pipe.receive_message('pipe_name_3') <> 0 LOOP
CONTINUE;
END LOOP;
DROP TABLE TEMP;
END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION checkUniqueSessionNameB() RETURNS bool AS $$
DECLARE
result bool;
BEGIN
PERFORM dbms_pipe.receive_message('pipe_name_4');
SELECT dbms_pipe.unpack_message_text() = dbms_pipe.unique_session_name() INTO result;
RETURN result;
END; $$ LANGUAGE plpgsql;
\set ECHO all
-- Receives messages sent via an implicit pipe
SELECT receiveFrom('named_pipe');
-- Bulk receive messages
SELECT bulkReceive();
-- Receives messages sent via an explicit private pipe under the same user
-- 'pipe_test_owner'
SELECT dbms_pipe.receive_message('recv_private1_notifier');
SELECT receiveFrom('private_pipe_1');
-- Switch user to 'pipe_test_other'
DROP USER IF EXISTS pipe_test_other;
CREATE USER pipe_test_other;
SET SESSION AUTHORIZATION pipe_test_other;
-- Try to receive messages sent via an explicit private pipe under the user
-- 'pipe_test_other' who is not the owner of pipe.
-- insufficient privileges in case of 'private_pipe_2'.
SELECT dbms_pipe.receive_message('recv_private2_notifier');
SELECT receiveFrom('private_pipe_2');
-- These are explicit private pipes created using create_pipe(text,integer)
-- and create_pipe(text)
SELECT dbms_pipe.receive_message('recv_public1_notifier');
SELECT receiveFrom('public_pipe_3');
SELECT dbms_pipe.receive_message('recv_public2_notifier');
SELECT receiveFrom('public_pipe_4');
-- Switch back to user 'pipe_test_owner'
SET SESSION AUTHORIZATION pipe_test_owner;
DROP USER pipe_test_other;
-- Tests receive_message(text)
SELECT checkReceive1('pipe_name_1');
SELECT checkReceive1('pipe_name_2');
-- Tests dbms_pipe.db_pipes view
SELECT name, items, "limit", private, owner
FROM dbms_pipe.db_pipes
WHERE name LIKE 'private%'
ORDER BY name;
-- Tests dbms_pipe.__list_pipes(); attribute size is not included
-- since it can be different across runs.
SELECT name, items, "limit", private, owner
FROM dbms_pipe.__list_pipes() AS (name varchar, items int4, siz int4, "limit" int4, private bool, owner varchar)
WHERE name <> 'pipe_name_4'
ORDER BY 1;
-- Tests remove_pipe(text)
SELECT dbms_pipe.remove_pipe('private_pipe_1');
SELECT dbms_pipe.remove_pipe('private_pipe_2');
SELECT dbms_pipe.remove_pipe('public_pipe_3');
SELECT dbms_pipe.remove_pipe('public_pipe_4');
SELECT dbms_pipe.purge('pipe_name_1');
SELECT dbms_pipe.purge('pipe_name_2');
-- Receives drop table notification from session A via 'pipe_name_3'
SELECT dropTempTable();
SELECT dbms_pipe.purge('pipe_name_3');
-- tests unique_session_name() (uses 'pipe_name_4')
SELECT checkUniqueSessionNameB();
SELECT dbms_pipe.purge('pipe_name_4');
DROP FUNCTION receiveFrom(text);
DROP FUNCTION checkReceive1(text);
DROP FUNCTION checkUniqueSessionNameB();
DROP FUNCTION bulkReceive();
DROP FUNCTION dropTempTable();
-- Perform a recieve on removed pipe resulting on timeout
SELECT dbms_pipe.receive_message('public_pipe_4',2);
SELECT dbms_pipe.purge('public_pipe_4');
SET SESSION AUTHORIZATION DEFAULT;
DROP USER pipe_test_owner;