Skip to content

Commit 86ae927

Browse files
[Fix][Connector-V2] Fix file binary format sync convert directory to file (#7942)
1 parent cb9c257 commit 86ae927

File tree

5 files changed

+72
-2
lines changed

5 files changed

+72
-2
lines changed

.github/workflows/backend.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ jobs:
364364
matrix:
365365
java: [ '8', '11' ]
366366
os: [ 'ubuntu-latest' ]
367-
timeout-minutes: 120
367+
timeout-minutes: 180
368368
steps:
369369
- uses: actions/checkout@v2
370370
- name: Set up JDK ${{ matrix.java }}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ public boolean fileExist(@NonNull String filePath) throws IOException {
6464
return execute(() -> getFileSystem().exists(new Path(filePath)));
6565
}
6666

67+
public boolean isFile(@NonNull String filePath) throws IOException {
68+
return execute(() -> getFileSystem().getFileStatus(new Path(filePath)).isFile());
69+
}
70+
6771
public void createFile(@NonNull String filePath) throws IOException {
6872
execute(
6973
() -> {

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void read(String path, String tableId, Collector<SeaTunnelRow> output)
5555
throws IOException, FileConnectorException {
5656
try (InputStream inputStream = hadoopFileSystemProxy.getInputStream(path)) {
5757
String relativePath;
58-
if (basePath.isFile()) {
58+
if (hadoopFileSystemProxy.isFile(basePath.getAbsolutePath())) {
5959
relativePath = basePath.getName();
6060
} else {
6161
relativePath =

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,25 @@ public void testFtpFileReadAndWriteForPassive(TestContainer container)
187187
deleteFileFromContainer(homePath);
188188
}
189189

190+
@TestTemplate
191+
public void testFtpToFtpForBinary(TestContainer container)
192+
throws IOException, InterruptedException {
193+
194+
Container.ExecResult execResult = container.executeJob("/text/ftp_to_ftp_for_binary.conf");
195+
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
196+
197+
String homePath = "/home/vsftpd/seatunnel/uploads/seatunnel";
198+
Assertions.assertEquals(1, getFileListFromContainer(homePath).size());
199+
200+
// Confirm data is written correctly
201+
Container.ExecResult resultExecResult =
202+
ftpContainer.execInContainer(
203+
"sh", "-c", "awk 'END {print NR}' " + homePath + "/e2e.txt");
204+
Assertions.assertEquals("5", resultExecResult.getStdout().trim());
205+
206+
deleteFileFromContainer(homePath);
207+
}
208+
190209
private void assertJobExecution(TestContainer container, String configPath, List<String> params)
191210
throws IOException, InterruptedException {
192211
Container.ExecResult execResult = container.executeJob(configPath, params);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
}
22+
23+
source {
24+
FtpFile {
25+
host = "ftp"
26+
port = 21
27+
user = seatunnel
28+
password = pass
29+
path= "/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt"
30+
file_format_type= "binary"
31+
encoding = "UTF-8"
32+
}
33+
}
34+
35+
36+
sink {
37+
FtpFile {
38+
host = "ftp"
39+
port = 21
40+
user = seatunnel
41+
password = pass
42+
tmp_path = "/upload-tmp/seatunnel"
43+
path= "/uploads/seatunnel"
44+
file_format_type= "binary"
45+
encoding="UTF-8"
46+
}
47+
}

0 commit comments

Comments
 (0)