Skip to content

Commit bfb416f

Browse files
committed
Implemented buffer pool feature.
1 parent e7f4c5e commit bfb416f

19 files changed

+231
-376
lines changed

FSharp.FlashCap/CaptureDeviceDescriptorExtension.fs

Lines changed: 12 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ module public CaptureDeviceDescriptorExtension =
3939
?ct: CancellationToken) : Async<CaptureDevice> =
4040
self.InternalOpenWithFrameProcessorAsync(
4141
characteristics, TranscodeFormats.Auto,
42-
new DelegatedQueuingProcessor(pixelBufferArrived, 1),
42+
new DelegatedQueuingProcessor(pixelBufferArrived, 1, new DefaultBufferPool()),
4343
asCT ct) |> Async.AwaitTask
4444

4545
member self.openDevice(
@@ -50,7 +50,7 @@ module public CaptureDeviceDescriptorExtension =
5050
self.InternalOpenWithFrameProcessorAsync(
5151
characteristics,
5252
transcodeFormat,
53-
new DelegatedQueuingProcessor(pixelBufferArrived, 1),
53+
new DelegatedQueuingProcessor(pixelBufferArrived, 1, new DefaultBufferPool()),
5454
asCT ct) |> Async.AwaitTask
5555

5656
member self.openDevice(
@@ -64,8 +64,8 @@ module public CaptureDeviceDescriptorExtension =
6464
characteristics,
6565
transcodeFormat,
6666
(match isScattering with
67-
| true -> (new DelegatedScatteringProcessor(pixelBufferArrived, maxQueuingFrames) :> FrameProcessor)
68-
| false -> (new DelegatedQueuingProcessor(pixelBufferArrived, maxQueuingFrames) :> FrameProcessor)),
67+
| true -> (new DelegatedScatteringProcessor(pixelBufferArrived, maxQueuingFrames, new DefaultBufferPool()) :> FrameProcessor)
68+
| false -> (new DelegatedQueuingProcessor(pixelBufferArrived, maxQueuingFrames, new DefaultBufferPool()) :> FrameProcessor)),
6969
asCT ct) |> Async.AwaitTask
7070

7171
//////////////////////////////////////////////////////////////////////////////////
@@ -77,7 +77,7 @@ module public CaptureDeviceDescriptorExtension =
7777
self.InternalOpenWithFrameProcessorAsync(
7878
characteristics,
7979
TranscodeFormats.Auto,
80-
new DelegatedQueuingTaskProcessor(asTask pixelBufferArrived, 1),
80+
new DelegatedQueuingTaskProcessor(asTask pixelBufferArrived, 1, new DefaultBufferPool()),
8181
asCT ct) |> Async.AwaitTask
8282

8383
member self.openDevice(
@@ -88,7 +88,7 @@ module public CaptureDeviceDescriptorExtension =
8888
self.InternalOpenWithFrameProcessorAsync(
8989
characteristics,
9090
transcodeFormat,
91-
new DelegatedQueuingTaskProcessor(asTask pixelBufferArrived, 1),
91+
new DelegatedQueuingTaskProcessor(asTask pixelBufferArrived, 1, new DefaultBufferPool()),
9292
asCT ct) |> Async.AwaitTask
9393

9494
member self.openDevice(
@@ -102,8 +102,8 @@ module public CaptureDeviceDescriptorExtension =
102102
characteristics,
103103
transcodeFormat,
104104
(match isScattering with
105-
| true -> (new DelegatedScatteringTaskProcessor(asTask pixelBufferArrived, maxQueuingFrames) :> FrameProcessor)
106-
| false -> (new DelegatedQueuingTaskProcessor(asTask pixelBufferArrived, maxQueuingFrames) :> FrameProcessor)),
105+
| true -> (new DelegatedScatteringTaskProcessor(asTask pixelBufferArrived, maxQueuingFrames, new DefaultBufferPool()) :> FrameProcessor)
106+
| false -> (new DelegatedQueuingTaskProcessor(asTask pixelBufferArrived, maxQueuingFrames, new DefaultBufferPool()) :> FrameProcessor)),
107107
asCT ct) |> Async.AwaitTask
108108

