Skip to content

Commit c57dc4d

Browse files
committed
concurrent purchases
1 parent 4c75ceb commit c57dc4d

File tree

3 files changed

+51
-66
lines changed

3 files changed

+51
-66
lines changed

Tools/AutoClient/Configuration.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ public class Configuration
1313
[Uniform("datapath", "dp", "DATAPATH", false, "Root path where all data files will be saved.")]
1414
public string DataPath { get; set; } = "datapath";
1515

16+
[Uniform("purchases", "np", "PURCHASES", false, "Number of concurrent purchases.")]
17+
public int NumConcurrentPurchases { get; set; } = 1;
18+
1619
[Uniform("contract-duration", "cd", "CONTRACTDURATION", false, "contract duration in minutes. (default 30)")]
1720
public int ContractDurationMinutes { get; set; } = 30;
1821

Tools/AutoClient/Program.cs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,18 @@ public static class Program
88
{
99
public static async Task Main(string[] args)
1010
{
11-
1211
var cts = new CancellationTokenSource();
1312
var cancellationToken = cts.Token;
1413
Console.CancelKeyPress += (sender, args) => cts.Cancel();
1514

1615
var uniformArgs = new ArgsUniform<Configuration>(PrintHelp, args);
1716
var config = uniformArgs.Parse(true);
1817

18+
if (config.NumConcurrentPurchases < 1)
19+
{
20+
throw new Exception("Number of concurrent purchases must be > 0");
21+
}
22+
1923
var log = new LogSplitter(
2024
new FileLog(Path.Combine(config.LogPath, "autoclient")),
2125
new ConsoleLog()
@@ -36,8 +40,20 @@ public static async Task Main(string[] args)
3640

3741
await CheckCodex(codex, log);
3842

39-
var runner = new Runner(log, client, address, codex, cancellationToken, config, imgGenerator);
40-
await runner.Run();
43+
var purchasers = new List<Purchaser>();
44+
for (var i = 0; i < config.NumConcurrentPurchases; i++)
45+
{
46+
purchasers.Add(
47+
new Purchaser(new LogPrefixer(log, $"({i}) "), client, address, codex, cancellationToken, config, imgGenerator)
48+
);
49+
}
50+
51+
var delayPerPurchaser = TimeSpan.FromMinutes(config.ContractDurationMinutes) / config.NumConcurrentPurchases;
52+
foreach (var purchaser in purchasers)
53+
{
54+
purchaser.Start();
55+
await Task.Delay(delayPerPurchaser);
56+
}
4157

4258
log.Log("Done.");
4359
}
@@ -61,16 +77,4 @@ private static void PrintHelp()
6177
{
6278
Console.WriteLine("Generates fake data and creates Codex storage contracts for it.");
6379
}
64-
65-
private static IPluginTools CreateTools(ILog log, Configuration config)
66-
{
67-
var configuration = new KubernetesWorkflow.Configuration(
68-
null,
69-
operationTimeout: TimeSpan.FromMinutes(10),
70-
retryDelay: TimeSpan.FromSeconds(10),
71-
kubernetesNamespace: "notUsed!#");
72-
73-
var result = new EntryPoint(log, configuration, config.DataPath, new DefaultTimeSet());
74-
return result.Tools;
75-
}
7680
}

Tools/AutoClient/Runner.cs renamed to Tools/AutoClient/Purchaser.cs

Lines changed: 29 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace AutoClient
88
{
9-
public class Runner
9+
public class Purchaser
1010
{
1111
private readonly ILog log;
1212
private readonly HttpClient client;
@@ -16,7 +16,7 @@ public class Runner
1616
private readonly Configuration config;
1717
private readonly ImageGenerator generator;
1818

19-
public Runner(ILog log, HttpClient client, Address address, CodexApi codex, CancellationToken ct, Configuration config, ImageGenerator generator)
19+
public Purchaser(ILog log, HttpClient client, Address address, CodexApi codex, CancellationToken ct, Configuration config, ImageGenerator generator)
2020
{
2121
this.log = log;
2222
this.client = client;
@@ -27,33 +27,25 @@ public Runner(ILog log, HttpClient client, Address address, CodexApi codex, Canc
2727
this.generator = generator;
2828
}
2929

30-
public async Task Run()
30+
public void Start()
31+
{
32+
Task.Run(Worker);
33+
}
34+
35+
private async Task Worker()
3136
{
3237
while (!ct.IsCancellationRequested)
3338
{
34-
log.Log("New run!");
35-
36-
try
37-
{
38-
await DoRun();
39-
40-
log.Log("Run succcessful.");
41-
}
42-
catch (Exception ex)
43-
{
44-
log.Error("Exception during run: " + ex);
45-
}
46-
47-
await FixedShortDelay();
39+
var pid = await StartNewPurchase();
40+
await WaitTillFinished(pid);
4841
}
4942
}
5043

51-
private async Task DoRun()
44+
private async Task<string> StartNewPurchase()
5245
{
5346
var file = await CreateFile();
5447
var cid = await UploadFile(file);
55-
var pid = await RequestStorage(cid);
56-
await WaitUntilStarted(pid);
48+
return await RequestStorage(cid);
5749
}
5850

5951
private async Task<string> CreateFile()
@@ -66,8 +58,7 @@ private async Task<ContentId> UploadFile(string filename)
6658
// Copied from CodexNode :/
6759
using var fileStream = File.OpenRead(filename);
6860

69-
var logMessage = $"Uploading file {filename}...";
70-
log.Log(logMessage);
61+
log.Log($"Uploading file {filename}...");
7162
var response = await codex.UploadAsync(fileStream, ct);
7263

7364
if (string.IsNullOrEmpty(response)) FrameworkAssert.Fail("Received empty response.");
@@ -91,7 +82,7 @@ private async Task<string> RequestStorage(ContentId cid)
9182
Tolerance = config.HostTolerance
9283
}, ct);
9384

94-
log.Log("Response: " + result);
85+
log.Log("Purchase ID: " + result);
9586

9687
return result;
9788
}
@@ -108,59 +99,46 @@ private async Task<string> RequestStorage(ContentId cid)
10899
if (!string.IsNullOrEmpty(sp.Error)) log.Log($"Purchase {pid} error is {sp.Error}");
109100
return sp.State;
110101
}
111-
catch (Exception ex)
102+
catch
112103
{
113104
return null;
114105
}
115106
}
116107

