forked from embulk/embulk-input-jdbc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMySQLInputPlugin.java
177 lines (151 loc) · 6.23 KB
/
MySQLInputPlugin.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package org.embulk.input;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Properties;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.SQLException;
import com.google.common.base.Throwables;
import com.mysql.jdbc.TimeUtil;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.input.jdbc.AbstractJdbcInputPlugin;
import org.embulk.input.jdbc.JdbcInputConnection;
import org.embulk.input.jdbc.getter.ColumnGetterFactory;
import org.embulk.input.mysql.MySQLInputConnection;
import org.embulk.input.mysql.getter.MySQLColumnGetterFactory;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.Schema;
import org.joda.time.DateTimeZone;
public class MySQLInputPlugin
extends AbstractJdbcInputPlugin
{
public interface MySQLPluginTask
extends PluginTask
{
@Config("host")
public String getHost();
@Config("port")
@ConfigDefault("3306")
public int getPort();
@Config("user")
public String getUser();
@Config("password")
@ConfigDefault("\"\"")
public String getPassword();
@Config("database")
public String getDatabase();
}
@Override
protected Class<? extends PluginTask> getTaskClass()
{
return MySQLPluginTask.class;
}
@Override
protected MySQLInputConnection newConnection(PluginTask task) throws SQLException
{
MySQLPluginTask t = (MySQLPluginTask) task;
String url = String.format("jdbc:mysql://%s:%d/%s",
t.getHost(), t.getPort(), t.getDatabase());
Properties props = new Properties();
props.setProperty("user", t.getUser());
props.setProperty("password", t.getPassword());
// convert 0000-00-00 to NULL to avoid this exceptoin:
// java.sql.SQLException: Value '0000-00-00' can not be represented as java.sql.Date
props.setProperty("zeroDateTimeBehavior", "convertToNull");
props.setProperty("useCompression", "true");
props.setProperty("connectTimeout", String.valueOf(t.getConnectTimeout() * 1000)); // milliseconds
props.setProperty("socketTimeout", String.valueOf(t.getSocketTimeout() * 1000)); // milliseconds
// Enable keepalive based on tcp_keepalive_time, tcp_keepalive_intvl and tcp_keepalive_probes kernel parameters.
// Socket options TCP_KEEPCNT, TCP_KEEPIDLE, and TCP_KEEPINTVL are not configurable.
props.setProperty("tcpKeepAlive", "true");
// TODO
//switch task.getSssl() {
//when "disable":
// break;
//when "enable":
// props.setProperty("useSSL", "true");
// props.setProperty("requireSSL", "false");
// props.setProperty("verifyServerCertificate", "false");
// break;
//when "verify":
// props.setProperty("useSSL", "true");
// props.setProperty("requireSSL", "true");
// props.setProperty("verifyServerCertificate", "true");
// break;
//}
if (t.getFetchRows() == 1) {
logger.info("Fetch size is 1. Fetching rows one by one.");
} else if (t.getFetchRows() <= 0) {
logger.info("Fetch size is set to -1. Fetching all rows at once.");
} else {
logger.info("Fetch size is {}. Using server-side prepared statement.", t.getFetchRows());
props.setProperty("useCursorFetch", "true");
}
props.putAll(t.getOptions());
// load timezone mappings
loadTimeZoneMappings();
Driver driver;
try {
driver = new com.mysql.jdbc.Driver(); // new com.mysql.jdbc.Driver throws SQLException
} catch (SQLException ex) {
throw new RuntimeException(ex);
}
Connection con = driver.connect(url, props);
try {
MySQLInputConnection c = new MySQLInputConnection(con);
con = null;
return c;
} finally {
if (con != null) {
con.close();
}
}
}
@Override
protected ColumnGetterFactory newColumnGetterFactory(PageBuilder pageBuilder, DateTimeZone dateTimeZone)
{
return new MySQLColumnGetterFactory(pageBuilder, dateTimeZone);
}
private void loadTimeZoneMappings()
{
// Here initializes com.mysql.jdbc.TimeUtil.timeZoneMappings static field by calling
// static timeZoneMappings method using reflection.
// The field is usually initialized when Driver#connect method is called. But the field
// initialization fails when a) useLegacyDatetimeCode=false is set AND b) mysql server's
// default_time_zone is not SYSTEM (default). According to the stacktrace, that's because
// the com.mysql.jdbc.TimeUtil.loadTimeZoneMappings can't find TimeZoneMapping.properties
// from the classloader. It seems like a bug of JDBC Driver where it should use the class loader
// that loaded com.mysql.jdbc.TimeUtil class rather than system class loader to read the
// property file because the file should be in the same classpath with the class.
// Here implements a workaround as as workaround.
Field f = null;
try {
f = TimeUtil.class.getDeclaredField("timeZoneMappings");
f.setAccessible(true);
Properties timeZoneMappings = (Properties) f.get(null);
if (timeZoneMappings == null) {
timeZoneMappings = new Properties();
synchronized (TimeUtil.class) {
timeZoneMappings.load(this.getClass().getResourceAsStream("/com/mysql/jdbc/TimeZoneMapping.properties"));
}
f.set(null, timeZoneMappings);
}
}
catch (IllegalAccessException | NoSuchFieldException | IOException e) {
throw Throwables.propagate(e);
}
finally {
if (f != null) {
f.setAccessible(false);
}
}
}
@Override
protected Schema setupTask(JdbcInputConnection con, PluginTask task) throws SQLException
{
MySQLInputConnection mySQLCon = (MySQLInputConnection)con;
mySQLCon.compareTimeZone();
return super.setupTask(con,task);
}
}