-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[DO NOT MERGE] TransferManager: DirectoryUploader & DirectoryDownloader #3288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: version-3
Are you sure you want to change the base?
Changes from 34 commits
6cb37fe
d31ae4d
74ed189
4e8db17
098049d
8f387d2
7749ba5
99f0de6
441fa82
c792439
adce496
012c2bc
ee9c9da
75df844
e5d3245
173f5e4
cf88ff2
2758c4d
b92d3b3
6afb495
86b53e8
d587ae1
eae3814
14010ef
8ab4edc
face84d
36a1e87
7dd9f98
77ab1ba
e843137
009127d
39912fd
f9fb117
d307555
a14649a
b9231e7
d991128
bc533a0
9efc77f
1cc3fcf
7b6b220
45d2f5d
64d481e
2ab63fb
7af3e32
747965f
2230478
cb145a0
0cb35cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| module Aws | ||
| module S3 | ||
| # @api private | ||
| class DefaultExecutor | ||
| def initialize(options = {}) | ||
| @queue = Queue.new | ||
| @max_threads = options[:max_threads] || 10 | ||
| @pool = [] | ||
| @running = true | ||
| @mutex = Mutex.new | ||
| end | ||
|
|
||
| def post(*args, &block) | ||
| raise 'Executor is not running' unless @running | ||
|
|
||
| @queue << [args, block] | ||
| ensure_worker_available | ||
| end | ||
|
|
||
| def shutdown | ||
| @running = false | ||
jterapin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| @max_threads.times { @queue << :shutdown } | ||
| @pool.each(&:join) | ||
| @pool.clear | ||
| true | ||
| end | ||
|
|
||
| def running? | ||
| @running | ||
| end | ||
|
|
||
| private | ||
|
|
||
| def ensure_worker_available | ||
| @mutex.synchronize do | ||
| @pool.select!(&:alive?) | ||
| @pool << spawn_worker if @pool.size < @max_threads | ||
| end | ||
| end | ||
|
|
||
| def spawn_worker | ||
| Thread.new do | ||
| while (job = @queue.shift) | ||
| break if job == :shutdown | ||
|
|
||
| args, block = job | ||
| block.call(*args) | ||
| end | ||
| end | ||
| end | ||
| end | ||
| end | ||
| end | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,155 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| module Aws | ||
| module S3 | ||
| # Raised when DirectoryDownloader fails to download objects from S3 bucket | ||
| class DirectoryDownloadError < StandardError | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By convention we were putting these in separate files right? If you want to promote the other two (multipart errors) to the files where they are used that's fine too, but let's stay consistent. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, I'm planning to separate them out. |
||
| def initialize(message, errors = []) | ||
| @errors = errors | ||
| super(message) | ||
| end | ||
|
|
||
| # @return [Array<StandardError>] The list of errors encountered when downloading objects | ||
| attr_reader :errors | ||
| end | ||
|
|
||
| # @api private | ||
| class DirectoryDownloader | ||
| def initialize(options = {}) | ||
| @client = options[:client] || Client.new | ||
| @executor = options[:executor] || DefaultExecutor.new | ||
| @options = options | ||
| @abort_download = false | ||
| end | ||
|
|
||
| attr_reader :client, :abort_download | ||
|
|
||
| # TODO: need to add progress tracker | ||
| def download(destination, bucket:, **options) | ||
| if File.exist?(destination) | ||
| raise ArgumentError 'invalid destination, expected a directory' unless File.directory?(destination) | ||
| else | ||
| FileUtils.mkdir_p(destination) | ||
| end | ||
|
|
||
| download_opts = options.dup | ||
| @bucket = bucket | ||
|
||
| @ignore_failure = download_opts.delete(:ignore_failure) || false | ||
| @errors = [] | ||
|
|
||
| downloader = FileDownloader.new(client: client, executor: @executor) | ||
| producer = ObjectProducer.new(destination, build_producer_opts(download_opts)) | ||
| producer.run | ||
jterapin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| downloads = process_download_queue(producer, downloader, download_opts) | ||
| build_result(downloads) | ||
| ensure | ||
| @executor.shutdown unless @options[:executor] | ||
|
||
| end | ||
|
|
||
| def build_producer_opts(opts) | ||
| { | ||
| directory_downloader: self, | ||
| client: @client, | ||
| bucket: @bucket, | ||
| s3_prefix: opts.delete(:s3_prefix), | ||
| ignore_failure: @ignore_failure, | ||
| filter_callback: opts.delete(:filter_callback), | ||
| errors: @errors | ||
| } | ||
| end | ||
|
|
||
| def build_result(download_count) | ||
| downloads = [download_count - @errors.count, 0].max | ||
|
|
||
| if @abort_download | ||
| msg = "failed to download directory: downloaded #{downloads} files " \ | ||
| "and failed to download #{@errors.count} files." | ||
| raise DirectoryDownloadError.new(msg, @errors) | ||
| else | ||
| result = { completed_downloads: downloads, failed_downloads: @errors.count } | ||
| result[:errors] = @errors if @errors.any? | ||
| result | ||
| end | ||
| end | ||
|
|
||
| def process_download_queue(producer, downloader, opts) | ||
| download_attempts = 0 | ||
| completion_queue = Queue.new | ||
| queue_executor = DefaultExecutor.new | ||
jterapin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| while (object = producer.object_queue.shift) != :done | ||
|
||
| break if @abort_download | ||
|
|
||
| download_attempts += 1 | ||
| queue_executor.post(object) do |o| | ||
| dir_path = File.dirname(o[:path]) | ||
| FileUtils.mkdir_p(dir_path) unless dir_path == @destination || Dir.exist?(dir_path) | ||
|
|
||
| downloader.download(o[:path], opts.merge(bucket: @bucket, key: o[:key])) | ||
| rescue StandardError => e | ||
| @errors << e | ||
| @abort_download = true unless @ignore_failure | ||
| ensure | ||
| completion_queue << :done | ||
| end | ||
| end | ||
| download_attempts.times { completion_queue.pop } | ||
jterapin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| download_attempts | ||
| ensure | ||
| queue_executor.shutdown | ||
| end | ||
|
|
||
| # @api private | ||
| class ObjectProducer | ||
| def initialize(destination_dir, options = {}) | ||
| @destination_dir = destination_dir | ||
| @client = options[:client] | ||
| @bucket = options[:bucket] | ||
| @s3_prefix = options[:s3_prefix] | ||
| @ignore_failure = options[:ignore_failure] | ||
| @filter_callback = options[:filter_callback] | ||
| @errors = options[:errors] | ||
| @directory_downloader = options[:directory_downloader] | ||
| @object_queue = SizedQueue.new(100) | ||
jterapin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| end | ||
|
|
||
| attr_reader :object_queue | ||
|
|
||
| def run | ||
| Thread.new do | ||
| stream_objects | ||
| @object_queue << :done | ||
| end | ||
| end | ||
|
|
||
| private | ||
|
|
||
| def build_object_entry(key) | ||
| { path: File.join(@destination_dir, normalize_key(key)), key: key } | ||
| end | ||
|
|
||
| # TODO: need to add filter callback, double check handling of objects that ends with / | ||
| def stream_objects(continuation_token: nil) | ||
| resp = @client.list_objects_v2(bucket: @bucket, continuation_token: continuation_token) | ||
| resp.contents.each do |o| | ||
| break if @directory_downloader.abort_download | ||
| next if o.key.end_with?('/') | ||
|
|
||
| @object_queue << build_object_entry(o.key) | ||
| rescue StandardError => e | ||
| @errors << e | ||
| @abort_download = true unless @ignore_failure | ||
| end | ||
| stream_objects(continuation_token: resp.next_continuation_token) if resp.next_continuation_token | ||
| end | ||
|
|
||
| def normalize_key(key) | ||
| key = key.delete_prefix(@s3_prefix) if @s3_prefix | ||
| return key if File::SEPARATOR == '/' | ||
|
|
||
| key.tr('/', File::SEPARATOR) | ||
| end | ||
| end | ||
| end | ||
| end | ||
| end | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does running state default to true in the initializer for other implementations of this in ruby-concurrency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually yes - concurrent-ruby's implementation goes a bit deeper but I simplified our implementation. Links below (most common executors eventually inherits
RubyExecutorService: