diff --git a/embulk-input-jdbc/src/main/java/org/embulk/input/jdbc/AbstractJdbcInputPlugin.java b/embulk-input-jdbc/src/main/java/org/embulk/input/jdbc/AbstractJdbcInputPlugin.java index b0605f51..6b3ee9ac 100644 --- a/embulk-input-jdbc/src/main/java/org/embulk/input/jdbc/AbstractJdbcInputPlugin.java +++ b/embulk-input-jdbc/src/main/java/org/embulk/input/jdbc/AbstractJdbcInputPlugin.java @@ -193,7 +193,7 @@ public ConfigDiff transaction(ConfigSource config, return buildNextConfigDiff(task, control.run(task.dump(), schema, 1)); } - private Schema setupTask(JdbcInputConnection con, PluginTask task) throws SQLException + protected Schema setupTask(JdbcInputConnection con, PluginTask task) throws SQLException { if (task.getTable().isPresent()) { String actualTableName = normalizeTableNameCase(con, task.getTable().get()); diff --git a/embulk-input-mysql/src/main/java/org/embulk/input/MySQLInputPlugin.java b/embulk-input-mysql/src/main/java/org/embulk/input/MySQLInputPlugin.java index 5d72716c..73c16d0d 100644 --- a/embulk-input-mysql/src/main/java/org/embulk/input/MySQLInputPlugin.java +++ b/embulk-input-mysql/src/main/java/org/embulk/input/MySQLInputPlugin.java @@ -12,10 +12,12 @@ import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.input.jdbc.AbstractJdbcInputPlugin; +import org.embulk.input.jdbc.JdbcInputConnection; import org.embulk.input.jdbc.getter.ColumnGetterFactory; import org.embulk.input.mysql.MySQLInputConnection; import org.embulk.input.mysql.getter.MySQLColumnGetterFactory; import org.embulk.spi.PageBuilder; +import org.embulk.spi.Schema; import org.joda.time.DateTimeZone; public class MySQLInputPlugin @@ -163,4 +165,13 @@ private void loadTimeZoneMappings() } } } + + @Override + protected Schema setupTask(JdbcInputConnection con, PluginTask task) throws SQLException + { + MySQLInputConnection mySQLCon = (MySQLInputConnection)con; + mySQLCon.compareTimeZone(); + return super.setupTask(con,task); + } + } diff --git a/embulk-input-mysql/src/main/java/org/embulk/input/MySQLTimeZoneComparison.java b/embulk-input-mysql/src/main/java/org/embulk/input/MySQLTimeZoneComparison.java new file mode 100644 index 00000000..95829c24 --- /dev/null +++ b/embulk-input-mysql/src/main/java/org/embulk/input/MySQLTimeZoneComparison.java @@ -0,0 +1,104 @@ +package org.embulk.input; + +import org.embulk.spi.Exec; +import org.slf4j.Logger; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Date; +import java.util.Locale; +import java.util.TimeZone; + +public class MySQLTimeZoneComparison +{ + private static final int ONE_HOUR_SEC = 3600; + private static final int ONE_MIN_SEC = 60; + + private Connection connection; + + private final Logger logger = Exec.getLogger(getClass()); + + public MySQLTimeZoneComparison(Connection connection) + { + this.connection = connection; + } + + public void compareTimeZone() + throws SQLException + { + TimeZone serverTimeZone = null; + try { + serverTimeZone = getServerTimeZone(); + } + catch (SQLException ex) { + logger.warn("Can't get server TimeZone."); + logger.warn(String.format(Locale.ENGLISH, "SQLException raised %s", ex.toString())); + } + + TimeZone clientTimeZone = TimeZone.getDefault(); + Date today = new Date(); + int clientOffset = clientTimeZone.getRawOffset(); + + if (clientTimeZone.inDaylightTime(today)) { + clientOffset += clientTimeZone.getDSTSavings(); + } + + // + // Compare offset only. Although I expect to return true, the following code return false, + // + // TimeZone tz_jst = TimeZone.getTimeZone("JST"); + // TimeZone tz_gmt9 = TimeZone.getTimeZone("GMT+9"); + // tz_jst.hasSameRules(tz_gmt9) // return false. + // + if (clientOffset != serverTimeZone.getRawOffset()) { + logger.warn(String.format(Locale.ENGLISH, + "The client timezone(%s) is different from the server timezone(%s). The plugin will fetch wrong datetime values.", + clientTimeZone.getID(), serverTimeZone.getID())); + logger.warn(String.format(Locale.ENGLISH, + "You may need to set options `useLegacyDatetimeCode` and `serverTimezone`")); + logger.warn(String.format(Locale.ENGLISH, + "Example. `options: { useLegacyDatetimeCode: false, serverTimezone: UTC }`")); + } + logger.warn(String.format(Locale.ENGLISH, "The plugin will set `useLegacyDatetimeCode=false` by default in future.")); + } + + private TimeZone getServerTimeZone() + throws SQLException + { + // + // First, I used `@@system_time_zone`. but It return non Time Zone Abbreviations name on a specific platform. + // So, This method calculate GMT offset with query. + // + String query = "select TIME_TO_SEC(timediff(now(),utc_timestamp()));"; + Statement stmt = connection.createStatement(); + + try { + ResultSet rs = stmt.executeQuery(query); + if (rs.next()) { + int offsetSeconds = rs.getInt(1); + return fromGMTOffsetSeconds(offsetSeconds); + } + throw new SQLException(String.format(Locale.ENGLISH, + "The timezone comparison query(%s) doesn't return the result.",query)); + } + finally { + stmt.close(); + } + } + + private TimeZone fromGMTOffsetSeconds(int offsetSeconds) + { + if (offsetSeconds == 0) { + return TimeZone.getTimeZone("UTC"); + } + + String sign = offsetSeconds > 0 ? "+" : "-"; + int absOffsetSec = Math.abs(offsetSeconds); + int tzHour = absOffsetSec / ONE_HOUR_SEC; + int tzMin = absOffsetSec % ONE_HOUR_SEC / ONE_MIN_SEC; + String tzName = String.format(Locale.ENGLISH, "GMT%s%02d:%02d", sign, tzHour, tzMin); + return TimeZone.getTimeZone(tzName); + } +} diff --git a/embulk-input-mysql/src/main/java/org/embulk/input/mysql/MySQLInputConnection.java b/embulk-input-mysql/src/main/java/org/embulk/input/mysql/MySQLInputConnection.java index b783db5e..c77cb6ab 100644 --- a/embulk-input-mysql/src/main/java/org/embulk/input/mysql/MySQLInputConnection.java +++ b/embulk-input-mysql/src/main/java/org/embulk/input/mysql/MySQLInputConnection.java @@ -9,6 +9,7 @@ import com.mysql.jdbc.ConnectionImpl; import com.mysql.jdbc.ConnectionProperties; +import org.embulk.input.MySQLTimeZoneComparison; import org.embulk.input.jdbc.JdbcInputConnection; import org.embulk.input.jdbc.JdbcLiteral; import org.embulk.input.jdbc.getter.ColumnGetter; @@ -59,4 +60,11 @@ public TimeZone getServerTimezoneTZ() { return ((ConnectionImpl) connection).getServerTimezoneTZ(); } + + public void compareTimeZone() throws SQLException + { + MySQLTimeZoneComparison timeZoneComparison = new MySQLTimeZoneComparison(connection); + timeZoneComparison.compareTimeZone(); + } + }