-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathSQLQuery.java
219 lines (197 loc) · 7.6 KB
/
SQLQuery.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
/*
* Made with all the love in the world
* by scireum in Remshalden, Germany
*
* Copyright by scireum GmbH
* http://www.scireum.de - [email protected]
*/
package sirius.db.jdbc;
import sirius.kernel.commons.Context;
import sirius.kernel.commons.Limit;
import sirius.kernel.commons.Streams;
import sirius.kernel.commons.Watch;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
import java.util.function.Predicate;
/**
* Represents a flexible way of executing parameterized SQL queries without
* thinking too much about resource management.
* <p>
* Supports named parameters in form of ${name}. Also #{name} can be used in LIKE expressions and will be
* appended with a % sign (if not empty), also all "*" will be replaced by a "%".
* <p>
* Optional blocks can be surrounded with angular braces: <tt>SELECT * FROM x WHERE test = 1[ AND test2=${val}]</tt>
* The surrounded block will only be added to the query, if the parameter within has a non-null value.
* <p>
* Additionally, block can be marked with conditions like <tt>SELECT * FROM x WHERE test = 1[:check AND test2= 2]</tt>.
* These blocks are only emitted, if the condition (<tt>check</tt> in this case) is a parameter which is set to
* <tt>true</tt>. Note that if the block contains parameters (like <tt>${param}</tt>), these also have to be present
* and non-null for the block to be emitted.
*/
public class SQLQuery extends BaseSQLQuery {
/**
* Specifies the default fetch size (number of rows to fetch at once) for a query.
* <p>
* Note that some databases (MySQL / Maria DB) does not support this. If possible ({@link Capability#STREAMING})
* these are set to a minimal fetch size to enable streaming of large results.
*/
public static final int DEFAULT_FETCH_SIZE = 1000;
private static final String MICROTIMING_KEY = "SQL";
private final Database ds;
private final String sql;
private final Context params = Context.create();
private boolean longRunning;
/**
* Creates a new instance for the given datasource and query.
* <p>
* Create a new instance using {@code Databases.createQuery(sql)}.
*
* @param ds the datasource to query
* @param sql the query to execute
*/
protected SQLQuery(Database ds, String sql) {
this.ds = ds;
this.sql = sql;
}
/**
* Adds a parameter.
*
* @param parameter the name of the parameter as referenced in the SQL statement (${name} or #{name}).
* @param value the value of the parameter
* @return the query itself to support fluent calls
*/
public SQLQuery set(String parameter, Object value) {
params.put(parameter, value);
return this;
}
/**
* Sets all parameters of the given context.
*
* @param ctx the containing pairs of names and values to add to the query
* @return the query itself to support fluent calls
*/
public SQLQuery set(Map<String, Object> ctx) {
params.putAll(ctx);
return this;
}
/**
* Marks the connection and its statements as potentially long running.
* <p>
* These connections and statements won't contribute to the query duration metrics and will not report slow queries.
* Note however, that each query is still wrapped in an {@link sirius.kernel.async.Operation} which is considered
* as hanging after 5 minutes.
*
* @return the query itself for fluent method calls
*/
public SQLQuery markAsLongRunning() {
this.longRunning = true;
return this;
}
@Override
protected void doIterate(Predicate<Row> handler, @Nullable Limit limit) throws SQLException {
Watch watch = Watch.start();
fieldNames = null;
try (Connection connection = longRunning ? ds.getLongRunningConnection() : ds.getConnection()) {
try (PreparedStatement statement = createPreparedStatement(connection)) {
if (statement == null) {
return;
}
Limit effectiveLimit = limit != null ? limit : Limit.UNLIMITED;
applyMaxRows(statement, effectiveLimit);
applyFetchSize(statement, effectiveLimit);
try (ResultSet resultSet = statement.executeQuery()) {
watch.submitMicroTiming(MICROTIMING_KEY, "ITERATE: " + sql);
processResultSet(handler, effectiveLimit, resultSet);
}
}
}
}
protected void applyMaxRows(PreparedStatement stmt, Limit effectiveLimit) throws SQLException {
if (effectiveLimit.getTotalItems() > 0) {
stmt.setMaxRows(effectiveLimit.getTotalItems());
}
}
protected void applyFetchSize(PreparedStatement stmt, Limit effectiveLimit) throws SQLException {
if (effectiveLimit.getTotalItems() > DEFAULT_FETCH_SIZE || effectiveLimit.getTotalItems() <= 0) {
if (ds.hasCapability(Capability.STREAMING)) {
stmt.setFetchSize(1);
} else {
stmt.setFetchSize(DEFAULT_FETCH_SIZE);
}
}
}
protected PreparedStatement createPreparedStatement(Connection c) throws SQLException {
StatementCompiler compiler = new StatementCompiler(c, false);
compiler.buildParameterizedStatement(sql, params);
return compiler.getStatement();
}
@Override
protected void writeBlobToParameter(String name, Blob blob) throws SQLException {
OutputStream out = (OutputStream) params.get(name);
if (out == null) {
return;
}
try (InputStream in = blob.getBinaryStream()) {
Streams.transfer(in, out);
} catch (IOException e) {
throw new SQLException(e);
}
}
/**
* Executes the query as update.
* <p>
* Requires the SQL statement to be an UPDATE or DELETE statement.
*
* @return the number of rows changed
* @throws SQLException in case of a database error
*/
public int executeUpdate() throws SQLException {
Watch w = Watch.start();
try (Connection c = ds.getConnection()) {
try (PreparedStatement stmt = createPreparedStatement(c)) {
if (stmt == null) {
return 0;
}
return stmt.executeUpdate();
}
} finally {
w.submitMicroTiming(MICROTIMING_KEY, "UPDATE: " + sql);
}
}
/**
* Executes the update and returns the generated keys.
* <p>
* Requires the SQL statement to be an UPDATE or DELETE statement.
*
* @return a row representing all generated keys
* @throws SQLException in case of a database error
*/
public Row executeUpdateReturnKeys() throws SQLException {
Watch w = Watch.start();
try (Connection c = ds.getConnection()) {
StatementCompiler compiler = new StatementCompiler(c, true);
compiler.buildParameterizedStatement(sql, params);
try (PreparedStatement stmt = compiler.getStatement()) {
if (stmt == null) {
return new Row();
}
stmt.executeUpdate();
return dbs.fetchGeneratedKeys(stmt);
}
} finally {
w.submitMicroTiming(MICROTIMING_KEY, "UPDATE: " + sql);
}
}
@Override
public String toString() {
return "SQLQuery [" + sql + "]";
}
}