diff --git a/crates/composefs-oci/Cargo.toml b/crates/composefs-oci/Cargo.toml index 2e85a2cf..4caf0455 100644 --- a/crates/composefs-oci/Cargo.toml +++ b/crates/composefs-oci/Cargo.toml @@ -15,7 +15,7 @@ anyhow = { version = "1.0.87", default-features = false } async-compression = { version = "0.4.0", default-features = false, features = ["tokio", "zstd", "gzip"] } bytes = { version = "1", default-features = false } composefs = { workspace = true } -containers-image-proxy = { version = "0.9.0", default-features = false } +containers-image-proxy = { version = "0.9.2", default-features = false } hex = { version = "0.4.0", default-features = false } indicatif = { version = "0.17.0", default-features = false, features = ["tokio"] } oci-spec = { version = "0.8.0", default-features = false } diff --git a/crates/composefs-oci/src/skopeo.rs b/crates/composefs-oci/src/skopeo.rs index cc1e00eb..01dae7d6 100644 --- a/crates/composefs-oci/src/skopeo.rs +++ b/crates/composefs-oci/src/skopeo.rs @@ -14,7 +14,9 @@ use std::{iter::zip, sync::Arc}; use anyhow::{bail, Context, Result}; use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; -use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage}; +use containers_image_proxy::{ + ConvertedLayerInfo, ImageProxy, ImageProxyConfig, OpenedImage, Transport, +}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType}; use rustix::process::geteuid; @@ -36,6 +38,7 @@ struct ImageOp { proxy: ImageProxy, img: OpenedImage, progress: MultiProgress, + transport: Transport, } impl ImageOp { @@ -44,8 +47,11 @@ impl ImageOp { imgref: &str, img_proxy_config: Option, ) -> Result { + // Detect transport from image reference + let transport = Transport::try_from(imgref).context("Failed to get image transport")?; + // See https://github.com/containers/skopeo/issues/2563 - let skopeo_cmd = if imgref.starts_with("containers-storage:") && !geteuid().is_root() { + let skopeo_cmd = if transport == Transport::ContainerStorage && !geteuid().is_root() { let mut cmd = Command::new("podman"); cmd.args(["unshare", "skopeo"]); Some(cmd) @@ -86,10 +92,17 @@ impl ImageOp { proxy, img, progress, + transport, }) } - pub async fn ensure_layer(&self, diff_id: &str, descriptor: &Descriptor) -> Result { + pub async fn ensure_layer( + &self, + diff_id: &str, + descriptor: &Descriptor, + uncompressed_layer_info: Option>>, + layer_idx: usize, + ) -> Result { // We need to use the per_manifest descriptor to download the compressed layer but it gets // stored in the repository via the per_config descriptor. Our return value is the // fsverity digest for the corresponding splitstream. @@ -101,7 +114,29 @@ impl ImageOp { Ok(layer_id) } else { // Otherwise, we need to fetch it... - let (blob_reader, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?; + let descriptor = match self.transport { + Transport::ContainerStorage => { + let layers = uncompressed_layer_info + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Failed to get uncompressed layer info"))?; + + let layer = layers.get(layer_idx).ok_or_else(|| { + anyhow::anyhow!( + "Failed to get uncompressed layer info for layer index {layer_idx}. Total layers: {}", + layers.len() + ) + })?; + + &Descriptor::new(layer.media_type.clone(), layer.size, layer.digest.clone()) + } + + _ => descriptor, + }; + + let (blob_reader, driver) = self + .proxy + .get_blob(&self.img, descriptor.digest(), descriptor.size()) + .await?; // See https://github.com/containers/containers-image-proxy-rs/issues/71 let blob_reader = blob_reader.take(descriptor.size()); @@ -113,33 +148,19 @@ impl ImageOp { let progress = bar.wrap_async_read(blob_reader); self.progress.println(format!("Fetching layer {diff_id}"))?; - let object_id = match descriptor.media_type() { - MediaType::ImageLayer => { - split_async( - BufReader::new(progress), - self.repo.clone(), - TAR_LAYER_CONTENT_TYPE, - ) - .await? - } - MediaType::ImageLayerGzip => { - split_async( - BufReader::new(GzipDecoder::new(BufReader::new(progress))), - self.repo.clone(), - TAR_LAYER_CONTENT_TYPE, - ) - .await? - } - MediaType::ImageLayerZstd => { - split_async( - BufReader::new(ZstdDecoder::new(BufReader::new(progress))), - self.repo.clone(), - TAR_LAYER_CONTENT_TYPE, - ) - .await? - } - other => bail!("Unsupported layer media type {other:?}"), - }; + let reader: Box = + match descriptor.media_type() { + MediaType::ImageLayer => Box::new(BufReader::new(progress)), + MediaType::ImageLayerGzip => { + Box::new(BufReader::new(GzipDecoder::new(BufReader::new(progress)))) + } + MediaType::ImageLayerZstd => { + Box::new(BufReader::new(ZstdDecoder::new(BufReader::new(progress)))) + } + other => bail!("Unsupported layer media type {other:?}"), + }; + + let object_id = split_async(reader, self.repo.clone(), TAR_LAYER_CONTENT_TYPE).await?; // skopeo is doing data checksums for us to make sure the content we received is equal // to the claimed diff_id. We trust it, but we need to check it by awaiting the driver. @@ -194,14 +215,32 @@ impl ImageOp { let threads = available_parallelism()?; let sem = Arc::new(Semaphore::new(threads.into())); let mut entries = vec![]; + + let uncompressed_layer_info = match self.transport { + Transport::ContainerStorage => { + self.proxy.get_layer_info(&self.img).await?.map(Arc::new) + } + _ => None, + }; + for (mld, diff_id) in layers { let diff_id_ = diff_id.clone(); let self_ = Arc::clone(self); let permit = Arc::clone(&sem).acquire_owned().await?; let descriptor = mld.clone(); + + let layer_idx = manifest_layers + .iter() + .position(|d| *d == descriptor) + .ok_or_else(|| anyhow::anyhow!("Layer descriptor not found in manifest"))?; + + let uncompressed_layer_info = uncompressed_layer_info.clone(); + let future = tokio::spawn(async move { let _permit = permit; - self_.ensure_layer(&diff_id_, &descriptor).await + self_ + .ensure_layer(&diff_id_, &descriptor, uncompressed_layer_info, layer_idx) + .await }); entries.push((diff_id, future)); }