Skip to content

Commit a36080a

Browse files
committed
Add Redshift timestamp/timestamptz incremental support
1 parent 1a1efc9 commit a36080a

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

embulk-input-redshift/src/main/java/org/embulk/input/RedshiftInputPlugin.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@
99
import org.embulk.config.Config;
1010
import org.embulk.config.ConfigDefault;
1111
import org.embulk.input.jdbc.AbstractJdbcInputPlugin;
12+
import org.embulk.input.jdbc.getter.ColumnGetterFactory;
1213
import org.embulk.input.postgresql.PostgreSQLInputConnection;
14+
import org.embulk.input.redshift.getter.RedshiftColumnGetterFactory;
15+
import org.embulk.spi.PageBuilder;
16+
import org.joda.time.DateTimeZone;
1317

1418
public class RedshiftInputPlugin
1519
extends AbstractJdbcInputPlugin
@@ -90,4 +94,10 @@ protected PostgreSQLInputConnection newConnection(PluginTask task) throws SQLExc
9094
}
9195
}
9296
}
97+
98+
@Override
99+
protected ColumnGetterFactory newColumnGetterFactory(PageBuilder pageBuilder, DateTimeZone dateTimeZone)
100+
{
101+
return new RedshiftColumnGetterFactory(pageBuilder, dateTimeZone);
102+
}
93103
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.embulk.input.redshift.getter;
2+
3+
import org.embulk.input.jdbc.AbstractJdbcInputPlugin.PluginTask;
4+
import org.embulk.input.jdbc.JdbcColumn;
5+
import org.embulk.input.jdbc.JdbcColumnOption;
6+
import org.embulk.input.jdbc.JdbcInputConnection;
7+
import org.embulk.input.jdbc.getter.ColumnGetter;
8+
import org.embulk.input.jdbc.getter.ColumnGetterFactory;
9+
import org.embulk.input.jdbc.getter.TimestampWithTimeZoneIncrementalHandler;
10+
import org.embulk.input.jdbc.getter.TimestampWithoutTimeZoneIncrementalHandler;
11+
import org.embulk.spi.PageBuilder;
12+
import org.joda.time.DateTimeZone;
13+
14+
public class RedshiftColumnGetterFactory extends ColumnGetterFactory
15+
{
16+
public RedshiftColumnGetterFactory(PageBuilder to, DateTimeZone defaultTimeZone)
17+
{
18+
super(to, defaultTimeZone);
19+
}
20+
21+
@Override
22+
public ColumnGetter newColumnGetter(JdbcInputConnection con, PluginTask task, JdbcColumn column, JdbcColumnOption option)
23+
{
24+
ColumnGetter getter = super.newColumnGetter(con, task, column, option);
25+
26+
// incremental loading wrapper
27+
switch (column.getTypeName()) {
28+
case "timestamptz":
29+
return new TimestampWithTimeZoneIncrementalHandler(getter);
30+
case "timestamp":
31+
return new TimestampWithoutTimeZoneIncrementalHandler(getter);
32+
default:
33+
return getter;
34+
}
35+
}
36+
}

0 commit comments

Comments
 (0)