Skip to content

Commit 4c8d92f

Browse files
authored
Merge pull request #101 from embulk/add_redshift_timestamp_incremental
Support Redshift timestamp/timestamptz incremental load
2 parents 1a1efc9 + eb9b10f commit 4c8d92f

31 files changed

+379
-0
lines changed

ci/travis_redshift.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
type: redshift
2+
host: localhost
3+
port: 5439
4+
database: travis_ci_test
5+
user: redshift
6+
password: ""
7+
fetch_rows: 1000

embulk-input-redshift/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,10 @@ in:
144144
```
145145
$ ./gradlew gem
146146
```
147+
148+
Running tests:
149+
150+
```
151+
$ cp ci/travis_redshift.yml ci/redshift.yml # edit this file if necessary
152+
$ EMBULK_INPUT_REDSHIFT_TEST_CONFIG=`pwd`/ci/redshift.yml ./gradlew :embulk-input-redshift:check --info
153+
```

embulk-input-redshift/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,6 @@ dependencies {
33
compile project(':embulk-input-postgresql')
44

55
testCompile project(':embulk-input-jdbc').sourceSets.test.output
6+
7+
testCompile 'org.embulk:embulk-standards:0.8.15'
68
}

embulk-input-redshift/src/main/java/org/embulk/input/RedshiftInputPlugin.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@
99
import org.embulk.config.Config;
1010
import org.embulk.config.ConfigDefault;
1111
import org.embulk.input.jdbc.AbstractJdbcInputPlugin;
12+
import org.embulk.input.jdbc.getter.ColumnGetterFactory;
1213
import org.embulk.input.postgresql.PostgreSQLInputConnection;
14+
import org.embulk.input.redshift.getter.RedshiftColumnGetterFactory;
15+
import org.embulk.spi.PageBuilder;
16+
import org.joda.time.DateTimeZone;
1317

