Skip to content

Commit fd6f920

Browse files
authored
chunk websocket frame when buff > 15M (#981)
* ws is now chunked * remove chunk in example * fix build
1 parent ca4e350 commit fd6f920

File tree

5 files changed

+209
-20
lines changed

5 files changed

+209
-20
lines changed

examples/cp/Cp.cs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,7 @@ private static void ValidatePathParameters(string sourcePath, string destination
7373

7474
memoryStream.Position = 0;
7575

76-
const int bufferSize = 31 * 1024 * 1024; // must be lower than 32 * 1024 * 1024
77-
byte[] localBuffer = new byte[bufferSize];
78-
while (true)
79-
{
80-
int numRead = await memoryStream.ReadAsync(localBuffer, 0, localBuffer.Length);
81-
if (numRead <= 0)
82-
{
83-
break;
84-
}
85-
await stdIn.WriteAsync(localBuffer, 0, numRead);
86-
}
76+
await memoryStream.CopyToAsync(stdIn);
8777
await stdIn.FlushAsync();
8878
}
8979

kubernetes-client.sln

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "webApiDependencyInjection",
7171
EndProject
7272
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "workerServiceDependencyInjection", "examples\workerServiceDependencyInjection\workerServiceDependencyInjection.csproj", "{05DC8884-AC54-4603-AC25-AE9D9F24E7AE}"
7373
EndProject
74+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "cp", "examples\cp\cp.csproj", "{CC41E248-2139-427E-8DD4-B047A8924FD2}"
75+
EndProject
7476
Global
7577
GlobalSection(SolutionConfigurationPlatforms) = preSolution
7678
Debug|Any CPU = Debug|Any CPU
@@ -453,6 +455,18 @@ Global
453455
{05DC8884-AC54-4603-AC25-AE9D9F24E7AE}.Release|x64.Build.0 = Release|Any CPU
454456
{05DC8884-AC54-4603-AC25-AE9D9F24E7AE}.Release|x86.ActiveCfg = Release|Any CPU
455457
{05DC8884-AC54-4603-AC25-AE9D9F24E7AE}.Release|x86.Build.0 = Release|Any CPU
458+
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
459+
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|Any CPU.Build.0 = Debug|Any CPU
460+
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|x64.ActiveCfg = Debug|Any CPU
461+
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|x64.Build.0 = Debug|Any CPU
462+
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|x86.ActiveCfg = Debug|Any CPU
463+
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|x86.Build.0 = Debug|Any CPU
464+
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|Any CPU.ActiveCfg = Release|Any CPU
465+
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|Any CPU.Build.0 = Release|Any CPU
466+
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|x64.ActiveCfg = Release|Any CPU
467+
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|x64.Build.0 = Release|Any CPU
468+
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|x86.ActiveCfg = Release|Any CPU
469+
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|x86.Build.0 = Release|Any CPU
456470
EndGlobalSection
457471
GlobalSection(SolutionProperties) = preSolution
458472
HideSolutionNode = FALSE
@@ -489,6 +503,7 @@ Global
489503
{8E266190-AE6E-44A8-948D-BD974AA82428} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
490504
{C0759F88-A010-4DEF-BD3B-E183D3328FFC} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
491505
{05DC8884-AC54-4603-AC25-AE9D9F24E7AE} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
506+
{CC41E248-2139-427E-8DD4-B047A8924FD2} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
492507
EndGlobalSection
493508
GlobalSection(ExtensibilityGlobals) = postSolution
494509
SolutionGuid = {049A763A-C891-4E8D-80CF-89DD3E22ADC7}

src/KubernetesClient/StreamDemuxer.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ namespace k8s
2121
/// </summary>
2222
public class StreamDemuxer : IStreamDemuxer
2323
{
24+
private const int MAXFRAMESIZE = 15 * 1024 * 1024; // 15MB
2425
private readonly WebSocket webSocket;
2526
private readonly Dictionary<byte, ByteBuffer> buffers = new Dictionary<byte, ByteBuffer>();
2627
private readonly CancellationTokenSource cts = new CancellationTokenSource();
@@ -156,15 +157,19 @@ public Task Write(ChannelIndex index, byte[] buffer, int offset, int count,
156157
public async Task Write(byte index, byte[] buffer, int offset, int count,
157158
CancellationToken cancellationToken = default)
158159
{
159-
var writeBuffer = ArrayPool<byte>.Shared.Rent(count + 1);
160+
var writeBuffer = ArrayPool<byte>.Shared.Rent(Math.Min(count, MAXFRAMESIZE) + 1);
160161

161162
try
162163
{
163-
writeBuffer[0] = (byte)index;
164-
Array.Copy(buffer, offset, writeBuffer, 1, count);
165-
var segment = new ArraySegment<byte>(writeBuffer, 0, count + 1);
166-
await webSocket.SendAsync(segment, WebSocketMessageType.Binary, false, cancellationToken)
167-
.ConfigureAwait(false);
164+
writeBuffer[0] = index;
165+
for (var i = 0; i < count; i += MAXFRAMESIZE)
166+
{
167+
var c = Math.Min(count - i, MAXFRAMESIZE);
168+
Buffer.BlockCopy(buffer, offset + i, writeBuffer, 1, c);
169+
var segment = new ArraySegment<byte>(writeBuffer, 0, c + 1);
170+
await webSocket.SendAsync(segment, WebSocketMessageType.Binary, false, cancellationToken)
171+
.ConfigureAwait(false);
172+
}
168173
}
169174
finally
170175
{

tests/E2E.Tests/E2E.Tests.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22
<PropertyGroup>
33
<IsPackable>false</IsPackable>
44
<SignAssembly>true</SignAssembly>
@@ -9,6 +9,7 @@
99
<ItemGroup>
1010

1111
<PackageReference Include="JsonPatch.Net" Version="2.0.3" />
12+
<PackageReference Include="SharpZipLib" Version="1.3.3" />
1213

1314
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.0" />
1415

tests/E2E.Tests/MinikubeTests.cs

Lines changed: 180 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Collections.Generic;
33
using System.Collections.ObjectModel;
44
using System.Diagnostics;
@@ -14,6 +14,9 @@
1414
using k8s.Autorest;
1515
using Nito.AsyncEx;
1616
using Xunit;
17+
using ICSharpCode.SharpZipLib.Tar;
18+
using System.Text;
19+
using System.Security.Cryptography;
1720

1821
namespace k8s.E2E
1922
{
@@ -398,7 +401,7 @@ void Cleanup()
398401
async Task<V1Pod> Pod()
399402
{
400403
var pods = client.CoreV1.ListNamespacedPod(namespaceParameter);
401-
var pod = pods.Items.First();
404+
var pod = pods.Items.First(p => p.Metadata.Name == podName);
402405
while (pod.Status.Phase != "Running")
403406
{
404407
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
@@ -548,6 +551,181 @@ await genericPods.CreateNamespacedAsync(
548551
}
549552

550553

554+
[MinikubeFact]
555+
public async Task CopyToPodTestAsync()
556+
{
557+
var namespaceParameter = "default";
558+
var podName = "k8scsharp-e2e-cp-pod";
559+
560+
var client = CreateClient();
561+
562+
async Task<int> CopyFileToPodAsync(string name, string @namespace, string container, Stream inputFileStream, string destinationFilePath, CancellationToken cancellationToken = default(CancellationToken))
563+
{
564+
// The callback which processes the standard input, standard output and standard error of exec method
565+
var handler = new ExecAsyncCallback(async (stdIn, stdOut, stdError) =>
566+
{
567+
var fileInfo = new FileInfo(destinationFilePath);
568+
try
569+
{
570+
using (var memoryStream = new MemoryStream())
571+
{
572+
using (var tarOutputStream = new TarOutputStream(memoryStream, Encoding.Default))
573+
{
574+
tarOutputStream.IsStreamOwner = false;
575+
576+
var fileSize = inputFileStream.Length;
577+
var entry = TarEntry.CreateTarEntry(fileInfo.Name);
578+
579+
entry.Size = fileSize;
580+
581+
tarOutputStream.PutNextEntry(entry);
582+
await inputFileStream.CopyToAsync(tarOutputStream).ConfigureAwait(false);
583+
tarOutputStream.CloseEntry();
584+
}
585+
586+
memoryStream.Position = 0;
587+
588+
await memoryStream.CopyToAsync(stdIn).ConfigureAwait(false);
589+
await memoryStream.FlushAsync().ConfigureAwait(false);
590+
stdIn.Close();
591+
}
592+
}
593+
catch (Exception ex)
594+
{
595+
throw new IOException($"Copy command failed: {ex.Message}");
596+
}
597+
598+
using StreamReader streamReader = new StreamReader(stdError);
599+
while (streamReader.EndOfStream == false)
600+
{
601+
string error = await streamReader.ReadToEndAsync().ConfigureAwait(false);
602+
throw new IOException($"Copy command failed: {error}");
603+
}
604+
});
605+
606+
string destinationFolder = Path.GetDirectoryName(destinationFilePath).Replace("\\", "/");
607+
608+
return await client.NamespacedPodExecAsync(
609+
name,
610+
@namespace,
611+
container,
612+
new string[] { "tar", "-xmf", "-", "-C", destinationFolder },
613+
false,
614+
handler,
615+
cancellationToken).ConfigureAwait(false);
616+
}
617+
618+
619+
void Cleanup()
620+
{
621+
var pods = client.CoreV1.ListNamespacedPod(namespaceParameter);
622+
while (pods.Items.Any(p => p.Metadata.Name == podName))
623+
{
624+
try
625+
{
626+
client.CoreV1.DeleteNamespacedPod(podName, namespaceParameter);
627+
}
628+
catch (HttpOperationException e)
629+
{
630+
if (e.Response.StatusCode == System.Net.HttpStatusCode.NotFound)
631+
{
632+
return;
633+
}
634+
}
635+
}
636+
}
637+
638+
try
639+
{
640+
Cleanup();
641+
642+
client.CoreV1.CreateNamespacedPod(
643+
new V1Pod()
644+
{
645+
Metadata = new V1ObjectMeta { Name = podName, },
646+
Spec = new V1PodSpec
647+
{
648+
Containers = new[]
649+
{
650+
new V1Container()
651+
{
652+
Name = "container",
653+
Image = "ubuntu",
654+
// Image = "busybox", // TODO not work with busybox
655+
Command = new[] { "sleep" },
656+
Args = new[] { "infinity" },
657+
},
658+
},
659+
},
660+
},
661+
namespaceParameter);
662+
663+
var lines = new List<string>();
664+
var started = new ManualResetEvent(false);
665+
666+
async Task<V1Pod> Pod()
667+
{
668+
var pods = client.CoreV1.ListNamespacedPod(namespaceParameter);
669+
var pod = pods.Items.First(p => p.Metadata.Name == podName);
670+
while (pod.Status.Phase != "Running")
671+
{
672+
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
673+
return await Pod().ConfigureAwait(false);
674+
}
675+
676+
return pod;
677+
}
678+
679+
var pod = await Pod().ConfigureAwait(false);
680+
681+
682+
async Task AssertMd5sumAsync(string file, byte[] orig)
683+
{
684+
var ws = await client.WebSocketNamespacedPodExecAsync(
685+
pod.Metadata.Name,
686+
pod.Metadata.NamespaceProperty,
687+
new string[] { "md5sum", file },
688+
"container").ConfigureAwait(false);
689+
690+
var demux = new StreamDemuxer(ws);
691+
demux.Start();
692+
693+
var buff = new byte[4096];
694+
var stream = demux.GetStream(1, 1);
695+
var read = stream.Read(buff, 0, 4096);
696+
var remotemd5 = Encoding.Default.GetString(buff);
697+
remotemd5 = remotemd5.Substring(0, 32);
698+
699+
var md5 = MD5.Create().ComputeHash(orig);
700+
var localmd5 = BitConverter.ToString(md5).Replace("-", string.Empty).ToLower();
701+
702+
Assert.Equal(localmd5, remotemd5);
703+
}
704+
705+
706+
//
707+
{
708+
// small
709+
var content = new byte[1 * 1024 * 1024];
710+
new Random().NextBytes(content);
711+
await CopyFileToPodAsync(pod.Metadata.Name, pod.Metadata.NamespaceProperty, "container", new MemoryStream(content), "/tmp/test").ConfigureAwait(false);
712+
await AssertMd5sumAsync("/tmp/test", content).ConfigureAwait(false);
713+
}
714+
715+
{
716+
// big
717+
var content = new byte[40 * 1024 * 1024];
718+
new Random().NextBytes(content);
719+
await CopyFileToPodAsync(pod.Metadata.Name, pod.Metadata.NamespaceProperty, "container", new MemoryStream(content), "/tmp/test").ConfigureAwait(false);
720+
await AssertMd5sumAsync("/tmp/test", content).ConfigureAwait(false);
721+
}
722+
}
723+
finally
724+
{
725+
Cleanup();
726+
}
727+
}
728+
551729
public static IKubernetes CreateClient()
552730
{
553731
return new Kubernetes(KubernetesClientConfiguration.BuildDefaultConfig());

0 commit comments

Comments
 (0)