Skip to content

Commit 03ff8fe

Browse files
authored
Merge pull request #30844 from benesch/copy-to-s3-empty
Reapply "storage/copy-to-s3: emit empty file even if input is empty"
2 parents dcd0741 + 95031e5 commit 03ff8fe

File tree

13 files changed

+320
-212
lines changed

13 files changed

+320
-212
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/adapter/src/coord.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,8 +426,10 @@ pub enum PeekStage {
426426
/// Final stage for an explain.
427427
ExplainPlan(PeekStageExplainPlan),
428428
ExplainPushdown(PeekStageExplainPushdown),
429-
/// Final stage for a copy to.
430-
CopyTo(PeekStageCopyTo),
429+
/// Preflight checks for a copy to operation.
430+
CopyToPreflight(PeekStageCopyTo),
431+
/// Final stage for a copy to which involves shipping the dataflow.
432+
CopyToDataflow(PeekStageCopyTo),
431433
}
432434

433435
#[derive(Debug)]

src/adapter/src/coord/sequencer/inner/peek.rs

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use std::sync::Arc;
1414
use http::Uri;
1515
use itertools::Either;
1616
use maplit::btreemap;
17+
use mz_compute_types::sinks::ComputeSinkConnection;
1718
use mz_controller_types::ClusterId;
1819
use mz_expr::{CollectionPlan, ResultSpec};
1920
use mz_ore::cast::CastFrom;
@@ -69,7 +70,8 @@ impl Staged for PeekStage {
6970
PeekStage::Finish(stage) => &mut stage.validity,
7071
PeekStage::ExplainPlan(stage) => &mut stage.validity,
7172
PeekStage::ExplainPushdown(stage) => &mut stage.validity,
72-
PeekStage::CopyTo(stage) => &mut stage.validity,
73+
PeekStage::CopyToPreflight(stage) => &mut stage.validity,
74+
PeekStage::CopyToDataflow(stage) => &mut stage.validity,
7375
}
7476
}
7577

@@ -94,7 +96,8 @@ impl Staged for PeekStage {
9496
PeekStage::ExplainPushdown(stage) => {
9597
coord.peek_explain_pushdown(ctx.session(), stage).await
9698
}
97-
PeekStage::CopyTo(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
99+
PeekStage::CopyToPreflight(stage) => coord.peek_copy_to_preflight(stage).await,
100+
PeekStage::CopyToDataflow(stage) => coord.peek_copy_to_dataflow(ctx, stage).await,
98101
}
99102
}
100103

