Skip to content

feat(copy): introduces CopyGraphOptions with events support #145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6817504
feat(copy): support mounting existing descriptors from other reposito…
leonardochaia Sep 26, 2024
b453fbc
fix: standard header
leonardochaia Sep 26, 2024
6a7a1e9
chore: adds repository tests
leonardochaia Oct 3, 2024
88c6793
fix: copy test
leonardochaia Oct 17, 2024
4c52467
Merge branch 'main' into feat/improve-copy-performance
leonardochaia Oct 17, 2024
36342c9
Merge branch 'main' into feat/improve-copy-performance
leonardochaia Nov 11, 2024
dbe3509
chore: removes mounting support to be implemented in separate PR
leonardochaia Nov 11, 2024
571820c
refactor: renames class and events as per review
leonardochaia Nov 11, 2024
138120e
chore: adds copy tests
leonardochaia Nov 11, 2024
77e8079
chore: adds overload to prevent breaking change
leonardochaia Nov 11, 2024
06fd442
chore: introduces overloads to keep previous signature on CopyGraphOp…
leonardochaia Nov 11, 2024
5787c04
fix: copy signature
leonardochaia Nov 12, 2024
c9be0a2
Merge branch 'main' into feat/improve-copy-performance
leonardochaia Nov 12, 2024
77ba179
refactor: introduces CopyOptions
leonardochaia Nov 13, 2024
ecaf1b1
Merge remote-tracking branch 'origin/feat/improve-copy-performance' i…
leonardochaia Nov 13, 2024
de9ee0d
Merge branch 'main' into feat/improve-copy-performance
leonardochaia Nov 13, 2024
6ef7e74
chore: adds license
leonardochaia Nov 13, 2024
be7b1c5
Merge remote-tracking branch 'origin/feat/improve-copy-performance' i…
leonardochaia Nov 13, 2024
a12ff78
chore: adds more tests from oras-go
leonardochaia Nov 13, 2024
a7a1baf
chore: adds tests from oras-go
leonardochaia Nov 13, 2024
493a8d9
doc: adds comments to new option structs
leonardochaia Nov 25, 2024
9e40655
refactor: makes copy events async
leonardochaia Nov 25, 2024
68220bd
feat: introduces `InvokeAsync` extension method to support asynchrono…
leonardochaia Nov 26, 2024
e35fc7e
refactor: delegate InvokeAsync to execute handlers in parallel
leonardochaia Nov 29, 2024
827f62b
refactor: Copy events to include sync and async variants.
leonardochaia Nov 29, 2024
6d1b0ba
Merge branch 'main' into feat/improve-copy-performance
Wwwsylvia Apr 30, 2025
2372ac6
address comments
Wwwsylvia May 14, 2025
f7b326d
fix tests
Wwwsylvia May 14, 2025
fe10a1a
rename
Wwwsylvia May 14, 2025
c1c7182
nit
Wwwsylvia May 14, 2025
7b31ec3
Merge branch 'main' into feat/improve-copy-performance
Wwwsylvia May 14, 2025
b9f93d0
refactor
Wwwsylvia May 15, 2025
20ac068
update tests
Wwwsylvia May 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 94 additions & 10 deletions src/OrasProject.Oras/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,47 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using OrasProject.Oras.Oci;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using OrasProject.Oras.Oci;
using OrasProject.Oras.Registry;
using static OrasProject.Oras.Content.Extensions;

namespace OrasProject.Oras;

