Skip to content

Commit ddbe5b1

Browse files
committed
Merge branch 'feature/block-retransmit'
2 parents 2f39327 + a0abea4 commit ddbe5b1

File tree

14 files changed

+335
-38
lines changed

14 files changed

+335
-38
lines changed

Framework/OverwatchTranscript/EventBucketReader.cs

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ namespace OverwatchTranscript
77
public interface IFinalizedBucket
88
{
99
bool IsEmpty { get; }
10+
void Update();
1011
DateTime? SeeTopUtc();
1112
BucketTop? TakeTop();
1213
}
@@ -28,7 +29,8 @@ public class EventBucketReader : IFinalizedBucket
2829
private readonly string bucketFile;
2930
private readonly ConcurrentQueue<BucketTop> topQueue = new ConcurrentQueue<BucketTop>();
3031
private readonly AutoResetEvent itemDequeued = new AutoResetEvent(false);
31-
private bool stopping;
32+
private readonly AutoResetEvent itemEnqueued = new AutoResetEvent(false);
33+
private bool sourceIsEmpty;
3234

3335
public EventBucketReader(ILog log, string bucketFile)
3436
{
@@ -42,34 +44,38 @@ public EventBucketReader(ILog log, string bucketFile)
4244

4345
public bool IsEmpty { get; private set; }
4446

47+
public void Update()
48+
{
49+
if (IsEmpty) return;
50+
while (topQueue.Count == 0)
51+
{
52+
UpdateIsEmpty();
53+
if (IsEmpty) return;
54+
55+
itemDequeued.Set();
56+
itemEnqueued.WaitOne(200);
57+
}
58+
}
59+
4560
public DateTime? SeeTopUtc()
4661
{
4762
if (IsEmpty) return null;
48-
while (true)
63+
if (topQueue.TryPeek(out BucketTop? top))
4964
{
50-
UpdateIsEmpty();
51-
if (IsEmpty) return null;
52-
if (topQueue.TryPeek(out BucketTop? top))
53-
{
54-
return top.Utc;
55-
}
65+
return top.Utc;
5666
}
67+
return null;
5768
}
5869

5970
public BucketTop? TakeTop()
6071
{
6172
if (IsEmpty) return null;
62-
63-
while (true)
73+
if (topQueue.TryDequeue(out BucketTop? top))
6474
{
65-
UpdateIsEmpty();
66-
if (IsEmpty) return null;
67-
if (topQueue.TryDequeue(out BucketTop? top))
68-
{
69-
itemDequeued.Set();
70-
return top;
71-
}
75+
itemDequeued.Set();
76+
return top;
7277
}
78+
return null;
7379
}
7480

7581
private void ReadBucket()
@@ -85,23 +91,25 @@ private void ReadBucket()
8591
if (top != null)
8692
{
8793
topQueue.Enqueue(top);
94+
itemEnqueued.Set();
8895
}
8996
else
9097
{
91-
stopping = true;
98+
sourceIsEmpty = true;
99+
UpdateIsEmpty();
92100
return;
93101
}
94102
}
95103

96104
itemDequeued.Reset();
97-
itemDequeued.WaitOne();
105+
itemDequeued.WaitOne(5000);
98106
}
99107
}
100108

