Skip to content

Commit

Permalink
Add the number of records imported.
Browse files Browse the repository at this point in the history
  • Loading branch information
jcustenborder committed May 9, 2018
1 parent 95d384f commit 82aeb6c
Showing 1 changed file with 21 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ public List<SourceRecord> poll() throws InterruptedException {
return results;
}

private void recordProcessingTime() {
log.info(
"Finished processing {} record(s) in {} second(s).",
this.recordCount,
processingTime.elapsed(TimeUnit.SECONDS)
);
}

private void closeAndMoveToFinished(File outputDirectory, boolean errored) throws IOException {
if (null != inputStream) {
log.info("Closing {}", this.inputFile);
Expand All @@ -190,7 +198,12 @@ private void closeAndMoveToFinished(File outputDirectory, boolean errored) throw
if (errored) {
log.error("Error during processing, moving {} to {}.", this.inputFile, outputDirectory);
} else {
log.info("Finished processing {} in {} second(s). Moving to {}.", this.inputFile, processingTime.elapsed(TimeUnit.SECONDS), outputDirectory);
recordProcessingTime();
log.info(
"Moving to {} to {}.",
this.inputFile,
outputDirectory
);
}

Files.move(this.inputFile, finishedFile);
Expand All @@ -215,6 +228,8 @@ private void closeAndMoveToFinished(File outputDirectory, boolean errored) throw
public List<SourceRecord> read() {
try {
if (!hasRecords) {


switch (this.config.cleanupPolicy) {
case MOVE:
closeAndMoveToFinished(this.config.finishedPath, false);
Expand Down Expand Up @@ -260,6 +275,7 @@ public List<SourceRecord> read() {
} else {
this.inputStream = inputStream;
}
this.recordCount = 0;
configure(this.inputStream, this.metadata, lastOffset);
} catch (Exception ex) {
throw new ConnectException(ex);
Expand Down Expand Up @@ -291,6 +307,7 @@ private void closeAndDelete() throws IOException {
log.info("Closing {}", this.inputFile);
this.inputStream.close();
this.inputStream = null;
recordProcessingTime();
log.info("Removing file {}", this.inputFile);
this.inputFile.delete();
File processingFile = InputFileDequeue.processingFile(this.config.processingFileExtension, this.inputFile);
Expand All @@ -302,6 +319,8 @@ private void closeAndDelete() throws IOException {
}
}

long recordCount;

protected void addRecord(List<SourceRecord> records, Struct keyStruct, Struct valueStruct) {
Map<String, ?> sourceOffset = ImmutableMap.of(
"offset",
Expand Down Expand Up @@ -350,6 +369,7 @@ protected void addRecord(List<SourceRecord> records, Struct keyStruct, Struct va
valueStruct,
timestamp
);
recordCount++;
records.add(sourceRecord);
}
}

0 comments on commit 82aeb6c

Please sign in to comment.