109109
//////////////////////////////////////////////////////////////////////////////////
@@ -116,7 +116,7 @@ module public CaptureDeviceDescriptorExtension =
116116
characteristics,
117117
TranscodeFormats.Auto,
118118
(new DelegatedQueuingProcessor(
119-
new PixelBufferArrivedDelegate(observerProxy.OnPixelBufferArrived), 1)), asCT ct) |> Async.AwaitTask
119+
new PixelBufferArrivedDelegate(observerProxy.OnPixelBufferArrived), 1, new DefaultBufferPool())), asCT ct) |> Async.AwaitTask
120120
return new ObservableCaptureDevice(captureDevice, observerProxy)
121121
}
122122

@@ -129,7 +129,7 @@ module public CaptureDeviceDescriptorExtension =
129129
characteristics,
130130
transcodeFormat,
131131
(new DelegatedQueuingProcessor(
132-
new PixelBufferArrivedDelegate(observerProxy.OnPixelBufferArrived), 1)), asCT ct) |> Async.AwaitTask
132+
new PixelBufferArrivedDelegate(observerProxy.OnPixelBufferArrived), 1, new DefaultBufferPool())), asCT ct) |> Async.AwaitTask
133133
return new ObservableCaptureDevice(captureDevice, observerProxy)
134134
}
135135

@@ -145,8 +145,8 @@ module public CaptureDeviceDescriptorExtension =
145145
characteristics,
146146
transcodeFormat,
147147
(match isScattering with
148-
| true -> (new DelegatedScatteringProcessor(pixelBufferArrived, maxQueuingFrames) :> FrameProcessor)
149-
| false -> (new DelegatedQueuingProcessor(pixelBufferArrived, maxQueuingFrames) :> FrameProcessor)), asCT ct) |> Async.AwaitTask
148+
| true -> (new DelegatedScatteringProcessor(pixelBufferArrived, maxQueuingFrames, new DefaultBufferPool()) :> FrameProcessor)
149+
| false -> (new DelegatedQueuingProcessor(pixelBufferArrived, maxQueuingFrames, new DefaultBufferPool()) :> FrameProcessor)), asCT ct) |> Async.AwaitTask
150150
return new ObservableCaptureDevice(captureDevice, observerProxy)
151151
}
152152