101109
private void UpdateIsEmpty()
102110
{
103-
var empty = stopping && topQueue.IsEmpty;
104-
if (!IsEmpty && empty)
111+
var allEmpty = sourceIsEmpty && topQueue.IsEmpty;
112+
if (!IsEmpty && allEmpty)
105113
{
106114
File.Delete(bucketFile);
107115
IsEmpty = true;

Framework/OverwatchTranscript/MomentReferenceBuilder.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public OverwatchMomentReference[] Build(IFinalizedBucket[] finalizedBuckets)
2424
log.Debug($"Building references for {buckets.Count} buckets.");
2525
while (buckets.Any())
2626
{
27+
foreach (var b in buckets) b.Update();
28+
2729
buckets.RemoveAll(b => b.IsEmpty);
2830
if (!buckets.Any()) break;
2931

Tests/CodexTests/DownloadConnectivityTests/SwarmTests.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@ namespace CodexTests.DownloadConnectivityTests
77
public class SwarmTests : AutoBootstrapDistTest
88
{
99
[Test]
10+
[Combinatorial]
1011
[CreateTranscript("swarm_retransmit")]
11-
public void DetectBlockRetransmits()
12+
public void DetectBlockRetransmits(
13+
[Values(1, 5, 10, 20)] int fileSize,
14+
[Values(3, 5, 10, 20)] int numNodes
15+
)
1216
{
13-
var nodes = StartCodex(10);
14-
var file = GenerateTestFile(10.MB());
17+
var nodes = StartCodex(numNodes);
18+
var file = GenerateTestFile(fileSize.MB());
1519
var cid = nodes[0].UploadFile(file);
1620

1721
var tasks = nodes.Select(n => Task.Run(() => n.DownloadContent(cid))).ToArray();

Tests/FrameworkTests/OverwatchTranscriptTests/TranscriptTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,10 @@ private void AssertEvent(List<ActivateEvent<MyEvent>> events, DateTime utc, Time
117117
var e = events.SingleOrDefault(e => e.Moment.Utc == utc && e.Payload.EventData == data);
118118
if (e == null) Assert.Fail("Event not found");
119119

120-
Assert.That(e.Moment.Utc, Is.EqualTo(utc));
121-
Assert.That(e.Moment.Duration, Is.EqualTo(duration));
122-
Assert.That(e.Moment.Index, Is.EqualTo(index));
123-
Assert.That(e.Payload.EventData, Is.EqualTo(data));
120+
Assert.That(e!.Moment.Utc, Is.EqualTo(utc));
121+
Assert.That(e!.Moment.Duration, Is.EqualTo(duration));
122+
Assert.That(e!.Moment.Index, Is.EqualTo(index));
123+
Assert.That(e!.Payload.EventData, Is.EqualTo(data));
124124
}
125125

126126
private void AssertFileContent()

Tools/CsvCombiner/CsvCombiner.csproj

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<ProjectReference Include="..\..\Framework\Logging\Logging.csproj" />
12+
</ItemGroup>
13+
14+
</Project>

Tools/CsvCombiner/Program.cs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
using Logging;
2+
3+
public class Program
4+
{
5+
public static void Main(string[] args)
6+
{
7+
args = ["d:\\CodexTestLogs\\BlockExchange\\experiment2-fetchbatched"];
8+
var p = new Program(args[0]);
9+
p.Run();
10+
}
11+
12+
private static readonly ILog log = new ConsoleLog();
13+
private string path;
14+
15+
private readonly Dictionary<string, List<string>> combine = new Dictionary<string, List<string>>();
16+
17+
public Program(string path)
18+
{
19+
this.path = path;
20+
}
21+
22+
private void Run()
23+
{
24+
Log("Starting in " + path);
25+
26+
var files = Directory.GetFiles(path)
27+
.Where(f => f.ToLowerInvariant().EndsWith(".csv")).ToArray();
28+
29+
foreach (var file in files)
30+
{
31+
AddToMap(file);
32+
}
33+
34+
var i = 0;
35+
foreach (var pair in combine)
36+
{
37+
var list = pair.Value;
38+
list.Insert(0, pair.Key);
39+
40+
File.WriteAllLines(Path.Combine(path, "combine_" + i + ".csv"), list.ToArray());
41+
i++;
42+
}
43+
44+
Log("done");
45+
}
46+
47+
private void AddToMap(string file)
48+
{
49+
var lines = File.ReadAllLines(file);
50+
if (lines.Length > 1)
51+
{
52+
var header = lines[0];
53+
var list = GetList(header);
54+
list.AddRange(lines.Skip(1));
55+
}
56+
}
57+
58+
private List<string> GetList(string header)
59+
{
60+
if (!combine.ContainsKey(header))
61+
{
62+
combine.Add(header, new List<string>());
63+
}
64+
return combine[header];
65+
}
66+
67+
private void Log(string msg)
68+
{
69+
log.Log(msg);
70+
}
71+
}

Tools/TranscriptAnalysis/CsvWriter.cs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
using Logging;
2+
3+
namespace TranscriptAnalysis
4+
{
5+
public class CsvWriter
6+
{
7+
private readonly ILog log;
8+
9+
public CsvWriter(ILog log)
10+
{
11+
this.log = log;
12+
}
13+
14+
public ICsv CreateNew()
15+
{
16+
return new Csv();
17+
}
18+
19+
public void Write(ICsv csv, string filename)
20+
{
21+
var c = (Csv)csv;
22+
23+
using var file = File.OpenWrite(filename);
24+
using var writer = new StreamWriter(file);
25+
c.CreateLines(writer.WriteLine);
26+
27+
log.Log($"CSV written to: '{filename}'");
28+
}
29+
}
30+
31+
public interface ICsv
32+
{
33+
ICsvColumn GetColumn(string title, float defaultValue);
34+
ICsvColumn GetColumn(string title, string defaultValue);
35+
void AddRow(params CsvCell[] cells);
36+
}
37+
38+
public class Csv : ICsv
39+
{
40+
private readonly string Sep = ",";
41+
private readonly List<CsvColumn> columns = new List<CsvColumn>();
42+
private readonly List<CsvRow> rows = new List<CsvRow>();
43+
44+
public ICsvColumn GetColumn(string title, float defaultValue)
45+
{
46+
return GetColumn(title, defaultValue.ToString());
47+
}
48+
49+
public ICsvColumn GetColumn(string title, string defaultValue)
50+
{
51+
var column = columns.SingleOrDefault(c => c.Title == title);
52+
if (column == null)
53+
{
54+
column = new CsvColumn(title, defaultValue);
55+
columns.Add(column);
56+
}
57+
return column;
58+
}
59+
60+
public void AddRow(params CsvCell[] cells)
61+
{
62+
rows.Add(new CsvRow(cells));
63+
}
64+
65+
public void CreateLines(Action<string> onLine)
66+
{
67+
CreateHeaderLine(onLine);
68+
foreach (var row in rows)
69+
{
70+
CreateRowLine(row, onLine);
71+
}
72+
}
73+
74+
private void CreateHeaderLine(Action<string> onLine)
75+
{
76+
onLine(string.Join(Sep, columns.Select(c => c.Title).ToArray()));
77+
}
78+
79+
private void CreateRowLine(CsvRow row, Action<string> onLine)
80+
{
81+
onLine(string.Join(Sep, columns.Select(c => GetRowCellValue(row, c)).ToArray()));
82+
}
83+
84+
private string GetRowCellValue(CsvRow row, CsvColumn column)
85+
{
86+
var cell = row.Cells.SingleOrDefault(c => c.Column == column);
87+
if (cell == null) return column.DefaultValue;
88+
return cell.Value;
89+
}
90+
}
91+
92+
public class CsvCell
93+
{
94+
public CsvCell(ICsvColumn column, float value)
95+
: this(column, value.ToString())
96+
{
97+
}
98+
99+
public CsvCell(ICsvColumn column, string value)
100+
{
101+
Column = column;
102+
Value = value;
103+
}
104+
105+
public ICsvColumn Column { get; }
106+
public string Value { get; }
107+
}
108+
109+
public interface ICsvColumn
110+
{
111+
string Title { get; }
112+
string DefaultValue { get; }
113+
}
114+
115+
public class CsvColumn : ICsvColumn
116+
{
117+
public CsvColumn(string title, string defaultValue)
118+
{
119+
Title = title;
120+
DefaultValue = defaultValue;
121+
}
122+
123+
public string Title { get; }
124+
public string DefaultValue { get; }
125+
}
126+
127+
public class CsvRow
128+
{
129+
public CsvRow(CsvCell[] cells)
130+
{
131+
Cells = cells;
132+
}
133+
134+
public CsvCell[] Cells { get; }
135+
}
136+
}

Tools/TranscriptAnalysis/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public static void Main(string[] args)
3131
};
3232

3333
var header = reader.GetHeader<OverwatchCodexHeader>("cdx_h");
34-
var receivers = new ReceiverSet(log, reader, header);
34+
var receivers = new ReceiverSet(args[0], log, reader, header);
3535
receivers.InitAll();
3636

3737
var processor = new Processor(log, reader);

0 commit comments

Comments
 (0)