public struct CopyOptions
{
// public int Concurrency { get; set; }

public event Action<Descriptor> OnPreCopy;
public event Action<Descriptor> OnPostCopy;
public event Action<Descriptor> OnCopySkipped;
public event Action<Descriptor, string> OnMounted;

public Func<Descriptor, string[]> MountFrom { get; set; }

internal void PreCopy(Descriptor descriptor)
{

Check warning on line 36 in src/OrasProject.Oras/Extensions.cs

View check run for this annotation

Codecov / codecov/patch

src/OrasProject.Oras/Extensions.cs#L36

Added line #L36 was not covered by tests
OnPreCopy?.Invoke(descriptor);
}

Check warning on line 38 in src/OrasProject.Oras/Extensions.cs

View check run for this annotation

Codecov / codecov/patch

src/OrasProject.Oras/Extensions.cs#L38

Added line #L38 was not covered by tests

internal void PostCopy(Descriptor descriptor)
{

Check warning on line 41 in src/OrasProject.Oras/Extensions.cs

View check run for this annotation

Codecov / codecov/patch

src/OrasProject.Oras/Extensions.cs#L41

Added line #L41 was not covered by tests
OnPostCopy?.Invoke(descriptor);
}

Check warning on line 43 in src/OrasProject.Oras/Extensions.cs

View check run for this annotation

Codecov / codecov/patch

src/OrasProject.Oras/Extensions.cs#L43

Added line #L43 was not covered by tests

internal void CopySkipped(Descriptor descriptor)
{

Check warning on line 46 in src/OrasProject.Oras/Extensions.cs

View check run for this annotation

Codecov / codecov/patch

src/OrasProject.Oras/Extensions.cs#L46

Added line #L46 was not covered by tests
OnCopySkipped?.Invoke(descriptor);
}

Check warning on line 48 in src/OrasProject.Oras/Extensions.cs

View check run for this annotation

Codecov / codecov/patch

src/OrasProject.Oras/Extensions.cs#L48

Added line #L48 was not covered by tests

internal void Mounted(Descriptor descriptor, string sourceRepository)
{
OnMounted?.Invoke(descriptor, sourceRepository);
}
}
public static class Extensions
{

Expand All @@ -36,38 +69,89 @@
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public static async Task<Descriptor> CopyAsync(this ITarget src, string srcRef, ITarget dst, string dstRef, CancellationToken cancellationToken = default)
public static async Task<Descriptor> CopyAsync(this ITarget src, string srcRef, ITarget dst, string dstRef, CancellationToken cancellationToken = default, CopyOptions? copyOptions = default)
{
if (string.IsNullOrEmpty(dstRef))
{
dstRef = srcRef;
}
var root = await src.ResolveAsync(srcRef, cancellationToken).ConfigureAwait(false);
await src.CopyGraphAsync(dst, root, cancellationToken).ConfigureAwait(false);
await src.CopyGraphAsync(dst, root, cancellationToken, copyOptions).ConfigureAwait(false);
await dst.TagAsync(root, dstRef, cancellationToken).ConfigureAwait(false);
return root;
}

public static async Task CopyGraphAsync(this ITarget src, ITarget dst, Descriptor node, CancellationToken cancellationToken)
public static async Task CopyGraphAsync(this ITarget src, ITarget dst, Descriptor node, CancellationToken cancellationToken, CopyOptions? copyOptions = default)
{
// check if node exists in target
if (await dst.ExistsAsync(node, cancellationToken).ConfigureAwait(false))
{
copyOptions?.CopySkipped(node);
return;
}

// retrieve successors
var successors = await src.GetSuccessorsAsync(node, cancellationToken).ConfigureAwait(false);
// obtain data stream
var dataStream = await src.FetchAsync(node, cancellationToken).ConfigureAwait(false);

// check if the node has successors
if (successors != null)
foreach (var childNode in successors)
{
await src.CopyGraphAsync(dst, childNode, cancellationToken, copyOptions).ConfigureAwait(false);
}

var sourceRepositories = copyOptions?.MountFrom(node) ?? [];
if (dst is IMounter mounter && sourceRepositories.Length > 0)
{
foreach (var childNode in successors)
for (var i = 0; i < sourceRepositories.Length; i++)
{
await src.CopyGraphAsync(dst, childNode, cancellationToken).ConfigureAwait(false);
var sourceRepository = sourceRepositories[i];
var mountFailed = false;

async Task<Stream> GetContents(CancellationToken token)
{

Check warning on line 111 in src/OrasProject.Oras/Extensions.cs

View check run for this annotation

Codecov / codecov/patch

src/OrasProject.Oras/Extensions.cs#L111

Added line #L111 was not covered by tests
// the invocation of getContent indicates that mounting has failed
mountFailed = true;

Check warning on line 113 in src/OrasProject.Oras/Extensions.cs

View check run for this annotation

Codecov / codecov/patch

src/OrasProject.Oras/Extensions.cs#L113

Added line #L113 was not covered by tests

if (i < sourceRepositories.Length - 1)
{

Check warning on line 116 in src/OrasProject.Oras/Extensions.cs

View check run for this annotation

Codecov / codecov/patch

src/OrasProject.Oras/Extensions.cs#L116

Added line #L116 was not covered by tests
// If this is not the last one, skip this source and try next one
// We want to return an error that we will test for from mounter.Mount()
throw new SkipSourceException();

Check warning on line 119 in src/OrasProject.Oras/Extensions.cs

View check run for this annotation

Codecov / codecov/patch

src/OrasProject.Oras/Extensions.cs#L119

Added line #L119 was not covered by tests
}

// this is the last iteration so we need to actually get the content and do the copy
// but first call the PreCopy function
copyOptions?.PreCopy(node);
return await src.FetchAsync(node, token).ConfigureAwait(false);
}

Check warning on line 126 in src/OrasProject.Oras/Extensions.cs

View check run for this annotation

Codecov / codecov/patch

src/OrasProject.Oras/Extensions.cs#L125-L126

Added lines #L125 - L126 were not covered by tests

try
{
await mounter.MountAsync(node, sourceRepository, GetContents, cancellationToken).ConfigureAwait(false);
}
catch (SkipSourceException)
{
}

Check warning on line 134 in src/OrasProject.Oras/Extensions.cs

View check run for this annotation

Codecov / codecov/patch

src/OrasProject.Oras/Extensions.cs#L132-L134

Added lines #L132 - L134 were not covered by tests

if (!mountFailed)
{
copyOptions?.Mounted(node, sourceRepository);
return;
}
}
}
await dst.PushAsync(node, dataStream, cancellationToken).ConfigureAwait(false);
else
{
// alternatively we just copy it
copyOptions?.PreCopy(node);
var dataStream = await src.FetchAsync(node, cancellationToken).ConfigureAwait(false);
await dst.PushAsync(node, dataStream, cancellationToken).ConfigureAwait(false);
}

// we copied it
copyOptions?.PostCopy(node);
}

private class SkipSourceException : Exception {}
}

