Skip to content

Commit 9121b60

Browse files
committed
Added max_s3_upload_threads_per_task option to Redshift
1 parent 6aba538 commit 9121b60

File tree

3 files changed

+13
-3
lines changed

3 files changed

+13
-3
lines changed

embulk-output-redshift/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ Redshift output plugin for Embulk loads records to Redshift.
8282
- **s3_bucket**: S3 bucket name for temporary files
8383
- **s3_key_prefix**: S3 key prefix for temporary files (string, default: "")
8484
- **delete_s3_temp_file**: whether to delete temporary files uploaded on S3 (boolean, default: true)
85+
- **max_s3_upload_threads_per_task**: The maximum number of threads per task which upload and copy data to Redshift via S3 (integer, optional). For example, if this option is 5 and the number of tasks is 8, 40 threads are created. If this option is increased, it may shorten the transfer time, but cause too many connections error. If this option is not specified, create as many new threads as needed, by default.
8586
- **copy_iam_role_name**: IAM Role for COPY credential(https://docs.aws.amazon.com/redshift/latest/dg/copy-usage_notes-access-permissions.html), if this is set, IAM Role is used instead of aws access key and aws secret access key(string, optional)
8687
- **copy_aws_account_id**: IAM Role's account ID for multi account COPY. If this is set, the ID is used instead of authenticated user's account ID. This is enabled only if copy_iam_role_name is set.(string, optional)
8788
- **options**: extra connection properties (hash, default: {})

embulk-output-redshift/src/main/java/org/embulk/output/RedshiftOutputPlugin.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ public interface RedshiftPluginTask extends AwsCredentialsTaskWithPrefix, Plugin
8383
@ConfigDefault("true")
8484
public boolean getDeleteS3TempFile();
8585

86+
@Config("max_s3_upload_threads_per_task")
87+
@ConfigDefault("null")
88+
public Optional<Integer> getMaxS3UploadThreadsPerTask();
89+
8690
@Config("ssl")
8791
@ConfigDefault("\"disable\"")
8892
public Ssl getSsl();
@@ -205,6 +209,8 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg
205209
RedshiftPluginTask t = (RedshiftPluginTask) task;
206210
setAWSCredentialsBackwardCompatibility(t);
207211
return new RedshiftCopyBatchInsert(getConnector(task, true),
208-
getAWSCredentialsProvider(t), t.getS3Bucket(), t.getS3KeyPrefix(), t.getIamUserName(), t.getDeleteS3TempFile(), t.getCopyIamRoleName().orElse(null), t.getCopyAwsAccountId().orElse(null));
212+
getAWSCredentialsProvider(t), t.getS3Bucket(), t.getS3KeyPrefix(),
213+
t.getIamUserName(), t.getDeleteS3TempFile(), t.getMaxS3UploadThreadsPerTask().orElse(null),
214+
t.getCopyIamRoleName().orElse(null), t.getCopyAwsAccountId().orElse(null));
209215
}
210216
}

embulk-output-redshift/src/main/java/org/embulk/output/redshift/RedshiftCopyBatchInsert.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ public class RedshiftCopyBatchInsert
6767

6868
public RedshiftCopyBatchInsert(JdbcOutputConnector connector,
6969
AWSCredentialsProvider credentialsProvider, String s3BucketName, String s3KeyPrefix,
70-
String iamReaderUserName, boolean deleteS3TempFile, String copyIamRoleName, String copyAwsAccountId) throws IOException, SQLException
70+
String iamReaderUserName, boolean deleteS3TempFile, Integer maxS3UploadThreadsPerTask,
71+
String copyIamRoleName, String copyAwsAccountId) throws IOException, SQLException
7172
{
7273
super();
7374
this.connector = connector;
@@ -82,7 +83,9 @@ public RedshiftCopyBatchInsert(JdbcOutputConnector connector,
8283
this.credentialsProvider = credentialsProvider;
8384
this.s3 = new AmazonS3Client(credentialsProvider); // TODO options
8485
this.sts = new AWSSecurityTokenServiceClient(credentialsProvider); // options
85-
this.executorService = Executors.newCachedThreadPool();
86+
this.executorService = maxS3UploadThreadsPerTask != null
87+
? Executors.newFixedThreadPool(maxS3UploadThreadsPerTask)
88+
: Executors.newCachedThreadPool();
8689
this.uploadAndCopyFutures = new ArrayList<Future<Void>>();
8790

8891
String s3RegionName = null;

0 commit comments

Comments
 (0)