-
Notifications
You must be signed in to change notification settings - Fork 83
/
Copy pathupload_manager_req_resp.go
257 lines (194 loc) · 9.62 KB
/
upload_manager_req_resp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
// Copyright (c) 2016, 2018, 2020, Oracle and/or its affiliates. All rights reserved.
// This software is dual-licensed to you under the Universal Permissive License (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose either license.
package transfer
import (
"errors"
"net/http"
"github.com/oracle/oci-go-sdk/v65/common"
"github.com/oracle/oci-go-sdk/v65/objectstorage"
)
// requestValidator validate user's input and assign default values if not defined
type requestValidator interface {
// validate inputs, return error if request is not valid
validate() error
// assign default values
assignDefaultValues() error
}
// UploadRequest defines the input parameters for UploadFile method
type UploadRequest struct {
// The top-level namespace used for the request.
NamespaceName *string `mandatory:"true"`
// The name of the bucket. Avoid entering confidential information. Example: my-new-bucket1
BucketName *string `mandatory:"true"`
// The name of the object. Avoid entering confidential information. Example: test/object1.log
ObjectName *string `mandatory:"true"`
// [Optional] Override the default part size of 128 MiB, value is in bytes.
// The max part size is 50GiB
PartSize *int64 `mandatory:"false"`
// [Optional] Whether or not this UploadManager supports performing mulitpart uploads. Defaults to True.
AllowMultipartUploads *bool `mandatory:"false"`
// [Optional] Whether or not this UploadManager supports uploading individual parts of a multipart upload in parallel.
// This setting has no effect on uploads that are performed using a single put_object call. Defaults to True.
AllowParrallelUploads *bool `mandatory:"false"`
// The number of go routines for uploading individual parts of a multipart upload.
// This setting is only used if allow_parallel_uploads is set to True. Defaults to 5.
// The upper bounds of the number is 10,000.
NumberOfGoroutines *int `mandatory:"false"`
// A configured object storage client to use for interacting with the Object Storage service.
// Default timeout is 60s which includes the time for reading the body.
// Default timeout doesn't work for big file size and big part size(once upload each part longer than 60s), need to manually update timeout to support big file upload.
ObjectStorageClient *objectstorage.ObjectStorageClient `mandatory:"false"`
// [Optional] The entity tag of the object to match.
IfMatch *string `mandatory:"false"`
// [Optional] The entity tag of the object to avoid matching. The only valid value is ‘*’,
// which indicates that the request should fail if the object already exists.
IfNoneMatch *string `mandatory:"false"`
// [Optional] The base-64 encoded MD5 hash of the body. This parameter is only used if the object is uploaded in a single part.
ContentMD5 *string `mandatory:"false"`
// [Optional] The content type of the object to upload.
ContentType *string `mandatory:"false"`
// [Optional] The content language of the object to upload.
ContentLanguage *string `mandatory:"false"`
// [Optional] The content encoding of the object to upload.
ContentEncoding *string `mandatory:"false"`
// [Optional] Arbitrary string keys and values for the user-defined metadata for the object.
// Keys must be in "opc-meta-*" format.
Metadata map[string]string `mandatory:"false"`
// [Optional] The client request ID for tracing.
OpcClientRequestID *string `mandatory:"false"`
// Metadata about the request. This information will not be transmitted to the service, but
// represents information that the SDK will consume to drive retry behavior.
RequestMetadata common.RequestMetadata
// [Optional] The storage tier of the object to upload. If not specified, the storage tier is
// defaulted to 'Standard'
StorageTier objectstorage.PutObjectStorageTierEnum `mandatory:"false"`
// [Optional] Callback API that can be invoked during multiPartUploads
CallBack UploadCallBack `mandatory:"false"`
// [Optional] Whether or not this UploadManager supports performing multipart uploads md5 checksum verification. Defaults to False.
EnableMultipartChecksumVerification *bool `mandatory:"false"`
// The optional header that specifies "AES256" as the encryption algorithm. For more information, see
// Using Your Own Keys for Server-Side Encryption (https://docs.cloud.oracle.com/Content/Object/Tasks/usingyourencryptionkeys.htm).
OpcSseCustomerAlgorithm *string `mandatory:"false"`
// The optional header that specifies the base64-encoded 256-bit encryption key to use to encrypt or
// decrypt the data. For more information, see
// Using Your Own Keys for Server-Side Encryption (https://docs.cloud.oracle.com/Content/Object/Tasks/usingyourencryptionkeys.htm).
OpcSseCustomerKey *string `mandatory:"false"`
// The optional header that specifies the base64-encoded SHA256 hash of the encryption key. This
// value is used to check the integrity of the encryption key. For more information, see
// Using Your Own Keys for Server-Side Encryption (https://docs.cloud.oracle.com/Content/Object/Tasks/usingyourencryptionkeys.htm).
OpcSseCustomerKeySha256 *string `mandatory:"false"`
// The OCID (https://docs.cloud.oracle.com/Content/General/Concepts/identifiers.htm) of a master encryption key used to call the Key
// Management service to generate a data encryption key or to encrypt or decrypt a data encryption key.
OpcSseKmsKeyId *string `mandatory:"false"`
}
// RetryPolicy implements the OCIRetryableRequest interface. This retrieves the specified retry policy.
func (request UploadRequest) RetryPolicy() *common.RetryPolicy {
return request.RequestMetadata.RetryPolicy
}
var (
errorInvalidNamespace = errors.New("namespaceName is required")
errorInvalidBucketName = errors.New("bucketName is required")
errorInvalidObjectName = errors.New("objectName is required")
)
const defaultNumberOfGoroutines = 5 // increase the value might cause 409 error form service and client timeout
func (request UploadRequest) validate() error {
if request.NamespaceName == nil {
return errorInvalidNamespace
}
if request.BucketName == nil {
return errorInvalidBucketName
}
if request.ObjectName == nil {
return errorInvalidObjectName
}
return nil
}
func (request *UploadRequest) initDefaultValues() error {
if request.ObjectStorageClient == nil {
client, err := objectstorage.NewObjectStorageClientWithConfigurationProvider(common.DefaultConfigProvider())
// default timeout is 60s which includes the time for reading the body
// default timeout doesn't work for big file, here will use the default
// 0s which means no timeout
client.HTTPClient = &http.Client{}
if err != nil {
return err
}
request.ObjectStorageClient = &client
}
if request.NumberOfGoroutines == nil ||
*request.NumberOfGoroutines <= 0 {
request.NumberOfGoroutines = common.Int(defaultNumberOfGoroutines)
}
if request.AllowMultipartUploads == nil {
request.AllowMultipartUploads = common.Bool(true)
}
if request.AllowParrallelUploads == nil {
request.AllowParrallelUploads = common.Bool(true)
}
if !*request.AllowParrallelUploads {
request.NumberOfGoroutines = common.Int(1) // one go routine for upload
}
if request.RetryPolicy() == nil {
// default retry policy
request.RequestMetadata = common.RequestMetadata{RetryPolicy: getUploadManagerDefaultRetryPolicy()}
}
return nil
}
// UploadResponseType with underlying type: string
type UploadResponseType string
// Set of constants representing the allowable values for VolumeAttachmentLifecycleState
const (
MultipartUpload UploadResponseType = "MULTIPARTUPLOAD"
SinglepartUpload UploadResponseType = "SINGLEPARTUPLOAD"
)
// UploadResponse is the response from commitMultipart or the putObject API operation.
type UploadResponse struct {
// Polymorphic response type indicates the response type
Type UploadResponseType
// response for putObject API response (single part upload), will be nil if the operation is multiPart upload
*SinglepartUploadResponse
// response for commitMultipart API response (multipart upload), will be nil if the operation is singlePart upload
*MultipartUploadResponse
}
// IsResumable is a function to check is previous failed upload resumable or not
func (resp UploadResponse) IsResumable() bool {
if resp.Type == SinglepartUpload {
return false
}
return *resp.MultipartUploadResponse.isResumable
}
// SinglepartUploadResponse is the response from putObject API operation.
type SinglepartUploadResponse struct {
objectstorage.PutObjectResponse
}
// MultipartUploadResponse is the response from commitMultipart API operation.
type MultipartUploadResponse struct {
objectstorage.CommitMultipartUploadResponse
// The upload ID for a multipart upload.
UploadID *string
// The value indicates is the operation IsResumable or not, call the resume function if is true
isResumable *bool
}
func getUploadManagerDefaultRetryPolicy() *common.RetryPolicy {
attempts := uint(3)
retryOnAllNon200ResponseCodes := func(r common.OCIOperationResponse) bool {
return !(r.Error == nil && 199 < r.Response.HTTPResponse().StatusCode && r.Response.HTTPResponse().StatusCode < 300)
}
policy := common.NewRetryPolicyWithOptions(
common.WithMaximumNumberAttempts(attempts),
common.WithShouldRetryOperation(retryOnAllNon200ResponseCodes))
return &policy
}
// MultiPartUploadPart holds the details of Part that is uploaded
type MultiPartUploadPart struct {
PartNum int
TotalParts int
Size int64
Offset int64
Hash *string
OpcMD5 *string
Etag *string
Err error
}
// UploadCallBack API that gets invoked after a Part is successuly uploaded
type UploadCallBack func(multiPartUploadPart MultiPartUploadPart)