37 changes: 37 additions & 0 deletions src/OrasProject.Oras/Registry/IMounter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The ORAS Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using OrasProject.Oras.Oci;

namespace OrasProject.Oras.Registry;

/// <summary>
/// Mounter allows cross-repository blob mounts.
/// </summary>
public interface IMounter
{
/// <summary>
/// Mount makes the blob with the given descriptor in fromRepo
/// available in the repository signified by the receiver.
/// </summary>
/// <param name="descriptor"></param>
/// <param name="contentReference"></param>
/// <param name="getContents"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task MountAsync(Descriptor descriptor, string contentReference, Func<CancellationToken, Task<Stream>>? getContents, CancellationToken cancellationToken);
}
2 changes: 1 addition & 1 deletion src/OrasProject.Oras/Registry/IRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace OrasProject.Oras.Registry;
/// Furthermore, this interface also provides the ability to enforce the
/// separation of the blob and the manifests CASs.
/// </summary>
public interface IRepository : ITarget, IReferenceFetchable, IReferencePushable, IDeletable, ITagListable
public interface IRepository : ITarget, IReferenceFetchable, IReferencePushable, IDeletable, ITagListable, IMounter
{
/// <summary>
/// Blobs provides access to the blob CAS only, which contains config blobs,layers, and other generic blobs.
Expand Down
116 changes: 96 additions & 20 deletions src/OrasProject.Oras/Registry/Remote/BlobStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@

namespace OrasProject.Oras.Registry.Remote;

public class BlobStore(Repository repository) : IBlobStore
public class BlobStore(Repository repository) : IBlobStore, IMounter
{
public Repository Repository { get; init; } = repository;

Check warning on line 30 in src/OrasProject.Oras/Registry/Remote/BlobStore.cs

View workflow job for this annotation

GitHub Actions / Analyze (8.0.x)

Parameter 'Repository repository' is captured into the state of the enclosing type and its value is also used to initialize a field, property, or event.

Check warning on line 30 in src/OrasProject.Oras/Registry/Remote/BlobStore.cs

View workflow job for this annotation

GitHub Actions / Analyze (8.0.x)

Parameter 'Repository repository' is captured into the state of the enclosing type and its value is also used to initialize a field, property, or event.

Check warning on line 30 in src/OrasProject.Oras/Registry/Remote/BlobStore.cs

View workflow job for this annotation

GitHub Actions / build (8.0.x)

Parameter 'Repository repository' is captured into the state of the enclosing type and its value is also used to initialize a field, property, or event.

public async Task<Stream> FetchAsync(Descriptor target, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -148,25 +148,7 @@
url = location.IsAbsoluteUri ? location : new Uri(url, location);
}

// monolithic upload
// add digest key to query string with expected digest value
var req = new HttpRequestMessage(HttpMethod.Put, new UriBuilder(url)
{
Query = $"{url.Query}&digest={HttpUtility.UrlEncode(expected.Digest)}"
}.Uri);
req.Content = new StreamContent(content);
req.Content.Headers.ContentLength = expected.Size;

// the expected media type is ignored as in the API doc.
req.Content.Headers.ContentType = new MediaTypeHeaderValue(MediaTypeNames.Application.Octet);

using (var response = await Repository.Options.HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false))
{
if (response.StatusCode != HttpStatusCode.Created)
{
throw await response.ParseErrorResponseAsync(cancellationToken).ConfigureAwait(false);
}
}
await InternalPushAsync(url, expected, content, cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -198,4 +180,98 @@
/// <returns></returns>
public async Task DeleteAsync(Descriptor target, CancellationToken cancellationToken = default)
=> await Repository.DeleteAsync(target, false, cancellationToken).ConfigureAwait(false);

/// <summary>
/// Mounts the given descriptor from contentReference into the blob store.
/// </summary>
/// <param name="descriptor"></param>
/// <param name="contentReference"></param>
/// <param name="getContents"></param>
/// <param name="cancellationToken"></param>
/// <exception cref="HttpRequestException"></exception>
/// <exception cref="Exception"></exception>
public async Task MountAsync(Descriptor descriptor, string contentReference,
Func<CancellationToken, Task<Stream>>? getContents, CancellationToken cancellationToken)
{
var url = new UriFactory(Repository.Options).BuildRepositoryBlobUpload();
var mountReq = new HttpRequestMessage(HttpMethod.Post, new UriBuilder(url)
{
Query =
$"{url.Query}&mount={HttpUtility.UrlEncode(descriptor.Digest)}&from={HttpUtility.UrlEncode(contentReference)}"
}.Uri);

using (var response = await Repository.Options.HttpClient.SendAsync(mountReq, cancellationToken)
.ConfigureAwait(false))
{
switch (response.StatusCode)
{
case HttpStatusCode.Created:
// 201, layer has been mounted
return;
case HttpStatusCode.Accepted:
{
// 202, mounting failed. upload session has begun
var location = response.Headers.Location ??
throw new HttpRequestException("missing location header");
url = location.IsAbsoluteUri ? location : new Uri(url, location);
break;
}
default:
throw await response.ParseErrorResponseAsync(cancellationToken).ConfigureAwait(false);
}
}

// From the [spec]:
//
// "If a registry does not support cross-repository mounting
// or is unable to mount the requested blob,
// it SHOULD return a 202.
// This indicates that the upload session has begun
// and that the client MAY proceed with the upload."
//
// So we need to get the content from somewhere in order to
// push it. If the caller has provided a getContent function, we
// can use that, otherwise pull the content from the source repository.
//
// [spec]: https://github.com/opencontainers/distribution-spec/blob/v1.1.0/spec.md#mounting-a-blob-from-another-repository

Stream contents;
if (getContents != null)
{
contents = await getContents(cancellationToken).ConfigureAwait(false);
}
else
{
var referenceOptions = repository.Options with
{
Reference = Reference.Parse(contentReference),
};
contents = await new Repository(referenceOptions).FetchAsync(descriptor, cancellationToken);
}

await InternalPushAsync(url, descriptor, contents, cancellationToken).ConfigureAwait(false);
}

private async Task InternalPushAsync(Uri url, Descriptor descriptor, Stream content,
CancellationToken cancellationToken)
{
// monolithic upload
// add digest key to query string with descriptor digest value
var req = new HttpRequestMessage(HttpMethod.Put, new UriBuilder(url)
{
Query = $"{url.Query}&digest={HttpUtility.UrlEncode(descriptor.Digest)}"
}.Uri);
req.Content = new StreamContent(content);
req.Content.Headers.ContentLength = descriptor.Size;

// the descriptor media type is ignored as in the API doc.
req.Content.Headers.ContentType = new MediaTypeHeaderValue(MediaTypeNames.Application.Octet);

using var response =
await Repository.Options.HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false);
if (response.StatusCode != HttpStatusCode.Created)
{
throw await response.ParseErrorResponseAsync(cancellationToken).ConfigureAwait(false);

Check warning on line 274 in src/OrasProject.Oras/Registry/Remote/BlobStore.cs

View check run for this annotation

Codecov / codecov/patch

src/OrasProject.Oras/Registry/Remote/BlobStore.cs#L273-L274

Added lines #L273 - L274 were not covered by tests
}
}
}
18 changes: 18 additions & 0 deletions src/OrasProject.Oras/Registry/Remote/Repository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -331,4 +331,22 @@ internal Reference ParseReferenceFromContentReference(string reference)
/// <param name="desc"></param>
/// <returns></returns>
private IBlobStore BlobStore(Descriptor desc) => IsManifest(desc) ? Manifests : Blobs;

/// <summary>
/// Mount makes the blob with the given digest in fromRepo
/// available in the repository signified by the receiver.
///
/// This avoids the need to pull content down from fromRepo only to push it to r.
///
/// If the registry does not implement mounting, getContent will be used to get the
/// content to push. If getContent is null, the content will be pulled from the source
/// repository.
/// </summary>
/// <param name="descriptor"></param>
/// <param name="contentReference"></param>
/// <param name="getContents"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task MountAsync(Descriptor descriptor, string contentReference, Func<CancellationToken, Task<Stream>>? getContents, CancellationToken cancellationToken)
=> ((IMounter)Blobs).MountAsync(descriptor,contentReference, getContents, cancellationToken);
}
Loading
Loading