From 0790103b2fcfc45f8937577ae6f9e8b982c6df3c Mon Sep 17 00:00:00 2001 From: alex855k Date: Fri, 23 Nov 2018 10:09:49 +0100 Subject: [PATCH 01/19] (#12) Add repository class IngestionService that can load configurations from a jsonfile --- Src/Ingestion/IngestionService.cs | 58 +++++++++++++++++++++++++++++++ Src/Ingestion/IngestionSource.cs | 13 +++++++ 2 files changed, 71 insertions(+) create mode 100644 Src/Ingestion/IngestionService.cs create mode 100644 Src/Ingestion/IngestionSource.cs diff --git a/Src/Ingestion/IngestionService.cs b/Src/Ingestion/IngestionService.cs new file mode 100644 index 0000000..e6697a1 --- /dev/null +++ b/Src/Ingestion/IngestionService.cs @@ -0,0 +1,58 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; +using Newtonsoft.Json; + +namespace Ingestion +{ + public class IngestionService + { + + private IList _sourcesList; + private const string _sourcesConfigRelativePath = "config/ingestion_sources.json"; + + public IngestionService(IList sources) + { + _sourcesList = sources; + } + + public IngestionService() + { + _sourcesList = LoadSourcesFromJson(); + + } + + // Due to sercurity concerns sources are either injected + //or loaded from config files + private IList LoadSourcesFromJson() + { + string json = ""; + + // deserialize JSON directly from a file + using (StreamReader file = File.OpenText(_sourcesConfigRelativePath)) + { + + } + + List videogames = JsonConvert.DeserializeObject>(json); + + Console.WriteLine(string.Join(", ", videogames.ToArray())); + return null; + } + + public void Ingest(int sourceID) { + foreach (IngestionSource source in _sourcesList) { + + } + + PutInMessageQueue(); + + } + + private void PutInMessageQueue() + { + throw new NotImplementedException(); + } + } +} diff --git a/Src/Ingestion/IngestionSource.cs b/Src/Ingestion/IngestionSource.cs new file mode 100644 index 0000000..78a0a07 --- /dev/null +++ b/Src/Ingestion/IngestionSource.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Ingestion +{ + public class IngestionSource + { + public string ApiUrl { get; set; } + public string Credential { get; set; } + public string ForwardMessageQueue { get; set; } + } +} From 9b0facf06f3ffb27a88496ad6926e4b798c0f4b2 Mon Sep 17 00:00:00 2001 From: alex855k Date: Sat, 24 Nov 2018 20:03:45 +0100 Subject: [PATCH 02/19] (#12) Add method load configurations for sources, from json files --- Src/Ingestion/IngestionService.cs | 104 +++++++++++++++---- Src/Ingestion/IngestionSource.cs | 1 + Src/Ingestion/Program.cs | 25 +---- Tests/Ingestion.Tests/Ingestion.Tests.csproj | 4 + 4 files changed, 90 insertions(+), 44 deletions(-) diff --git a/Src/Ingestion/IngestionService.cs b/Src/Ingestion/IngestionService.cs index e6697a1..a3a6ed1 100644 --- a/Src/Ingestion/IngestionService.cs +++ b/Src/Ingestion/IngestionService.cs @@ -1,58 +1,118 @@ using System; using System.Collections.Generic; using System.IO; -using System.Text; +using System.Net.Http; +using System.Threading.Tasks; using Newtonsoft.Json; +using RabbitMQ.Client; +using Serilog; +using Shared; namespace Ingestion { + //Read about Rabbitmq - https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html + // for more advanced Message Bus setup - http://masstransit-project.com/MassTransit/ which integrates with RabbitMQ as well public class IngestionService { - - private IList _sourcesList; + private IDictionary _sourcesList; private const string _sourcesConfigRelativePath = "config/ingestion_sources.json"; + private ILogger _logger; + private static string rabbitmqHostname = "rabbit.docker"; - public IngestionService(IList sources) + public IngestionService(IDictionary sources) { + + _logger = new LoggerConfiguration().WriteTo.Console().CreateLogger(); _sourcesList = sources; } + private async Task GetDataAsync(string baseUrl) + { + string data = ""; + + HttpClient client = new HttpClient(); + + try + { + data = await client.GetStringAsync(baseUrl); + + Console.WriteLine(data); + } + catch (HttpRequestException e) + { + Console.WriteLine("\nException Caught!"); + Console.WriteLine("Message :{0} ", e.Message); + } + client.Dispose(); + return data; + } public IngestionService() { + _logger = new LoggerConfiguration().WriteTo.Console().CreateLogger(); _sourcesList = LoadSourcesFromJson(); - } // Due to sercurity concerns sources are either injected //or loaded from config files - private IList LoadSourcesFromJson() - { - string json = ""; - + private IDictionary LoadSourcesFromJson() + { + IDictionary sources = null; // deserialize JSON directly from a file - using (StreamReader file = File.OpenText(_sourcesConfigRelativePath)) - { - + if (File.Exists(_sourcesConfigRelativePath)) { + string JSONText = File.ReadAllText(_sourcesConfigRelativePath); + sources = JsonConvert.DeserializeObject>(JSONText); } + else { + throw new Exception("Cant find file"); + } + return sources; + } + + public void Ingest(int sourceID) { - List videogames = JsonConvert.DeserializeObject>(json); + if (!_sourcesList.ContainsKey(sourceID)) throw new Exception("Key was not found, source is not defined"); + // Fetch data + Task data = GetDataAsync(_sourcesList[sourceID].ApiUrl); + string dataString = data.Result.ToString(); - Console.WriteLine(string.Join(", ", videogames.ToArray())); - return null; + _logger.Information("Starting Ingestor"); + ForwardMessageToRabbitMQ(dataString); + _logger.Information("Stopping Ingestor"); } - public void Ingest(int sourceID) { - foreach (IngestionSource source in _sourcesList) { - - } + private void ForwardMessageToRabbitMQ(string message) + { + var factory = new ConnectionFactory() { HostName = rabbitmqHostname }; + using (IConnection conn = factory.CreateConnection()) + { + using (IModel channel = conn.CreateModel()) + { + var exchange = "grabid_exchange"; + var routingKey = "mono.data.received"; - PutInMessageQueue(); + channel.ExchangeDeclare(exchange: exchange, type: "topic"); + var envelope = new Envelope(Guid.NewGuid(), message); + var envelopedMessage = JsonConvert.SerializeObject(envelope); + byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(envelopedMessage); + channel.BasicPublish(exchange, routingKey, null, messageBodyBytes); + Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message); + } + } } - private void PutInMessageQueue() + public string SerializeObjects(object obj) { - throw new NotImplementedException(); + return JsonConvert.SerializeObject(obj); + } + + public void StoreObject(string obj) { + try { + File.WriteAllText(_sourcesConfigRelativePath, obj); + } catch(Exception exception) + { + Console.WriteLine("Couldnt write to file: " + exception.Message + "Message"); + } } } } diff --git a/Src/Ingestion/IngestionSource.cs b/Src/Ingestion/IngestionSource.cs index 78a0a07..ee7d157 100644 --- a/Src/Ingestion/IngestionSource.cs +++ b/Src/Ingestion/IngestionSource.cs @@ -6,6 +6,7 @@ namespace Ingestion { public class IngestionSource { + public int id { get; set; } public string ApiUrl { get; set; } public string Credential { get; set; } public string ForwardMessageQueue { get; set; } diff --git a/Src/Ingestion/Program.cs b/Src/Ingestion/Program.cs index 8374801..8a5694b 100644 --- a/Src/Ingestion/Program.cs +++ b/Src/Ingestion/Program.cs @@ -9,30 +9,11 @@ class Program { static void Main(string[] args) { - //Read about Rabbitmq - https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html - // for more advanced Message Bus setup - http://masstransit-project.com/MassTransit/ which integrates with RabbitMQ as well - Console.WriteLine($"Starting Ingestor"); - var factory = new ConnectionFactory() { HostName = "rabbit.docker" }; + // - using (IConnection conn = factory.CreateConnection()) - { - using (IModel channel = conn.CreateModel()) - { - var exchange = "grabid_exchange"; - //This would properly be a http call to get new data from Mono - var message = "{'message':'Hello ','users':'Alexander, Bogdan, Elitsa, David'}"; - var routingKey = "mono.data.received"; - channel.ExchangeDeclare(exchange: exchange, type: "topic"); - var envelope = new Envelope(Guid.NewGuid(), message); - var envelopedMessage = JsonConvert.SerializeObject(envelope); - byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(envelopedMessage); - - channel.BasicPublish(exchange,routingKey , null, messageBodyBytes); - Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message); - } - } - Console.WriteLine($"Stopping Ingestor"); + IngestionService ingestService = new IngestionService(); } + } } diff --git a/Tests/Ingestion.Tests/Ingestion.Tests.csproj b/Tests/Ingestion.Tests/Ingestion.Tests.csproj index 9f5c4f4..1acf710 100644 --- a/Tests/Ingestion.Tests/Ingestion.Tests.csproj +++ b/Tests/Ingestion.Tests/Ingestion.Tests.csproj @@ -4,4 +4,8 @@ netstandard2.0 + + + + From 2b626f14b2e7b9c9d093d0eea07b439a8e116079 Mon Sep 17 00:00:00 2001 From: alex855k Date: Tue, 27 Nov 2018 14:44:24 +0100 Subject: [PATCH 03/19] (#12) Add capability to input argument for the particular type of ingestion source the cronjob should go to --- Src/Ingestion/Program.cs | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/Src/Ingestion/Program.cs b/Src/Ingestion/Program.cs index 8a5694b..512d1ee 100644 --- a/Src/Ingestion/Program.cs +++ b/Src/Ingestion/Program.cs @@ -1,7 +1,9 @@ using Newtonsoft.Json; using RabbitMQ.Client; +using Serilog; using Shared; using System; +using System.Threading; namespace Ingestion { @@ -9,10 +11,29 @@ class Program { static void Main(string[] args) { - // + ILogger logger = new LoggerConfiguration().WriteTo.Console().CreateLogger(); + string sourceID = ""; + /* + try { + sourceID = args[0]; + } catch (Exception e) { + logger.Error("Incorrect argument for ingestion dll crontab: " + e.Message); + } + */ + sourceID = "ResellerData"; + if (sourceID != ""){ + IngestionService ingestService = new IngestionService(logger); - - IngestionService ingestService = new IngestionService(); + try { + while (true) { + ingestService.Ingest(sourceID); + Thread.Sleep(3000); + } + } catch(Exception e) + { + logger.Error("Error: " + e.Message + "\n" + "Exception trace: " + e.StackTrace); + } + } } } From 13d4bf905c898f8c7522af9fa4d108fcb5d4e8e0 Mon Sep 17 00:00:00 2001 From: alex855k Date: Tue, 27 Nov 2018 14:47:13 +0100 Subject: [PATCH 04/19] (#12) Add tests for changes --- Src/Logger/Program.cs | 2 -- Tests/Ingestion.Tests/Class1.cs | 8 -------- Tests/Ingestion.Tests/TestIngestionService.cs | 17 +++++++++++++++++ 3 files changed, 17 insertions(+), 10 deletions(-) delete mode 100644 Tests/Ingestion.Tests/Class1.cs create mode 100644 Tests/Ingestion.Tests/TestIngestionService.cs diff --git a/Src/Logger/Program.cs b/Src/Logger/Program.cs index bf6807e..26a89ab 100644 --- a/Src/Logger/Program.cs +++ b/Src/Logger/Program.cs @@ -75,8 +75,6 @@ private static void Exit() Log.Information("Exiting..."); channel.Close(); conn.Close(); - - } } diff --git a/Tests/Ingestion.Tests/Class1.cs b/Tests/Ingestion.Tests/Class1.cs deleted file mode 100644 index ad7f63c..0000000 --- a/Tests/Ingestion.Tests/Class1.cs +++ /dev/null @@ -1,8 +0,0 @@ -using System; - -namespace Ingestion.Tests -{ - public class Class1 - { - } -} diff --git a/Tests/Ingestion.Tests/TestIngestionService.cs b/Tests/Ingestion.Tests/TestIngestionService.cs new file mode 100644 index 0000000..0df5eb8 --- /dev/null +++ b/Tests/Ingestion.Tests/TestIngestionService.cs @@ -0,0 +1,17 @@ +using System; +using NUnit; +using NUnit.Framework; +using Ingestion; + +namespace Ingestion.Tests +{ + [TestFixture] + public class TestIngestionService + { + [Test] + public void LoadJsonFromFiles() + { + IngestionService service = new IngestionService(); + } + } +} From 6b76f2b00b59ce806f8cb0e7bb41f58abdbaad4f Mon Sep 17 00:00:00 2001 From: alex855k Date: Tue, 27 Nov 2018 14:49:53 +0100 Subject: [PATCH 05/19] (#11) Add mongodb setup to docker-compose --- docker-compose.yml | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 8bd38a5..b1a5ae7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,5 @@ version: "3" services: - rabbit.docker: image: rabbitmq:3.7.8-management-alpine restart: always @@ -30,13 +29,31 @@ services: - rabbit.docker restart: always - - - - - - - - - - + mongodb-primary: + image: 'bitnami/mongodb:latest' + environment: + - MONGODB_REPLICA_SET_MODE=primary + volumes: + - 'mongodb_master_data:/bitnami' + + mongodb-secondary: + image: 'bitnami/mongodb:latest' + depends_on: + - mongodb-primary + environment: + - MONGODB_REPLICA_SET_MODE=secondary + - MONGODB_PRIMARY_HOST=mongodb-primary + - MONGODB_PRIMARY_PORT_NUMBER=27017 + + mongodb-arbiter: + image: 'bitnami/mongodb:latest' + depends_on: + - mongodb-primary + environment: + - MONGODB_REPLICA_SET_MODE=arbiter + - MONGODB_PRIMARY_HOST=mongodb-primary + - MONGODB_PRIMARY_PORT_NUMBER=27017 + +volumes: + mongodb_master_data: + driver: local \ No newline at end of file From a8d62c3fb5833f0c6467675535f47d769bd34bde Mon Sep 17 00:00:00 2001 From: alex855k Date: Tue, 27 Nov 2018 14:54:18 +0100 Subject: [PATCH 06/19] (#12) Fix method to load IngestionSources from file --- Src/Ingestion/IngestionService.cs | 58 +++++++++++++++++-------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/Src/Ingestion/IngestionService.cs b/Src/Ingestion/IngestionService.cs index a3a6ed1..ec8bfa4 100644 --- a/Src/Ingestion/IngestionService.cs +++ b/Src/Ingestion/IngestionService.cs @@ -14,23 +14,27 @@ namespace Ingestion // for more advanced Message Bus setup - http://masstransit-project.com/MassTransit/ which integrates with RabbitMQ as well public class IngestionService { - private IDictionary _sourcesList; - private const string _sourcesConfigRelativePath = "config/ingestion_sources.json"; + private IDictionary _sourcesList; + private const string _sourcesConfigRelativePath = "../../../../../config/IngestionSourcesTest.json"; private ILogger _logger; - private static string rabbitmqHostname = "rabbit.docker"; + private static string _rabbitmqHostname = "rabbit.docker"; - public IngestionService(IDictionary sources) + public IngestionService(IDictionary sources, ILogger logger) { - - _logger = new LoggerConfiguration().WriteTo.Console().CreateLogger(); + _logger = logger; _sourcesList = sources; } + + public IngestionService(ILogger logger) + { + _logger = logger; + _sourcesList = LoadSourcesFromJson(_sourcesConfigRelativePath); + } + private async Task GetDataAsync(string baseUrl) { string data = ""; - HttpClient client = new HttpClient(); - try { data = await client.GetStringAsync(baseUrl); @@ -46,43 +50,43 @@ private async Task GetDataAsync(string baseUrl) return data; } - public IngestionService() - { - _logger = new LoggerConfiguration().WriteTo.Console().CreateLogger(); - _sourcesList = LoadSourcesFromJson(); - } - // Due to sercurity concerns sources are either injected //or loaded from config files - private IDictionary LoadSourcesFromJson() + + public IDictionary LoadSourcesFromJson(string path) { - IDictionary sources = null; + IDictionary sources = new Dictionary(); // deserialize JSON directly from a file - if (File.Exists(_sourcesConfigRelativePath)) { - string JSONText = File.ReadAllText(_sourcesConfigRelativePath); - sources = JsonConvert.DeserializeObject>(JSONText); + if (File.Exists(path)) + { + string JSONText = File.ReadAllText(path); + IList listsources = JsonConvert.DeserializeObject>(JSONText); + foreach (IngestionSource s in listsources) { + sources.Add(s.name, s); + } } - else { + else + { throw new Exception("Cant find file"); - } + } return sources; } - public void Ingest(int sourceID) { + public void Ingest(string sourceID) { if (!_sourcesList.ContainsKey(sourceID)) throw new Exception("Key was not found, source is not defined"); // Fetch data - Task data = GetDataAsync(_sourcesList[sourceID].ApiUrl); - string dataString = data.Result.ToString(); - + //Task data = GetDataAsync(_sourcesList[sourceID].ApiUrl); + //string dataString = data.Result.ToString(); + string dataString = "message test"; _logger.Information("Starting Ingestor"); ForwardMessageToRabbitMQ(dataString); _logger.Information("Stopping Ingestor"); } - private void ForwardMessageToRabbitMQ(string message) + public void ForwardMessageToRabbitMQ(string message) { - var factory = new ConnectionFactory() { HostName = rabbitmqHostname }; + var factory = new ConnectionFactory() { HostName = _rabbitmqHostname }; using (IConnection conn = factory.CreateConnection()) { using (IModel channel = conn.CreateModel()) From ccae22c1bd57d9e43a8ceb06b6aeed9c686f32b2 Mon Sep 17 00:00:00 2001 From: alex855k Date: Tue, 27 Nov 2018 14:55:35 +0100 Subject: [PATCH 07/19] (#12) Changed id as an int to name as a unique id --- Src/Ingestion/IngestionSource.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Src/Ingestion/IngestionSource.cs b/Src/Ingestion/IngestionSource.cs index ee7d157..78d1cbf 100644 --- a/Src/Ingestion/IngestionSource.cs +++ b/Src/Ingestion/IngestionSource.cs @@ -6,7 +6,7 @@ namespace Ingestion { public class IngestionSource { - public int id { get; set; } + public string name { get; set; } public string ApiUrl { get; set; } public string Credential { get; set; } public string ForwardMessageQueue { get; set; } From f502da0188c00d297f15a94b70039e83665b4346 Mon Sep 17 00:00:00 2001 From: alex855k Date: Tue, 27 Nov 2018 14:56:31 +0100 Subject: [PATCH 08/19] (#12) Add id argument to crontab job --- Src/Ingestion/crontab/root | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Src/Ingestion/crontab/root b/Src/Ingestion/crontab/root index f3cc3b4..7066796 100644 --- a/Src/Ingestion/crontab/root +++ b/Src/Ingestion/crontab/root @@ -1,2 +1 @@ -*/5 * * * * /usr/bin/dotnet /var/app/Ingestion.dll >> /activity.log 2>&1 - +*/5 * * * * /usr/bin/dotnet /var/app/Ingestion.dll ResellerData >> /activity.log 2>&1 \ No newline at end of file From e07ec7944e0f12353dcfeeb775a416b0b067bddb Mon Sep 17 00:00:00 2001 From: alex855k Date: Tue, 27 Nov 2018 14:59:18 +0100 Subject: [PATCH 09/19] (#12) Changes mainly about tests. --- GraBID.sln | 18 +++++--------- Test/IngestionServiceTest.cs | 42 ++++++++++++++++++++++++++++++++ Test/RefinerServiceTest.cs | 27 ++++++++++++++++++++ Test/Tests.csproj | 19 +++++++++++++++ config/IngestionSources.json | 8 ++++++ config/IngestionSourcesTest.json | 14 +++++++++++ mongodb/docker-compose.yml | 31 +++++++++++++++++++++++ 7 files changed, 147 insertions(+), 12 deletions(-) create mode 100644 Test/IngestionServiceTest.cs create mode 100644 Test/RefinerServiceTest.cs create mode 100644 Test/Tests.csproj create mode 100644 config/IngestionSources.json create mode 100644 config/IngestionSourcesTest.json create mode 100644 mongodb/docker-compose.yml diff --git a/GraBID.sln b/GraBID.sln index f6bbddb..25401c7 100644 --- a/GraBID.sln +++ b/GraBID.sln @@ -5,8 +5,6 @@ VisualStudioVersion = 15.0.28306.52 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Src", "Src", "{47FA0BFF-7C06-4F41-A91E-3119CE207B70}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{6FE2E050-C1DF-4E58-8689-8D90C45931C3}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docker", "Docker", "{D695FB5C-0D3D-468A-95F0-43ADC918BE06}" ProjectSection(SolutionItems) = preProject docker-compose.yml = docker-compose.yml @@ -14,11 +12,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docker", "Docker", "{D695FB EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Ingestion", "Src\Ingestion\Ingestion.csproj", "{2E78E0E5-DD54-457C-92F3-DB6D19EFA6DA}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Ingestion.Tests", "Tests\Ingestion.Tests\Ingestion.Tests.csproj", "{A8569975-D120-4FD5-841B-2B0568740500}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Refiner", "src\Refiner\Refiner.csproj", "{13410153-13DB-41D4-A08A-CE507B963559}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Logger", "src\Logger\Logger.csproj", "{1C67535A-5DB2-4B2E-B1F6-56B864229760}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Logger", "src\Logger\Logger.csproj", "{1C67535A-5DB2-4B2E-B1F6-56B864229760}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tests", "Test\Tests.csproj", "{2CC3D7F9-F025-4F2B-A74A-B40CE45019DE}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -30,25 +28,21 @@ Global {2E78E0E5-DD54-457C-92F3-DB6D19EFA6DA}.Debug|Any CPU.Build.0 = Debug|Any CPU {2E78E0E5-DD54-457C-92F3-DB6D19EFA6DA}.Release|Any CPU.ActiveCfg = Release|Any CPU {2E78E0E5-DD54-457C-92F3-DB6D19EFA6DA}.Release|Any CPU.Build.0 = Release|Any CPU - {A8569975-D120-4FD5-841B-2B0568740500}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {A8569975-D120-4FD5-841B-2B0568740500}.Debug|Any CPU.Build.0 = Debug|Any CPU - {A8569975-D120-4FD5-841B-2B0568740500}.Release|Any CPU.ActiveCfg = Release|Any CPU - {A8569975-D120-4FD5-841B-2B0568740500}.Release|Any CPU.Build.0 = Release|Any CPU {13410153-13DB-41D4-A08A-CE507B963559}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {13410153-13DB-41D4-A08A-CE507B963559}.Debug|Any CPU.Build.0 = Debug|Any CPU {13410153-13DB-41D4-A08A-CE507B963559}.Release|Any CPU.ActiveCfg = Release|Any CPU - {13410153-13DB-41D4-A08A-CE507B963559}.Release|Any CPU.Build.0 = Release|Any CPU {1C67535A-5DB2-4B2E-B1F6-56B864229760}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {1C67535A-5DB2-4B2E-B1F6-56B864229760}.Debug|Any CPU.Build.0 = Debug|Any CPU {1C67535A-5DB2-4B2E-B1F6-56B864229760}.Release|Any CPU.ActiveCfg = Release|Any CPU - {1C67535A-5DB2-4B2E-B1F6-56B864229760}.Release|Any CPU.Build.0 = Release|Any CPU + {2CC3D7F9-F025-4F2B-A74A-B40CE45019DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2CC3D7F9-F025-4F2B-A74A-B40CE45019DE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2CC3D7F9-F025-4F2B-A74A-B40CE45019DE}.Release|Any CPU.ActiveCfg = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection GlobalSection(NestedProjects) = preSolution {2E78E0E5-DD54-457C-92F3-DB6D19EFA6DA} = {47FA0BFF-7C06-4F41-A91E-3119CE207B70} - {A8569975-D120-4FD5-841B-2B0568740500} = {6FE2E050-C1DF-4E58-8689-8D90C45931C3} {13410153-13DB-41D4-A08A-CE507B963559} = {47FA0BFF-7C06-4F41-A91E-3119CE207B70} {1C67535A-5DB2-4B2E-B1F6-56B864229760} = {47FA0BFF-7C06-4F41-A91E-3119CE207B70} EndGlobalSection diff --git a/Test/IngestionServiceTest.cs b/Test/IngestionServiceTest.cs new file mode 100644 index 0000000..fa6b37a --- /dev/null +++ b/Test/IngestionServiceTest.cs @@ -0,0 +1,42 @@ +using NUnit.Framework; +using Ingestion; +using System.Collections.Generic; +using Serilog; + +namespace Tests +{ + public class IngestionServiceTest + { + private IngestionService service; + + [SetUp] + public void Setup() + { + Dictionary sources = new Dictionary(); + sources.Add( + "Test",new IngestionSource() { ApiUrl = "Example", Credential = "blabla", ForwardMessageQueue = "monoqueue", name = "Test"} + ); + sources.Add( + "Test2", new IngestionSource() { ApiUrl = "Example2", Credential = "blabla2", ForwardMessageQueue = "monoqueue2", name = "Test2" } + ); + + service = new IngestionService(sources, new LoggerConfiguration().WriteTo.Console().CreateLogger()); + } + + [Test] + public void TestCanLoadSourcesFromJSONFile() + { + IDictionary sources = service.LoadSourcesFromJson("../../../../config/IngestionSourcesTest.json"); + Assert.AreEqual(sources["ResellerData"].name, "ResellerData"); + Assert.AreEqual(sources["ResellerData2"].name, "ResellerData2"); + } + + [Test] + public void TestCanForwardMessageToQueue() + { + service.LoadSourcesFromJson("../../../../config/IngestionSourcesTest.json"); + service.ForwardMessageToRabbitMQ(""); + } + + } +} \ No newline at end of file diff --git a/Test/RefinerServiceTest.cs b/Test/RefinerServiceTest.cs new file mode 100644 index 0000000..8017b62 --- /dev/null +++ b/Test/RefinerServiceTest.cs @@ -0,0 +1,27 @@ +using NUnit.Framework; +using Ingestion; +using System.Collections.Generic; +using Serilog; + +namespace Tests +{ + public class RefinerServiceTest + { + + [SetUp] + public void Setup() + { + } + + [Test] + public void TestCanLoadSourcesFromJSONFile() + { + } + + [Test] + public void TestCanForwardMessageToQueue() + { + } + + } +} \ No newline at end of file diff --git a/Test/Tests.csproj b/Test/Tests.csproj new file mode 100644 index 0000000..05c5e12 --- /dev/null +++ b/Test/Tests.csproj @@ -0,0 +1,19 @@ + + + + netcoreapp2.1 + + false + + + + + + + + + + + + + \ No newline at end of file diff --git a/config/IngestionSources.json b/config/IngestionSources.json new file mode 100644 index 0000000..7cc218c --- /dev/null +++ b/config/IngestionSources.json @@ -0,0 +1,8 @@ +[ + { + "name": "ResellerData", + "ApiUrl": "https://hal.mono.net/api/v1/admin", + "Credential": "b68064b8cf25842a1ea86cd61a7ee5fa", + "ForwardMessageQueue": "mono.queue.resellers" + } +] diff --git a/config/IngestionSourcesTest.json b/config/IngestionSourcesTest.json new file mode 100644 index 0000000..64ee1a4 --- /dev/null +++ b/config/IngestionSourcesTest.json @@ -0,0 +1,14 @@ +[ + { + "name": "ResellerData", + "ApiUrl": "https://hal.mono.net/api/v1/admin", + "Credential": "b68064b8cf25842a1ea86cd61a7ee5fa", + "ForwardMessageQueue": "mono.queue.resellers" + }, + { + "name": "ResellerData2", + "ApiUrl": "https://hal.mono.net/api/v1/admin", + "Credential": "b68064b8cf25842a1ea86cd61a7ee5fa", + "ForwardMessageQueue": "mono.queue.resellers" + } +] diff --git a/mongodb/docker-compose.yml b/mongodb/docker-compose.yml new file mode 100644 index 0000000..943f8bf --- /dev/null +++ b/mongodb/docker-compose.yml @@ -0,0 +1,31 @@ +version: '2' + +services: + mongodb-primary: + image: 'bitnami/mongodb:latest' + environment: + - MONGODB_REPLICA_SET_MODE=primary + volumes: + - 'mongodb_master_data:/bitnami' + + mongodb-secondary: + image: 'bitnami/mongodb:latest' + depends_on: + - mongodb-primary + environment: + - MONGODB_REPLICA_SET_MODE=secondary + - MONGODB_PRIMARY_HOST=mongodb-primary + - MONGODB_PRIMARY_PORT_NUMBER=27017 + + mongodb-arbiter: + image: 'bitnami/mongodb:latest' + depends_on: + - mongodb-primary + environment: + - MONGODB_REPLICA_SET_MODE=arbiter + - MONGODB_PRIMARY_HOST=mongodb-primary + - MONGODB_PRIMARY_PORT_NUMBER=27017 + +volumes: + mongodb_master_data: + driver: local \ No newline at end of file From 65e87f063eac15db42f62f059a9f2ad1c941d3c6 Mon Sep 17 00:00:00 2001 From: alex855k Date: Tue, 27 Nov 2018 17:13:34 +0100 Subject: [PATCH 10/19] (Ingestion-service-CSProjectsEAL/GraBID#12) Changed name --- Test/RefinerServiceTest.cs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/Test/RefinerServiceTest.cs b/Test/RefinerServiceTest.cs index 8017b62..2c959ea 100644 --- a/Test/RefinerServiceTest.cs +++ b/Test/RefinerServiceTest.cs @@ -7,21 +7,24 @@ namespace Tests { public class RefinerServiceTest { - [SetUp] public void Setup() { } [Test] - public void TestCanLoadSourcesFromJSONFile() + public void TestCanPullMessagesFromQueue() { } [Test] - public void TestCanForwardMessageToQueue() + public void TestCanPullMessagesFromDifferentQueues() { } + [Test] + public void TestCanStoreMessagePayloadInMongoDB() + { + } } } \ No newline at end of file From 919aaf40d00f0926e0301eb569cf18964ba39cf3 Mon Sep 17 00:00:00 2001 From: alex855k Date: Tue, 27 Nov 2018 17:16:17 +0100 Subject: [PATCH 11/19] (Ingestion-service-CSProjectsEAL/GraBID#12) Testing git credentials :P --- Src/Refiner/Program.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/Src/Refiner/Program.cs b/Src/Refiner/Program.cs index dd35d4f..682e3a5 100644 --- a/Src/Refiner/Program.cs +++ b/Src/Refiner/Program.cs @@ -73,6 +73,7 @@ static void Main(string[] args) }; + channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); From 67da1a29c4f8bd1e484dda80ca71fe6bf53c63ee Mon Sep 17 00:00:00 2001 From: alex855k Date: Tue, 27 Nov 2018 17:40:55 +0100 Subject: [PATCH 12/19] (Ingestion-service-CSProjectsEAL/GraBID#12) Testing credentials again :( --- Src/Refiner/Program.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/Src/Refiner/Program.cs b/Src/Refiner/Program.cs index 682e3a5..9fb85de 100644 --- a/Src/Refiner/Program.cs +++ b/Src/Refiner/Program.cs @@ -83,6 +83,7 @@ static void Main(string[] args) WaitHandle.WaitOne(); } + private static void Exit() { Log.Information("Exiting..."); From b8c64fc4f1ee196b20cfcc90b023c8e4623fd0df Mon Sep 17 00:00:00 2001 From: alex855k Date: Tue, 27 Nov 2018 17:49:33 +0100 Subject: [PATCH 13/19] (Ingestion-service-CSProjectsEAL/GraBID#12) Testing credentials --- Src/Refiner/Program.cs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/Src/Refiner/Program.cs b/Src/Refiner/Program.cs index 9fb85de..dce36b8 100644 --- a/Src/Refiner/Program.cs +++ b/Src/Refiner/Program.cs @@ -72,8 +72,6 @@ static void Main(string[] args) Console.WriteLine($" [x] Sent 'mono.data.refined':'{newMessage}'"); }; - - channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); @@ -89,11 +87,6 @@ private static void Exit() Log.Information("Exiting..."); channel.Close(); conn.Close(); - - } - - } - } From 4f305a65ec701c52c572f3c8537d54a0e3a26413 Mon Sep 17 00:00:00 2001 From: Bogdan Date: Thu, 6 Dec 2018 14:00:42 +0100 Subject: [PATCH 14/19] wip ingestion and refiner --- Src/Ingestion/IngestionService.cs | 39 +++++++------ Src/Ingestion/IngestionSource.cs | 2 +- Src/Refiner/DataProcessor.cs | 62 ++++++++++++++++++++ Src/Refiner/Program.cs | 94 ++++++++++++++++--------------- Src/Refiner/Refiner.csproj | 7 +-- Test/IngestionServiceTest.cs | 34 +++++++---- Test/Tests.csproj | 9 +-- config/IngestionSourcesTest.json | 6 ++ docker-compose.yml | 40 ++++--------- mongodb/docker-compose.yml | 14 ++++- 10 files changed, 191 insertions(+), 116 deletions(-) create mode 100644 Src/Refiner/DataProcessor.cs diff --git a/Src/Ingestion/IngestionService.cs b/Src/Ingestion/IngestionService.cs index ec8bfa4..49a594d 100644 --- a/Src/Ingestion/IngestionService.cs +++ b/Src/Ingestion/IngestionService.cs @@ -33,7 +33,7 @@ public IngestionService(ILogger logger) private async Task GetDataAsync(string baseUrl) { - string data = ""; + var data = ""; HttpClient client = new HttpClient(); try { @@ -62,7 +62,7 @@ public IDictionary LoadSourcesFromJson(string path) string JSONText = File.ReadAllText(path); IList listsources = JsonConvert.DeserializeObject>(JSONText); foreach (IngestionSource s in listsources) { - sources.Add(s.name, s); + sources.Add(s.Name, s); } } else @@ -72,35 +72,40 @@ public IDictionary LoadSourcesFromJson(string path) return sources; } - public void Ingest(string sourceID) { + public void Ingest(string sourceId) { - if (!_sourcesList.ContainsKey(sourceID)) throw new Exception("Key was not found, source is not defined"); - // Fetch data - //Task data = GetDataAsync(_sourcesList[sourceID].ApiUrl); - //string dataString = data.Result.ToString(); - string dataString = "message test"; + if (!_sourcesList.ContainsKey(sourceId)) throw new Exception("Key was not found, source is not defined"); _logger.Information("Starting Ingestor"); - ForwardMessageToRabbitMQ(dataString); + + var data = GetDataAsync(_sourcesList[sourceId].ApiUrl).ToString(); + + ForwardMessageToRabbitMQ(data, _sourcesList[sourceId].ForwardMessageQueue); + _logger.Information("Stopping Ingestor"); } - public void ForwardMessageToRabbitMQ(string message) + public void ForwardMessageToRabbitMQ(string message, string queue) { - var factory = new ConnectionFactory() { HostName = _rabbitmqHostname }; + var factory = new ConnectionFactory(); using (IConnection conn = factory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { - var exchange = "grabid_exchange"; - var routingKey = "mono.data.received"; - - channel.ExchangeDeclare(exchange: exchange, type: "topic"); + var exchange = "mono.data.received"; + + channel.QueueDeclare(queue, true, false, false, null); + + channel.ExchangeDeclare(exchange: exchange, type: "fanout"); + + channel.QueueBind(queue, exchange, ""); + var envelope = new Envelope(Guid.NewGuid(), message); var envelopedMessage = JsonConvert.SerializeObject(envelope); + byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(envelopedMessage); - channel.BasicPublish(exchange, routingKey, null, messageBodyBytes); - Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message); + channel.BasicPublish(exchange, null, null, messageBodyBytes); + Console.WriteLine(" [x] Sent '{0}':'{1}'", exchange, message); } } } diff --git a/Src/Ingestion/IngestionSource.cs b/Src/Ingestion/IngestionSource.cs index 78d1cbf..247c9c9 100644 --- a/Src/Ingestion/IngestionSource.cs +++ b/Src/Ingestion/IngestionSource.cs @@ -6,7 +6,7 @@ namespace Ingestion { public class IngestionSource { - public string name { get; set; } + public string Name { get; set; } public string ApiUrl { get; set; } public string Credential { get; set; } public string ForwardMessageQueue { get; set; } diff --git a/Src/Refiner/DataProcessor.cs b/Src/Refiner/DataProcessor.cs new file mode 100644 index 0000000..1d7017c --- /dev/null +++ b/Src/Refiner/DataProcessor.cs @@ -0,0 +1,62 @@ +using Newtonsoft.Json; +using System; +using Newtonsoft.Json.Linq; +using Shared; + +namespace Refiner +{ + public class DataProcessor + { + private readonly Envelope _data; + + public DataProcessor(Envelope data) + { + _data = data; + } + + public JArray ParseEnvelope () + { + dynamic cleanData = new JArray(); + dynamic envelopeData = JToken.Parse(_data.Payload); + + foreach (var obj in envelopeData) + { + dynamic entry = new JObject(); + + entry.Reseller = new JArray(); + entry.User = new JArray(); + entry.Subscription = new JArray(); + entry.Bill = new JArray(); + + entry.Reseller.Id = obj.resellerId; + entry.Reseller.Name = obj.resellerName; + entry.Reseller.Discount = obj.discount; + + entry.User.AccountId = obj.accountId; + + entry.Subscription.Name = obj.subscriptionName; + entry.Subscription.Type = obj.type; + entry.Subscription.State = obj.state; + entry.Subscription.Price = obj.price; + entry.Subscription.OnExpiry = obj.onExpiry; + entry.Subscription.StartDate = obj.startDate; + entry.Subscription.ExpiryDate = obj.expiryDate; + + entry.Bill.BillingDate = obj.billingDate; + entry.DiscountedPrice = obj.discountedPrice; + entry.CurrencyName = obj.currencyName; + entry.CurrencyISO = obj.code; + + cleanData.Add(entry); + } + + return cleanData; + } + + public void StoreCleanData(JArray data) + { + //add to mongo + } + + } +} \ No newline at end of file diff --git a/Src/Refiner/Program.cs b/Src/Refiner/Program.cs index dce36b8..1a31236 100644 --- a/Src/Refiner/Program.cs +++ b/Src/Refiner/Program.cs @@ -11,6 +11,7 @@ using System.Runtime.Loader; using System.Text; using System.Threading; +using System.Threading.Tasks; namespace Refiner { @@ -32,56 +33,61 @@ static void Main(string[] args) .CreateLogger(); Log.Information("Starting..."); - - var factory = new ConnectionFactory() { HostName = "rabbit.docker" }; - conn = factory.CreateConnection(); - channel = conn.CreateModel(); - - var exchange = "grabid_exchange"; - var routingKey = "mono.data.received"; - - channel.ExchangeDeclare(exchange: exchange, type: "topic"); - - var queueName = channel.QueueDeclare().QueueName; - - channel.QueueBind(queue: queueName, - exchange: exchange, - routingKey: routingKey); - - - var consumer = new EventingBasicConsumer(channel); - - consumer.Received += (model, ea) => - { - var body = ea.Body; - var message = Encoding.UTF8.GetString(body); - var key = ea.RoutingKey; - Console.WriteLine($" [x] Received '{key}':'{message}'"); - var envelope = JsonConvert.DeserializeObject>(message); - - dynamic json = JObject.Parse(envelope.Payload); - string messageString = json.message; - string userString = json.users; - string[] userArray = userString.Split(","); - string[] messages = userArray.Select(user => { return $"{messageString} {user}"; }).ToArray(); - var returnEnvelope = new Envelope(envelope.Id, messages); - string newMessage = JsonConvert.SerializeObject(returnEnvelope); - byte[] messageBodyBytes = Encoding.UTF8.GetBytes(newMessage); - - channel.BasicPublish(exchange, "mono.data.refined", null, messageBodyBytes); - Console.WriteLine($" [x] Sent 'mono.data.refined':'{newMessage}'"); - - }; - channel.BasicConsume(queue: queueName, - autoAck: true, - consumer: consumer); - + + RetrieveMessageFromQueue("mono.data.received"); + Log.Information("Started"); WaitHandle.WaitOne(); } + public static void RetrieveMessageFromQueue (string queue) + { + var factory = new ConnectionFactory(); + using (IConnection conn = factory.CreateConnection()) + { + using (IModel channel = conn.CreateModel()) + { + var exchange = "mono.data.received"; + + channel.QueueDeclare(queue, true, false, false, null); + + channel.ExchangeDeclare(exchange: exchange, type: "fanout"); + + channel.QueueBind(queue, exchange, ""); + + var consumer = new EventingBasicConsumer(channel); + + consumer.Received += (model, ea) => + { + var body = ea.Body; + var message = Encoding.UTF8.GetString(body); + var key = ea.RoutingKey; + Console.WriteLine($" [x] Received '{key}':'{message}'"); + var envelope = JsonConvert.DeserializeObject>(message); + + var processor = new DataProcessor(envelope); + var cleanData = processor.ParseEnvelope(); + + ForwardMessageToQueue(channel, "mono.data.refined", cleanData); + + Console.WriteLine($" [x] Sent 'mono.data.refined':'{cleanData}'"); + }; + + channel.BasicConsume(queue: queue, + autoAck: true, + consumer: consumer); + } + } + } + public static void ForwardMessageToQueue (IModel channel, string queue, JArray cleanData) + { + var message = JsonConvert.SerializeObject(cleanData); + byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(message); + channel.BasicPublish(queue, null, null, messageBodyBytes); + } + private static void Exit() { Log.Information("Exiting..."); diff --git a/Src/Refiner/Refiner.csproj b/Src/Refiner/Refiner.csproj index ff423f2..9655860 100644 --- a/Src/Refiner/Refiner.csproj +++ b/Src/Refiner/Refiner.csproj @@ -1,23 +1,18 @@  - Exe netcoreapp2.1 - - - - - + \ No newline at end of file diff --git a/Test/IngestionServiceTest.cs b/Test/IngestionServiceTest.cs index fa6b37a..4953011 100644 --- a/Test/IngestionServiceTest.cs +++ b/Test/IngestionServiceTest.cs @@ -1,6 +1,8 @@ using NUnit.Framework; using Ingestion; using System.Collections.Generic; +using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; using Serilog; namespace Tests @@ -14,29 +16,39 @@ public void Setup() { Dictionary sources = new Dictionary(); sources.Add( - "Test",new IngestionSource() { ApiUrl = "Example", Credential = "blabla", ForwardMessageQueue = "monoqueue", name = "Test"} + "Test",new IngestionSource() { ApiUrl = "Example", Credential = "blabla", ForwardMessageQueue = "monoqueue", Name = "Test"} ); sources.Add( - "Test2", new IngestionSource() { ApiUrl = "Example2", Credential = "blabla2", ForwardMessageQueue = "monoqueue2", name = "Test2" } + "Test2", new IngestionSource() { ApiUrl = "Example2", Credential = "blabla2", ForwardMessageQueue = "monoqueue2", Name = "Test2" } + ); + sources.Add( + "FakeData", new IngestionSource() { ApiUrl = "https://jsonplaceholder.typicode.com/posts", Credential = "", ForwardMessageQueue = "mono.data.received", Name = "FakeData" } ); service = new IngestionService(sources, new LoggerConfiguration().WriteTo.Console().CreateLogger()); } [Test] - public void TestCanLoadSourcesFromJSONFile() + public void CanIngestFromEndpoint() { - IDictionary sources = service.LoadSourcesFromJson("../../../../config/IngestionSourcesTest.json"); - Assert.AreEqual(sources["ResellerData"].name, "ResellerData"); - Assert.AreEqual(sources["ResellerData2"].name, "ResellerData2"); + service.Ingest("FakeData"); + using (IConnection conn = new ConnectionFactory() {HostName = "localhost", UserName = "guest", Password = "guest"}.CreateConnection()) + { + using (IModel channel = conn.CreateModel()) + { + BasicGetResult result = channel.BasicGet("mono.data.received", false); + Assert.IsNotNull(result); + } + } } [Test] - public void TestCanForwardMessageToQueue() + public void TestCanLoadSourcesFromJSONFile() { - service.LoadSourcesFromJson("../../../../config/IngestionSourcesTest.json"); - service.ForwardMessageToRabbitMQ(""); + IDictionary sources = service.LoadSourcesFromJson("../../../../config/IngestionSourcesTest.json"); + Assert.AreEqual(sources["ResellerData"].Name, "ResellerData"); + Assert.AreEqual(sources["ResellerData2"].Name, "ResellerData2"); + Assert.AreEqual(sources["FakeData"].Name, "FakeData"); } - } -} \ No newline at end of file +} diff --git a/Test/Tests.csproj b/Test/Tests.csproj index 05c5e12..be36974 100644 --- a/Test/Tests.csproj +++ b/Test/Tests.csproj @@ -1,19 +1,14 @@ - - + netcoreapp2.1 - false - - + - - \ No newline at end of file diff --git a/config/IngestionSourcesTest.json b/config/IngestionSourcesTest.json index 64ee1a4..dc44bc8 100644 --- a/config/IngestionSourcesTest.json +++ b/config/IngestionSourcesTest.json @@ -10,5 +10,11 @@ "ApiUrl": "https://hal.mono.net/api/v1/admin", "Credential": "b68064b8cf25842a1ea86cd61a7ee5fa", "ForwardMessageQueue": "mono.queue.resellers" + }, + { + "name": "FakeData", + "ApiUrl": "https://jsonplaceholder.typicode.com/posts", + "Credential": "", + "ForwardMessageQueue": "mono.queue.test" } ] diff --git a/docker-compose.yml b/docker-compose.yml index b1a5ae7..2383c50 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,11 @@ version: "3" + +networks: + mongodb_application-network-db: + external: true + services: + rabbit.docker: image: rabbitmq:3.7.8-management-alpine restart: always @@ -7,6 +13,7 @@ services: ports: - "15672:15672" - "5672:5672" + refiner: image: grabid/refiner:local build: @@ -14,6 +21,9 @@ services: depends_on: - rabbit.docker restart: always + networks: + - mongodb_application-network-db + logger: image: grabid/logger:local build: @@ -21,6 +31,7 @@ services: depends_on: - rabbit.docker restart: always + ingestion: image: grabid/ingestion:local build: @@ -28,32 +39,3 @@ services: depends_on: - rabbit.docker restart: always - - mongodb-primary: - image: 'bitnami/mongodb:latest' - environment: - - MONGODB_REPLICA_SET_MODE=primary - volumes: - - 'mongodb_master_data:/bitnami' - - mongodb-secondary: - image: 'bitnami/mongodb:latest' - depends_on: - - mongodb-primary - environment: - - MONGODB_REPLICA_SET_MODE=secondary - - MONGODB_PRIMARY_HOST=mongodb-primary - - MONGODB_PRIMARY_PORT_NUMBER=27017 - - mongodb-arbiter: - image: 'bitnami/mongodb:latest' - depends_on: - - mongodb-primary - environment: - - MONGODB_REPLICA_SET_MODE=arbiter - - MONGODB_PRIMARY_HOST=mongodb-primary - - MONGODB_PRIMARY_PORT_NUMBER=27017 - -volumes: - mongodb_master_data: - driver: local \ No newline at end of file diff --git a/mongodb/docker-compose.yml b/mongodb/docker-compose.yml index 943f8bf..306e025 100644 --- a/mongodb/docker-compose.yml +++ b/mongodb/docker-compose.yml @@ -1,15 +1,25 @@ -version: '2' +version: "3" +networks: + application-network-db: + driver: bridge + services: mongodb-primary: image: 'bitnami/mongodb:latest' + networks: + - application-network-db environment: - MONGODB_REPLICA_SET_MODE=primary volumes: - 'mongodb_master_data:/bitnami' + ports: + - "27017:27017" mongodb-secondary: image: 'bitnami/mongodb:latest' + networks: + - application-network-db depends_on: - mongodb-primary environment: @@ -19,6 +29,8 @@ services: mongodb-arbiter: image: 'bitnami/mongodb:latest' + networks: + - application-network-db depends_on: - mongodb-primary environment: From c2c60571c1ed184ad33ff4c86502d48abcc3c98c Mon Sep 17 00:00:00 2001 From: Bogdan Date: Sun, 9 Dec 2018 14:20:49 +0100 Subject: [PATCH 15/19] WIP-09/12 --- Src/Ingestion/Dockerfile | 13 +- Src/Ingestion/Ingestion.csproj | 7 +- Src/Ingestion/IngestionService.cs | 60 +++--- Src/Ingestion/wait-for-it.sh | 314 +++++++++++++++++++++++------- Src/Logger/Dockerfile | 8 +- Src/Logger/wait-for-it.sh | 314 +++++++++++++++++++++++------- Src/Refiner/Dockerfile | 8 +- Src/Refiner/Program.cs | 64 +----- Src/Refiner/RefinerService.cs | 85 ++++++++ Src/Refiner/wait-for-it.sh | 177 ----------------- Test/IngestionServiceTest.cs | 2 +- Test/RefinerServiceTest.cs | 15 +- Test/Tests.csproj | 1 + 13 files changed, 635 insertions(+), 433 deletions(-) create mode 100644 Src/Refiner/RefinerService.cs delete mode 100644 Src/Refiner/wait-for-it.sh diff --git a/Src/Ingestion/Dockerfile b/Src/Ingestion/Dockerfile index 134c13f..84c6eac 100644 --- a/Src/Ingestion/Dockerfile +++ b/Src/Ingestion/Dockerfile @@ -1,18 +1,13 @@ FROM microsoft/dotnet:2.1-sdk AS build COPY . ./build WORKDIR /build/ -RUN dotnet restore +RUN dotnet restore RUN dotnet build --no-restore -c Release -o /app - FROM build AS publish RUN dotnet publish --no-restore -c Release -o /app - FROM microsoft/dotnet:2.1-runtime-alpine - COPY --from=publish /app /var/app COPY entrypoint.sh /var/app -COPY ./crontab /etc/crontabs -RUN chmod -R 644 /etc/crontabs -COPY wait-for-it.sh /var/app -RUN apk add bash -CMD ["bash", "/var/app/wait-for-it.sh", "rabbit.docker:5672", "--timeout=30","--", "/var/app/entrypoint.sh"] +#COPY wait-for-it.sh /var/app +#RUN apk add bash +#CMD ["bash","/var/app/wait-for-it.sh","localhost:5672","--timeout=30","--","/var/app/entrypoint.sh"] \ No newline at end of file diff --git a/Src/Ingestion/Ingestion.csproj b/Src/Ingestion/Ingestion.csproj index ff423f2..9655860 100644 --- a/Src/Ingestion/Ingestion.csproj +++ b/Src/Ingestion/Ingestion.csproj @@ -1,23 +1,18 @@  - Exe netcoreapp2.1 - - - - - + \ No newline at end of file diff --git a/Src/Ingestion/IngestionService.cs b/Src/Ingestion/IngestionService.cs index 49a594d..62943c8 100644 --- a/Src/Ingestion/IngestionService.cs +++ b/Src/Ingestion/IngestionService.cs @@ -10,49 +10,49 @@ namespace Ingestion { - //Read about Rabbitmq - https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html - // for more advanced Message Bus setup - http://masstransit-project.com/MassTransit/ which integrates with RabbitMQ as well public class IngestionService { private IDictionary _sourcesList; - private const string _sourcesConfigRelativePath = "../../../../../config/IngestionSourcesTest.json"; + private const string SourcesConfigRelativePath = "../../../../../config/IngestionSourcesTest.json"; private ILogger _logger; - private static string _rabbitmqHostname = "rabbit.docker"; + private static IConnection _conn; + private static IModel _channel; public IngestionService(IDictionary sources, ILogger logger) { _logger = logger; + IConnectionFactory factory = new ConnectionFactory() {HostName = "localhost", UserName = "guest", Password = "guest"}; + _conn = factory.CreateConnection(); + _channel = _conn.CreateModel(); _sourcesList = sources; } public IngestionService(ILogger logger) { _logger = logger; - _sourcesList = LoadSourcesFromJson(_sourcesConfigRelativePath); + _sourcesList = LoadSourcesFromJson(SourcesConfigRelativePath); + IConnectionFactory factory = new ConnectionFactory() {HostName = "localhost", UserName = "guest", Password = "guest"}; + _conn = factory.CreateConnection(); + _channel = _conn.CreateModel(); } - private async Task GetDataAsync(string baseUrl) + private async Task GetData (string url) { var data = ""; - HttpClient client = new HttpClient(); - try + using (var client = new HttpClient()) { - data = await client.GetStringAsync(baseUrl); - - Console.WriteLine(data); - } - catch (HttpRequestException e) - { - Console.WriteLine("\nException Caught!"); - Console.WriteLine("Message :{0} ", e.Message); + using (var res = await client.GetAsync(url)) + { + using (var content = res.Content) + { + data = await content.ReadAsStringAsync(); + } + } } - client.Dispose(); + return data; } - // Due to sercurity concerns sources are either injected - //or loaded from config files - public IDictionary LoadSourcesFromJson(string path) { IDictionary sources = new Dictionary(); @@ -75,9 +75,10 @@ public IDictionary LoadSourcesFromJson(string path) public void Ingest(string sourceId) { if (!_sourcesList.ContainsKey(sourceId)) throw new Exception("Key was not found, source is not defined"); - _logger.Information("Starting Ingestor"); - var data = GetDataAsync(_sourcesList[sourceId].ApiUrl).ToString(); + _logger.Information("Starting Ingestor"); + + var data = GetData(_sourcesList[sourceId].ApiUrl).Result; ForwardMessageToRabbitMQ(data, _sourcesList[sourceId].ForwardMessageQueue); @@ -86,25 +87,24 @@ public void Ingest(string sourceId) { public void ForwardMessageToRabbitMQ(string message, string queue) { - var factory = new ConnectionFactory(); - using (IConnection conn = factory.CreateConnection()) + using (_conn) { - using (IModel channel = conn.CreateModel()) + using (_channel) { var exchange = "mono.data.received"; - channel.QueueDeclare(queue, true, false, false, null); + _channel.QueueDeclare(queue, true, false, false, null); - channel.ExchangeDeclare(exchange: exchange, type: "fanout"); + _channel.ExchangeDeclare(exchange, "fanout"); - channel.QueueBind(queue, exchange, ""); + _channel.QueueBind(queue, exchange, ""); var envelope = new Envelope(Guid.NewGuid(), message); var envelopedMessage = JsonConvert.SerializeObject(envelope); byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(envelopedMessage); - channel.BasicPublish(exchange, null, null, messageBodyBytes); + _channel.BasicPublish(exchange, "", _channel.CreateBasicProperties(), messageBodyBytes); Console.WriteLine(" [x] Sent '{0}':'{1}'", exchange, message); } } @@ -117,7 +117,7 @@ public string SerializeObjects(object obj) public void StoreObject(string obj) { try { - File.WriteAllText(_sourcesConfigRelativePath, obj); + File.WriteAllText(SourcesConfigRelativePath, obj); } catch(Exception exception) { Console.WriteLine("Couldnt write to file: " + exception.Message + "Message"); diff --git a/Src/Ingestion/wait-for-it.sh b/Src/Ingestion/wait-for-it.sh index bbe4043..b64739f 100644 --- a/Src/Ingestion/wait-for-it.sh +++ b/Src/Ingestion/wait-for-it.sh @@ -1,15 +1,190 @@ #!/usr/bin/env bash -# Use this script to test if a given TCP host/port are available +##!/usr/bin/env bash +## Use this script to test if a given TCP host/port are available +# +#cmdname=$(basename $0) +# +#echoerr() { if [[ $QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } +# +##usage() +##{ +## cat << USAGE >&2 +##Usage: +## $cmdname host:port [-s] [-t timeout] [-- command args] +## -h HOST | --host=HOST Host or IP under test +## -p PORT | --port=PORT TCP port under test +## Alternatively, you specify the host and port as host:port +## -s | --strict Only execute subcommand if the test succeeds +## -q | --quiet Don't output any status messages +## -t TIMEOUT | --timeout=TIMEOUT +## Timeout in seconds, zero for no timeout +## -- COMMAND ARGS Execute command with args after the test finishes +##USAGE +## exit 1 +##} +# +#wait_for() +#{ +# if [[ $TIMEOUT -gt 0 ]]; then +# echoerr "$cmdname: waiting $TIMEOUT seconds for $HOST:$PORT" +# else +# echoerr "$cmdname: waiting for $HOST:$PORT without a timeout" +# fi +# start_ts=$(date +%s) +# while : +# do +# if [[ $ISBUSY -eq 1 ]]; then +# nc -z $HOST $PORT +# result=$? +# else +# (echo > /dev/tcp/$HOST/$PORT) >/dev/null 2>&1 +# result=$? +# fi +# if [[ $result -eq 0 ]]; then +# end_ts=$(date +%s) +# echoerr "$cmdname: $HOST:$PORT is available after $((end_ts - start_ts)) seconds" +# break +# fi +# sleep 1 +# done +# return $result +#} +# +#wait_for_wrapper() +#{ +# # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 +# if [[ $QUIET -eq 1 ]]; then +# timeout $BUSYTIMEFLAG $TIMEOUT $0 --quiet --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & +# else +# timeout $BUSYTIMEFLAG $TIMEOUT $0 --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & +# fi +# PID=$! +# trap "kill -INT -$PID" INT +# wait $PID +# RESULT=$? +# if [[ $RESULT -ne 0 ]]; then +# echoerr "$cmdname: timeout occurred after waiting $TIMEOUT seconds for $HOST:$PORT" +# fi +# return $RESULT +#} +# +## process arguments +#while [[ $# -gt 0 ]] +#do +# case "$1" in +# *:* ) +# hostport=(${1//:/ }) +# HOST=${hostport[0]} +# PORT=${hostport[1]} +# shift 1 +# ;; +# --child) +# CHILD=1 +# shift 1 +# ;; +# -q | --quiet) +# QUIET=1 +# shift 1 +# ;; +# -s | --strict) +# STRICT=1 +# shift 1 +# ;; +# -h) +# HOST="$2" +# if [[ $HOST == "" ]]; then break; fi +# shift 2 +# ;; +# --host=*) +# HOST="${1#*=}" +# shift 1 +# ;; +# -p) +# PORT="$2" +# if [[ $PORT == "" ]]; then break; fi +# shift 2 +# ;; +# --port=*) +# PORT="${1#*=}" +# shift 1 +# ;; +# -t) +# TIMEOUT="$2" +# if [[ $TIMEOUT == "" ]]; then break; fi +# shift 2 +# ;; +# --timeout=*) +# TIMEOUT="${1#*=}" +# shift 1 +# ;; +# --) +# shift +# CLI=("$@") +# break +# ;; +# --help) +# usage +# ;; +# *) +# echoerr "Unknown argument: $1" +# usage +# ;; +# esac +#done +# +#if [[ "$HOST" == "" || "$PORT" == "" ]]; then +# echoerr "Error: you need to provide a host and port to test." +# usage +#fi +# +#TIMEOUT=${TIMEOUT:-15} +#STRICT=${STRICT:-0} +#CHILD=${CHILD:-0} +#QUIET=${QUIET:-0} +# +## check to see if timeout is from busybox? +## check to see if timeout is from busybox? +#TIMEOUT_PATH=$(realpath $(which timeout)) +#if [[ $TIMEOUT_PATH =~ "busybox" ]]; then +# ISBUSY=1 +# BUSYTIMEFLAG="-t" +#else +# ISBUSY=0 +# BUSYTIMEFLAG="" +#fi +# +#if [[ $CHILD -gt 0 ]]; then +# wait_for +# RESULT=$? +# exit $RESULT +#else +# if [[ $TIMEOUT -gt 0 ]]; then +# wait_for_wrapper +# RESULT=$? +# else +# wait_for +# RESULT=$? +# fi +#fi +# +#if [[ $CLI != "" ]]; then +# if [[ $RESULT -ne 0 && $STRICT -eq 1 ]]; then +# echoerr "$cmdname: strict mode, refusing to execute subprocess" +# exit $RESULT +# fi +# exec "${CLI[@]}" +#else +# exit $RESULT +#fi +WAITFORIT_cmdname=${0##*/} -cmdname=$(basename $0) - -echoerr() { if [[ $QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } +echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } usage() { cat << USAGE >&2 Usage: - $cmdname host:port [-s] [-t timeout] [-- command args] + $WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args] -h HOST | --host=HOST Host or IP under test -p PORT | --port=PORT TCP port under test Alternatively, you specify the host and port as host:port @@ -24,47 +199,47 @@ USAGE wait_for() { - if [[ $TIMEOUT -gt 0 ]]; then - echoerr "$cmdname: waiting $TIMEOUT seconds for $HOST:$PORT" + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then + echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" else - echoerr "$cmdname: waiting for $HOST:$PORT without a timeout" + echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout" fi - start_ts=$(date +%s) + WAITFORIT_start_ts=$(date +%s) while : do - if [[ $ISBUSY -eq 1 ]]; then - nc -z $HOST $PORT - result=$? + if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then + nc -z $WAITFORIT_HOST $WAITFORIT_PORT + WAITFORIT_result=$? else - (echo > /dev/tcp/$HOST/$PORT) >/dev/null 2>&1 - result=$? + (echo > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1 + WAITFORIT_result=$? fi - if [[ $result -eq 0 ]]; then - end_ts=$(date +%s) - echoerr "$cmdname: $HOST:$PORT is available after $((end_ts - start_ts)) seconds" + if [[ $WAITFORIT_result -eq 0 ]]; then + WAITFORIT_end_ts=$(date +%s) + echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds" break fi sleep 1 done - return $result + return $WAITFORIT_result } wait_for_wrapper() { # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 - if [[ $QUIET -eq 1 ]]; then - timeout $BUSYTIMEFLAG $TIMEOUT $0 --quiet --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & + if [[ $WAITFORIT_QUIET -eq 1 ]]; then + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & else - timeout $BUSYTIMEFLAG $TIMEOUT $0 --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & fi - PID=$! - trap "kill -INT -$PID" INT - wait $PID - RESULT=$? - if [[ $RESULT -ne 0 ]]; then - echoerr "$cmdname: timeout occurred after waiting $TIMEOUT seconds for $HOST:$PORT" + WAITFORIT_PID=$! + trap "kill -INT -$WAITFORIT_PID" INT + wait $WAITFORIT_PID + WAITFORIT_RESULT=$? + if [[ $WAITFORIT_RESULT -ne 0 ]]; then + echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" fi - return $RESULT + return $WAITFORIT_RESULT } # process arguments @@ -72,53 +247,53 @@ while [[ $# -gt 0 ]] do case "$1" in *:* ) - hostport=(${1//:/ }) - HOST=${hostport[0]} - PORT=${hostport[1]} + WAITFORIT_hostport=(${1//:/ }) + WAITFORIT_HOST=${WAITFORIT_hostport[0]} + WAITFORIT_PORT=${WAITFORIT_hostport[1]} shift 1 ;; --child) - CHILD=1 + WAITFORIT_CHILD=1 shift 1 ;; -q | --quiet) - QUIET=1 + WAITFORIT_QUIET=1 shift 1 ;; -s | --strict) - STRICT=1 + WAITFORIT_STRICT=1 shift 1 ;; -h) - HOST="$2" - if [[ $HOST == "" ]]; then break; fi + WAITFORIT_HOST="$2" + if [[ $WAITFORIT_HOST == "" ]]; then break; fi shift 2 ;; --host=*) - HOST="${1#*=}" + WAITFORIT_HOST="${1#*=}" shift 1 ;; -p) - PORT="$2" - if [[ $PORT == "" ]]; then break; fi + WAITFORIT_PORT="$2" + if [[ $WAITFORIT_PORT == "" ]]; then break; fi shift 2 ;; --port=*) - PORT="${1#*=}" + WAITFORIT_PORT="${1#*=}" shift 1 ;; -t) - TIMEOUT="$2" - if [[ $TIMEOUT == "" ]]; then break; fi + WAITFORIT_TIMEOUT="$2" + if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi shift 2 ;; --timeout=*) - TIMEOUT="${1#*=}" + WAITFORIT_TIMEOUT="${1#*=}" shift 1 ;; --) shift - CLI=("$@") + WAITFORIT_CLI=("$@") break ;; --help) @@ -131,47 +306,48 @@ do esac done -if [[ "$HOST" == "" || "$PORT" == "" ]]; then +if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then echoerr "Error: you need to provide a host and port to test." usage fi -TIMEOUT=${TIMEOUT:-15} -STRICT=${STRICT:-0} -CHILD=${CHILD:-0} -QUIET=${QUIET:-0} +WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15} +WAITFORIT_STRICT=${WAITFORIT_STRICT:-0} +WAITFORIT_CHILD=${WAITFORIT_CHILD:-0} +WAITFORIT_QUIET=${WAITFORIT_QUIET:-0} # check to see if timeout is from busybox? -# check to see if timeout is from busybox? -TIMEOUT_PATH=$(realpath $(which timeout)) -if [[ $TIMEOUT_PATH =~ "busybox" ]]; then - ISBUSY=1 - BUSYTIMEFLAG="-t" +WAITFORIT_TIMEOUT_PATH=$(type -p timeout) +WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH) +if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then + WAITFORIT_ISBUSY=1 + WAITFORIT_BUSYTIMEFLAG="-t" + else - ISBUSY=0 - BUSYTIMEFLAG="" + WAITFORIT_ISBUSY=0 + WAITFORIT_BUSYTIMEFLAG="" fi -if [[ $CHILD -gt 0 ]]; then +if [[ $WAITFORIT_CHILD -gt 0 ]]; then wait_for - RESULT=$? - exit $RESULT + WAITFORIT_RESULT=$? + exit $WAITFORIT_RESULT else - if [[ $TIMEOUT -gt 0 ]]; then + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then wait_for_wrapper - RESULT=$? + WAITFORIT_RESULT=$? else wait_for - RESULT=$? + WAITFORIT_RESULT=$? fi fi -if [[ $CLI != "" ]]; then - if [[ $RESULT -ne 0 && $STRICT -eq 1 ]]; then - echoerr "$cmdname: strict mode, refusing to execute subprocess" - exit $RESULT +if [[ $WAITFORIT_CLI != "" ]]; then + if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then + echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess" + exit $WAITFORIT_RESULT fi - exec "${CLI[@]}" + exec "${WAITFORIT_CLI[@]}" else - exit $RESULT -fi + exit $WAITFORIT_RESULT +fi \ No newline at end of file diff --git a/Src/Logger/Dockerfile b/Src/Logger/Dockerfile index 8b2bcff..84c6eac 100644 --- a/Src/Logger/Dockerfile +++ b/Src/Logger/Dockerfile @@ -3,13 +3,11 @@ COPY . ./build WORKDIR /build/ RUN dotnet restore RUN dotnet build --no-restore -c Release -o /app - FROM build AS publish RUN dotnet publish --no-restore -c Release -o /app - FROM microsoft/dotnet:2.1-runtime-alpine COPY --from=publish /app /var/app COPY entrypoint.sh /var/app -COPY wait-for-it.sh /var/app -RUN apk add bash -CMD ["bash", "/var/app/wait-for-it.sh", "rabbit.docker:5672", "--timeout=30","--", "/var/app/entrypoint.sh"] +#COPY wait-for-it.sh /var/app +#RUN apk add bash +#CMD ["bash","/var/app/wait-for-it.sh","localhost:5672","--timeout=30","--","/var/app/entrypoint.sh"] \ No newline at end of file diff --git a/Src/Logger/wait-for-it.sh b/Src/Logger/wait-for-it.sh index bbe4043..b64739f 100644 --- a/Src/Logger/wait-for-it.sh +++ b/Src/Logger/wait-for-it.sh @@ -1,15 +1,190 @@ #!/usr/bin/env bash -# Use this script to test if a given TCP host/port are available +##!/usr/bin/env bash +## Use this script to test if a given TCP host/port are available +# +#cmdname=$(basename $0) +# +#echoerr() { if [[ $QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } +# +##usage() +##{ +## cat << USAGE >&2 +##Usage: +## $cmdname host:port [-s] [-t timeout] [-- command args] +## -h HOST | --host=HOST Host or IP under test +## -p PORT | --port=PORT TCP port under test +## Alternatively, you specify the host and port as host:port +## -s | --strict Only execute subcommand if the test succeeds +## -q | --quiet Don't output any status messages +## -t TIMEOUT | --timeout=TIMEOUT +## Timeout in seconds, zero for no timeout +## -- COMMAND ARGS Execute command with args after the test finishes +##USAGE +## exit 1 +##} +# +#wait_for() +#{ +# if [[ $TIMEOUT -gt 0 ]]; then +# echoerr "$cmdname: waiting $TIMEOUT seconds for $HOST:$PORT" +# else +# echoerr "$cmdname: waiting for $HOST:$PORT without a timeout" +# fi +# start_ts=$(date +%s) +# while : +# do +# if [[ $ISBUSY -eq 1 ]]; then +# nc -z $HOST $PORT +# result=$? +# else +# (echo > /dev/tcp/$HOST/$PORT) >/dev/null 2>&1 +# result=$? +# fi +# if [[ $result -eq 0 ]]; then +# end_ts=$(date +%s) +# echoerr "$cmdname: $HOST:$PORT is available after $((end_ts - start_ts)) seconds" +# break +# fi +# sleep 1 +# done +# return $result +#} +# +#wait_for_wrapper() +#{ +# # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 +# if [[ $QUIET -eq 1 ]]; then +# timeout $BUSYTIMEFLAG $TIMEOUT $0 --quiet --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & +# else +# timeout $BUSYTIMEFLAG $TIMEOUT $0 --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & +# fi +# PID=$! +# trap "kill -INT -$PID" INT +# wait $PID +# RESULT=$? +# if [[ $RESULT -ne 0 ]]; then +# echoerr "$cmdname: timeout occurred after waiting $TIMEOUT seconds for $HOST:$PORT" +# fi +# return $RESULT +#} +# +## process arguments +#while [[ $# -gt 0 ]] +#do +# case "$1" in +# *:* ) +# hostport=(${1//:/ }) +# HOST=${hostport[0]} +# PORT=${hostport[1]} +# shift 1 +# ;; +# --child) +# CHILD=1 +# shift 1 +# ;; +# -q | --quiet) +# QUIET=1 +# shift 1 +# ;; +# -s | --strict) +# STRICT=1 +# shift 1 +# ;; +# -h) +# HOST="$2" +# if [[ $HOST == "" ]]; then break; fi +# shift 2 +# ;; +# --host=*) +# HOST="${1#*=}" +# shift 1 +# ;; +# -p) +# PORT="$2" +# if [[ $PORT == "" ]]; then break; fi +# shift 2 +# ;; +# --port=*) +# PORT="${1#*=}" +# shift 1 +# ;; +# -t) +# TIMEOUT="$2" +# if [[ $TIMEOUT == "" ]]; then break; fi +# shift 2 +# ;; +# --timeout=*) +# TIMEOUT="${1#*=}" +# shift 1 +# ;; +# --) +# shift +# CLI=("$@") +# break +# ;; +# --help) +# usage +# ;; +# *) +# echoerr "Unknown argument: $1" +# usage +# ;; +# esac +#done +# +#if [[ "$HOST" == "" || "$PORT" == "" ]]; then +# echoerr "Error: you need to provide a host and port to test." +# usage +#fi +# +#TIMEOUT=${TIMEOUT:-15} +#STRICT=${STRICT:-0} +#CHILD=${CHILD:-0} +#QUIET=${QUIET:-0} +# +## check to see if timeout is from busybox? +## check to see if timeout is from busybox? +#TIMEOUT_PATH=$(realpath $(which timeout)) +#if [[ $TIMEOUT_PATH =~ "busybox" ]]; then +# ISBUSY=1 +# BUSYTIMEFLAG="-t" +#else +# ISBUSY=0 +# BUSYTIMEFLAG="" +#fi +# +#if [[ $CHILD -gt 0 ]]; then +# wait_for +# RESULT=$? +# exit $RESULT +#else +# if [[ $TIMEOUT -gt 0 ]]; then +# wait_for_wrapper +# RESULT=$? +# else +# wait_for +# RESULT=$? +# fi +#fi +# +#if [[ $CLI != "" ]]; then +# if [[ $RESULT -ne 0 && $STRICT -eq 1 ]]; then +# echoerr "$cmdname: strict mode, refusing to execute subprocess" +# exit $RESULT +# fi +# exec "${CLI[@]}" +#else +# exit $RESULT +#fi +WAITFORIT_cmdname=${0##*/} -cmdname=$(basename $0) - -echoerr() { if [[ $QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } +echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } usage() { cat << USAGE >&2 Usage: - $cmdname host:port [-s] [-t timeout] [-- command args] + $WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args] -h HOST | --host=HOST Host or IP under test -p PORT | --port=PORT TCP port under test Alternatively, you specify the host and port as host:port @@ -24,47 +199,47 @@ USAGE wait_for() { - if [[ $TIMEOUT -gt 0 ]]; then - echoerr "$cmdname: waiting $TIMEOUT seconds for $HOST:$PORT" + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then + echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" else - echoerr "$cmdname: waiting for $HOST:$PORT without a timeout" + echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout" fi - start_ts=$(date +%s) + WAITFORIT_start_ts=$(date +%s) while : do - if [[ $ISBUSY -eq 1 ]]; then - nc -z $HOST $PORT - result=$? + if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then + nc -z $WAITFORIT_HOST $WAITFORIT_PORT + WAITFORIT_result=$? else - (echo > /dev/tcp/$HOST/$PORT) >/dev/null 2>&1 - result=$? + (echo > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1 + WAITFORIT_result=$? fi - if [[ $result -eq 0 ]]; then - end_ts=$(date +%s) - echoerr "$cmdname: $HOST:$PORT is available after $((end_ts - start_ts)) seconds" + if [[ $WAITFORIT_result -eq 0 ]]; then + WAITFORIT_end_ts=$(date +%s) + echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds" break fi sleep 1 done - return $result + return $WAITFORIT_result } wait_for_wrapper() { # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 - if [[ $QUIET -eq 1 ]]; then - timeout $BUSYTIMEFLAG $TIMEOUT $0 --quiet --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & + if [[ $WAITFORIT_QUIET -eq 1 ]]; then + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & else - timeout $BUSYTIMEFLAG $TIMEOUT $0 --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & fi - PID=$! - trap "kill -INT -$PID" INT - wait $PID - RESULT=$? - if [[ $RESULT -ne 0 ]]; then - echoerr "$cmdname: timeout occurred after waiting $TIMEOUT seconds for $HOST:$PORT" + WAITFORIT_PID=$! + trap "kill -INT -$WAITFORIT_PID" INT + wait $WAITFORIT_PID + WAITFORIT_RESULT=$? + if [[ $WAITFORIT_RESULT -ne 0 ]]; then + echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" fi - return $RESULT + return $WAITFORIT_RESULT } # process arguments @@ -72,53 +247,53 @@ while [[ $# -gt 0 ]] do case "$1" in *:* ) - hostport=(${1//:/ }) - HOST=${hostport[0]} - PORT=${hostport[1]} + WAITFORIT_hostport=(${1//:/ }) + WAITFORIT_HOST=${WAITFORIT_hostport[0]} + WAITFORIT_PORT=${WAITFORIT_hostport[1]} shift 1 ;; --child) - CHILD=1 + WAITFORIT_CHILD=1 shift 1 ;; -q | --quiet) - QUIET=1 + WAITFORIT_QUIET=1 shift 1 ;; -s | --strict) - STRICT=1 + WAITFORIT_STRICT=1 shift 1 ;; -h) - HOST="$2" - if [[ $HOST == "" ]]; then break; fi + WAITFORIT_HOST="$2" + if [[ $WAITFORIT_HOST == "" ]]; then break; fi shift 2 ;; --host=*) - HOST="${1#*=}" + WAITFORIT_HOST="${1#*=}" shift 1 ;; -p) - PORT="$2" - if [[ $PORT == "" ]]; then break; fi + WAITFORIT_PORT="$2" + if [[ $WAITFORIT_PORT == "" ]]; then break; fi shift 2 ;; --port=*) - PORT="${1#*=}" + WAITFORIT_PORT="${1#*=}" shift 1 ;; -t) - TIMEOUT="$2" - if [[ $TIMEOUT == "" ]]; then break; fi + WAITFORIT_TIMEOUT="$2" + if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi shift 2 ;; --timeout=*) - TIMEOUT="${1#*=}" + WAITFORIT_TIMEOUT="${1#*=}" shift 1 ;; --) shift - CLI=("$@") + WAITFORIT_CLI=("$@") break ;; --help) @@ -131,47 +306,48 @@ do esac done -if [[ "$HOST" == "" || "$PORT" == "" ]]; then +if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then echoerr "Error: you need to provide a host and port to test." usage fi -TIMEOUT=${TIMEOUT:-15} -STRICT=${STRICT:-0} -CHILD=${CHILD:-0} -QUIET=${QUIET:-0} +WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15} +WAITFORIT_STRICT=${WAITFORIT_STRICT:-0} +WAITFORIT_CHILD=${WAITFORIT_CHILD:-0} +WAITFORIT_QUIET=${WAITFORIT_QUIET:-0} # check to see if timeout is from busybox? -# check to see if timeout is from busybox? -TIMEOUT_PATH=$(realpath $(which timeout)) -if [[ $TIMEOUT_PATH =~ "busybox" ]]; then - ISBUSY=1 - BUSYTIMEFLAG="-t" +WAITFORIT_TIMEOUT_PATH=$(type -p timeout) +WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH) +if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then + WAITFORIT_ISBUSY=1 + WAITFORIT_BUSYTIMEFLAG="-t" + else - ISBUSY=0 - BUSYTIMEFLAG="" + WAITFORIT_ISBUSY=0 + WAITFORIT_BUSYTIMEFLAG="" fi -if [[ $CHILD -gt 0 ]]; then +if [[ $WAITFORIT_CHILD -gt 0 ]]; then wait_for - RESULT=$? - exit $RESULT + WAITFORIT_RESULT=$? + exit $WAITFORIT_RESULT else - if [[ $TIMEOUT -gt 0 ]]; then + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then wait_for_wrapper - RESULT=$? + WAITFORIT_RESULT=$? else wait_for - RESULT=$? + WAITFORIT_RESULT=$? fi fi -if [[ $CLI != "" ]]; then - if [[ $RESULT -ne 0 && $STRICT -eq 1 ]]; then - echoerr "$cmdname: strict mode, refusing to execute subprocess" - exit $RESULT +if [[ $WAITFORIT_CLI != "" ]]; then + if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then + echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess" + exit $WAITFORIT_RESULT fi - exec "${CLI[@]}" + exec "${WAITFORIT_CLI[@]}" else - exit $RESULT -fi + exit $WAITFORIT_RESULT +fi \ No newline at end of file diff --git a/Src/Refiner/Dockerfile b/Src/Refiner/Dockerfile index 8b2bcff..84c6eac 100644 --- a/Src/Refiner/Dockerfile +++ b/Src/Refiner/Dockerfile @@ -3,13 +3,11 @@ COPY . ./build WORKDIR /build/ RUN dotnet restore RUN dotnet build --no-restore -c Release -o /app - FROM build AS publish RUN dotnet publish --no-restore -c Release -o /app - FROM microsoft/dotnet:2.1-runtime-alpine COPY --from=publish /app /var/app COPY entrypoint.sh /var/app -COPY wait-for-it.sh /var/app -RUN apk add bash -CMD ["bash", "/var/app/wait-for-it.sh", "rabbit.docker:5672", "--timeout=30","--", "/var/app/entrypoint.sh"] +#COPY wait-for-it.sh /var/app +#RUN apk add bash +#CMD ["bash","/var/app/wait-for-it.sh","localhost:5672","--timeout=30","--","/var/app/entrypoint.sh"] \ No newline at end of file diff --git a/Src/Refiner/Program.cs b/Src/Refiner/Program.cs index 1a31236..521e446 100644 --- a/Src/Refiner/Program.cs +++ b/Src/Refiner/Program.cs @@ -17,16 +17,10 @@ namespace Refiner { class Program { - private static IConnection conn; - - private static IModel channel; private static readonly AutoResetEvent WaitHandle = new AutoResetEvent(false); static void Main(string[] args) { - AssemblyLoadContext.Default.Unloading += _ => Exit(); - Console.CancelKeyPress += (_, __) => Exit(); - Log.Logger = new LoggerConfiguration() .Enrich.WithProperty("name", typeof(Program).Assembly.GetName().Name) .WriteTo.Console() @@ -34,65 +28,13 @@ static void Main(string[] args) Log.Information("Starting..."); - RetrieveMessageFromQueue("mono.data.received"); + RefinerService refiner = new RefinerService(Log.Logger); + + refiner.RetrieveMessageFromQueue("mono.data.received"); Log.Information("Started"); WaitHandle.WaitOne(); } - - public static void RetrieveMessageFromQueue (string queue) - { - var factory = new ConnectionFactory(); - using (IConnection conn = factory.CreateConnection()) - { - using (IModel channel = conn.CreateModel()) - { - var exchange = "mono.data.received"; - - channel.QueueDeclare(queue, true, false, false, null); - - channel.ExchangeDeclare(exchange: exchange, type: "fanout"); - - channel.QueueBind(queue, exchange, ""); - - var consumer = new EventingBasicConsumer(channel); - - consumer.Received += (model, ea) => - { - var body = ea.Body; - var message = Encoding.UTF8.GetString(body); - var key = ea.RoutingKey; - Console.WriteLine($" [x] Received '{key}':'{message}'"); - var envelope = JsonConvert.DeserializeObject>(message); - - var processor = new DataProcessor(envelope); - var cleanData = processor.ParseEnvelope(); - - ForwardMessageToQueue(channel, "mono.data.refined", cleanData); - - Console.WriteLine($" [x] Sent 'mono.data.refined':'{cleanData}'"); - }; - - channel.BasicConsume(queue: queue, - autoAck: true, - consumer: consumer); - } - } - } - - public static void ForwardMessageToQueue (IModel channel, string queue, JArray cleanData) - { - var message = JsonConvert.SerializeObject(cleanData); - byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(message); - channel.BasicPublish(queue, null, null, messageBodyBytes); - } - - private static void Exit() - { - Log.Information("Exiting..."); - channel.Close(); - conn.Close(); - } } } diff --git a/Src/Refiner/RefinerService.cs b/Src/Refiner/RefinerService.cs new file mode 100644 index 0000000..c268591 --- /dev/null +++ b/Src/Refiner/RefinerService.cs @@ -0,0 +1,85 @@ +using System; +using System.Text; +using EasyNetQ.MultipleExchange; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Serilog; +using Shared; + +namespace Refiner +{ + public class RefinerService + { + private ILogger _logger; + private static IConnection _conn; + private static IModel _channel; + private const string Exchange = "mono.data.received "; + + public RefinerService(ILogger logger) + { + _logger = logger; + IConnectionFactory factory = new ConnectionFactory() {HostName = "localhost", UserName = "guest", Password = "guest"}; + _conn = factory.CreateConnection(); + _channel = _conn.CreateModel(); + } + + public void RetrieveMessageFromQueue (string queue) + { + using (_conn) + { + using (_channel) + { + _channel.QueueDeclare(queue, true, false, false, null); + + _channel.ExchangeDeclare(Exchange, "fanout"); + + _channel.QueueBind(queue, Exchange, ""); + + var consumer = new EventingBasicConsumer(_channel); + + consumer.Received += (model, ea) => + { + var body = ea.Body; + var message = Encoding.UTF8.GetString(body); + var key = ea.RoutingKey; + Console.WriteLine($" [x] Received '{key}':'{message}'"); + var envelope = JsonConvert.DeserializeObject>(message); + + var processor = new DataProcessor(envelope); + var cleanData = processor.ParseEnvelope(); + + ForwardMessageToQueue("mono.data.refined", cleanData); + + Console.WriteLine($" [x] Sent 'mono.data.refined':'{cleanData}'"); + }; + + _channel.BasicConsume(queue, true, consumer); + } + } + } + + public void ForwardMessageToQueue (string queue, JToken cleanData) + { + var message = JsonConvert.SerializeObject(cleanData); + byte[] messageBodyBytes = Encoding.UTF8.GetBytes(message); + + using (_conn) + { + using (_channel) + { + _channel.QueueDeclare(queue, true, false, false, null); + + _channel.ExchangeDeclare(Exchange, "fanout"); + + _channel.QueueBind(queue, Exchange, ""); + + _channel.BasicPublish(queue, "", _channel.CreateBasicProperties(), messageBodyBytes); + Console.WriteLine(" [x] Sent '{0}':'{1}'", Exchange, message); + } + } + + } + } +} \ No newline at end of file diff --git a/Src/Refiner/wait-for-it.sh b/Src/Refiner/wait-for-it.sh deleted file mode 100644 index bbe4043..0000000 --- a/Src/Refiner/wait-for-it.sh +++ /dev/null @@ -1,177 +0,0 @@ -#!/usr/bin/env bash -# Use this script to test if a given TCP host/port are available - -cmdname=$(basename $0) - -echoerr() { if [[ $QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } - -usage() -{ - cat << USAGE >&2 -Usage: - $cmdname host:port [-s] [-t timeout] [-- command args] - -h HOST | --host=HOST Host or IP under test - -p PORT | --port=PORT TCP port under test - Alternatively, you specify the host and port as host:port - -s | --strict Only execute subcommand if the test succeeds - -q | --quiet Don't output any status messages - -t TIMEOUT | --timeout=TIMEOUT - Timeout in seconds, zero for no timeout - -- COMMAND ARGS Execute command with args after the test finishes -USAGE - exit 1 -} - -wait_for() -{ - if [[ $TIMEOUT -gt 0 ]]; then - echoerr "$cmdname: waiting $TIMEOUT seconds for $HOST:$PORT" - else - echoerr "$cmdname: waiting for $HOST:$PORT without a timeout" - fi - start_ts=$(date +%s) - while : - do - if [[ $ISBUSY -eq 1 ]]; then - nc -z $HOST $PORT - result=$? - else - (echo > /dev/tcp/$HOST/$PORT) >/dev/null 2>&1 - result=$? - fi - if [[ $result -eq 0 ]]; then - end_ts=$(date +%s) - echoerr "$cmdname: $HOST:$PORT is available after $((end_ts - start_ts)) seconds" - break - fi - sleep 1 - done - return $result -} - -wait_for_wrapper() -{ - # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 - if [[ $QUIET -eq 1 ]]; then - timeout $BUSYTIMEFLAG $TIMEOUT $0 --quiet --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & - else - timeout $BUSYTIMEFLAG $TIMEOUT $0 --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & - fi - PID=$! - trap "kill -INT -$PID" INT - wait $PID - RESULT=$? - if [[ $RESULT -ne 0 ]]; then - echoerr "$cmdname: timeout occurred after waiting $TIMEOUT seconds for $HOST:$PORT" - fi - return $RESULT -} - -# process arguments -while [[ $# -gt 0 ]] -do - case "$1" in - *:* ) - hostport=(${1//:/ }) - HOST=${hostport[0]} - PORT=${hostport[1]} - shift 1 - ;; - --child) - CHILD=1 - shift 1 - ;; - -q | --quiet) - QUIET=1 - shift 1 - ;; - -s | --strict) - STRICT=1 - shift 1 - ;; - -h) - HOST="$2" - if [[ $HOST == "" ]]; then break; fi - shift 2 - ;; - --host=*) - HOST="${1#*=}" - shift 1 - ;; - -p) - PORT="$2" - if [[ $PORT == "" ]]; then break; fi - shift 2 - ;; - --port=*) - PORT="${1#*=}" - shift 1 - ;; - -t) - TIMEOUT="$2" - if [[ $TIMEOUT == "" ]]; then break; fi - shift 2 - ;; - --timeout=*) - TIMEOUT="${1#*=}" - shift 1 - ;; - --) - shift - CLI=("$@") - break - ;; - --help) - usage - ;; - *) - echoerr "Unknown argument: $1" - usage - ;; - esac -done - -if [[ "$HOST" == "" || "$PORT" == "" ]]; then - echoerr "Error: you need to provide a host and port to test." - usage -fi - -TIMEOUT=${TIMEOUT:-15} -STRICT=${STRICT:-0} -CHILD=${CHILD:-0} -QUIET=${QUIET:-0} - -# check to see if timeout is from busybox? -# check to see if timeout is from busybox? -TIMEOUT_PATH=$(realpath $(which timeout)) -if [[ $TIMEOUT_PATH =~ "busybox" ]]; then - ISBUSY=1 - BUSYTIMEFLAG="-t" -else - ISBUSY=0 - BUSYTIMEFLAG="" -fi - -if [[ $CHILD -gt 0 ]]; then - wait_for - RESULT=$? - exit $RESULT -else - if [[ $TIMEOUT -gt 0 ]]; then - wait_for_wrapper - RESULT=$? - else - wait_for - RESULT=$? - fi -fi - -if [[ $CLI != "" ]]; then - if [[ $RESULT -ne 0 && $STRICT -eq 1 ]]; then - echoerr "$cmdname: strict mode, refusing to execute subprocess" - exit $RESULT - fi - exec "${CLI[@]}" -else - exit $RESULT -fi diff --git a/Test/IngestionServiceTest.cs b/Test/IngestionServiceTest.cs index 4953011..fba3cf0 100644 --- a/Test/IngestionServiceTest.cs +++ b/Test/IngestionServiceTest.cs @@ -37,7 +37,7 @@ public void CanIngestFromEndpoint() using (IModel channel = conn.CreateModel()) { BasicGetResult result = channel.BasicGet("mono.data.received", false); - Assert.IsNotNull(result); + Assert.IsNotNull(result.Body); } } } diff --git a/Test/RefinerServiceTest.cs b/Test/RefinerServiceTest.cs index 2c959ea..a3c402c 100644 --- a/Test/RefinerServiceTest.cs +++ b/Test/RefinerServiceTest.cs @@ -1,20 +1,33 @@ using NUnit.Framework; -using Ingestion; +using Refiner; using System.Collections.Generic; +using RabbitMQ.Client; using Serilog; namespace Tests { public class RefinerServiceTest { + private RefinerService refiner; + [SetUp] public void Setup() { + refiner = new RefinerService(new LoggerConfiguration().WriteTo.Console().CreateLogger()); } [Test] public void TestCanPullMessagesFromQueue() { + refiner.RetrieveMessageFromQueue("mono.data.received"); + using (IConnection conn = new ConnectionFactory() {HostName = "localhost", UserName = "guest", Password = "guest"}.CreateConnection()) + { + using (IModel channel = conn.CreateModel()) + { + BasicGetResult result = channel.BasicGet("mono.data.refined", false); + Assert.IsNotNull(result.Body); + } + } } [Test] diff --git a/Test/Tests.csproj b/Test/Tests.csproj index be36974..d9a666e 100644 --- a/Test/Tests.csproj +++ b/Test/Tests.csproj @@ -10,5 +10,6 @@ + \ No newline at end of file From dff49e87a8aa45ae2dd3af83db2c171c2d0836ea Mon Sep 17 00:00:00 2001 From: alex855k Date: Thu, 13 Dec 2018 15:57:06 +0100 Subject: [PATCH 16/19] (Ingestion-service/GraBID#12) Refactor refiner changes out of this branch. --- Src/Refiner/DataProcessor.cs | 62 ------------ Src/Refiner/Dockerfile | 8 +- Src/Refiner/Program.cs | 69 +++++++++++-- Src/Refiner/Refiner.csproj | 7 +- Src/Refiner/RefinerService.cs | 85 ---------------- Src/Refiner/wait-for-it.sh | 177 ++++++++++++++++++++++++++++++++++ 6 files changed, 251 insertions(+), 157 deletions(-) delete mode 100644 Src/Refiner/DataProcessor.cs delete mode 100644 Src/Refiner/RefinerService.cs create mode 100644 Src/Refiner/wait-for-it.sh diff --git a/Src/Refiner/DataProcessor.cs b/Src/Refiner/DataProcessor.cs deleted file mode 100644 index 1d7017c..0000000 --- a/Src/Refiner/DataProcessor.cs +++ /dev/null @@ -1,62 +0,0 @@ -using Newtonsoft.Json; -using System; -using Newtonsoft.Json.Linq; -using Shared; - -namespace Refiner -{ - public class DataProcessor - { - private readonly Envelope _data; - - public DataProcessor(Envelope data) - { - _data = data; - } - - public JArray ParseEnvelope () - { - dynamic cleanData = new JArray(); - dynamic envelopeData = JToken.Parse(_data.Payload); - - foreach (var obj in envelopeData) - { - dynamic entry = new JObject(); - - entry.Reseller = new JArray(); - entry.User = new JArray(); - entry.Subscription = new JArray(); - entry.Bill = new JArray(); - - entry.Reseller.Id = obj.resellerId; - entry.Reseller.Name = obj.resellerName; - entry.Reseller.Discount = obj.discount; - - entry.User.AccountId = obj.accountId; - - entry.Subscription.Name = obj.subscriptionName; - entry.Subscription.Type = obj.type; - entry.Subscription.State = obj.state; - entry.Subscription.Price = obj.price; - entry.Subscription.OnExpiry = obj.onExpiry; - entry.Subscription.StartDate = obj.startDate; - entry.Subscription.ExpiryDate = obj.expiryDate; - - entry.Bill.BillingDate = obj.billingDate; - entry.DiscountedPrice = obj.discountedPrice; - entry.CurrencyName = obj.currencyName; - entry.CurrencyISO = obj.code; - - cleanData.Add(entry); - } - - return cleanData; - } - - public void StoreCleanData(JArray data) - { - //add to mongo - } - - } -} \ No newline at end of file diff --git a/Src/Refiner/Dockerfile b/Src/Refiner/Dockerfile index 84c6eac..8b2bcff 100644 --- a/Src/Refiner/Dockerfile +++ b/Src/Refiner/Dockerfile @@ -3,11 +3,13 @@ COPY . ./build WORKDIR /build/ RUN dotnet restore RUN dotnet build --no-restore -c Release -o /app + FROM build AS publish RUN dotnet publish --no-restore -c Release -o /app + FROM microsoft/dotnet:2.1-runtime-alpine COPY --from=publish /app /var/app COPY entrypoint.sh /var/app -#COPY wait-for-it.sh /var/app -#RUN apk add bash -#CMD ["bash","/var/app/wait-for-it.sh","localhost:5672","--timeout=30","--","/var/app/entrypoint.sh"] \ No newline at end of file +COPY wait-for-it.sh /var/app +RUN apk add bash +CMD ["bash", "/var/app/wait-for-it.sh", "rabbit.docker:5672", "--timeout=30","--", "/var/app/entrypoint.sh"] diff --git a/Src/Refiner/Program.cs b/Src/Refiner/Program.cs index 521e446..dd35d4f 100644 --- a/Src/Refiner/Program.cs +++ b/Src/Refiner/Program.cs @@ -11,30 +11,87 @@ using System.Runtime.Loader; using System.Text; using System.Threading; -using System.Threading.Tasks; namespace Refiner { class Program { + private static IConnection conn; + + private static IModel channel; private static readonly AutoResetEvent WaitHandle = new AutoResetEvent(false); static void Main(string[] args) { + AssemblyLoadContext.Default.Unloading += _ => Exit(); + Console.CancelKeyPress += (_, __) => Exit(); + Log.Logger = new LoggerConfiguration() .Enrich.WithProperty("name", typeof(Program).Assembly.GetName().Name) .WriteTo.Console() .CreateLogger(); Log.Information("Starting..."); - - RefinerService refiner = new RefinerService(Log.Logger); - - refiner.RetrieveMessageFromQueue("mono.data.received"); - + + var factory = new ConnectionFactory() { HostName = "rabbit.docker" }; + conn = factory.CreateConnection(); + channel = conn.CreateModel(); + + var exchange = "grabid_exchange"; + var routingKey = "mono.data.received"; + + channel.ExchangeDeclare(exchange: exchange, type: "topic"); + + var queueName = channel.QueueDeclare().QueueName; + + channel.QueueBind(queue: queueName, + exchange: exchange, + routingKey: routingKey); + + + var consumer = new EventingBasicConsumer(channel); + + consumer.Received += (model, ea) => + { + var body = ea.Body; + var message = Encoding.UTF8.GetString(body); + var key = ea.RoutingKey; + Console.WriteLine($" [x] Received '{key}':'{message}'"); + var envelope = JsonConvert.DeserializeObject>(message); + + dynamic json = JObject.Parse(envelope.Payload); + string messageString = json.message; + string userString = json.users; + string[] userArray = userString.Split(","); + string[] messages = userArray.Select(user => { return $"{messageString} {user}"; }).ToArray(); + var returnEnvelope = new Envelope(envelope.Id, messages); + string newMessage = JsonConvert.SerializeObject(returnEnvelope); + byte[] messageBodyBytes = Encoding.UTF8.GetBytes(newMessage); + + channel.BasicPublish(exchange, "mono.data.refined", null, messageBodyBytes); + Console.WriteLine($" [x] Sent 'mono.data.refined':'{newMessage}'"); + + }; + + channel.BasicConsume(queue: queueName, + autoAck: true, + consumer: consumer); + Log.Information("Started"); WaitHandle.WaitOne(); } + + private static void Exit() + { + Log.Information("Exiting..."); + channel.Close(); + conn.Close(); + + + } + + } + } diff --git a/Src/Refiner/Refiner.csproj b/Src/Refiner/Refiner.csproj index 9655860..ff423f2 100644 --- a/Src/Refiner/Refiner.csproj +++ b/Src/Refiner/Refiner.csproj @@ -1,18 +1,23 @@  + Exe netcoreapp2.1 + + + - \ No newline at end of file + + diff --git a/Src/Refiner/RefinerService.cs b/Src/Refiner/RefinerService.cs deleted file mode 100644 index c268591..0000000 --- a/Src/Refiner/RefinerService.cs +++ /dev/null @@ -1,85 +0,0 @@ -using System; -using System.Text; -using EasyNetQ.MultipleExchange; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; -using RabbitMQ.Client; -using RabbitMQ.Client.Events; -using Serilog; -using Shared; - -namespace Refiner -{ - public class RefinerService - { - private ILogger _logger; - private static IConnection _conn; - private static IModel _channel; - private const string Exchange = "mono.data.received "; - - public RefinerService(ILogger logger) - { - _logger = logger; - IConnectionFactory factory = new ConnectionFactory() {HostName = "localhost", UserName = "guest", Password = "guest"}; - _conn = factory.CreateConnection(); - _channel = _conn.CreateModel(); - } - - public void RetrieveMessageFromQueue (string queue) - { - using (_conn) - { - using (_channel) - { - _channel.QueueDeclare(queue, true, false, false, null); - - _channel.ExchangeDeclare(Exchange, "fanout"); - - _channel.QueueBind(queue, Exchange, ""); - - var consumer = new EventingBasicConsumer(_channel); - - consumer.Received += (model, ea) => - { - var body = ea.Body; - var message = Encoding.UTF8.GetString(body); - var key = ea.RoutingKey; - Console.WriteLine($" [x] Received '{key}':'{message}'"); - var envelope = JsonConvert.DeserializeObject>(message); - - var processor = new DataProcessor(envelope); - var cleanData = processor.ParseEnvelope(); - - ForwardMessageToQueue("mono.data.refined", cleanData); - - Console.WriteLine($" [x] Sent 'mono.data.refined':'{cleanData}'"); - }; - - _channel.BasicConsume(queue, true, consumer); - } - } - } - - public void ForwardMessageToQueue (string queue, JToken cleanData) - { - var message = JsonConvert.SerializeObject(cleanData); - byte[] messageBodyBytes = Encoding.UTF8.GetBytes(message); - - using (_conn) - { - using (_channel) - { - _channel.QueueDeclare(queue, true, false, false, null); - - _channel.ExchangeDeclare(Exchange, "fanout"); - - _channel.QueueBind(queue, Exchange, ""); - - _channel.BasicPublish(queue, "", _channel.CreateBasicProperties(), messageBodyBytes); - Console.WriteLine(" [x] Sent '{0}':'{1}'", Exchange, message); - } - } - - } - } -} \ No newline at end of file diff --git a/Src/Refiner/wait-for-it.sh b/Src/Refiner/wait-for-it.sh new file mode 100644 index 0000000..bbe4043 --- /dev/null +++ b/Src/Refiner/wait-for-it.sh @@ -0,0 +1,177 @@ +#!/usr/bin/env bash +# Use this script to test if a given TCP host/port are available + +cmdname=$(basename $0) + +echoerr() { if [[ $QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } + +usage() +{ + cat << USAGE >&2 +Usage: + $cmdname host:port [-s] [-t timeout] [-- command args] + -h HOST | --host=HOST Host or IP under test + -p PORT | --port=PORT TCP port under test + Alternatively, you specify the host and port as host:port + -s | --strict Only execute subcommand if the test succeeds + -q | --quiet Don't output any status messages + -t TIMEOUT | --timeout=TIMEOUT + Timeout in seconds, zero for no timeout + -- COMMAND ARGS Execute command with args after the test finishes +USAGE + exit 1 +} + +wait_for() +{ + if [[ $TIMEOUT -gt 0 ]]; then + echoerr "$cmdname: waiting $TIMEOUT seconds for $HOST:$PORT" + else + echoerr "$cmdname: waiting for $HOST:$PORT without a timeout" + fi + start_ts=$(date +%s) + while : + do + if [[ $ISBUSY -eq 1 ]]; then + nc -z $HOST $PORT + result=$? + else + (echo > /dev/tcp/$HOST/$PORT) >/dev/null 2>&1 + result=$? + fi + if [[ $result -eq 0 ]]; then + end_ts=$(date +%s) + echoerr "$cmdname: $HOST:$PORT is available after $((end_ts - start_ts)) seconds" + break + fi + sleep 1 + done + return $result +} + +wait_for_wrapper() +{ + # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 + if [[ $QUIET -eq 1 ]]; then + timeout $BUSYTIMEFLAG $TIMEOUT $0 --quiet --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & + else + timeout $BUSYTIMEFLAG $TIMEOUT $0 --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & + fi + PID=$! + trap "kill -INT -$PID" INT + wait $PID + RESULT=$? + if [[ $RESULT -ne 0 ]]; then + echoerr "$cmdname: timeout occurred after waiting $TIMEOUT seconds for $HOST:$PORT" + fi + return $RESULT +} + +# process arguments +while [[ $# -gt 0 ]] +do + case "$1" in + *:* ) + hostport=(${1//:/ }) + HOST=${hostport[0]} + PORT=${hostport[1]} + shift 1 + ;; + --child) + CHILD=1 + shift 1 + ;; + -q | --quiet) + QUIET=1 + shift 1 + ;; + -s | --strict) + STRICT=1 + shift 1 + ;; + -h) + HOST="$2" + if [[ $HOST == "" ]]; then break; fi + shift 2 + ;; + --host=*) + HOST="${1#*=}" + shift 1 + ;; + -p) + PORT="$2" + if [[ $PORT == "" ]]; then break; fi + shift 2 + ;; + --port=*) + PORT="${1#*=}" + shift 1 + ;; + -t) + TIMEOUT="$2" + if [[ $TIMEOUT == "" ]]; then break; fi + shift 2 + ;; + --timeout=*) + TIMEOUT="${1#*=}" + shift 1 + ;; + --) + shift + CLI=("$@") + break + ;; + --help) + usage + ;; + *) + echoerr "Unknown argument: $1" + usage + ;; + esac +done + +if [[ "$HOST" == "" || "$PORT" == "" ]]; then + echoerr "Error: you need to provide a host and port to test." + usage +fi + +TIMEOUT=${TIMEOUT:-15} +STRICT=${STRICT:-0} +CHILD=${CHILD:-0} +QUIET=${QUIET:-0} + +# check to see if timeout is from busybox? +# check to see if timeout is from busybox? +TIMEOUT_PATH=$(realpath $(which timeout)) +if [[ $TIMEOUT_PATH =~ "busybox" ]]; then + ISBUSY=1 + BUSYTIMEFLAG="-t" +else + ISBUSY=0 + BUSYTIMEFLAG="" +fi + +if [[ $CHILD -gt 0 ]]; then + wait_for + RESULT=$? + exit $RESULT +else + if [[ $TIMEOUT -gt 0 ]]; then + wait_for_wrapper + RESULT=$? + else + wait_for + RESULT=$? + fi +fi + +if [[ $CLI != "" ]]; then + if [[ $RESULT -ne 0 && $STRICT -eq 1 ]]; then + echoerr "$cmdname: strict mode, refusing to execute subprocess" + exit $RESULT + fi + exec "${CLI[@]}" +else + exit $RESULT +fi From 9cb96a659c383e3decabd51dd0de2bd96d8ac00a Mon Sep 17 00:00:00 2001 From: alex855k Date: Thu, 13 Dec 2018 18:28:18 +0100 Subject: [PATCH 17/19] (Ingestion-service/GraBID#12) Remove unnessecary using statement. --- Src/Refiner/Program.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/Src/Refiner/Program.cs b/Src/Refiner/Program.cs index dd35d4f..0547580 100644 --- a/Src/Refiner/Program.cs +++ b/Src/Refiner/Program.cs @@ -6,7 +6,6 @@ using Serilog; using Shared; using System; -using System.Collections.Generic; using System.Linq; using System.Runtime.Loader; using System.Text; From b320412108db96441bb27a4cadd28328c3980d7d Mon Sep 17 00:00:00 2001 From: alex855k Date: Thu, 13 Dec 2018 18:33:20 +0100 Subject: [PATCH 18/19] (Ingestion-service/GraBID#12) Refactor out all unnessecary tests. --- Test/RefinerServiceTest.cs | 43 ------------------- Tests/Ingestion.Tests/Ingestion.Tests.csproj | 11 ----- Tests/Ingestion.Tests/TestIngestionService.cs | 17 -------- 3 files changed, 71 deletions(-) delete mode 100644 Test/RefinerServiceTest.cs delete mode 100644 Tests/Ingestion.Tests/Ingestion.Tests.csproj delete mode 100644 Tests/Ingestion.Tests/TestIngestionService.cs diff --git a/Test/RefinerServiceTest.cs b/Test/RefinerServiceTest.cs deleted file mode 100644 index a3c402c..0000000 --- a/Test/RefinerServiceTest.cs +++ /dev/null @@ -1,43 +0,0 @@ -using NUnit.Framework; -using Refiner; -using System.Collections.Generic; -using RabbitMQ.Client; -using Serilog; - -namespace Tests -{ - public class RefinerServiceTest - { - private RefinerService refiner; - - [SetUp] - public void Setup() - { - refiner = new RefinerService(new LoggerConfiguration().WriteTo.Console().CreateLogger()); - } - - [Test] - public void TestCanPullMessagesFromQueue() - { - refiner.RetrieveMessageFromQueue("mono.data.received"); - using (IConnection conn = new ConnectionFactory() {HostName = "localhost", UserName = "guest", Password = "guest"}.CreateConnection()) - { - using (IModel channel = conn.CreateModel()) - { - BasicGetResult result = channel.BasicGet("mono.data.refined", false); - Assert.IsNotNull(result.Body); - } - } - } - - [Test] - public void TestCanPullMessagesFromDifferentQueues() - { - } - - [Test] - public void TestCanStoreMessagePayloadInMongoDB() - { - } - } -} \ No newline at end of file diff --git a/Tests/Ingestion.Tests/Ingestion.Tests.csproj b/Tests/Ingestion.Tests/Ingestion.Tests.csproj deleted file mode 100644 index 1acf710..0000000 --- a/Tests/Ingestion.Tests/Ingestion.Tests.csproj +++ /dev/null @@ -1,11 +0,0 @@ - - - - netstandard2.0 - - - - - - - diff --git a/Tests/Ingestion.Tests/TestIngestionService.cs b/Tests/Ingestion.Tests/TestIngestionService.cs deleted file mode 100644 index 0df5eb8..0000000 --- a/Tests/Ingestion.Tests/TestIngestionService.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System; -using NUnit; -using NUnit.Framework; -using Ingestion; - -namespace Ingestion.Tests -{ - [TestFixture] - public class TestIngestionService - { - [Test] - public void LoadJsonFromFiles() - { - IngestionService service = new IngestionService(); - } - } -} From 95729f88ea5209c5ed72b1e6c60e71b07dc8042e Mon Sep 17 00:00:00 2001 From: alex855k Date: Sat, 15 Dec 2018 14:43:18 +0100 Subject: [PATCH 19/19] (Ingestion-service-CSProjectsEAL/GraBID#12) Move MongoDB docker-compose.yml to appropiate branch. --- mongodb/docker-compose.yml | 43 -------------------------------------- 1 file changed, 43 deletions(-) delete mode 100644 mongodb/docker-compose.yml diff --git a/mongodb/docker-compose.yml b/mongodb/docker-compose.yml deleted file mode 100644 index 306e025..0000000 --- a/mongodb/docker-compose.yml +++ /dev/null @@ -1,43 +0,0 @@ -version: "3" - -networks: - application-network-db: - driver: bridge - -services: - mongodb-primary: - image: 'bitnami/mongodb:latest' - networks: - - application-network-db - environment: - - MONGODB_REPLICA_SET_MODE=primary - volumes: - - 'mongodb_master_data:/bitnami' - ports: - - "27017:27017" - - mongodb-secondary: - image: 'bitnami/mongodb:latest' - networks: - - application-network-db - depends_on: - - mongodb-primary - environment: - - MONGODB_REPLICA_SET_MODE=secondary - - MONGODB_PRIMARY_HOST=mongodb-primary - - MONGODB_PRIMARY_PORT_NUMBER=27017 - - mongodb-arbiter: - image: 'bitnami/mongodb:latest' - networks: - - application-network-db - depends_on: - - mongodb-primary - environment: - - MONGODB_REPLICA_SET_MODE=arbiter - - MONGODB_PRIMARY_HOST=mongodb-primary - - MONGODB_PRIMARY_PORT_NUMBER=27017 - -volumes: - mongodb_master_data: - driver: local \ No newline at end of file