Skip to content

Commit 4f305a6

Browse files
author
Bogdan
committed
wip ingestion and refiner
1 parent b8c64fc commit 4f305a6

File tree

10 files changed

+191
-116
lines changed

10 files changed

+191
-116
lines changed

Src/Ingestion/IngestionService.cs

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public IngestionService(ILogger logger)
3333

3434
private async Task<string> GetDataAsync(string baseUrl)
3535
{
36-
string data = "";
36+
var data = "";
3737
HttpClient client = new HttpClient();
3838
try
3939
{
@@ -62,7 +62,7 @@ public IDictionary<string, IngestionSource> LoadSourcesFromJson(string path)
6262
string JSONText = File.ReadAllText(path);
6363
IList<IngestionSource> listsources = JsonConvert.DeserializeObject<IList<IngestionSource>>(JSONText);
6464
foreach (IngestionSource s in listsources) {
65-
sources.Add(s.name, s);
65+
sources.Add(s.Name, s);
6666
}
6767
}
6868
else
@@ -72,35 +72,40 @@ public IDictionary<string, IngestionSource> LoadSourcesFromJson(string path)
7272
return sources;
7373
}
7474

75-
public void Ingest(string sourceID) {
75+
public void Ingest(string sourceId) {
7676

77-
if (!_sourcesList.ContainsKey(sourceID)) throw new Exception("Key was not found, source is not defined");
78-
// Fetch data
79-
//Task<string> data = GetDataAsync(_sourcesList[sourceID].ApiUrl);
80-
//string dataString = data.Result.ToString();
81-
string dataString = "message test";
77+
if (!_sourcesList.ContainsKey(sourceId)) throw new Exception("Key was not found, source is not defined");
8278
_logger.Information("Starting Ingestor");
83-
ForwardMessageToRabbitMQ(dataString);
79+
80+
var data = GetDataAsync(_sourcesList[sourceId].ApiUrl).ToString();
81+
82+
ForwardMessageToRabbitMQ(data, _sourcesList[sourceId].ForwardMessageQueue);
83+
8484
_logger.Information("Stopping Ingestor");
8585
}
8686

87-
public void ForwardMessageToRabbitMQ(string message)
87+
public void ForwardMessageToRabbitMQ(string message, string queue)
8888
{
89-
var factory = new ConnectionFactory() { HostName = _rabbitmqHostname };
89+
var factory = new ConnectionFactory();
9090
using (IConnection conn = factory.CreateConnection())
9191
{
9292
using (IModel channel = conn.CreateModel())
9393
{
94-
var exchange = "grabid_exchange";
95-
var routingKey = "mono.data.received";
96-
97-
channel.ExchangeDeclare(exchange: exchange, type: "topic");
94+
var exchange = "mono.data.received";
95+
96+
channel.QueueDeclare(queue, true, false, false, null);
97+
98+
channel.ExchangeDeclare(exchange: exchange, type: "fanout");
99+
100+
channel.QueueBind(queue, exchange, "");
101+
98102
var envelope = new Envelope<string>(Guid.NewGuid(), message);
99103
var envelopedMessage = JsonConvert.SerializeObject(envelope);
104+
100105
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(envelopedMessage);
101106

102-
channel.BasicPublish(exchange, routingKey, null, messageBodyBytes);
103-
Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
107+
channel.BasicPublish(exchange, null, null, messageBodyBytes);
108+
Console.WriteLine(" [x] Sent '{0}':'{1}'", exchange, message);
104109
}
105110
}
106111
}

Src/Ingestion/IngestionSource.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace Ingestion
66
{
77
public class IngestionSource
88
{
9-
public string name { get; set; }
9+
public string Name { get; set; }
1010
public string ApiUrl { get; set; }
1111
public string Credential { get; set; }
1212
public string ForwardMessageQueue { get; set; }

Src/Refiner/DataProcessor.cs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
using Newtonsoft.Json;
2+
using System;
3+
using Newtonsoft.Json.Linq;
4+
using Shared;
5+
6+
namespace Refiner
7+
{
8+
public class DataProcessor
9+
{
10+
private readonly Envelope<string> _data;
11+
12+
public DataProcessor(Envelope<string> data)
13+
{
14+
_data = data;
15+
}
16+
17+
public JArray ParseEnvelope ()
18+
{
19+
dynamic cleanData = new JArray();
20+
dynamic envelopeData = JToken.Parse(_data.Payload);
21+
22+
foreach (var obj in envelopeData)
23+
{
24+
dynamic entry = new JObject();
25+
26+
entry.Reseller = new JArray();
27+
entry.User = new JArray();
28+
entry.Subscription = new JArray();
29+
entry.Bill = new JArray();
30+
31+
entry.Reseller.Id = obj.resellerId;
32+
entry.Reseller.Name = obj.resellerName;
33+
entry.Reseller.Discount = obj.discount;
34+
35+
entry.User.AccountId = obj.accountId;
36+
37+
entry.Subscription.Name = obj.subscriptionName;
38+
entry.Subscription.Type = obj.type;
39+
entry.Subscription.State = obj.state;
40+
entry.Subscription.Price = obj.price;
41+
entry.Subscription.OnExpiry = obj.onExpiry;
42+
entry.Subscription.StartDate = obj.startDate;
43+
entry.Subscription.ExpiryDate = obj.expiryDate;
44+
45+
entry.Bill.BillingDate = obj.billingDate;
46+
entry.DiscountedPrice = obj.discountedPrice;
47+
entry.CurrencyName = obj.currencyName;
48+
entry.CurrencyISO = obj.code;
49+
50+
cleanData.Add(entry);
51+
}
52+
53+
return cleanData;
54+
}
55+
56+
public void StoreCleanData(JArray data)
57+
{
58+
//add to mongo
59+
}
60+
61+
}
62+
}

Src/Refiner/Program.cs

Lines changed: 50 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using System.Runtime.Loader;
1212
using System.Text;
1313
using System.Threading;
14+
using System.Threading.Tasks;
1415

1516
namespace Refiner
1617
{
@@ -32,56 +33,61 @@ static void Main(string[] args)
3233
.CreateLogger();
3334

3435
Log.Information("Starting...");
35-
36-
var factory = new ConnectionFactory() { HostName = "rabbit.docker" };
37-
conn = factory.CreateConnection();
38-
channel = conn.CreateModel();
39-
40-
var exchange = "grabid_exchange";
41-
var routingKey = "mono.data.received";
42-
43-
channel.ExchangeDeclare(exchange: exchange, type: "topic");
44-
45-
var queueName = channel.QueueDeclare().QueueName;
46-
47-
channel.QueueBind(queue: queueName,
48-
exchange: exchange,
49-
routingKey: routingKey);
50-
51-
52-
var consumer = new EventingBasicConsumer(channel);
53-
54-
consumer.Received += (model, ea) =>
55-
{
56-
var body = ea.Body;
57-
var message = Encoding.UTF8.GetString(body);
58-
var key = ea.RoutingKey;
59-
Console.WriteLine($" [x] Received '{key}':'{message}'");
60-
var envelope = JsonConvert.DeserializeObject<Envelope<string>>(message);
61-
62-
dynamic json = JObject.Parse(envelope.Payload);
63-
string messageString = json.message;
64-
string userString = json.users;
65-
string[] userArray = userString.Split(",");
66-
string[] messages = userArray.Select(user => { return $"{messageString} {user}"; }).ToArray();
67-
var returnEnvelope = new Envelope<string[]>(envelope.Id, messages);
68-
string newMessage = JsonConvert.SerializeObject(returnEnvelope);
69-
byte[] messageBodyBytes = Encoding.UTF8.GetBytes(newMessage);
70-
71-
channel.BasicPublish(exchange, "mono.data.refined", null, messageBodyBytes);
72-
Console.WriteLine($" [x] Sent 'mono.data.refined':'{newMessage}'");
73-
74-
};
75-
channel.BasicConsume(queue: queueName,
76-
autoAck: true,
77-
consumer: consumer);
78-
36+
37+
RetrieveMessageFromQueue("mono.data.received");
38+
7939
Log.Information("Started");
8040

8141
WaitHandle.WaitOne();
8242
}
8343

44+
public static void RetrieveMessageFromQueue (string queue)
45+
{
46+
var factory = new ConnectionFactory();
47+
using (IConnection conn = factory.CreateConnection())
48+
{
49+
using (IModel channel = conn.CreateModel())
50+
{
51+
var exchange = "mono.data.received";
52+
53+
channel.QueueDeclare(queue, true, false, false, null);
54+
55+
channel.ExchangeDeclare(exchange: exchange, type: "fanout");
56+
57+
channel.QueueBind(queue, exchange, "");
58+
59+
var consumer = new EventingBasicConsumer(channel);
60+
61+
consumer.Received += (model, ea) =>
62+
{
63+
var body = ea.Body;
64+
var message = Encoding.UTF8.GetString(body);
65+
var key = ea.RoutingKey;
66+
Console.WriteLine($" [x] Received '{key}':'{message}'");
67+
var envelope = JsonConvert.DeserializeObject<Envelope<string>>(message);
68+
69+
var processor = new DataProcessor(envelope);
70+
var cleanData = processor.ParseEnvelope();
71+
72+
ForwardMessageToQueue(channel, "mono.data.refined", cleanData);
73+
74+
Console.WriteLine($" [x] Sent 'mono.data.refined':'{cleanData}'");
75+
};
76+
77+
channel.BasicConsume(queue: queue,
78+
autoAck: true,
79+
consumer: consumer);
80+
}
81+
}
82+
}
8483

84+
public static void ForwardMessageToQueue (IModel channel, string queue, JArray cleanData)
85+
{
86+
var message = JsonConvert.SerializeObject(cleanData);
87+
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(message);
88+
channel.BasicPublish(queue, null, null, messageBodyBytes);
89+
}
90+
8591
private static void Exit()
8692
{
8793
Log.Information("Exiting...");

Src/Refiner/Refiner.csproj

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,18 @@
11
<Project Sdk="Microsoft.NET.Sdk">
2-
32
<PropertyGroup>
43
<OutputType>Exe</OutputType>
54
<TargetFramework>netcoreapp2.1</TargetFramework>
65
</PropertyGroup>
7-
86
<ItemGroup>
97
<None Remove="Dockerfile" />
108
</ItemGroup>
11-
129
<ItemGroup>
1310
<Content Include="Dockerfile" />
1411
</ItemGroup>
15-
1612
<ItemGroup>
1713
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
1814
<PackageReference Include="RabbitMQ.Client" Version="5.1.0" />
1915
<PackageReference Include="Serilog" Version="2.7.1" />
2016
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
2117
</ItemGroup>
22-
23-
</Project>
18+
</Project>

Test/IngestionServiceTest.cs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using NUnit.Framework;
22
using Ingestion;
33
using System.Collections.Generic;
4+
using RabbitMQ.Client;
5+
using RabbitMQ.Client.Exceptions;
46
using Serilog;
57

68
namespace Tests
@@ -14,29 +16,39 @@ public void Setup()
1416
{
1517
Dictionary<string, IngestionSource> sources = new Dictionary<string, IngestionSource>();
1618
sources.Add(
17-
"Test",new IngestionSource() { ApiUrl = "Example", Credential = "blabla", ForwardMessageQueue = "monoqueue", name = "Test"}
19+
"Test",new IngestionSource() { ApiUrl = "Example", Credential = "blabla", ForwardMessageQueue = "monoqueue", Name = "Test"}
1820
);
1921
sources.Add(
20-
"Test2", new IngestionSource() { ApiUrl = "Example2", Credential = "blabla2", ForwardMessageQueue = "monoqueue2", name = "Test2" }
22+
"Test2", new IngestionSource() { ApiUrl = "Example2", Credential = "blabla2", ForwardMessageQueue = "monoqueue2", Name = "Test2" }
23+
);
24+
sources.Add(
25+
"FakeData", new IngestionSource() { ApiUrl = "https://jsonplaceholder.typicode.com/posts", Credential = "", ForwardMessageQueue = "mono.data.received", Name = "FakeData" }
2126
);
2227

2328
service = new IngestionService(sources, new LoggerConfiguration().WriteTo.Console().CreateLogger());
2429
}
2530

2631
[Test]
27-
public void TestCanLoadSourcesFromJSONFile()
32+
public void CanIngestFromEndpoint()
2833
{
29-
IDictionary<string, IngestionSource> sources = service.LoadSourcesFromJson("../../../../config/IngestionSourcesTest.json");
30-
Assert.AreEqual(sources["ResellerData"].name, "ResellerData");
31-
Assert.AreEqual(sources["ResellerData2"].name, "ResellerData2");
34+
service.Ingest("FakeData");
35+
using (IConnection conn = new ConnectionFactory() {HostName = "localhost", UserName = "guest", Password = "guest"}.CreateConnection())
36+
{
37+
using (IModel channel = conn.CreateModel())
38+
{
39+
BasicGetResult result = channel.BasicGet("mono.data.received", false);
40+
Assert.IsNotNull(result);
41+
}
42+
}
3243
}
3344

3445
[Test]
35-
public void TestCanForwardMessageToQueue()
46+
public void TestCanLoadSourcesFromJSONFile()
3647
{
37-
service.LoadSourcesFromJson("../../../../config/IngestionSourcesTest.json");
38-
service.ForwardMessageToRabbitMQ("");
48+
IDictionary<string, IngestionSource> sources = service.LoadSourcesFromJson("../../../../config/IngestionSourcesTest.json");
49+
Assert.AreEqual(sources["ResellerData"].Name, "ResellerData");
50+
Assert.AreEqual(sources["ResellerData2"].Name, "ResellerData2");
51+
Assert.AreEqual(sources["FakeData"].Name, "FakeData");
3952
}
40-
4153
}
42-
}
54+
}

Test/Tests.csproj

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
2-
1+
<Project Sdk="Microsoft.NET.Sdk">
32
<PropertyGroup>
43
<TargetFramework>netcoreapp2.1</TargetFramework>
5-
64
<IsPackable>false</IsPackable>
75
</PropertyGroup>
8-
96
<ItemGroup>
10-
<PackageReference Include="nunit" Version="3.10.1" />
7+
<PackageReference Include="NUnit" Version="3.11.0" />
118
<PackageReference Include="NUnit3TestAdapter" Version="3.10.0" />
129
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.8.0" />
1310
</ItemGroup>
14-
1511
<ItemGroup>
1612
<ProjectReference Include="..\Src\Ingestion\Ingestion.csproj" />
1713
</ItemGroup>
18-
1914
</Project>

config/IngestionSourcesTest.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,11 @@
1010
"ApiUrl": "https://hal.mono.net/api/v1/admin",
1111
"Credential": "b68064b8cf25842a1ea86cd61a7ee5fa",
1212
"ForwardMessageQueue": "mono.queue.resellers"
13+
},
14+
{
15+
"name": "FakeData",
16+
"ApiUrl": "https://jsonplaceholder.typicode.com/posts",
17+
"Credential": "",
18+
"ForwardMessageQueue": "mono.queue.test"
1319
}
1420
]

0 commit comments

Comments
 (0)