@@ -14,7 +14,9 @@ use std::{iter::zip, sync::Arc};
1414
1515use anyhow:: { bail, Context , Result } ;
1616use async_compression:: tokio:: bufread:: { GzipDecoder , ZstdDecoder } ;
17- use containers_image_proxy:: { ImageProxy , ImageProxyConfig , OpenedImage } ;
17+ use containers_image_proxy:: {
18+ ConvertedLayerInfo , ImageProxy , ImageProxyConfig , OpenedImage , Transport ,
19+ } ;
1820use indicatif:: { MultiProgress , ProgressBar , ProgressStyle } ;
1921use oci_spec:: image:: { Descriptor , ImageConfiguration , ImageManifest , MediaType } ;
2022use rustix:: process:: geteuid;
@@ -36,6 +38,7 @@ struct ImageOp<ObjectID: FsVerityHashValue> {
3638 proxy : ImageProxy ,
3739 img : OpenedImage ,
3840 progress : MultiProgress ,
41+ transport : Transport ,
3942}
4043
4144impl < ObjectID : FsVerityHashValue > ImageOp < ObjectID > {
@@ -44,8 +47,11 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
4447 imgref : & str ,
4548 img_proxy_config : Option < ImageProxyConfig > ,
4649 ) -> Result < Self > {
50+ // Detect transport from image reference
51+ let transport = Transport :: try_from ( imgref) . context ( "Failed to get image transport" ) ?;
52+
4753 // See https://github.com/containers/skopeo/issues/2563
48- let skopeo_cmd = if imgref . starts_with ( "containers-storage:" ) && !geteuid ( ) . is_root ( ) {
54+ let skopeo_cmd = if transport == Transport :: ContainerStorage && !geteuid ( ) . is_root ( ) {
4955 let mut cmd = Command :: new ( "podman" ) ;
5056 cmd. args ( [ "unshare" , "skopeo" ] ) ;
5157 Some ( cmd)
@@ -86,10 +92,17 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
8692 proxy,
8793 img,
8894 progress,
95+ transport,
8996 } )
9097 }
9198
92- pub async fn ensure_layer ( & self , diff_id : & str , descriptor : & Descriptor ) -> Result < ObjectID > {
99+ pub async fn ensure_layer (
100+ & self ,
101+ diff_id : & str ,
102+ descriptor : & Descriptor ,
103+ uncompressed_layer_info : Option < Arc < Vec < ConvertedLayerInfo > > > ,
104+ layer_idx : usize ,
105+ ) -> Result < ObjectID > {
93106 // We need to use the per_manifest descriptor to download the compressed layer but it gets
94107 // stored in the repository via the per_config descriptor. Our return value is the
95108 // fsverity digest for the corresponding splitstream.
@@ -101,7 +114,29 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
101114 Ok ( layer_id)
102115 } else {
103116 // Otherwise, we need to fetch it...
104- let ( blob_reader, driver) = self . proxy . get_descriptor ( & self . img , descriptor) . await ?;
117+ let descriptor = match self . transport {
118+ Transport :: ContainerStorage => {
119+ let layers = uncompressed_layer_info
120+ . as_ref ( )
121+ . ok_or_else ( || anyhow:: anyhow!( "Failed to get uncompressed layer info" ) ) ?;
122+
123+ let layer = layers. get ( layer_idx) . ok_or_else ( || {
124+ anyhow:: anyhow!(
125+ "Failed to get uncompressed layer info for layer index {layer_idx}. Total layers: {}" ,
126+ layers. len( )
127+ )
128+ } ) ?;
129+
130+ & Descriptor :: new ( layer. media_type . clone ( ) , layer. size , layer. digest . clone ( ) )
131+ }
132+
133+ _ => descriptor,
134+ } ;
135+
136+ let ( blob_reader, driver) = self
137+ . proxy
138+ . get_blob ( & self . img , descriptor. digest ( ) , descriptor. size ( ) )
139+ . await ?;
105140
106141 // See https://github.com/containers/containers-image-proxy-rs/issues/71
107142 let blob_reader = blob_reader. take ( descriptor. size ( ) ) ;
@@ -113,33 +148,19 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
113148 let progress = bar. wrap_async_read ( blob_reader) ;
114149 self . progress . println ( format ! ( "Fetching layer {diff_id}" ) ) ?;
115150
116- let object_id = match descriptor. media_type ( ) {
117- MediaType :: ImageLayer => {
118- split_async (
119- BufReader :: new ( progress) ,
120- self . repo . clone ( ) ,
121- TAR_LAYER_CONTENT_TYPE ,
122- )
123- . await ?
124- }
125- MediaType :: ImageLayerGzip => {
126- split_async (
127- BufReader :: new ( GzipDecoder :: new ( BufReader :: new ( progress) ) ) ,
128- self . repo . clone ( ) ,
129- TAR_LAYER_CONTENT_TYPE ,
130- )
131- . await ?
132- }
133- MediaType :: ImageLayerZstd => {
134- split_async (
135- BufReader :: new ( ZstdDecoder :: new ( BufReader :: new ( progress) ) ) ,
136- self . repo . clone ( ) ,
137- TAR_LAYER_CONTENT_TYPE ,
138- )
139- . await ?
140- }
141- other => bail ! ( "Unsupported layer media type {other:?}" ) ,
142- } ;
151+ let reader: Box < dyn tokio:: io:: AsyncBufRead + Unpin + Send > =
152+ match descriptor. media_type ( ) {
153+ MediaType :: ImageLayer => Box :: new ( BufReader :: new ( progress) ) ,
154+ MediaType :: ImageLayerGzip => {
155+ Box :: new ( BufReader :: new ( GzipDecoder :: new ( BufReader :: new ( progress) ) ) )
156+ }
157+ MediaType :: ImageLayerZstd => {
158+ Box :: new ( BufReader :: new ( ZstdDecoder :: new ( BufReader :: new ( progress) ) ) )
159+ }
160+ other => bail ! ( "Unsupported layer media type {other:?}" ) ,
161+ } ;
162+
163+ let object_id = split_async ( reader, self . repo . clone ( ) , TAR_LAYER_CONTENT_TYPE ) . await ?;
143164
144165 // skopeo is doing data checksums for us to make sure the content we received is equal
145166 // to the claimed diff_id. We trust it, but we need to check it by awaiting the driver.
@@ -194,14 +215,32 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
194215 let threads = available_parallelism ( ) ?;
195216 let sem = Arc :: new ( Semaphore :: new ( threads. into ( ) ) ) ;
196217 let mut entries = vec ! [ ] ;
218+
219+ let uncompressed_layer_info = match self . transport {
220+ Transport :: ContainerStorage => {
221+ self . proxy . get_layer_info ( & self . img ) . await ?. map ( Arc :: new)
222+ }
223+ _ => None ,
224+ } ;
225+
197226 for ( mld, diff_id) in layers {
198227 let diff_id_ = diff_id. clone ( ) ;
199228 let self_ = Arc :: clone ( self ) ;
200229 let permit = Arc :: clone ( & sem) . acquire_owned ( ) . await ?;
201230 let descriptor = mld. clone ( ) ;
231+
232+ let layer_idx = manifest_layers
233+ . iter ( )
234+ . position ( |d| * d == descriptor)
235+ . ok_or_else ( || anyhow:: anyhow!( "Layer descriptor not found in manifest" ) ) ?;
236+
237+ let uncompressed_layer_info = uncompressed_layer_info. clone ( ) ;
238+
202239 let future = tokio:: spawn ( async move {
203240 let _permit = permit;
204- self_. ensure_layer ( & diff_id_, & descriptor) . await
241+ self_
242+ . ensure_layer ( & diff_id_, & descriptor, uncompressed_layer_info, layer_idx)
243+ . await
205244 } ) ;
206245 entries. push ( ( diff_id, future) ) ;
207246 }
0 commit comments