Skip to content

Commit b85ffb6

Browse files
Issue 50 (#51)
* Partial support for ELF. #50 * Full support for ELF. Added InputFileDeque the number of calls to list the directory contents. This will query only when needed to reduce the number of stat calls.
1 parent cac32a7 commit b85ffb6

31 files changed

+1125
-105
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
target
22
*.iml
33
.okhttpcache
4+
ELFTesting.properties

bin/debug.sh

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# you may not use this file except in compliance with the License.
77
# You may obtain a copy of the License at
88
#
9-
# http://www.apache.org/licenses/LICENSE-2.0
9+
# http://www.apache.org/licenses/LICENSE-2.0
1010
#
1111
# Unless required by applicable law or agreed to in writing, software
1212
# distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,25 +18,24 @@
1818
: ${INPUT_PATH:='/tmp/spooldir/input'}
1919
: ${ERROR_PATH:='/tmp/spooldir/error'}
2020
: ${FINISHED_PATH:='/tmp/spooldir/finished'}
21-
: ${DEBUG_SUSPEND_FLAG:='n'}
22-
export KAFKA_DEBUG='y'
23-
21+
: ${DEBUG_SUSPEND_FLAG:='y'}
22+
export KAFKA_DEBUG='n'
23+
export KAFKA_OPTS='-agentpath:/Applications/YourKit-Java-Profiler-2017.02.app/Contents/Resources/bin/mac/libyjpagent.jnilib=disablestacktelemetry,exceptions=disable,delay=10000'
2424
set -e
2525

2626
mvn clean package
2727

28-
if [ ! -d "${INPUT_PATH}" ]; then
29-
mkdir -p "${INPUT_PATH}"
30-
fi
31-
32-
if [ ! -d "${ERROR_PATH}" ]; then
33-
mkdir -p "${ERROR_PATH}"
34-
fi
28+
#if [ ! -d "${INPUT_PATH}" ]; then
29+
# mkdir -p "${INPUT_PATH}"
30+
#fi
3531

36-
if [ ! -d "${FINISHED_PATH}" ]; then
37-
mkdir -p "${FINISHED_PATH}"
38-
fi
32+
#if [ ! -d "${ERROR_PATH}" ]; then
33+
# mkdir -p "${ERROR_PATH}"
34+
#fi
3935

40-
cp src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/csv/FieldsMatch.data "${INPUT_PATH}/FieldsMatch.csv"
36+
#if [ ! -d "${FINISHED_PATH}" ]; then
37+
# mkdir -p "${FINISHED_PATH}"
38+
#fi
4139

42-
connect-standalone config/connect-avro-docker.properties config/CSVExample.properties
40+
#cp src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/csv/FieldsMatch.data "${INPUT_PATH}/FieldsMatch.csv
41+
connect-standalone config/connect-avro-docker.properties config/ELFTesting.properties

config/CSVExample.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
# you may not use this file except in compliance with the License.
66
# You may obtain a copy of the License at
77
#
8-
# http://www.apache.org/licenses/LICENSE-2.0
8+
# http://www.apache.org/licenses/LICENSE-2.0
99
#
1010
# Unless required by applicable law or agreed to in writing, software
1111
# distributed under the License is distributed on an "AS IS" BASIS,

config/ELFTesting.properties

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#
2+
# Copyright © 2016 Jeremy Custenborder ([email protected])
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
name=elftesting
18+
tasks.max=1
19+
connector.class=com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector
20+
input.file.pattern=^.*\.gz$
21+
finished.path=/Users/jeremy/data/confluent/logs/packages/finished
22+
input.path=/Users/jeremy/data/confluent/logs/packages
23+
error.path=/Users/jeremy/data/confluent/logs/packages/error
24+
halt.on.error=true
25+
topic=cloudfront
26+
schema.generation.enabled=true

config/connect-avro-docker.properties

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
# you may not use this file except in compliance with the License.
66
# You may obtain a copy of the License at
77
#
8-
# http://www.apache.org/licenses/LICENSE-2.0
8+
# http://www.apache.org/licenses/LICENSE-2.0
99
#
1010
# Unless required by applicable law or agreed to in writing, software
1111
# distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,14 +14,14 @@
1414
# limitations under the License.
1515
#
1616

17-
bootstrap.servers=confluent:9092
17+
bootstrap.servers=kafka:9092
1818
key.converter=io.confluent.connect.avro.AvroConverter
19-
key.converter.schema.registry.url=http://confluent:8081
19+
key.converter.schema.registry.url=http://schema-registry:8081
2020
value.converter=io.confluent.connect.avro.AvroConverter
21-
value.converter.schema.registry.url=http://confluent:8081
21+
value.converter.schema.registry.url=http://schema-registry:8081
2222
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
2323
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
2424
internal.key.converter.schemas.enable=false
2525
internal.value.converter.schemas.enable=false
2626
offset.storage.file.filename=/tmp/connect.offsets
27-
plugin.path=target/kafka-connect-target/usr/share/java
27+
plugin.path=target/kafka-connect-target/usr/share/kafka-connect

docker-compose.yml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
# you may not use this file except in compliance with the License.
66
# You may obtain a copy of the License at
77
#
8-
# http://www.apache.org/licenses/LICENSE-2.0
8+
# http://www.apache.org/licenses/LICENSE-2.0
99
#
1010
# Unless required by applicable law or agreed to in writing, software
1111
# distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,22 +17,23 @@
1717
version: "2"
1818
services:
1919
zookeeper:
20-
image: confluentinc/cp-zookeeper:3.3.0
20+
image: confluentinc/cp-zookeeper:4.1.0
2121
ports:
2222
- "2181:2181"
2323
environment:
2424
ZOOKEEPER_CLIENT_PORT: 2181
2525
kafka:
26-
image: confluentinc/cp-kafka:3.3.0
26+
image: confluentinc/cp-kafka:4.1.0
2727
depends_on:
2828
- zookeeper
2929
ports:
3030
- "9092:9092"
3131
environment:
3232
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
33-
KAFKA_ADVERTISED_LISTENERS: "plaintext://confluent:9092"
33+
KAFKA_ADVERTISED_LISTENERS: "plaintext://kafka:9092"
34+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
3435
schema-registry:
35-
image: confluentinc/cp-schema-registry:3.3.0
36+
image: confluentinc/cp-schema-registry:4.1.0
3637
depends_on:
3738
- kafka
3839
- zookeeper

pom.xml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,21 @@
11
<?xml version="1.0"?>
2+
<!--
3+
4+
Copyright © 2016 Jeremy Custenborder ([email protected])
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
219
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
320
xmlns="http://maven.apache.org/POM/4.0.0"
421
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
@@ -57,6 +74,11 @@
5774
<artifactId>commons-compress</artifactId>
5875
<version>1.16.1</version>
5976
</dependency>
77+
<dependency>
78+
<groupId>com.github.jcustenborder.parsers</groupId>
79+
<artifactId>extended-log-format</artifactId>
80+
<version>[0.0.1.2, 0.0.1.1000)</version>
81+
</dependency>
6082
</dependencies>
6183
<build>
6284
<plugins>
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/**
2+
* Copyright © 2016 Jeremy Custenborder ([email protected])
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.jcustenborder.kafka.connect.spooldir;
17+
18+
import com.google.common.collect.ForwardingDeque;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
import java.io.File;
23+
import java.util.ArrayDeque;
24+
import java.util.ArrayList;
25+
import java.util.Arrays;
26+
import java.util.Comparator;
27+
import java.util.Deque;
28+
import java.util.List;
29+
30+
public class InputFileDequeue extends ForwardingDeque<File> {
31+
private static final Logger log = LoggerFactory.getLogger(InputFileDequeue.class);
32+
private final SpoolDirSourceConnectorConfig config;
33+
34+
public InputFileDequeue(SpoolDirSourceConnectorConfig config) {
35+
this.config = config;
36+
}
37+
38+
public static File processingFile(String processingFileExtension, File input) {
39+
String fileName = input.getName() + processingFileExtension;
40+
return new File(input.getParentFile(), fileName);
41+
}
42+
43+
44+
Deque<File> files;
45+
46+
@Override
47+
protected Deque<File> delegate() {
48+
if (null != files && !files.isEmpty()) {
49+
return files;
50+
}
51+
52+
log.info("Searching for file in {}", this.config.inputPath);
53+
File[] input = this.config.inputPath.listFiles(this.config.inputFilenameFilter);
54+
if (null == input || input.length == 0) {
55+
log.info("No files matching {} were found in {}", SpoolDirSourceConnectorConfig.INPUT_FILE_PATTERN_CONF, this.config.inputPath);
56+
return new ArrayDeque<>();
57+
}
58+
Arrays.sort(input, Comparator.comparing(File::getName));
59+
List<File> files = new ArrayList<>(input.length);
60+
for (File f : input) {
61+
File processingFile = processingFile(this.config.processingFileExtension, f);
62+
log.trace("Checking for processing file: {}", processingFile);
63+
64+
if (processingFile.exists()) {
65+
log.debug("Skipping {} because processing file exists.", f);
66+
continue;
67+
}
68+
files.add(f);
69+
}
70+
71+
Deque<File> result = new ArrayDeque<>(files.size());
72+
73+
for (File file : files) {
74+
long fileAgeMS = System.currentTimeMillis() - file.lastModified();
75+
76+
if (fileAgeMS < 0L) {
77+
log.warn("File {} has a date in the future.", file);
78+
}
79+
80+
if (this.config.minimumFileAgeMS > 0L && fileAgeMS < this.config.minimumFileAgeMS) {
81+
log.debug("Skipping {} because it does not meet the minimum age.", file);
82+
continue;
83+
}
84+
result.add(file);
85+
}
86+
87+
log.info("Found {} file(s) to process", result.size());
88+
return (this.files = result);
89+
}
90+
}

src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorConfig.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,12 @@ class SpoolDirCsvSourceConnectorConfig extends SpoolDirSourceConnectorConfig {
6464

6565
static final String CSV_SKIP_LINES_DOC = "Number of lines to skip in the beginning of the file.";
6666
static final int CSV_SKIP_LINES_DEFAULT = CSVReader.DEFAULT_SKIP_LINES;
67-
static final String CSV_SEPARATOR_CHAR_DOC = "The character that seperates each field. Typically in a CSV this is a , character. A TSV would use \\t.";
67+
static final String CSV_SEPARATOR_CHAR_DOC = "The character that separates each field in the form " +
68+
"of an integer. Typically in a CSV this is a ,(44) character. A TSV would use a tab(9) character.";
6869
static final int CSV_SEPARATOR_CHAR_DEFAULT = (int) CSVParser.DEFAULT_SEPARATOR;
6970
static final int CSV_QUOTE_CHAR_DEFAULT = (int) CSVParser.DEFAULT_QUOTE_CHARACTER;
70-
static final String CSV_ESCAPE_CHAR_DOC = "Escape character.";
71+
static final String CSV_ESCAPE_CHAR_DOC = "The character as an integer to use when a special " +
72+
"character is encountered. The default escape character is typically a \\(92)";
7173
static final int CSV_ESCAPE_CHAR_DEFAULT = (int) CSVParser.DEFAULT_ESCAPE_CHARACTER;
7274
static final String CSV_STRICT_QUOTES_DOC = "Sets the strict quotes setting - if true, characters outside the quotes are ignored.";
7375
static final boolean CSV_STRICT_QUOTES_DEFAULT = CSVParser.DEFAULT_STRICT_QUOTES;
@@ -288,6 +290,11 @@ public CSVReaderBuilder createCSVReaderBuilder(Reader reader, CSVParser parser)
288290
.withFieldAsNull(nullFieldIndicator);
289291
}
290292

293+
@Override
294+
public boolean schemasRequired() {
295+
return true;
296+
}
297+
291298
static class CharsetValidator implements ConfigDef.Validator {
292299
static CharsetValidator of() {
293300
return new CharsetValidator();

src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright © 2016 Jeremy Custenborder ([email protected])
3-
* <p>
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
* <p>
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
* <p>
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

0 commit comments

Comments
 (0)