Skip to content

Commit 6e3f731

Browse files
committed
Merge pull request #2 from civitaspo/direct_sftp_upload
Direct sftp upload
2 parents b15b5ec + ce3f70d commit 6e3f731

File tree

3 files changed

+38
-82
lines changed

3 files changed

+38
-82
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,7 @@ $ embulk run -Ilib example/sample.yml
5353
```
5454
$ ./gradlew gem # -t to watch change of files and rebuild continuously
5555
```
56+
57+
## Note
58+
59+
This plugin uses "org.apache.commons:commons-vfs" and the library uses the logger "org.apache.commons.logging.Log". So, this plugin suppress the logger's message except when embulk log level is debug.

src/main/java/org/embulk/output/sftp/SftpFileOutput.java

Lines changed: 33 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,21 @@ public class SftpFileOutput
3535
private final String userInfo;
3636
private final String host;
3737
private final int port;
38-
private final String workingFileScheme;
3938
private final String pathPrefix;
4039
private final String sequenceFormat;
4140
private final String fileNameExtension;
4241

4342
private final int taskIndex;
4443
private int fileIndex = 0;
45-
private FileObject currentWorkingFile;
46-
private OutputStream currentWorkingFileOutputStream;
44+
private FileObject currentFile;
45+
private OutputStream currentFileOutputStream;
4746

