-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathConnectionValidatorTest.java
128 lines (111 loc) · 4.67 KB
/
ConnectionValidatorTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.github.pgasync.impl;
import com.github.pgasync.Connection;
import com.github.pgasync.ResultSet;
import com.github.pgasync.SqlException;
import org.junit.Test;
import rx.Observable;
import java.util.function.Consumer;
import static org.junit.Assert.*;
public class ConnectionValidatorTest {
@Test
public void shouldBeTheSamePidOnSuccessiveCalls() {
withDbr(null, true, dbr -> {
// Simple sanity check for our PID assumptions
assertEquals(selectPid(dbr).toBlocking().single().intValue(),
selectPid(dbr).toBlocking().single().intValue());
});
}
@Test
public void shouldBeSamePidWhenValidationQuerySucceeds() {
withDbr("SELECT 1", false, dbr -> {
// Just compare PIDs
assertEquals(selectPid(dbr).toBlocking().single().intValue(),
selectPid(dbr).toBlocking().single().intValue());
});
}
@Test
public void shouldFailValidationQueryFailsAndReconnectAfterSuccess() throws Exception {
String errSql =
"DO language plpgsql $$\n" +
" BEGIN\n" +
" IF (SELECT COUNT(1) FROM VSTATE) = 1 THEN\n" +
" RAISE 'ERR';\n" +
" END IF;\n" +
" EXCEPTION\n" +
" WHEN undefined_table THEN\n" +
" END\n" +
"$$;";
withDbr(errSql, false, dbr -> {
// Add the VSTATE table
dbr.query("DROP TABLE IF EXISTS VSTATE; CREATE TABLE VSTATE (ID VARCHAR(255) PRIMARY KEY)");
try {
// Grab the pid
int pid = selectPid(dbr).toBlocking().single();
// Break it
runFromOutside(dbr, "INSERT INTO VSTATE VALUES('A')");
// Make sure it is broken
try {
selectPid(dbr).toBlocking().single();
fail("Should be broken");
} catch (SqlException e) { }
// Fix it, and go ahead and expect the same PID
runFromOutside(dbr, "TRUNCATE TABLE VSTATE");
assertEquals(pid, selectPid(dbr).toBlocking().single().intValue());
} finally {
runFromOutside(dbr, "DROP TABLE IF EXISTS VSTATE");
}
});
}
@Test
public void shouldErrorWhenNotValidatingSocket() {
withDbr(null, false, dbr -> {
// Simple check, kill from outside, confirm failure
assertNotNull(selectPid(dbr).toBlocking().single());
killConnectionFromOutside(dbr);
try {
selectPid(dbr).toBlocking().single();
fail("Should not succeed after killing connection");
} catch (IllegalStateException e) { }
});
}
@Test
public void shouldNotErrorWhenValidatingSocket() {
withDbr(null, true, dbr -> {
// Grab pid, kill from outside, confirm different pid
int pid = selectPid(dbr).toBlocking().single();
killConnectionFromOutside(dbr);
assertNotEquals(pid, selectPid(dbr).toBlocking().single().intValue());
});
}
private static Observable<Integer> selectPid(DatabaseRule dbr) {
return dbr.db().queryRows("SELECT pg_backend_pid()").map(r -> r.getInt(0));
}
private static void killConnectionFromOutside(DatabaseRule dbr) {
ResultSet rs = runFromOutside(dbr, "SELECT pg_terminate_backend(pid) FROM pg_stat_activity " +
"WHERE pid <> pg_backend_pid() AND datname = '" + ((PgConnectionPool) dbr.pool).database + "'");
assertEquals(1, rs.size());
// Unfortunately, it appears we have to wait a tiny bit after
// killing the connection for netty to know
try { Thread.sleep(300); } catch (Exception e) { }
}
private static ResultSet runFromOutside(DatabaseRule dbr, String query) {
PgConnectionPool pool = (PgConnectionPool) dbr.pool;
try(Connection conn = new PgConnection(pool.openStream(pool.address), pool.dataConverter).
connect(pool.username, pool.password, pool.database).toBlocking().single()) {
return conn.querySet(query).toBlocking().single();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static void withDbr(String validationQuery, boolean validateSocket, Consumer<DatabaseRule> fn) {
DatabaseRule rule = new DatabaseRule();
rule.builder.validationQuery(validationQuery);
rule.builder.validateSocket(validateSocket);
rule.before();
try {
fn.accept(rule);
} finally {
rule.after();
}
}
}