@@ -7,6 +7,7 @@ namespace OverwatchTranscript
7
7
public interface IFinalizedBucket
8
8
{
9
9
bool IsEmpty { get ; }
10
+ void Update ( ) ;
10
11
DateTime ? SeeTopUtc ( ) ;
11
12
BucketTop ? TakeTop ( ) ;
12
13
}
@@ -28,7 +29,8 @@ public class EventBucketReader : IFinalizedBucket
28
29
private readonly string bucketFile ;
29
30
private readonly ConcurrentQueue < BucketTop > topQueue = new ConcurrentQueue < BucketTop > ( ) ;
30
31
private readonly AutoResetEvent itemDequeued = new AutoResetEvent ( false ) ;
31
- private bool stopping ;
32
+ private readonly AutoResetEvent itemEnqueued = new AutoResetEvent ( false ) ;
33
+ private bool sourceIsEmpty ;
32
34
33
35
public EventBucketReader ( ILog log , string bucketFile )
34
36
{
@@ -42,34 +44,38 @@ public EventBucketReader(ILog log, string bucketFile)
42
44
43
45
public bool IsEmpty { get ; private set ; }
44
46
47
+ public void Update ( )
48
+ {
49
+ if ( IsEmpty ) return ;
50
+ while ( topQueue . Count == 0 )
51
+ {
52
+ UpdateIsEmpty ( ) ;
53
+ if ( IsEmpty ) return ;
54
+
55
+ itemDequeued . Set ( ) ;
56
+ itemEnqueued . WaitOne ( 200 ) ;
57
+ }
58
+ }
59
+
45
60
public DateTime ? SeeTopUtc ( )
46
61
{
47
62
if ( IsEmpty ) return null ;
48
- while ( true )
63
+ if ( topQueue . TryPeek ( out BucketTop ? top ) )
49
64
{
50
- UpdateIsEmpty ( ) ;
51
- if ( IsEmpty ) return null ;
52
- if ( topQueue . TryPeek ( out BucketTop ? top ) )
53
- {
54
- return top . Utc ;
55
- }
65
+ return top . Utc ;
56
66
}
67
+ return null ;
57
68
}
58
69
59
70
public BucketTop ? TakeTop ( )
60
71
{
61
72
if ( IsEmpty ) return null ;
62
-
63
- while ( true )
73
+ if ( topQueue . TryDequeue ( out BucketTop ? top ) )
64
74
{
65
- UpdateIsEmpty ( ) ;
66
- if ( IsEmpty ) return null ;
67
- if ( topQueue . TryDequeue ( out BucketTop ? top ) )
68
- {
69
- itemDequeued . Set ( ) ;
70
- return top ;
71
- }
75
+ itemDequeued . Set ( ) ;
76
+ return top ;
72
77
}
78
+ return null ;
73
79
}
74
80
75
81
private void ReadBucket ( )
@@ -85,23 +91,25 @@ private void ReadBucket()
85
91
if ( top != null )
86
92
{
87
93
topQueue . Enqueue ( top ) ;
94
+ itemEnqueued . Set ( ) ;
88
95
}
89
96
else
90
97
{
91
- stopping = true ;
98
+ sourceIsEmpty = true ;
99
+ UpdateIsEmpty ( ) ;
92
100
return ;
93
101
}
94
102
}
95
103
96
104
itemDequeued . Reset ( ) ;
97
- itemDequeued . WaitOne ( ) ;
105
+ itemDequeued . WaitOne ( 5000 ) ;
98
106
}
99
107
}
100
108
101
109
private void UpdateIsEmpty ( )
102
110
{
103
- var empty = stopping && topQueue . IsEmpty ;
104
- if ( ! IsEmpty && empty )
111
+ var allEmpty = sourceIsEmpty && topQueue . IsEmpty ;
112
+ if ( ! IsEmpty && allEmpty )
105
113
{
106
114
File . Delete ( bucketFile ) ;
107
115
IsEmpty = true ;
0 commit comments