1418
public class RedshiftInputPlugin
1519
extends AbstractJdbcInputPlugin
@@ -90,4 +94,10 @@ protected PostgreSQLInputConnection newConnection(PluginTask task) throws SQLExc
9094
}
9195
}
9296
}
97+
98+
@Override
99+
protected ColumnGetterFactory newColumnGetterFactory(PageBuilder pageBuilder, DateTimeZone dateTimeZone)
100+
{
101+
return new RedshiftColumnGetterFactory(pageBuilder, dateTimeZone);
102+
}
93103
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.embulk.input.redshift.getter;
2+
3+
import org.embulk.input.jdbc.AbstractJdbcInputPlugin.PluginTask;
4+
import org.embulk.input.jdbc.JdbcColumn;
5+
import org.embulk.input.jdbc.JdbcColumnOption;
6+
import org.embulk.input.jdbc.JdbcInputConnection;
7+
import org.embulk.input.jdbc.getter.ColumnGetter;
8+
import org.embulk.input.jdbc.getter.ColumnGetterFactory;
9+
import org.embulk.input.jdbc.getter.TimestampWithTimeZoneIncrementalHandler;
10+
import org.embulk.input.jdbc.getter.TimestampWithoutTimeZoneIncrementalHandler;
11+
import org.embulk.spi.PageBuilder;
12+
import org.joda.time.DateTimeZone;
13+
14+
public class RedshiftColumnGetterFactory extends ColumnGetterFactory
15+
{
16+
public RedshiftColumnGetterFactory(PageBuilder to, DateTimeZone defaultTimeZone)
17+
{
18+
super(to, defaultTimeZone);
19+
}
20+
21+
@Override
22+
public ColumnGetter newColumnGetter(JdbcInputConnection con, PluginTask task, JdbcColumn column, JdbcColumnOption option)
23+
{
24+
ColumnGetter getter = super.newColumnGetter(con, task, column, option);
25+
26+
// incremental loading wrapper
27+
switch (column.getTypeName()) {
28+
case "timestamptz":
29+
return new TimestampWithTimeZoneIncrementalHandler(getter);
30+
case "timestamp":
31+
return new TimestampWithoutTimeZoneIncrementalHandler(getter);
32+
default:
33+
return getter;
34+
}
35+
}
36+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package org.embulk.input.redshift;
2+
3+
import org.embulk.config.ConfigDiff;
4+
import org.embulk.config.ConfigSource;
5+
import org.embulk.input.RedshiftInputPlugin;
6+
import org.embulk.spi.InputPlugin;
7+
import org.embulk.test.EmbulkTests;
8+
import org.embulk.test.TestingEmbulk;
9+
import org.embulk.test.TestingEmbulk.RunResult;
10+
import org.junit.Before;
11+
import org.junit.Rule;
12+
import org.junit.Test;
13+
14+
import java.nio.file.Path;
15+
16+
import static org.embulk.input.redshift.RedshiftTests.execute;
17+
import static org.embulk.test.EmbulkTests.readSortedFile;
18+
import static org.hamcrest.Matchers.is;
19+
import static org.junit.Assert.assertThat;
20+
21+
public class IncrementalTest
22+
{
23+
private static final String BASIC_RESOURCE_PATH = "org/embulk/input/redshift/test/expect/incremental/";
24+
25+
private static ConfigSource loadYamlResource(TestingEmbulk embulk, String fileName)
26+
{
27+
return embulk.loadYamlResource(BASIC_RESOURCE_PATH + fileName);
28+
}
29+
30+
private static String readResource(String fileName)
31+
{
32+
return EmbulkTests.readResource(BASIC_RESOURCE_PATH + fileName);
33+
}
34+
35+
@Rule
36+
public TestingEmbulk embulk = TestingEmbulk.builder()
37+
.registerPlugin(InputPlugin.class, "redshift", RedshiftInputPlugin.class)
38+
.build();
39+
40+
private ConfigSource baseConfig;
41+
42+
@Before
43+
public void setup()
44+
{
45+
baseConfig = RedshiftTests.baseConfig();
46+
}
47+
48+
@Test
49+
public void simpleInt() throws Exception
50+
{
51+
// setup first rows
52+
execute(readResource("int/setup.sql"));
53+
54+
Path out1 = embulk.createTempFile("csv");
55+
RunResult result1 = embulk.runInput(
56+
baseConfig.merge(loadYamlResource(embulk, "int/config_1.yml")),
57+
out1);
58+
assertThat(
59+
readSortedFile(out1),
60+
is(readResource("int/expected_1.csv")));
61+
assertThat(
62+
result1.getConfigDiff(),
63+
is((ConfigDiff) loadYamlResource(embulk, "int/expected_1.diff")));
64+
65+
// insert more rows
66+
execute(readResource("int/insert_more.sql"));
67+
68+
Path out2 = embulk.createTempFile("csv");
69+
RunResult result2 = embulk.runInput(
70+
baseConfig.merge(loadYamlResource(embulk, "int/config_2.yml")),
71+
out2);
72+
assertThat(
73+
readSortedFile(out2),
74+
is(readResource("int/expected_2.csv")));
75+
assertThat(
76+
result2.getConfigDiff(),
77+
is((ConfigDiff) loadYamlResource(embulk, "int/expected_2.diff")));
78+
}
79+
80+
@Test
81+
public void simpleTimestampWithoutTimeZone() throws Exception
82+
{
83+
// setup first rows
84+
execute(readResource("timestamp/setup.sql"));
85+
86+
Path out1 = embulk.createTempFile("csv");
87+
RunResult result1 = embulk.runInput(
88+
baseConfig.merge(loadYamlResource(embulk, "timestamp/config_1.yml")),
89+
out1);
90+
assertThat(
91+
readSortedFile(out1),
92+
is(readResource("timestamp/expected_1.csv")));
93+
assertThat(
94+
result1.getConfigDiff(),
95+
is((ConfigDiff) loadYamlResource(embulk, "timestamp/expected_1.diff")));
96+
97+
// insert more rows
98+
execute(readResource("timestamp/insert_more.sql"));
99+
100+
Path out2 = embulk.createTempFile("csv");
101+
RunResult result2 = embulk.runInput(
102+
baseConfig.merge(loadYamlResource(embulk, "timestamp/config_2.yml")),
103+
out2);
104+
assertThat(
105+
readSortedFile(out2),
106+
is(readResource("timestamp/expected_2.csv")));
107+
assertThat(
108+
result2.getConfigDiff(),
109+
is((ConfigDiff) loadYamlResource(embulk, "timestamp/expected_2.diff")));
110+
}
111+
112+
@Test
113+
public void simpleTimestampWithTimeZone() throws Exception
114+
{
115+
// setup first rows
116+
execute(readResource("timestamptz/setup.sql"));
117+
118+
Path out1 = embulk.createTempFile("csv");
119+
RunResult result1 = embulk.runInput(
120+
baseConfig.merge(loadYamlResource(embulk, "timestamptz/config_1.yml")),
121+
out1);
122+
assertThat(
123+
readSortedFile(out1),
124+
is(readResource("timestamptz/expected_1.csv")));
125+
assertThat(
126+
result1.getConfigDiff(),
127+
is((ConfigDiff) loadYamlResource(embulk, "timestamptz/expected_1.diff")));
128+
129+
// insert more rows
130+
execute(readResource("timestamptz/insert_more.sql"));
131+
132+
Path out2 = embulk.createTempFile("csv");
133+
RunResult result2 = embulk.runInput(
134+
baseConfig.merge(loadYamlResource(embulk, "timestamptz/config_2.yml")),
135+
out2);
136+
assertThat(
137+
readSortedFile(out2),
138+
is(readResource("timestamptz/expected_2.csv")));
139+
assertThat(
140+
result2.getConfigDiff(),
141+
is((ConfigDiff) loadYamlResource(embulk, "timestamptz/expected_2.diff")));
142+
}
143+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package org.embulk.input.redshift;
2+
3+
import com.google.common.base.Throwables;
4+
import com.google.common.io.ByteStreams;
5+
import org.embulk.config.ConfigSource;
6+
import org.embulk.test.EmbulkTests;
7+
8+
import java.io.IOException;
9+
10+
import static java.util.Locale.ENGLISH;
11+
12+
public class RedshiftTests
13+
{
14+
public static ConfigSource baseConfig()
15+
{
16+
return EmbulkTests.config("EMBULK_INPUT_REDSHIFT_TEST_CONFIG");
17+
}
18+
19+
public static void execute(String sql)
20+
{
21+
ConfigSource config = baseConfig();
22+
ProcessBuilder pb = new ProcessBuilder("psql", "-w", "--set", "ON_ERROR_STOP=1", "-c", sql);
23+
pb.environment().put("PGUSER", config.get(String.class, "user"));
24+
pb.environment().put("PGPASSWORD", config.get(String.class, "password"));
25+
pb.environment().put("PGDATABASE", config.get(String.class, "database"));
26+
pb.environment().put("PGHOST", config.get(String.class, "host", "localhost"));
27+
pb.environment().put("PGPORT", config.get(String.class, "port", "5439"));
28+
pb.redirectErrorStream(true);
29+
30+
int code;
31+
try {
32+
Process process = pb.start();
33+
ByteStreams.copy(process.getInputStream(), System.out);
34+
code = process.waitFor();
35+
} catch (IOException | InterruptedException ex) {
36+
throw Throwables.propagate(ex);
37+
}
38+
if (code != 0) {
39+
throw new RuntimeException(String.format(ENGLISH,
40+
"Command finished with non-zero exit code. Exit code is %d.", code));
41+
}
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
table: int_load
2+
incremental: true
3+
incremental_columns: [num]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
table: int_load
2+
last_record: [4]
3+
incremental: true
4+
incremental_columns: [num]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
1,first
2+
2,first
3+
3,first
4+
4,first
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
in:
2+
last_record: [4]
3+
out: {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
5,more_load
2+
9,more_load
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
in:
2+
last_record: [9]
3+
out: {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
2+
insert into int_load (num, note) values
3+
(0, 'more_skip'),
4+
(4, 'more_skip'),
5+
(9, 'more_load'),
6+
(5, 'more_load');
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
drop table if exists int_load;
2+
3+
create table int_load (
4+
num int not null,
5+
note text
6+
);
7+
8+
insert into int_load (num, note) values
9+
(3, 'first'),
10+
(4, 'first'),
11+
(2, 'first'),
12+
(1, 'first');
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
table: load
2+
default_time_zone: +0300
3+
incremental: true
4+
incremental_columns: [time]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
table: load
2+
last_record: ['2016-11-02T04:00:05.333003']
3+
incremental: true
4+
incremental_columns: [time]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
2016-11-01 23:00:01.000000 +0000,first
2+
2016-11-02 00:00:02.000000 +0000,first
3+
2016-11-02 01:00:03.000000 +0000,first
4+
2016-11-02 02:00:04.000000 +0000,first
5+
2016-11-02 02:00:05.111001 +0000,first
6+
2016-11-02 02:00:05.222002 +0000,first
7+
2016-11-02 02:00:05.333003 +0000,first
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
in:
2+
last_record: ['2016-11-02T04:00:05.333003']
3+
out: {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
2016-11-02 02:00:05.333004 +0000,more_load
2+
2016-11-02 02:00:06.000000 +0000,more_load
3+
2016-11-02 02:00:06.000000 +0000,more_load
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
in:
2+
last_record: ['2016-11-02T04:00:06.000000']
3+
out: {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
2+
insert into load (time, note) values
3+
('2016-11-02 04:00:00', 'more_skip'),
4+
('2016-11-02 04:00:05.333000', 'more_skip'),
5+
('2016-11-02 04:00:05.333003', 'more_skip'),
6+
('2016-11-02 04:00:05.333004', 'more_load'),
7+
('2016-11-02 04:00:06', 'more_load'),
8+
('2016-11-02 04:00:06', 'more_load');
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
drop table if exists load;
2+
3+
create table load (
4+
time timestamp without time zone not null,
5+
note text
6+
);
7+
8+
insert into load (time, note) values
9+
('2016-11-02 01:00:01', 'first'),
10+
('2016-11-02 02:00:02', 'first'),
11+
('2016-11-02 03:00:03', 'first'),
12+
('2016-11-02 04:00:04', 'first'),
13+
('2016-11-02 04:00:05.111001', 'first'),
14+
('2016-11-02 04:00:05.222002', 'first'),
15+
('2016-11-02 04:00:05.333003', 'first');
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
table: load
2+
default_time_zone: +0300
3+
incremental: true
4+
incremental_columns: [time]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
table: load
2+
last_record: ['2016-11-02T04:00:05.333003Z']
3+
incremental: true
4+
incremental_columns: [time]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
2016-11-02 01:00:01.000000 +0000,first
2+
2016-11-02 02:00:02.000000 +0000,first
3+
2016-11-02 03:00:03.000000 +0000,first
4+
2016-11-02 04:00:04.000000 +0000,first
5+
2016-11-02 04:00:05.111001 +0000,first
6+
2016-11-02 04:00:05.222002 +0000,first
7+
2016-11-02 04:00:05.333003 +0000,first
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
in:
2+
last_record: ['2016-11-02T04:00:05.333003Z']
3+
out: {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
2016-11-02 04:00:05.333004 +0000,more_load
2+
2016-11-02 04:00:06.000000 +0000,more_load
3+
2016-11-02 04:00:06.000000 +0000,more_load

0 commit comments

Comments
 (0)