Skip to content

Commit 695c6f2

Browse files
rouellet99fhussonnois
authored andcommitted
Update LocalFSDirectoryListing.java
fix: (plugin): change the code for the configuration to delete the compressed file after extraction LocalFSDirectoryListing.java:[104,5] (metrics) CyclomaticComplexity: Cyclomatic Complexity is 16 (max allowed is 15). When using zipped file, is there any reason , why the compressed file is not removed from the directory once extracted? #603
1 parent 974e83e commit 695c6f2

File tree

1 file changed

+57
-32
lines changed

1 file changed

+57
-32
lines changed

connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListing.java

Lines changed: 57 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -103,22 +103,54 @@ public void setFilter(final FileListFilter filter) {
103103

104104
private List<File> listEligibleFiles(final Path input) {
105105
final List<File> listingLocalFiles = new LinkedList<>();
106-
if (!Files.isReadable(input)) {
107-
LOG.warn("Cannot get directory listing for '{}'. Input path is not readable.", input);
106+
if (!isPathReadable(input)) {
108107
return listingLocalFiles;
109108
}
110109

111-
if (!Files.isDirectory(input)) {
112-
LOG.warn("Cannot get directory listing for '{}'. Input path is not a directory.", input);
113-
return listingLocalFiles;
114-
}
115-
116-
if (isHidden(input)) {
110+
if (!isPathDirectory(input) || isHidden(input)) {
117111
return listingLocalFiles;
118112
}
119113

120114
final List<Path> decompressedDirs = new LinkedList<>();
121115
final List<Path> directories = new LinkedList<>();
116+
processFiles(input, listingLocalFiles, directories, decompressedDirs);
117+
118+
if (config.isRecursiveScanEnable() && !directories.isEmpty()) {
119+
listingLocalFiles.addAll(scanRecursiveDirectories(directories, decompressedDirs));
120+
}
121+
return listingLocalFiles;
122+
}
123+
124+
private boolean isPathReadable(Path path) {
125+
if (!Files.isReadable(path)) {
126+
LOG.warn("Cannot get directory listing for '{}'. Input path is not readable.", path);
127+
return false;
128+
}
129+
return true;
130+
}
131+
132+
private boolean isPathDirectory(Path path) {
133+
if (!Files.isDirectory(path)) {
134+
LOG.warn("Cannot get directory listing for '{}'. Input path is not a directory.", path);
135+
return false;
136+
}
137+
return true;
138+
}
139+
140+
private boolean isHidden(final Path input) {
141+
try {
142+
return Files.isHidden(input);
143+
} catch (IOException e) {
144+
LOG.warn(
145+
"Error while checking if input file is hidden '{}': {}",
146+
input,
147+
e.getLocalizedMessage());
148+
return false;
149+
}
150+
}
151+
152+
private void processFiles(Path input, List<File> listingLocalFiles, List<Path> directories,
153+
List<Path> decompressedDirs) {
122154
try (DirectoryStream<Path> stream = Files.newDirectoryStream(input)) {
123155
for (Path path : stream) {
124156
if (Files.isDirectory(path)) {
@@ -137,10 +169,8 @@ private List<File> listEligibleFiles(final Path input) {
137169
final Path decompressed = codec.decompress(file).toPath();
138170
listingLocalFiles.addAll(listEligibleFiles(decompressed));
139171
decompressedDirs.add(decompressed);
140-
if (config.isDeleteCompressFileEnable() && decompressed.toFile().exists()) {
141-
file.delete();
142-
}
143-
LOG.debug("Compressed file deleted successfully : {}", path);
172+
LOG.debug("Compressed file extracted successfully : {}", path);
173+
handleFileDeletion(file, path);
144174
} catch (IOException | SecurityException e) {
145175
if (e instanceof IOException) {
146176
LOG.warn("Error while decompressing input file '{}'. Skip and continue.", path, e);
@@ -158,34 +188,29 @@ private List<File> listEligibleFiles(final Path input) {
158188
}
159189
}
160190
} catch (IOException e) {
161-
LOG.error(
162-
"Error while getting directory listing for {}: {}",
163-
input,
164-
e.getLocalizedMessage());
191+
LOG.error("Error while getting directory listing for {}: {}", input, e.getLocalizedMessage());
165192
throw new ConnectException(e);
166193
}
167194

168-
if (config.isRecursiveScanEnable() && !directories.isEmpty()) {
169-
listingLocalFiles.addAll(directories.stream()
170-
.filter(f -> !decompressedDirs.contains(f))
171-
.flatMap(f -> listEligibleFiles(f).stream())
172-
.collect(Collectors.toList()));
173-
}
174-
return listingLocalFiles;
175195
}
176196

177-
private boolean isHidden(final Path input) {
178-
try {
179-
return Files.isHidden(input);
180-
} catch (IOException e) {
181-
LOG.warn(
182-
"Error while checking if input file is hidden '{}': {}",
183-
input,
184-
e.getLocalizedMessage());
185-
return false;
197+
private void handleFileDeletion(File file, Path path) {
198+
if (config.isDeleteCompressFileEnable() && file.exists()) {
199+
if (file.delete()) {
200+
LOG.debug("Compressed file deleted successfully : {}", path);
201+
} else {
202+
LOG.warn("Error while deleting input file '{}'. Skip and continue.", path);
203+
}
186204
}
187205
}
188206

207+
private List<File> scanRecursiveDirectories(List<Path> directories, List<Path> decompressedDirs) {
208+
return directories.stream()
209+
.filter(f -> !decompressedDirs.contains(f))
210+
.flatMap(f -> listEligibleFiles(f).stream())
211+
.collect(Collectors.toList());
212+
}
213+
189214
/**
190215
* {@inheritDoc}
191216
*/

0 commit comments

Comments
 (0)