@@ -168,163 +168,3 @@ module public CaptureDeviceDescriptorExtension =
168168
characteristics,
169169
transcodeFormat,
170170
asCT ct) |> Async.AwaitTask
171-
172-
//////////////////////////////////////////////////////////////////////////////////
173-
174-
[<Obsolete("This function is obsoleted, please use `openDevice` instead.")>]
175-
[<EditorBrowsable(EditorBrowsableState.Never)>]
176-
member self.openAsync(
177-
characteristics: VideoCharacteristics,
178-
pixelBufferArrived: PixelBufferScope -> unit,
179-
?ct: CancellationToken) : Async<CaptureDevice> =
180-
self.InternalOpenWithFrameProcessorAsync(
181-
characteristics, TranscodeFormats.Auto,
182-
new DelegatedQueuingProcessor(pixelBufferArrived, 1),
183-
asCT ct) |> Async.AwaitTask
184-
185-
[<Obsolete("This function is obsoleted, please use `openDevice` instead.")>]
186-
[<EditorBrowsable(EditorBrowsableState.Never)>]
187-
member self.openAsync(
188-
characteristics: VideoCharacteristics,
189-
transcodeIfYUV: bool,
190-
pixelBufferArrived: PixelBufferScope -> unit,
191-
?ct: CancellationToken) : Async<CaptureDevice> =
192-
self.InternalOpenWithFrameProcessorAsync(
193-
characteristics,
194-
toFormat transcodeIfYUV,
195-
new DelegatedQueuingProcessor(pixelBufferArrived, 1),
196-
asCT ct) |> Async.AwaitTask
197-
198-
[<Obsolete("This function is obsoleted, please use `openDevice` instead.")>]
199-
[<EditorBrowsable(EditorBrowsableState.Never)>]
200-
member self.openAsync(
201-
characteristics: VideoCharacteristics,
202-
transcodeIfYUV: bool,
203-
isScattering: bool,
204-
maxQueuingFrames: int,
205-
pixelBufferArrived: PixelBufferScope -> unit,
206-
?ct: CancellationToken) : Async<CaptureDevice> =
207-
self.InternalOpenWithFrameProcessorAsync(
208-
characteristics,
209-
toFormat transcodeIfYUV,
210-
(match isScattering with
211-
| true -> (new DelegatedScatteringProcessor(pixelBufferArrived, maxQueuingFrames) :> FrameProcessor)
212-
| false -> (new DelegatedQueuingProcessor(pixelBufferArrived, maxQueuingFrames) :> FrameProcessor)),
213-
asCT ct) |> Async.AwaitTask
214-
215-
//////////////////////////////////////////////////////////////////////////////////
216-
217-
[<Obsolete("This function is obsoleted, please use `openDevice` instead.")>]
218-
[<EditorBrowsable(EditorBrowsableState.Never)>]
219-
member self.openAsync(
220-
characteristics: VideoCharacteristics,
221-
pixelBufferArrived: PixelBufferScope -> Async<unit>,
222-
?ct: CancellationToken) : Async<CaptureDevice> =
223-
self.InternalOpenWithFrameProcessorAsync(
224-
characteristics,
225-
TranscodeFormats.Auto,
226-
new DelegatedQueuingTaskProcessor(asTask pixelBufferArrived, 1),
227-
asCT ct) |> Async.AwaitTask
228-
229-
[<Obsolete("This function is obsoleted, please use `openDevice` instead.")>]
230-
[<EditorBrowsable(EditorBrowsableState.Never)>]
231-
member self.openAsync(
232-
characteristics: VideoCharacteristics,
233-
transcodeIfYUV: bool,
234-
pixelBufferArrived: PixelBufferScope -> Async<unit>,
235-
?ct: CancellationToken) : Async<CaptureDevice> =
236-
self.InternalOpenWithFrameProcessorAsync(
237-
characteristics,
238-
toFormat transcodeIfYUV,
239-
new DelegatedQueuingTaskProcessor(asTask pixelBufferArrived, 1),
240-
asCT ct) |> Async.AwaitTask
241-
242-
[<Obsolete("This function is obsoleted, please use `openDevice` instead.")>]
243-
[<EditorBrowsable(EditorBrowsableState.Never)>]
244-
member self.openAsync(
245-
characteristics: VideoCharacteristics,
246-
transcodeIfYUV: bool,
247-
isScattering: bool,
248-
maxQueuingFrames: int,
249-
pixelBufferArrived: PixelBufferScope -> Async<unit>,
250-
?ct: CancellationToken) : Async<CaptureDevice> =
251-
self.InternalOpenWithFrameProcessorAsync(
252-
characteristics,
253-
toFormat transcodeIfYUV,
254-
(match isScattering with
255-
| true -> (new DelegatedScatteringTaskProcessor(asTask pixelBufferArrived, maxQueuingFrames) :> FrameProcessor)
256-
| false -> (new DelegatedQueuingTaskProcessor(asTask pixelBufferArrived, maxQueuingFrames) :> FrameProcessor)),
257-
asCT ct) |> Async.AwaitTask
258-
259-
//////////////////////////////////////////////////////////////////////////////////
260-
261-
[<Obsolete("This function is obsoleted, please use `asObservable` instead.")>]
262-
[<EditorBrowsable(EditorBrowsableState.Never)>]
263-
member self.asObservableAsync(
264-
characteristics: VideoCharacteristics,
265-
?ct: CancellationToken) : Async<ObservableCaptureDevice> = async {
266-
let observerProxy = new ObservableCaptureDevice.ObserverProxy()
267-
let! captureDevice = self.InternalOpenWithFrameProcessorAsync(
268-
characteristics,
269-
TranscodeFormats.Auto,
270-
(new DelegatedQueuingProcessor(
271-
new PixelBufferArrivedDelegate(observerProxy.OnPixelBufferArrived), 1)), asCT ct) |> Async.AwaitTask
272-
return new ObservableCaptureDevice(captureDevice, observerProxy)
273-
}
274-
275-
[<Obsolete("This function is obsoleted, please use `asObservable` instead.")>]
276-
[<EditorBrowsable(EditorBrowsableState.Never)>]
277-
member self.asObservableAsync(
278-
characteristics: VideoCharacteristics,
279-
transcodeIfYUV: bool,
280-
?ct: CancellationToken) : Async<ObservableCaptureDevice> = async {
281-
let observerProxy = new ObservableCaptureDevice.ObserverProxy()
282-
let! captureDevice = self.InternalOpenWithFrameProcessorAsync(
283-
characteristics,
284-
toFormat transcodeIfYUV,
285-
(new DelegatedQueuingProcessor(
286-
new PixelBufferArrivedDelegate(observerProxy.OnPixelBufferArrived), 1)), asCT ct) |> Async.AwaitTask
287-
return new ObservableCaptureDevice(captureDevice, observerProxy)
288-
}
289-
290-
[<Obsolete("This function is obsoleted, please use `asObservable` instead.")>]
291-
[<EditorBrowsable(EditorBrowsableState.Never)>]
292-
member self.asObservableAsync(
293-
characteristics: VideoCharacteristics,
294-
transcodeIfYUV: bool,
295-
isScattering: bool,
296-
maxQueuingFrames: int,
297-
?ct: CancellationToken) : Async<ObservableCaptureDevice> = async {
298-
let observerProxy = new ObservableCaptureDevice.ObserverProxy()
299-
let pixelBufferArrived = new PixelBufferArrivedDelegate(observerProxy.OnPixelBufferArrived)
300-
let! captureDevice = self.InternalOpenWithFrameProcessorAsync(
301-
characteristics,
302-
toFormat transcodeIfYUV,
303-
(match isScattering with
304-
| true -> (new DelegatedScatteringProcessor(pixelBufferArrived, maxQueuingFrames) :> FrameProcessor)
305-
| false -> (new DelegatedQueuingProcessor(pixelBufferArrived, maxQueuingFrames) :> FrameProcessor)), asCT ct) |> Async.AwaitTask
306-
return new ObservableCaptureDevice(captureDevice, observerProxy)
307-
}
308-
309-
//////////////////////////////////////////////////////////////////////////////////
310-
311-
[<Obsolete("This function is obsoleted, please use `takeOneShot` instead.")>]
312-
[<EditorBrowsable(EditorBrowsableState.Never)>]
313-
member self.takeOneShotAsync(
314-
characteristics: VideoCharacteristics,
315-
?ct: CancellationToken) : Async<byte[]> =
316-
self.InternalTakeOneShotAsync(
317-
characteristics,
318-
TranscodeFormats.Auto,
319-
asCT ct) |> Async.AwaitTask
320-
321-
[<Obsolete("This function is obsoleted, please use `takeOneShot` instead.")>]
322-
[<EditorBrowsable(EditorBrowsableState.Never)>]
323-
member self.takeOneShotAsync(
324-
characteristics: VideoCharacteristics,
325-
transcodeIfYUV: bool,
326-
?ct: CancellationToken) : Async<byte[]> =
327-
self.InternalTakeOneShotAsync(
328-
characteristics,
329-
toFormat transcodeIfYUV,
330-
asCT ct) |> Async.AwaitTask