@@ -687,7 +690,7 @@ impl Coordinator {
687690
}
688691
Ok(Either::Right(global_lir_plan)) => {
689692
let optimizer = optimizer.unwrap_right();
690-
PeekStage::CopyTo(PeekStageCopyTo {
693+
PeekStage::CopyToPreflight(PeekStageCopyTo {
691694
validity,
692695
optimizer,
693696
global_lir_plan,
@@ -947,6 +950,44 @@ impl Coordinator {
947950
Ok(StageResult::Response(resp))
948951
}
949952

953+
#[instrument]
954+
async fn peek_copy_to_preflight(
955+
&mut self,
956+
copy_to: PeekStageCopyTo,
957+
) -> Result<StageResult<Box<PeekStage>>, AdapterError> {
958+
let connection_context = self.connection_context().clone();
959+
Ok(StageResult::Handle(mz_ore::task::spawn(
960+
|| "peek copy to preflight",
961+
async {
962+
let sinks = &copy_to.global_lir_plan.df_desc().sink_exports;
963+
if sinks.len() != 1 {
964+
return Err(AdapterError::Internal(
965+
"expected exactly one copy to s3 sink".into(),
966+
));
967+
}
968+
let (sink_id, sink_desc) = sinks
969+
.first_key_value()
970+
.expect("known to be exactly one copy to s3 sink");
971+
match &sink_desc.connection {
972+
ComputeSinkConnection::CopyToS3Oneshot(conn) => {
973+
mz_storage_types::sinks::s3_oneshot_sink::preflight(
974+
connection_context,
975+
&conn.aws_connection,
976+
&conn.upload_info,
977+
conn.connection_id,
978+
*sink_id,
979+
)
980+
.await?;
981+
Ok(Box::new(PeekStage::CopyToDataflow(copy_to)))
982+
}
983+
_ => Err(AdapterError::Internal(
984+
"expected copy to s3 oneshot sink".into(),
985+
)),
986+
}
987+
},
988+
)))
989+
}
990+
950991
#[instrument]
951992
async fn peek_copy_to_dataflow(
952993
&mut self,

src/adapter/src/optimize/copy_to.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ pub struct GlobalLirPlan {
139139
}
140140

141141
impl GlobalLirPlan {
142+
pub fn df_desc(&self) -> &LirDataflowDescription {
143+
&self.df_desc
144+
}
145+
142146
pub fn sink_id(&self) -> GlobalId {
143147
let sink_exports = &self.df_desc.sink_exports;
144148
let sink_id = sink_exports.keys().next().expect("valid sink");

src/aws-util/src/s3_uploader.rs

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,6 @@ pub struct S3MultiPartUploader {
5151
upload_handles: Vec<JoinHandle<Result<(Option<String>, i32), S3MultiPartUploadError>>>,
5252
}
5353

54-
/// The smallest allowable part number (inclusive).
55-
///
56-
/// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
57-
const AWS_S3_MIN_PART_COUNT: i32 = 1;
5854
/// The largest allowable part number (inclusive).
5955
///
6056
/// From <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
@@ -218,18 +214,12 @@ impl S3MultiPartUploader {
218214
}
219215
}
220216

221-
/// Method to finish the multi part upload. If the buffer is not empty,
222-
/// it flushes the buffer first and then makes a call to `complete_multipart_upload`.
217+
/// Finishes the multi part upload.
218+
///
223219
/// Returns the number of parts and number of bytes uploaded.
224220
pub async fn finish(mut self) -> Result<CompletedUpload, S3MultiPartUploadError> {
225-
if self.buffer.len() > 0 {
226-
let remaining = self.buffer.split();
227-
self.upload_part_internal(remaining.freeze())?;
228-
}
229-
230-
if self.part_count < AWS_S3_MIN_PART_COUNT {
231-
return Err(S3MultiPartUploadError::AtLeastMinPartNumber);
232-
}
221+
let remaining = self.buffer.split();
222+
self.upload_part_internal(remaining.freeze())?;
233223

234224
let mut parts: Vec<CompletedPart> = Vec::with_capacity(self.upload_handles.len());
235225
for handle in self.upload_handles {
@@ -336,11 +326,6 @@ pub enum S3MultiPartUploadError {
336326
AWS_S3_MAX_PART_COUNT
337327
)]
338328
ExceedsMaxPartNumber,
339-
#[error(
340-
"multi-part upload should have at least {} part",
341-
AWS_S3_MIN_PART_COUNT
342-
)]
343-
AtLeastMinPartNumber,
344329
#[error("multi-part upload will exceed configured file_size_limit: {} bytes", .0)]
345330
UploadExceedsMaxFileLimit(u64),
346331
#[error("{}", .0.display_with_causes())]
@@ -503,7 +488,7 @@ mod tests {
503488
#[mz_ore::test(tokio::test(flavor = "multi_thread"))]
504489
#[cfg_attr(coverage, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5586
505490
#[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_method` on OS `linux`
506-
async fn multi_part_upload_error() -> Result<(), S3MultiPartUploadError> {
491+
async fn multi_part_upload_no_data() -> Result<(), S3MultiPartUploadError> {
507492
let sdk_config = defaults().load().await;
508493
let (bucket, key) = match s3_bucket_key_for_test() {
509494
Some(tuple) => tuple,
@@ -514,12 +499,20 @@ mod tests {
514499
let uploader =
515500
S3MultiPartUploader::try_new(&sdk_config, bucket.clone(), key.clone(), config).await?;
516501

517-
// Calling finish without adding any data should error
518-
let err = uploader.finish().await.unwrap_err();
519-
assert_eq!(
520-
err.to_string(),
521-
"multi-part upload should have at least 1 part"
522-
);
502+
// Calling finish without adding any data should succeed.
503+
uploader.finish().await.unwrap();
504+
505+
// The file should exist but have no content.
506+
let s3_client = s3::new_client(&sdk_config);
507+
let uploaded_object = s3_client
508+
.get_object()
509+
.bucket(bucket)
510+
.key(key)
511+
.send()
512+
.await
513+
.unwrap();
514+
515+
assert_eq!(uploaded_object.content_length(), Some(0));
523516

524517
Ok(())
525518
}

src/compute/src/sink/copy_to_s3_oneshot.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ where
124124
.get(&compute_state.worker_config),
125125
};
126126

127-
mz_storage_operators::s3_oneshot_sink::copy_to(
127+
let token = mz_storage_operators::s3_oneshot_sink::copy_to(
128128
input,
129129
error_stream,
130130
sink.up_to.clone(),
@@ -138,6 +138,7 @@ where
138138
);
139139

140140
Some(Rc::new(scopeguard::guard((), move |_| {
141+
let _token = token;
141142
if let Some(protocol_handle) = response_protocol_weak.upgrade() {
142143
std::mem::drop(protocol_handle.borrow_mut().take())
143144
}

0 commit comments

Comments
 (0)