Skip to content

Commit

Permalink
Added support for a configurable delete policy. Fixes #52. (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcustenborder authored May 2, 2018
1 parent b85ffb6 commit b8f1b19
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ public abstract class SpoolDirSourceConnectorConfig extends AbstractConfig {
public static final String SCHEMA_GENERATION_VALUE_NAME_CONF = "schema.generation.value.name";
public static final String SCHEMA_GENERATION_ENABLED_CONF = "schema.generation.enabled";
public static final String METADATA_SCHEMA_NAME = "com.github.jcustenborder.kafka.connect.spooldir.Metadata";
public static final String CLEANUP_POLICY_CONF = "cleanup.policy";
public static final String CLEANUP_POLICY_DOC = "Determines how the connector should cleanup the " +
"files that have been successfully processed. NONE leaves the files in place which could " +
"cause them to be reprocessed if the connector is restarted. DELETE removes the file from the " +
"filesystem. MOVE will move the file to a finished directory.";
public static final String GROUP_FILESYSTEM = "File System";
public static final String GROUP_SCHEMA_GENERATION = "Schema Generation";
public static final String GROUP_SCHEMA = "Schema";
public static final String GROUP_GENERAL = "General";
public static final String GROUP_TIMESTAMP = "Timestamps";
static final String TIMESTAMP_FIELD_DOC = "The field in the value schema that will contain the parsed timestamp for the record. " +
"This field cannot be marked as optional and must be a " +
"[Timestamp](https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.html)";
Expand Down Expand Up @@ -137,11 +147,18 @@ public abstract class SpoolDirSourceConnectorConfig extends AbstractConfig {
public final String schemaGenerationValueName;
public boolean hasKeyMetadataField;
public boolean hasvalueMetadataField;

public CleanupPolicy cleanupPolicy;
public SpoolDirSourceConnectorConfig(final boolean isTask, ConfigDef configDef, Map<String, ?> settings) {
super(configDef, settings);
this.inputPath = ConfigUtils.getAbsoluteFile(this, INPUT_PATH_CONFIG);
this.finishedPath = ConfigUtils.getAbsoluteFile(this, FINISHED_PATH_CONFIG);
this.cleanupPolicy = ConfigUtils.getEnum(CleanupPolicy.class, this, CLEANUP_POLICY_CONF);

if (CleanupPolicy.MOVE == this.cleanupPolicy) {
this.finishedPath = ConfigUtils.getAbsoluteFile(this, FINISHED_PATH_CONFIG);
} else {
this.finishedPath = null;
}

this.errorPath = ConfigUtils.getAbsoluteFile(this, ERROR_PATH_CONFIG);
this.haltOnError = this.getBoolean(HALT_ON_ERROR_CONF);
this.minimumFileAgeMS = this.getLong(FILE_MINIMUM_AGE_MS_CONF);
Expand Down Expand Up @@ -271,9 +288,6 @@ public SpoolDirSourceConnectorConfig(final boolean isTask, ConfigDef configDef,
this.inputFilenameFilter = new PatternFilenameFilter(inputPattern);
}

public abstract boolean schemasRequired();


private static final Field findMetadataField(Schema schema) {
Field result = null;
for (Field field : schema.fields()) {
Expand All @@ -287,12 +301,6 @@ private static final Field findMetadataField(Schema schema) {
return result;
}

public static final String GROUP_FILESYSTEM = "File System";
public static final String GROUP_SCHEMA_GENERATION = "Schema Generation";
public static final String GROUP_SCHEMA = "Schema";
public static final String GROUP_GENERAL = "General";
public static final String GROUP_TIMESTAMP = "Timestamps";

public static ConfigDef config() {

ConfigDef.Recommender schemaRecommender = new ConfigDef.Recommender() {
Expand Down Expand Up @@ -325,6 +333,23 @@ public boolean visible(String key, Map<String, Object> settings) {
}
};

ConfigDef.Recommender finishedPath = new ConfigDef.Recommender() {
@Override
public List<Object> validValues(String s, Map<String, Object> map) {
return null;
}

@Override
public boolean visible(String s, Map<String, Object> map) {
if (!FINISHED_PATH_CONFIG.equals(s)) {
return true;
}

final String cleanupPolicy = (String) map.get(CLEANUP_POLICY_CONF);
return CleanupPolicy.MOVE.toString().equals(cleanupPolicy);
}
};


return new ConfigDef()

Expand All @@ -351,7 +376,15 @@ public boolean visible(String key, Map<String, Object> settings) {
.group(GROUP_GENERAL)
.build()
)

.define(
ConfigKeyBuilder.of(CLEANUP_POLICY_CONF, ConfigDef.Type.STRING)
.documentation(CLEANUP_POLICY_DOC)
.importance(ConfigDef.Importance.MEDIUM)
.validator(ValidEnum.of(CleanupPolicy.class))
.defaultValue(CleanupPolicy.MOVE.toString())
.group(GROUP_FILESYSTEM)
.build()
)
// Filesystem
.define(
ConfigKeyBuilder.of(INPUT_PATH_CONFIG, ConfigDef.Type.STRING)
Expand All @@ -364,7 +397,8 @@ public boolean visible(String key, Map<String, Object> settings) {
ConfigKeyBuilder.of(FINISHED_PATH_CONFIG, ConfigDef.Type.STRING)
.documentation(FINISHED_PATH_DOC)
.importance(ConfigDef.Importance.HIGH)
.validator(ValidDirectoryWritable.of())
.defaultValue("")
.recommender(finishedPath)
.group(GROUP_FILESYSTEM)
.build()
).define(
Expand Down Expand Up @@ -503,6 +537,8 @@ public boolean visible(String key, Map<String, Object> settings) {
);
}

public abstract boolean schemasRequired();

Schema readSchema(final String key) {
String schema = this.getString(key);
Schema result;
Expand All @@ -525,4 +561,10 @@ public enum TimestampMode {
FILE_TIME,
PROCESS_TIME
}

public enum CleanupPolicy {
NONE,
DELETE,
MOVE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,45 +201,6 @@ private void closeAndMoveToFinished(File outputDirectory, boolean errored) throw
}
}


// File findNextInputFile() {
// File[] input = this.config.inputPath.listFiles(this.config.inputFilenameFilter);
// if (null == input || input.length == 0) {
// log.debug("No files matching {} were found in {}", SpoolDirSourceConnectorConfig.INPUT_FILE_PATTERN_CONF, this.config.inputPath);
// return null;
// }
// List<File> files = new ArrayList<>(input.length);
// for (File f : input) {
// File processingFile = InputFileDequeue.processingFile(this.config, f);
// log.trace("Checking for processing file: {}", processingFile);
//
// if (processingFile.exists()) {
// log.debug("Skipping {} because processing file exists.", f);
// continue;
// }
// files.add(f);
// }
//
// File result = null;
//
// for (File file : files) {
// long fileAgeMS = System.currentTimeMillis() - file.lastModified();
//
// if (fileAgeMS < 0L) {
// log.warn("File {} has a date in the future.", file);
// }
//
// if (this.config.minimumFileAgeMS > 0L && fileAgeMS < this.config.minimumFileAgeMS) {
// log.debug("Skipping {} because it does not meet the minimum age.", file);
// continue;
// }
// result = file;
// break;
// }
//
// return result;
// }

static final Map<String, String> SUPPORTED_COMPRESSION_TYPES = ImmutableMap.of(
"bz2", CompressorStreamFactory.BZIP2,
"gz", CompressorStreamFactory.GZIP,
Expand All @@ -251,7 +212,14 @@ private void closeAndMoveToFinished(File outputDirectory, boolean errored) throw
public List<SourceRecord> read() {
try {
if (!hasRecords) {
closeAndMoveToFinished(this.config.finishedPath, false);
switch (this.config.cleanupPolicy) {
case MOVE:
closeAndMoveToFinished(this.config.finishedPath, false);
break;
case DELETE:
closeAndDelete();
break;
}

File nextFile = this.inputFileDequeue.poll();
if (null == nextFile) {
Expand Down Expand Up @@ -315,6 +283,21 @@ public List<SourceRecord> read() {
}
}

private void closeAndDelete() throws IOException {
if (null != inputStream) {
log.info("Closing {}", this.inputFile);
this.inputStream.close();
this.inputStream = null;
this.inputFile.delete();
File processingFile = InputFileDequeue.processingFile(this.config.processingFileExtension, this.inputFile);
if (processingFile.exists()) {
log.info("Removing processing file {}", processingFile);
processingFile.delete();
}

}
}

protected void addRecord(List<SourceRecord> records, Struct keyStruct, Struct valueStruct) {
Map<String, ?> sourceOffset = ImmutableMap.of(
"offset",
Expand Down

0 comments on commit b8f1b19

Please sign in to comment.