Skip to content

Commit bd22faf

Browse files
authored
Merge branch 'streamthoughts:master' into master
2 parents cc0eb2c + d8ca5ba commit bd22faf

File tree

11 files changed

+504
-12
lines changed

11 files changed

+504
-12
lines changed

.github/workflows/github-page.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ jobs:
2828
git submodule update --init --recursive
2929
3030
- name: Setup Hugo
31-
uses: peaceiris/actions-hugo@v2
31+
uses: peaceiris/actions-hugo@v3
3232
with:
3333
hugo-version: '0.96.0'
3434
extended: true

connect-file-pulse-expression/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
2626
<checkstyle.config.location>${project.parent.basedir}</checkstyle.config.location>
2727
<license.header.file>${project.parent.basedir}/header</license.header.file>
28-
<antlr4.version>4.13.1</antlr4.version>
28+
<antlr4.version>4.13.2</antlr4.version>
2929
</properties>
3030

3131
<build>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (c) StreamThoughts
4+
*
5+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
package io.streamthoughts.kafka.connect.filepulse.fs;
8+
9+
import static io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3ClientConfig.*;
10+
11+
import com.amazonaws.services.s3.AmazonS3;
12+
import io.findify.s3mock.S3Mock;
13+
import java.util.Collections;
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
import java.util.Random;
17+
import org.junit.After;
18+
import org.junit.Before;
19+
20+
public class BaseBzipAmazonS3Test {
21+
22+
public static final String S3_TEST_BUCKET = "testbucket";
23+
protected S3Mock s3Mock;
24+
protected AmazonS3 client;
25+
protected String endpointConfiguration;
26+
protected Map<String, String> unmodifiableCommonsProperties;
27+
28+
@Before
29+
public void setUp() throws Exception {
30+
final Random generator = new Random();
31+
final int s3Port = generator.nextInt(10000) + 10000;
32+
s3Mock = new S3Mock.Builder().withPort(s3Port).withInMemoryBackend().build();
33+
s3Mock.start();
34+
35+
endpointConfiguration = "http://localhost:" + s3Port;
36+
unmodifiableCommonsProperties = new HashMap<>();
37+
unmodifiableCommonsProperties.put(AWS_ACCESS_KEY_ID_CONFIG, "test_key_id");
38+
unmodifiableCommonsProperties.put(AWS_SECRET_ACCESS_KEY_CONFIG, "test_secret_key");
39+
unmodifiableCommonsProperties.put(AWS_S3_BUCKET_NAME_CONFIG, S3_TEST_BUCKET);
40+
unmodifiableCommonsProperties.put(AWS_S3_REGION_CONFIG, "us-west-2");
41+
unmodifiableCommonsProperties.put("compression.method", "bzip");
42+
unmodifiableCommonsProperties = Collections.unmodifiableMap(unmodifiableCommonsProperties);
43+
44+
client = AmazonS3ClientUtils.createS3Client(
45+
new AmazonS3ClientConfig(unmodifiableCommonsProperties),
46+
endpointConfiguration
47+
);
48+
}
49+
50+
@After
51+
public void tearDown() throws Exception {
52+
client.shutdown();
53+
s3Mock.shutdown();
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (c) StreamThoughts
4+
*
5+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
package io.streamthoughts.kafka.connect.filepulse.fs;
8+
9+
import static io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3ClientConfig.*;
10+
11+
import com.amazonaws.services.s3.AmazonS3;
12+
import io.findify.s3mock.S3Mock;
13+
import java.util.Collections;
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
import java.util.Random;
17+
import org.junit.After;
18+
import org.junit.Before;
19+
20+
public class BaseGzipAmazonS3Test {
21+
22+
public static final String S3_TEST_BUCKET = "testbucket";
23+
protected S3Mock s3Mock;
24+
protected AmazonS3 client;
25+
protected String endpointConfiguration;
26+
protected Map<String, String> unmodifiableCommonsProperties;
27+
28+
@Before
29+
public void setUp() throws Exception {
30+
final Random generator = new Random();
31+
final int s3Port = generator.nextInt(10000) + 10000;
32+
s3Mock = new S3Mock.Builder().withPort(s3Port).withInMemoryBackend().build();
33+
s3Mock.start();
34+
35+
endpointConfiguration = "http://localhost:" + s3Port;
36+
unmodifiableCommonsProperties = new HashMap<>();
37+
unmodifiableCommonsProperties.put(AWS_ACCESS_KEY_ID_CONFIG, "test_key_id");
38+
unmodifiableCommonsProperties.put(AWS_SECRET_ACCESS_KEY_CONFIG, "test_secret_key");
39+
unmodifiableCommonsProperties.put(AWS_S3_BUCKET_NAME_CONFIG, S3_TEST_BUCKET);
40+
unmodifiableCommonsProperties.put(AWS_S3_REGION_CONFIG, "us-west-2");
41+
unmodifiableCommonsProperties.put("compression.method", "gzip");
42+
unmodifiableCommonsProperties = Collections.unmodifiableMap(unmodifiableCommonsProperties);
43+
44+
client = AmazonS3ClientUtils.createS3Client(
45+
new AmazonS3ClientConfig(unmodifiableCommonsProperties),
46+
endpointConfiguration
47+
);
48+
}
49+
50+
@After
51+
public void tearDown() throws Exception {
52+
client.shutdown();
53+
s3Mock.shutdown();
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (c) StreamThoughts
4+
*
5+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
package io.streamthoughts.kafka.connect.filepulse.fs;
8+
9+
import static io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3ClientConfig.*;
10+
11+
import com.amazonaws.services.s3.AmazonS3;
12+
import io.findify.s3mock.S3Mock;
13+
import java.util.Collections;
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
import java.util.Random;
17+
import org.junit.After;
18+
import org.junit.Before;
19+
20+
public class BaseZipAmazonS3Test {
21+
22+
public static final String S3_TEST_BUCKET = "testbucket";
23+
protected S3Mock s3Mock;
24+
protected AmazonS3 client;
25+
protected String endpointConfiguration;
26+
protected Map<String, String> unmodifiableCommonsProperties;
27+
28+
@Before
29+
public void setUp() throws Exception {
30+
final Random generator = new Random();
31+
final int s3Port = generator.nextInt(10000) + 10000;
32+
s3Mock = new S3Mock.Builder().withPort(s3Port).withInMemoryBackend().build();
33+
s3Mock.start();
34+
35+
endpointConfiguration = "http://localhost:" + s3Port;
36+
unmodifiableCommonsProperties = new HashMap<>();
37+
unmodifiableCommonsProperties.put(AWS_ACCESS_KEY_ID_CONFIG, "test_key_id");
38+
unmodifiableCommonsProperties.put(AWS_SECRET_ACCESS_KEY_CONFIG, "test_secret_key");
39+
unmodifiableCommonsProperties.put(AWS_S3_BUCKET_NAME_CONFIG, S3_TEST_BUCKET);
40+
unmodifiableCommonsProperties.put(AWS_S3_REGION_CONFIG, "us-west-2");
41+
unmodifiableCommonsProperties.put("compression.method", "zip");
42+
unmodifiableCommonsProperties = Collections.unmodifiableMap(unmodifiableCommonsProperties);
43+
44+
client = AmazonS3ClientUtils.createS3Client(
45+
new AmazonS3ClientConfig(unmodifiableCommonsProperties),
46+
endpointConfiguration
47+
);
48+
}
49+
50+
@After
51+
public void tearDown() throws Exception {
52+
client.shutdown();
53+
s3Mock.shutdown();
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (c) StreamThoughts
4+
*
5+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
package io.streamthoughts.kafka.connect.filepulse.fs.reader;
8+
9+
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
10+
import io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3Storage;
11+
import io.streamthoughts.kafka.connect.filepulse.fs.BaseBzipAmazonS3Test;
12+
import io.streamthoughts.kafka.connect.filepulse.fs.S3BucketKey;
13+
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
14+
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
15+
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
16+
import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
17+
import java.io.*;
18+
import java.nio.charset.StandardCharsets;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
22+
import org.junit.Assert;
23+
import org.junit.Before;
24+
import org.junit.Rule;
25+
import org.junit.Test;
26+
import org.junit.rules.TemporaryFolder;
27+
28+
public class AmazonS3RowBzipFileInputReaderTest extends BaseBzipAmazonS3Test{
29+
30+
private static final String LF = "\n";
31+
32+
private static final int NLINES = 10;
33+
34+
@Rule
35+
public TemporaryFolder testFolder = new TemporaryFolder();
36+
37+
private File objectFile;
38+
39+
private AmazonS3RowFileInputReader reader;
40+
41+
@Before
42+
public void setUp() throws Exception {
43+
super.setUp();
44+
objectFile = testFolder.newFile();
45+
System.out.println(objectFile.toPath());
46+
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new BZip2CompressorOutputStream(new FileOutputStream(objectFile.toPath().toString())), StandardCharsets.UTF_8));
47+
generateLines(writer);
48+
49+
reader = new AmazonS3RowFileInputReader();
50+
reader.setStorage(new AmazonS3Storage(client));
51+
reader.configure(unmodifiableCommonsProperties);
52+
}
53+
54+
@Override
55+
public void tearDown() throws Exception {
56+
super.tearDown();
57+
reader.close();
58+
}
59+
60+
@Test
61+
public void should_read_all_zip_lines() {
62+
client.createBucket(S3_TEST_BUCKET);
63+
client.putObject(S3_TEST_BUCKET, "my-key", objectFile);
64+
65+
final GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder()
66+
.withUri(new S3BucketKey(S3_TEST_BUCKET, "my-key").toURI())
67+
.build();
68+
69+
final FileInputIterator<FileRecord<TypedStruct>> iterator = reader.newIterator(meta.uri());
70+
List<FileRecord<TypedStruct>> results = new ArrayList<>();
71+
while (iterator.hasNext()) {
72+
final RecordsIterable<FileRecord<TypedStruct>> next = iterator.next();
73+
results.addAll(next.collect());
74+
}
75+
Assert.assertEquals(10, results.size());
76+
}
77+
78+
private void generateLines(final BufferedWriter writer) throws IOException {
79+
80+
for (int i = 0; i < NLINES; i++) {
81+
String line = "00000000-" + i;
82+
writer.write(line);
83+
if (i + 1 < NLINES) {
84+
writer.write(LF);
85+
}
86+
}
87+
writer.flush();
88+
writer.close();
89+
}
90+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (c) StreamThoughts
4+
*
5+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
package io.streamthoughts.kafka.connect.filepulse.fs.reader;
8+
9+
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
10+
import io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3Storage;
11+
import io.streamthoughts.kafka.connect.filepulse.fs.BaseGzipAmazonS3Test;
12+
import io.streamthoughts.kafka.connect.filepulse.fs.S3BucketKey;
13+
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
14+
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
15+
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
16+
import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
17+
import java.io.*;
18+
import java.nio.charset.StandardCharsets;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.zip.GZIPOutputStream;
22+
import org.junit.Assert;
23+
import org.junit.Before;
24+
import org.junit.Rule;
25+
import org.junit.Test;
26+
import org.junit.rules.TemporaryFolder;
27+
28+
public class AmazonS3RowGzipFileInputReaderTest extends BaseGzipAmazonS3Test {
29+
30+
private static final String LF = "\n";
31+
32+
private static final int NLINES = 10;
33+
34+
@Rule
35+
public TemporaryFolder testFolder = new TemporaryFolder();
36+
37+
private File objectFile;
38+
39+
private AmazonS3RowFileInputReader reader;
40+
41+
@Before
42+
public void setUp() throws Exception {
43+
super.setUp();
44+
objectFile = testFolder.newFile();
45+
System.out.println(objectFile.toPath());
46+
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(new FileOutputStream(objectFile.toPath().toString())), StandardCharsets.UTF_8));
47+
generateLines(writer);
48+
49+
reader = new AmazonS3RowFileInputReader();
50+
reader.setStorage(new AmazonS3Storage(client));
51+
reader.configure(unmodifiableCommonsProperties);
52+
}
53+
54+
@Override
55+
public void tearDown() throws Exception {
56+
super.tearDown();
57+
reader.close();
58+
}
59+
60+
@Test
61+
public void should_read_all_gzip_lines() {
62+
client.createBucket(S3_TEST_BUCKET);
63+
client.putObject(S3_TEST_BUCKET, "my-key", objectFile);
64+
65+
final GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder()
66+
.withUri(new S3BucketKey(S3_TEST_BUCKET, "my-key").toURI())
67+
.build();
68+
69+
final FileInputIterator<FileRecord<TypedStruct>> iterator = reader.newIterator(meta.uri());
70+
List<FileRecord<TypedStruct>> results = new ArrayList<>();
71+
while (iterator.hasNext()) {
72+
final RecordsIterable<FileRecord<TypedStruct>> next = iterator.next();
73+
results.addAll(next.collect());
74+
}
75+
Assert.assertEquals(10, results.size());
76+
}
77+
78+
private void generateLines(final BufferedWriter writer) throws IOException {
79+
80+
for (int i = 0; i < NLINES; i++) {
81+
String line = "00000000-" + i;
82+
writer.write(line);
83+
if (i + 1 < NLINES) {
84+
writer.write(LF);
85+
}
86+
}
87+
writer.flush();
88+
writer.close();
89+
}
90+
}

0 commit comments

Comments
 (0)