Skip to content

Commit

Permalink
Write support for S3 (#8921)
Browse files Browse the repository at this point in the history
- Closes #8809
  • Loading branch information
radeusgd authored Feb 12, 2024
1 parent 0c3e8f1 commit eb59b47
Show file tree
Hide file tree
Showing 36 changed files with 1,223 additions and 380 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@
- [Implemented `Table.replace` for the in-memory backend.][8935]
- [Allow removing rows using a Filter_Condition.][8861]
- [Added `Table.to_xml`.][8979]
- [Implemented Write support for `S3_File`.][8921]

[debug-shortcuts]:
https://github.com/enso-org/enso/blob/develop/app/gui/docs/product/shortcuts.md#debug
Expand Down Expand Up @@ -884,6 +885,7 @@
[8935]: https://github.com/enso-org/enso/pull/8935
[8861]: https://github.com/enso-org/enso/pull/8861
[8979]: https://github.com/enso-org/enso/pull/8979
[8921]: https://github.com/enso-org/enso/pull/8921

#### Enso Compiler

Expand Down
14 changes: 14 additions & 0 deletions distribution/lib/Standard/AWS/0.0.0-dev/src/Errors.enso
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from Standard.Base import all

import project.AWS_Credential.AWS_Credential

polyglot java import software.amazon.awssdk.core.exception.SdkClientException

## An error in the core AWS SDK
Expand Down Expand Up @@ -38,6 +40,18 @@ type S3_Bucket_Not_Found
to_display_text : Text
to_display_text self = "Bucket '" + self.bucket + "' not found."

## An error when a S3 key is not found within a bucket.

It is a low-level error used by the raw methods.
`S3_File` will translate this error to a `File_Error.Not_Found`.
type S3_Key_Not_Found
## PRIVATE
Error bucket:Text key:Text

## PRIVATE
to_display_text : Text
to_display_text self = "Key '" + self.key + "' not found in '" + self.bucket + "'."

## A warning that more records are available
type More_Records_Available
## PRIVATE
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
private
from Standard.Base import all
from Standard.Base.System.File import file_as_java

polyglot java import software.amazon.awssdk.core.sync.RequestBody

## PRIVATE
from_local_file (file : File) =
java_file = file_as_java file
RequestBody.fromFile java_file
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
private

from Standard.Base import all
import Standard.Base.Errors.Common.Forbidden_Operation
import Standard.Base.Errors.File_Error.File_Error
from Standard.Base.System.File.Generic.File_Write_Strategy import File_Write_Strategy, default_overwrite, default_append, default_raise_error, generic_remote_write_with_local_file

import project.Errors.S3_Error

## PRIVATE
instance =
File_Write_Strategy.Value default_overwrite default_append default_raise_error s3_backup create_dry_run_file remote_write_with_local_file

## PRIVATE
create_dry_run_file file copy_original =
_ = [file, copy_original]
Error.throw (Forbidden_Operation.Error "Currently dry-run is not supported for S3_File, so writing to an S3_File is forbidden if the Output context is disabled.")

## PRIVATE
remote_write_with_local_file file existing_file_behavior action =
if existing_file_behavior == Existing_File_Behavior.Append then Error.throw (S3_Error.Error "S3 does not support appending to a file. Instead you may read it, modify and then write the new contents." Nothing) else
generic_remote_write_with_local_file file existing_file_behavior action

## PRIVATE
A backup strategy tailored for S3.
Since S3 does not support a cheap 'move' operation, the standard backup
strategy that mostly relies on it does not work too well.
Instead, S3 relies on a simpler strategy:
1. If the destination file exists, copy it to a backup location
(overwriting a previous backup file, if it was present).
2. Write the new file.
i. If the write succeeded, that's it.
ii. If the write failed, 'restore' from the backup - copy the backup
back to the original location and delete the backup file (as it's no
longer needed because the original file is back with the old contents).
s3_backup file action = recover_errors <|
backup_location = file.parent / (file.name + ".bak")
has_backup = if file.exists.not then False else
Panic.rethrow <| file.copy_to backup_location replace_existing=True
True

revert_backup =
if has_backup then
Panic.rethrow <| backup_location.copy_to file
Panic.rethrow <| backup_location.delete

with_failure_handler revert_backup <|
file.with_output_stream [File_Access.Write, File_Access.Truncate_Existing] action

## PRIVATE
with_failure_handler ~failure_action ~action =
panic_handler caught_panic =
failure_action
Panic.throw caught_panic

result = Panic.catch Any handler=panic_handler action
if result.is_error.not then result else
failure_action
result


## PRIVATE
recover_errors ~action =
Panic.catch S3_Error handler=(.convert_to_dataflow_error) <|
Panic.catch File_Error handler=(.convert_to_dataflow_error) <|
action
67 changes: 56 additions & 11 deletions distribution/lib/Standard/AWS/0.0.0-dev/src/S3/S3.enso
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from Standard.Base import all
import Standard.Base.Errors.No_Such_Key.No_Such_Key
import Standard.Base.Network.HTTP.Response_Body.Response_Body
import Standard.Base.System.File_Format_Metadata.File_Format_Metadata
import Standard.Base.System.Input_Stream.Input_Stream
Expand All @@ -9,17 +8,21 @@ import project.AWS_Credential.AWS_Credential
import project.Errors.AWS_SDK_Error
import project.Errors.More_Records_Available
import project.Errors.S3_Bucket_Not_Found
import project.Errors.S3_Key_Not_Found
import project.Errors.S3_Error

polyglot java import java.io.IOException
polyglot java import org.enso.aws.ClientBuilder
polyglot java import software.amazon.awssdk.core.exception.SdkClientException
polyglot java import software.amazon.awssdk.services.s3.model.CopyObjectRequest
polyglot java import software.amazon.awssdk.services.s3.model.DeleteObjectRequest
polyglot java import software.amazon.awssdk.services.s3.model.GetObjectRequest
polyglot java import software.amazon.awssdk.services.s3.model.HeadBucketRequest
polyglot java import software.amazon.awssdk.services.s3.model.HeadObjectRequest
polyglot java import software.amazon.awssdk.services.s3.model.ListObjectsV2Request
polyglot java import software.amazon.awssdk.services.s3.model.NoSuchBucketException
polyglot java import software.amazon.awssdk.services.s3.model.NoSuchKeyException
polyglot java import software.amazon.awssdk.services.s3.model.PutObjectRequest
polyglot java import software.amazon.awssdk.services.s3.model.S3Exception
polyglot java import software.amazon.awssdk.services.s3.S3Client

Expand Down Expand Up @@ -87,18 +90,28 @@ read_bucket bucket prefix="" credentials:(AWS_Credential | Nothing)=Nothing deli
be used.
head : Text -> Text -> AWS_Credential | Nothing -> Map Text Any ! S3_Error
head bucket key="" credentials:(AWS_Credential | Nothing)=Nothing =
response = raw_head bucket key credentials
pairs = response.sdkFields.map f-> [f.memberName, f.getValueOrDefault response]
Map.from_vector pairs

## PRIVATE
Gets the raw metadata of a bucket or object.

Arguments:
- bucket: the name of the bucket.
- key: the key of the object.
- credentials: AWS credentials.
raw_head : Text -> Text -> AWS_Credential | Nothing -> Map Text Any ! S3_Error
raw_head bucket key credentials =
client = make_client credentials
response = case key == "" of
case key == "" of
True ->
request = HeadBucketRequest.builder.bucket bucket . build
handle_s3_errors bucket=bucket <| client.headBucket request
False ->
request = HeadObjectRequest.builder.bucket bucket . key key . build
handle_s3_errors bucket=bucket key=key <| client.headObject request

pairs = response.sdkFields.map f-> [f.memberName, f.getValueOrDefault response]
Map.from_vector pairs

## ADVANCED
Gets an object from an S3 bucket.
Returns a raw stream which can be read once.
Expand All @@ -108,15 +121,15 @@ head bucket key="" credentials:(AWS_Credential | Nothing)=Nothing =
- key: the key of the object.
- credentials: AWS credentials. If not provided, the default credentials will
be used.
get_object : Text -> Text -> AWS_Credential | Nothing -> Any ! S3_Error
get_object : Text -> Text -> AWS_Credential | Nothing -> Response_Body ! S3_Error
get_object bucket key credentials:(AWS_Credential | Nothing)=Nothing = handle_s3_errors bucket=bucket key=key <|
request = GetObjectRequest.builder.bucket bucket . key key . build

client = make_client credentials
response = client.getObject request

inner_response = response.response
s3_uri = URI.parse ("s3://" + bucket + "/" + key)
s3_uri = URI.parse (uri_prefix + bucket + "/") / key
content_type = inner_response.contentType
name = filename_from_content_disposition inner_response.contentDisposition . if_nothing <|
key.split "/" . last
Expand All @@ -125,23 +138,49 @@ get_object bucket key credentials:(AWS_Credential | Nothing)=Nothing = handle_s3
input_stream = Input_Stream.new response (handle_io_errors s3_uri)
Response_Body.Raw_Stream input_stream metadata s3_uri

## PRIVATE
put_object (bucket : Text) (key : Text) credentials:(AWS_Credential | Nothing)=Nothing request_body = handle_s3_errors bucket=bucket key=key <|
client = make_client credentials
request = PutObjectRequest.builder.bucket bucket . key key . build
client.putObject request request_body
Nothing

## PRIVATE
delete_object (bucket : Text) (key : Text) credentials:(AWS_Credential | Nothing)=Nothing = handle_s3_errors bucket=bucket key=key <|
client = make_client credentials
request = DeleteObjectRequest.builder . bucket bucket . key key . build
client.deleteObject request
Nothing

## PRIVATE
copy_object (source_bucket : Text) (source_key : Text) (target_bucket : Text) (target_key : Text) credentials:(AWS_Credential | Nothing)=Nothing = handle_s3_errors bucket=source_bucket key=source_key <|
client = make_client credentials
request = CopyObjectRequest.builder
. destinationBucket target_bucket
. destinationKey target_key
. sourceBucket source_bucket
. sourceKey source_key
. build
client.copyObject request
Nothing

## PRIVATE
Splits a S3 URI into bucket and key.
parse_uri : Text -> Pair Text Text | Nothing
parse_uri uri =
if uri.starts_with "s3://" . not then Nothing else
no_prefix = uri.drop 5
if uri.starts_with uri_prefix . not then Nothing else
no_prefix = uri.drop uri_prefix.length
index_of = no_prefix.index_of "/"
if index_of == 0 then Nothing else
if index_of.is_nothing then Pair.new no_prefix "" else
Pair.new (no_prefix.take index_of) (no_prefix.drop index_of+1)

## PRIVATE
handle_s3_errors : Text -> Text -> Function -> Any ! S3_Error | AWS_SDK_Error
handle_s3_errors : Any -> Text -> Text -> Any ! S3_Error | AWS_SDK_Error
handle_s3_errors ~action bucket="" key="" =
s3_inner_handler caught_panic =
error = if bucket!="" && caught_panic.payload.is_a NoSuchBucketException then S3_Bucket_Not_Found.Error bucket else
if bucket!="" && key!="" && caught_panic.payload.is_a NoSuchKeyException then No_Such_Key.Error bucket key else
if bucket!="" && key!="" && caught_panic.payload.is_a NoSuchKeyException then S3_Key_Not_Found.Error bucket key else
aws_error = caught_panic.payload.awsErrorDetails
S3_Error.Error aws_error.errorMessage aws_error.errorCode
Error.throw error
Expand All @@ -159,3 +198,9 @@ make_client credentials:(AWS_Credential | Nothing) =
handle_io_errors uri:URI ~action =
Panic.catch IOException action caught_panic->
Error.throw (S3_Error.Error ("An IO error has occurred: " + caught_panic.payload.to_text) uri.to_text)

## PRIVATE
scheme = "s3"

## PRIVATE
uri_prefix = scheme + "://"
Loading

0 comments on commit eb59b47

Please sign in to comment.