Skip to content

Commit 796f669

Browse files
authored
Merge pull request #337 from rajyan/swap-table-replace-2
Swap name + drop instead of drop + rename in MySQL replace mode
2 parents feae8a0 + fcd0998 commit 796f669

File tree

2 files changed

+54
-2
lines changed

2 files changed

+54
-2
lines changed

embulk-output-mysql/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ MySQL output plugin for Embulk loads records to MySQL.
5353
* Transactional: Yes.
5454
* Resumable: No.
5555
* **replace**:
56-
* Behavior: This mode writes rows to an intermediate table first. If all those tasks run correctly, drops the target table and alters the name of the intermediate table into the target table name.
57-
* Transactional: No. If fails, the target table could be dropped (because MySQL can't rollback DDL).
56+
* Behavior: This mode writes rows to an intermediate table first. If all those tasks run correctly, swaps the intermediate table with target table by altering the table names atomically, and then drops the intermediate table.
57+
* Transactional: No. If fails, the intermediate table could remain (because MySQL can't rollback DDL)
5858
* Resumable: No.
5959
* **merge**:
6060
* Behavior: This mode writes rows to some intermediate tables first. If all those tasks run correctly, runs `INSERT INTO <target_table> SELECT * FROM <intermediate_table_1> UNION ALL SELECT * FROM <intermediate_table_2> UNION ALL ... ON DUPLICATE KEY UPDATE ...` query. Namely, if primary keys of a record in the intermediate tables already exist in the target table, the target record is updated by the intermediate record, otherwise the intermediate record is inserted. If the target table doesn't exist, it is created automatically.

embulk-output-mysql/src/main/java/org/embulk/output/mysql/MySQLOutputConnection.java

+52
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package org.embulk.output.mysql;
22

3+
import java.sql.Statement;
34
import java.util.List;
45
import java.sql.Connection;
56
import java.sql.SQLException;
7+
import java.util.Optional;
68

79
import org.embulk.output.MySQLTimeZoneComparison;
810
import org.embulk.output.jdbc.JdbcColumn;
@@ -101,6 +103,56 @@ protected String buildCollectMergeSql(List<TableIdentifier> fromTables, JdbcSche
101103
return sb.toString();
102104
}
103105

106+
private String buildSwapTableSql(TableIdentifier fromTable, TableIdentifier toTable)
107+
{
108+
String suffix = "_embulk_swap_tmp";
109+
String uniqueName = String.format("%016x", System.currentTimeMillis()) + suffix;
110+
// NOTE: The table name should be always shorter than 64 characters
111+
// See also: https://dev.mysql.com/doc/refman/8.0/en/identifier-length.html
112+
TableIdentifier tmpTable = new TableIdentifier(fromTable.getDatabase(), fromTable.getSchemaName(), uniqueName);
113+
114+
StringBuilder sb = new StringBuilder();
115+
sb.append("RENAME TABLE ");
116+
quoteTableIdentifier(sb, fromTable);
117+
sb.append(" TO ");
118+
quoteTableIdentifier(sb, tmpTable);
119+
120+
sb.append(", ");
121+
quoteTableIdentifier(sb, toTable);
122+
sb.append(" TO ");
123+
quoteTableIdentifier(sb, fromTable);
124+
125+
sb.append(", ");
126+
quoteTableIdentifier(sb, tmpTable);
127+
sb.append(" TO ");
128+
quoteTableIdentifier(sb, toTable);
129+
130+
return sb.toString();
131+
}
132+
133+
@Override
134+
public void replaceTable(TableIdentifier fromTable, JdbcSchema schema, TableIdentifier toTable, Optional<String> postSql) throws SQLException
135+
{
136+
Statement stmt = connection.createStatement();
137+
try {
138+
// "DROP TABLE" causes an implicit commit in MySQL, so we rename the table at first.
139+
// See also: https://dev.mysql.com/doc/refman/8.0/en/implicit-commit.html
140+
executeUpdate(stmt, buildSwapTableSql(fromTable, toTable));
141+
142+
dropTableIfExists(stmt, fromTable);
143+
144+
if (postSql.isPresent()) {
145+
execute(stmt, postSql.get());
146+
}
147+
148+
commitIfNecessary(connection);
149+
} catch (SQLException ex) {
150+
throw safeRollback(connection, ex);
151+
} finally {
152+
stmt.close();
153+
}
154+
}
155+
104156
@Override
105157
protected String buildColumnTypeName(JdbcColumn c)
106158
{

0 commit comments

Comments
 (0)