117-
private async Task WaitUntilStarted(string pid)
108+
private async Task WaitTillFinished(string pid)
118109
{
119-
log.Log("Waiting till contract is started, or expired...");
110+
log.Log("Waiting...");
120111
try
121112
{
122113
var emptyResponseTolerance = 10;
123114
while (true)
124115
{
125-
await FixedShortDelay();
126-
var status = await GetPurchaseState(pid);
116+
var status = (await GetPurchaseState(pid))?.ToLowerInvariant();
127117
if (string.IsNullOrEmpty(status))
128118
{
129119
emptyResponseTolerance--;
130120
if (emptyResponseTolerance == 0)
131121
{
132-
log.Log("Received 10 empty responses. Applying expiry delay, then carrying on.");
122+
log.Log("Received 10 empty responses. Stop tracking this purchase.");
133123
await ExpiryTimeDelay();
134124
return;
135125
}
136-
await FixedShortDelay();
137126
}
138127
else
139128
{
140-
if (status.Contains("pending") || status.Contains("submitted"))
129+
if (status.Contains("cancel") ||
130+
status.Contains("error") ||
131+
status.Contains("finished"))
141132
{
142-
await FixedShortDelay();
143-
}
144-
else if (status.Contains("started"))
145-
{
146-
log.Log("Started.");
147-
await FixedDurationDelay();
148-
}
149-
else if (status.Contains("finished"))
150-
{
151-
log.Log("Purchase finished.");
152133
return;
153134
}
154-
else if (status.Contains("error"))
135+
if (status.Contains("started"))
155136
{
156-
await FixedShortDelay();
157-
return;
158-
}
159-
else
160-
{
161-
await FixedShortDelay();
137+
await FixedDurationDelay();
162138
}
163139
}
140+
141+
await FixedShortDelay();
164142
}
165143
}
166144
catch (Exception ex)
@@ -172,17 +150,17 @@ private async Task WaitUntilStarted(string pid)
172150

173151
private async Task FixedDurationDelay()
174152
{
175-
await Task.Delay(config.ContractDurationMinutes * 60 * 1000);
153+
await Task.Delay(config.ContractDurationMinutes * 60 * 1000, ct);
176154
}
177155

178156
private async Task ExpiryTimeDelay()
179157
{
180-
await Task.Delay(config.ContractExpiryMinutes * 60 * 1000);
158+
await Task.Delay(config.ContractExpiryMinutes * 60 * 1000, ct);
181159
}
182160

183161
private async Task FixedShortDelay()
184162
{
185-
await Task.Delay(15 * 1000);
163+
await Task.Delay(15 * 1000, ct);
186164
}
187165
}
188166
}

0 commit comments

Comments
 (0)