FSharp.FlashCap/CaptureDeviceExtension.fs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,3 @@ module public CaptureDeviceExtension =
2525

2626
member self.showPropertyPage(parentWindow: nativeint, ?ct: CancellationToken) =
2727
self.InternalShowPropertyPageAsync(parentWindow, asCT ct) |> Async.AwaitTask
28-
29-
[<Obsolete("This function is obsoleted, please use `start` instead.")>]
30-
member self.startAsync(?ct: CancellationToken) =
31-
self.InternalStartAsync(asCT ct) |> Async.AwaitTask
32-
33-
[<Obsolete("This function is obsoleted, please use `stop` instead.")>]
34-
member self.stopAsync(?ct: CancellationToken) =
35-
self.InternalStopAsync(asCT ct) |> Async.AwaitTask

FSharp.FlashCap/ObservableCaptureDeviceExtension.fs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,5 @@ module public ObservableCaptureDeviceExtension =
2323
member self.stop(?ct: CancellationToken) =
2424
self.InternalStopAsync(asCT ct) |> Async.AwaitTask
2525

26-
[<Obsolete("This function is obsoleted, please use `start` instead.")>]
27-
member self.startAsync(?ct: CancellationToken) =
28-
self.InternalStartAsync(asCT ct) |> Async.AwaitTask
29-
30-
[<Obsolete("This function is obsoleted, please use `stop` instead.")>]
31-
member self.stopAsync(?ct: CancellationToken) =
32-
self.InternalStopAsync(asCT ct) |> Async.AwaitTask
33-
3426
member self.subscribe(observer: IObserver<PixelBufferScope>) =
3527
self.InternalSubscribe(observer)

