-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathIngestionService.cs
127 lines (106 loc) · 4.43 KB
/
IngestionService.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Http;
using System.Threading.Tasks;
using Newtonsoft.Json;
using RabbitMQ.Client;
using Serilog;
using Shared;
namespace Ingestion
{
public class IngestionService
{
private IDictionary<string,IngestionSource> _sourcesList;
private const string SourcesConfigRelativePath = "../../../../../config/IngestionSourcesTest.json";
private ILogger _logger;
private static IConnection _conn;
private static IModel _channel;
public IngestionService(IDictionary<string, IngestionSource> sources, ILogger logger)
{
_logger = logger;
IConnectionFactory factory = new ConnectionFactory() {HostName = "localhost", UserName = "guest", Password = "guest"};
_conn = factory.CreateConnection();
_channel = _conn.CreateModel();
_sourcesList = sources;
}
public IngestionService(ILogger logger)
{
_logger = logger;
_sourcesList = LoadSourcesFromJson(SourcesConfigRelativePath);
IConnectionFactory factory = new ConnectionFactory() {HostName = "localhost", UserName = "guest", Password = "guest"};
_conn = factory.CreateConnection();
_channel = _conn.CreateModel();
}
private async Task<string> GetData (string url)
{
var data = "";
using (var client = new HttpClient())
{
using (var res = await client.GetAsync(url))
{
using (var content = res.Content)
{
data = await content.ReadAsStringAsync();
}
}
}
return data;
}
public IDictionary<string, IngestionSource> LoadSourcesFromJson(string path)
{
IDictionary<string, IngestionSource> sources = new Dictionary<string, IngestionSource>();
// deserialize JSON directly from a file
if (File.Exists(path))
{
string JSONText = File.ReadAllText(path);
IList<IngestionSource> listsources = JsonConvert.DeserializeObject<IList<IngestionSource>>(JSONText);
foreach (IngestionSource s in listsources) {
sources.Add(s.Name, s);
}
}
else
{
throw new Exception("Cant find file");
}
return sources;
}
public void Ingest(string sourceId) {
if (!_sourcesList.ContainsKey(sourceId)) throw new Exception("Key was not found, source is not defined");
_logger.Information("Starting Ingestor");
var data = GetData(_sourcesList[sourceId].ApiUrl).Result;
ForwardMessageToRabbitMQ(data, _sourcesList[sourceId].ForwardMessageQueue);
_logger.Information("Stopping Ingestor");
}
public void ForwardMessageToRabbitMQ(string message, string queue)
{
using (_conn)
{
using (_channel)
{
var exchange = "mono.data.received";
_channel.QueueDeclare(queue, true, false, false, null);
_channel.ExchangeDeclare(exchange, "fanout");
_channel.QueueBind(queue, exchange, "");
var envelope = new Envelope<string>(Guid.NewGuid(), message);
var envelopedMessage = JsonConvert.SerializeObject(envelope);
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(envelopedMessage);
_channel.BasicPublish(exchange, "", _channel.CreateBasicProperties(), messageBodyBytes);
Console.WriteLine(" [x] Sent '{0}':'{1}'", exchange, message);
}
}
}
public string SerializeObjects(object obj)
{
return JsonConvert.SerializeObject(obj);
}
public void StoreObject(string obj) {
try {
File.WriteAllText(SourcesConfigRelativePath, obj);
} catch(Exception exception)
{
Console.WriteLine("Couldnt write to file: " + exception.Message + "Message");
}
}
}
}