4847
private StandardFileSystemManager initializeStandardFileSystemManager()
4948
{
50-
// TODO: change logging format: org.apache.commons.logging.Log
51-
// System.setProperty("org.apache.commons.logging.Log",
52-
// "org.apache.commons.logging.impl.NoOpLog");
49+
if (logger.isDebugEnabled()) {
50+
// TODO: change logging format: org.apache.commons.logging.Log
51+
System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.NoOpLog");
52+
}
5353
StandardFileSystemManager manager = new StandardFileSystemManager();
5454
try {
5555
manager.init();
@@ -100,7 +100,6 @@ private FileSystemOptions initializeFsOptions(PluginTask task)
100100
this.host = task.getHost();
101101
this.port = task.getPort();
102102
this.pathPrefix = task.getPathPrefix();
103-
this.workingFileScheme = task.getWorkingFileScheme();
104103
this.sequenceFormat = task.getSequenceFormat();
105104
this.fileNameExtension = task.getFileNameExtension();
106105
this.taskIndex = taskIndex;
@@ -109,32 +108,28 @@ private FileSystemOptions initializeFsOptions(PluginTask task)
109108
@Override
110109
public void nextFile()
111110
{
112-
closeCurrentWithUpload();
111+
closeCurrentFile();
113112

114113
try {
115-
currentWorkingFile = newWorkingFile(getWorkingFileUri(getOutputFilePath()));
116-
currentWorkingFileOutputStream = currentWorkingFile.getContent().getOutputStream();
117-
logger.info("new working file: {}", currentWorkingFile.getPublicURIString());
114+
currentFile = newSftpFile(getSftpFileUri(getOutputFilePath()));
115+
currentFileOutputStream = currentFile.getContent().getOutputStream();
116+
logger.info("new sftp file: {}", currentFile.getPublicURIString());
118117
}
119118
catch (FileSystemException e) {
120119
logger.error(e.getMessage());
121120
Throwables.propagate(e);
122121
}
123-
catch (URISyntaxException e) {
124-
logger.error(e.getMessage());
125-
Throwables.propagate(e);
126-
}
127122
}
128123

129124
@Override
130125
public void add(Buffer buffer)
131126
{
132-
if (currentWorkingFile == null) {
127+
if (currentFile == null) {
133128
throw new IllegalStateException("nextFile() must be called before poll()");
134129
}
135130

136131
try {
137-
currentWorkingFileOutputStream.write(buffer.array(), buffer.offset(), buffer.limit());
132+
currentFileOutputStream.write(buffer.array(), buffer.offset(), buffer.limit());
138133
}
139134
catch (IOException e) {
140135
logger.error(e.getMessage());
@@ -146,13 +141,13 @@ public void add(Buffer buffer)
146141
@Override
147142
public void finish()
148143
{
149-
closeCurrentWithUpload();
144+
closeCurrentFile();
150145
}
151146

152147
@Override
153148
public void close()
154149
{
155-
closeCurrentWithUpload();
150+
closeCurrentFile();
156151
manager.close();
157152
}
158153

@@ -167,69 +162,38 @@ public TaskReport commit()
167162
return null;
168163
}
169164

170-
private void closeCurrentWithUpload()
165+
166+
private void closeCurrentFile()
171167
{
172-
try {
173-
closeCurrentWorkingFileContent();
174-
uploadCurrentWorkingFileToSftp();
175-
closeCurrentWorkingFile();
168+
if (currentFile == null) {
169+
return;
176170
}
177-
catch (URISyntaxException e) {
178-
logger.error(e.getMessage());
179-
Throwables.propagate(e);
171+
172+
try {
173+
currentFileOutputStream.close();
174+
currentFile.getContent().close();
175+
currentFile.close();
180176
}
181177
catch (IOException e) {
182178
logger.error(e.getMessage());
183179
Throwables.propagate(e);
184180
}
185-
fileIndex++;
186-
currentWorkingFile = null;
187-
}
188-
189-
private void closeCurrentWorkingFileContent()
190-
throws IOException
191-
{
192-
if (currentWorkingFile == null) {
193-
return;
181+
finally {
182+
fileIndex++;
183+
currentFile = null;
184+
currentFileOutputStream = null;
194185
}
195-
currentWorkingFileOutputStream.close();
196-
currentWorkingFile.getContent().close();
197186
}
198187

199-
private void uploadCurrentWorkingFileToSftp()
200-
throws FileSystemException, URISyntaxException
188+
private URI getSftpFileUri(String remoteFilePath)
201189
{
202-
if (currentWorkingFile == null) {
203-
return;
204-
}
205-
206-
try (FileObject remoteSftpFile = newSftpFile(getSftpFileUri(getOutputFilePath()))) {
207-
remoteSftpFile.copyFrom(currentWorkingFile, Selectors.SELECT_SELF);
208-
logger.info("Upload: {}", remoteSftpFile.getPublicURIString());
190+
try {
191+
return new URI("sftp", userInfo, host, port, remoteFilePath, null, null);
209192
}
210-
}
211-
212-
private void closeCurrentWorkingFile()
213-
throws FileSystemException
214-
{
215-
if (currentWorkingFile == null) {
216-
return;
193+
catch (URISyntaxException e) {
194+
logger.error(e.getMessage());
195+
throw new RuntimeException(e);
217196
}
218-
219-
currentWorkingFile.close();
220-
currentWorkingFile.delete();
221-
}
222-
223-
private URI getSftpFileUri(String remoteFilePath)
224-
throws URISyntaxException
225-
{
226-
return new URI("sftp", userInfo, host, port, remoteFilePath, null, null);
227-
}
228-
229-
private URI getWorkingFileUri(String remoteFilePath)
230-
throws URISyntaxException
231-
{
232-
return new URI(workingFileScheme, null, remoteFilePath, null);
233197
}
234198

235199
private String getOutputFilePath()
@@ -242,12 +206,4 @@ private FileObject newSftpFile(URI sftpUri)
242206
{
243207
return manager.resolveFile(sftpUri.toString(), fsOptions);
244208
}
245-
246-
private FileObject newWorkingFile(URI workingFileUri)
247-
throws FileSystemException
248-
{
249-
FileObject workingFile = manager.resolveFile(workingFileUri);
250-
workingFile.createFile();
251-
return workingFile;
252-
}
253209
}

src/main/java/org/embulk/output/sftp/SftpFileOutputPlugin.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,9 @@ public interface PluginTask
5050
public Boolean getUserDirIsRoot();
5151

5252
@Config("timeout")
53-
@ConfigDefault("600") // 10 munites
53+
@ConfigDefault("600") // 10 minutes
5454
public int getSftpConnectionTimeout();
5555

56-
@Config("working_file_schema")
57-
@ConfigDefault("ram")
58-
public String getWorkingFileScheme();
59-
6056
@Config("path_prefix")
6157
public String getPathPrefix();
6258

0 commit comments

Comments
 (0)