FlashCap.Core/BufferPool.cs

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
////////////////////////////////////////////////////////////////////////////
2+
//
3+
// FlashCap - Independent camera capture library.
4+
// Copyright (c) Kouji Matsui (@kozy_kekyo, @[email protected])
5+
//
6+
// Licensed under Apache-v2: https://opensource.org/licenses/Apache-2.0
7+
//
8+
////////////////////////////////////////////////////////////////////////////
9+
10+
using System.Diagnostics;
11+
using System.Linq;
12+
using System.Threading;
13+
14+
namespace FlashCap;
15+
16+
public abstract class BufferPool
17+
{
18+
protected BufferPool()
19+
{
20+
}
21+
22+
public abstract byte[] Rent(int size, bool exactSize);
23+
public abstract void Return(byte[] buffer);
24+
}
25+
26+
public sealed class DefaultBufferPool :
27+
BufferPool
28+
{
29+
// Imported from:
30+
// https://github.com/kekyo/GitReader/blob/main/GitReader.Core/Internal/BufferPool.cs
31+
32+
// Tried and tested, but simple strategies were the fastest.
33+
// Probably because each buffer table and lookup fragments on the CPU cache.
34+
35+
private const int BufferHolderLength = 13;
36+
37+
[DebuggerStepThrough]
38+
private sealed class BufferHolder
39+
{
40+
private readonly byte[]?[] buffers;
41+
42+
public BufferHolder(int maxReservedBufferElements) =>
43+
this.buffers = new byte[maxReservedBufferElements][];
44+
45+
public byte[] Rent(int size, bool exactSize)
46+
{
47+
for (var index = 0; index < this.buffers.Length; index++)
48+
{
49+
var buffer = this.buffers[index];
50+
if (buffer != null && (exactSize ? (buffer.Length == size) : (buffer.Length <= size)))
51+
{
52+
if (Interlocked.CompareExchange(ref this.buffers[index], null, buffer) == buffer)
53+
{
54+
return buffer!;
55+
}
56+
}
57+
}
58+
59+
return new byte[size];
60+
}
61+
62+
public void Return(byte[] buffer)
63+
{
64+
for (var index = 0; index < this.buffers.Length; index++)
65+
{
66+
if (this.buffers[index] == null)
67+
{
68+
if (Interlocked.CompareExchange(ref this.buffers[index], buffer, null) == null)
69+
{
70+
break;
71+
}
72+
}
73+
}
74+
75+
// It was better to simply discard a buffer instance than the cost of extending the table.
76+
}
77+
}
78+
79+
private readonly BufferHolder[] bufferHolders;
80+
81+
public DefaultBufferPool() : this(32)
82+
{
83+
}
84+
85+
public DefaultBufferPool(int maxReservedBufferElements) =>
86+
this.bufferHolders = Enumerable.Range(0, BufferHolderLength).
87+
Select(_ => new BufferHolder(maxReservedBufferElements)).
88+
ToArray();
89+
90+
public override byte[] Rent(int size, bool exactSize)
91+
{
92+
// NOTE: Size is determined on a "less than" basis,
93+
// which may result in placement in different buckets and missed opportunities for reuse.
94+
// This implementation is ignored it.
95+
var bufferHolder = this.bufferHolders[size % this.bufferHolders.Length];
96+
return bufferHolder.Rent(size, exactSize);
97+
}
98+
99+
public override void Return(byte[] buffer)
100+
{
101+
if (Interlocked.Exchange(ref buffer, null!) is { } b)
102+
{
103+
var bufferHolder = this.bufferHolders[b.Length % this.bufferHolders.Length];
104+
bufferHolder.Return(b);
105+
}
106+
}
107+
}

0 commit comments

Comments
 (0)