-
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathoutput.d
105 lines (83 loc) · 2.3 KB
/
output.d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
module jin.go.output;
import jin.go.channel;
import jin.go.input;
import jin.go.await;
import std.range;
/// Round robin output channel.
/// Implements OutputRange.
struct Output(Message)
{
alias Pair = Input;
mixin Channel!Message;
/// Count of messages that can be privided now.
/// Negative value - new messages will never provided.
ptrdiff_t available()
{
ptrdiff_t available = -1;
if (this.queues.length == 0)
return available;
foreach (i; this.queues.length.iota)
{
const queue = this.queues[this.current];
const available2 = queue.available;
if (available2 > 0)
return available2;
// skip full queue
if (available2 < 0)
{
this.currentUnlink();
continue;
}
available = 0;
this.current = (this.current + 1) % this.queues.length;
}
return available;
}
/// True when no more messages will be consumed.
bool ignore()
{
return this.available == -1;
}
/// Send all items from Input Range
void feed(Values)(Values input) if (isInputRange!Values)
{
size_t available = 0;
foreach (item; input)
{
if (!available)
{
available = this.available.await;
if (available == -1)
return;
}
const current = this.current;
this.queues[current].put(item);
available -= 1;
}
}
/// Put message to current non full Queue and switch Queue
/// `available` must be checked before.
void put(Value)(Value value)
{
const available = this.available.await;
if (available == -1)
return;
const current = this.current;
this.queues[current].put(value);
this.current = (current + 1) % this.queues.length;
}
/// Create and put message.
/// `available` must be checked before.
void put(Value, Args...)(Args args)
{
this.put(Value(args));
}
/// Finalizes all cursors on destroy.
~this()
{
if (this.immortal)
return;
foreach (queue; this.queues)
queue.producer.finalize();
}
}