Skip to content

Commit e6c6ec4

Browse files
committed
Added check for cleanup policy. Added log message when removing file.
1 parent 3af4d58 commit e6c6ec4

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceTask.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,12 @@ public void start(Map<String, String> settings) {
132132
this.config = config(settings);
133133

134134
checkDirectory(SpoolDirSourceConnectorConfig.INPUT_PATH_CONFIG, this.config.inputPath);
135-
checkDirectory(SpoolDirSourceConnectorConfig.FINISHED_PATH_CONFIG, this.config.finishedPath);
136135
checkDirectory(SpoolDirSourceConnectorConfig.ERROR_PATH_CONFIG, this.config.errorPath);
137136

137+
if (SpoolDirSourceConnectorConfig.CleanupPolicy.MOVE == this.config.cleanupPolicy) {
138+
checkDirectory(SpoolDirSourceConnectorConfig.FINISHED_PATH_CONFIG, this.config.finishedPath);
139+
}
140+
138141
this.parser = new Parser();
139142
Map<Schema, TypeParser> dateTypeParsers = ImmutableMap.of(
140143
Timestamp.SCHEMA, new TimestampTypeParser(this.config.parserTimestampTimezone, this.config.parserTimestampDateFormats),
@@ -288,6 +291,7 @@ private void closeAndDelete() throws IOException {
288291
log.info("Closing {}", this.inputFile);
289292
this.inputStream.close();
290293
this.inputStream = null;
294+
log.info("Removing file {}", this.inputFile);
291295
this.inputFile.delete();
292296
File processingFile = InputFileDequeue.processingFile(this.config.processingFileExtension, this.inputFile);
293297
if (processingFile.exists()) {

0 commit comments

Comments
 (0)