Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/composefs-oci/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ anyhow = { version = "1.0.87", default-features = false }
async-compression = { version = "0.4.0", default-features = false, features = ["tokio", "zstd", "gzip"] }
bytes = { version = "1", default-features = false }
composefs = { workspace = true }
containers-image-proxy = { version = "0.9.0", default-features = false }
containers-image-proxy = { version = "0.9.2", default-features = false }
hex = { version = "0.4.0", default-features = false }
indicatif = { version = "0.17.0", default-features = false, features = ["tokio"] }
oci-spec = { version = "0.8.0", default-features = false }
Expand Down
103 changes: 71 additions & 32 deletions crates/composefs-oci/src/skopeo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use std::{iter::zip, sync::Arc};

use anyhow::{bail, Context, Result};
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage};
use containers_image_proxy::{
ConvertedLayerInfo, ImageProxy, ImageProxyConfig, OpenedImage, Transport,
};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
use rustix::process::geteuid;
Expand All @@ -36,6 +38,7 @@ struct ImageOp<ObjectID: FsVerityHashValue> {
proxy: ImageProxy,
img: OpenedImage,
progress: MultiProgress,
transport: Transport,
}

impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
Expand All @@ -44,8 +47,11 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
imgref: &str,
img_proxy_config: Option<ImageProxyConfig>,
) -> Result<Self> {
// Detect transport from image reference
let transport = Transport::try_from(imgref).context("Failed to get image transport")?;

// See https://github.com/containers/skopeo/issues/2563
let skopeo_cmd = if imgref.starts_with("containers-storage:") && !geteuid().is_root() {
let skopeo_cmd = if transport == Transport::ContainerStorage && !geteuid().is_root() {
let mut cmd = Command::new("podman");
cmd.args(["unshare", "skopeo"]);
Some(cmd)
Expand Down Expand Up @@ -86,10 +92,17 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
proxy,
img,
progress,
transport,
})
}

pub async fn ensure_layer(&self, diff_id: &str, descriptor: &Descriptor) -> Result<ObjectID> {
pub async fn ensure_layer(
&self,
diff_id: &str,
descriptor: &Descriptor,
uncompressed_layer_info: Option<Arc<Vec<ConvertedLayerInfo>>>,
layer_idx: usize,
) -> Result<ObjectID> {
// We need to use the per_manifest descriptor to download the compressed layer but it gets
// stored in the repository via the per_config descriptor. Our return value is the
// fsverity digest for the corresponding splitstream.
Expand All @@ -101,7 +114,29 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
Ok(layer_id)
} else {
// Otherwise, we need to fetch it...
let (blob_reader, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
let descriptor = match self.transport {
Transport::ContainerStorage => {
let layers = uncompressed_layer_info
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Failed to get uncompressed layer info"))?;

let layer = layers.get(layer_idx).ok_or_else(|| {
anyhow::anyhow!(
"Failed to get uncompressed layer info for layer index {layer_idx}. Total layers: {}",
layers.len()
)
})?;

&Descriptor::new(layer.media_type.clone(), layer.size, layer.digest.clone())
}

_ => descriptor,
};

let (blob_reader, driver) = self
.proxy
.get_blob(&self.img, descriptor.digest(), descriptor.size())
.await?;

// See https://github.com/containers/containers-image-proxy-rs/issues/71
let blob_reader = blob_reader.take(descriptor.size());
Expand All @@ -113,33 +148,19 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
let progress = bar.wrap_async_read(blob_reader);
self.progress.println(format!("Fetching layer {diff_id}"))?;

let object_id = match descriptor.media_type() {
MediaType::ImageLayer => {
split_async(
BufReader::new(progress),
self.repo.clone(),
TAR_LAYER_CONTENT_TYPE,
)
.await?
}
MediaType::ImageLayerGzip => {
split_async(
BufReader::new(GzipDecoder::new(BufReader::new(progress))),
self.repo.clone(),
TAR_LAYER_CONTENT_TYPE,
)
.await?
}
MediaType::ImageLayerZstd => {
split_async(
BufReader::new(ZstdDecoder::new(BufReader::new(progress))),
self.repo.clone(),
TAR_LAYER_CONTENT_TYPE,
)
.await?
}
other => bail!("Unsupported layer media type {other:?}"),
};
let reader: Box<dyn tokio::io::AsyncBufRead + Unpin + Send> =
match descriptor.media_type() {
MediaType::ImageLayer => Box::new(BufReader::new(progress)),
MediaType::ImageLayerGzip => {
Box::new(BufReader::new(GzipDecoder::new(BufReader::new(progress))))
}
MediaType::ImageLayerZstd => {
Box::new(BufReader::new(ZstdDecoder::new(BufReader::new(progress))))
}
other => bail!("Unsupported layer media type {other:?}"),
};

let object_id = split_async(reader, self.repo.clone(), TAR_LAYER_CONTENT_TYPE).await?;

// skopeo is doing data checksums for us to make sure the content we received is equal
// to the claimed diff_id. We trust it, but we need to check it by awaiting the driver.
Expand Down Expand Up @@ -194,14 +215,32 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
let threads = available_parallelism()?;
let sem = Arc::new(Semaphore::new(threads.into()));
let mut entries = vec![];

let uncompressed_layer_info = match self.transport {
Transport::ContainerStorage => {
self.proxy.get_layer_info(&self.img).await?.map(Arc::new)
}
_ => None,
};

for (mld, diff_id) in layers {
let diff_id_ = diff_id.clone();
let self_ = Arc::clone(self);
let permit = Arc::clone(&sem).acquire_owned().await?;
let descriptor = mld.clone();

let layer_idx = manifest_layers
.iter()
.position(|d| *d == descriptor)
.ok_or_else(|| anyhow::anyhow!("Layer descriptor not found in manifest"))?;

let uncompressed_layer_info = uncompressed_layer_info.clone();

let future = tokio::spawn(async move {
let _permit = permit;
self_.ensure_layer(&diff_id_, &descriptor).await
self_
.ensure_layer(&diff_id_, &descriptor, uncompressed_layer_info, layer_idx)
.await
});
entries.push((diff_id, future));
}
Expand Down
Loading