Skip to content

Commit c137948

Browse files
committed
override replaceTable to make replace mode safer for MySQL
1 parent 6aba538 commit c137948

File tree

1 file changed

+48
-0
lines changed

1 file changed

+48
-0
lines changed

embulk-output-mysql/src/main/java/org/embulk/output/mysql/MySQLOutputConnection.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package org.embulk.output.mysql;
22

3+
import java.sql.Statement;
34
import java.util.List;
45
import java.sql.Connection;
56
import java.sql.SQLException;
7+
import java.util.Optional;
68

79
import org.embulk.output.MySQLTimeZoneComparison;
810
import org.embulk.output.jdbc.JdbcColumn;
@@ -101,6 +103,52 @@ protected String buildCollectMergeSql(List<TableIdentifier> fromTables, JdbcSche
101103
return sb.toString();
102104
}
103105

106+
private String buildSwapTableSql(TableIdentifier fromTable, TableIdentifier toTable)
107+
{
108+
String suffix = "_embulk_swap_tmp";
109+
String uniqueName = String.format("%016x", System.currentTimeMillis()) + suffix;
110+
TableIdentifier tmpTable = new TableIdentifier(fromTable.getDatabase(), fromTable.getSchemaName(), uniqueName);
111+
112+
StringBuilder sb = new StringBuilder();
113+
sb.append("RENAME TABLE ");
114+
quoteTableIdentifier(sb, fromTable);
115+
sb.append(" TO ");
116+
quoteTableIdentifier(sb, tmpTable);
117+
118+
sb.append(", ");
119+
quoteTableIdentifier(sb, toTable);
120+
sb.append(" TO ");
121+
quoteTableIdentifier(sb, fromTable);
122+
123+
sb.append(", ");
124+
quoteTableIdentifier(sb, tmpTable);
125+
sb.append(" TO ");
126+
quoteTableIdentifier(sb, toTable);
127+
128+
return sb.toString();
129+
}
130+
131+
@Override
132+
public void replaceTable(TableIdentifier fromTable, JdbcSchema schema, TableIdentifier toTable, Optional<String> postSql) throws SQLException
133+
{
134+
Statement stmt = connection.createStatement();
135+
try {
136+
executeUpdate(stmt, buildSwapTableSql(fromTable, toTable));
137+
138+
dropTableIfExists(stmt, fromTable);
139+
140+
if (postSql.isPresent()) {
141+
execute(stmt, postSql.get());
142+
}
143+
144+
commitIfNecessary(connection);
145+
} catch (SQLException ex) {
146+
throw safeRollback(connection, ex);
147+
} finally {
148+
stmt.close();
149+
}
150+
}
151+
104152
@Override
105153
protected String buildColumnTypeName(JdbcColumn c)
106154
{

0 commit comments

Comments
 (0)