Skip to content

Ingestion service that fetches data from endpoint at regular intervals CSProjectsEAL/GraBID#12 #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
0790103
(#12) Add repository class IngestionService that can load configurati…
alex855k Nov 23, 2018
9b0facf
(#12) Add method load configurations for sources, from json files
alex855k Nov 24, 2018
2b626f1
(#12) Add capability to input argument for the particular type of ing…
alex855k Nov 27, 2018
13d4bf9
(#12) Add tests for changes
alex855k Nov 27, 2018
6b76f2b
(#11) Add mongodb setup to docker-compose
alex855k Nov 27, 2018
a8d62c3
(#12) Fix method to load IngestionSources from file
alex855k Nov 27, 2018
ccae22c
(#12) Changed id as an int to name as a unique id
alex855k Nov 27, 2018
f502da0
(#12) Add id argument to crontab job
alex855k Nov 27, 2018
e07ec79
(#12) Changes mainly about tests.
alex855k Nov 27, 2018
65e87f0
(Ingestion-service-CSProjectsEAL/GraBID#12) Changed name
alex855k Nov 27, 2018
919aaf4
(Ingestion-service-CSProjectsEAL/GraBID#12) Testing git credentials :P
alex855k Nov 27, 2018
67da1a2
(Ingestion-service-CSProjectsEAL/GraBID#12) Testing credentials again :(
alex855k Nov 27, 2018
b8c64fc
(Ingestion-service-CSProjectsEAL/GraBID#12) Testing credentials
alex855k Nov 27, 2018
4f305a6
wip ingestion and refiner
Dec 6, 2018
c2c6057
WIP-09/12
Dec 9, 2018
dff49e8
(Ingestion-service/GraBID#12) Refactor refiner changes out of this br…
alex855k Dec 13, 2018
9cb96a6
(Ingestion-service/GraBID#12) Remove unnessecary using statement.
alex855k Dec 13, 2018
b320412
(Ingestion-service/GraBID#12) Refactor out all unnessecary tests.
alex855k Dec 13, 2018
95729f8
(Ingestion-service-CSProjectsEAL/GraBID#12) Move MongoDB docker-compo…
alex855k Dec 15, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions GraBID.sln
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@ VisualStudioVersion = 15.0.28306.52
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Src", "Src", "{47FA0BFF-7C06-4F41-A91E-3119CE207B70}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{6FE2E050-C1DF-4E58-8689-8D90C45931C3}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docker", "Docker", "{D695FB5C-0D3D-468A-95F0-43ADC918BE06}"
ProjectSection(SolutionItems) = preProject
docker-compose.yml = docker-compose.yml
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Ingestion", "Src\Ingestion\Ingestion.csproj", "{2E78E0E5-DD54-457C-92F3-DB6D19EFA6DA}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Ingestion.Tests", "Tests\Ingestion.Tests\Ingestion.Tests.csproj", "{A8569975-D120-4FD5-841B-2B0568740500}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Refiner", "src\Refiner\Refiner.csproj", "{13410153-13DB-41D4-A08A-CE507B963559}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Logger", "src\Logger\Logger.csproj", "{1C67535A-5DB2-4B2E-B1F6-56B864229760}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Logger", "src\Logger\Logger.csproj", "{1C67535A-5DB2-4B2E-B1F6-56B864229760}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tests", "Test\Tests.csproj", "{2CC3D7F9-F025-4F2B-A74A-B40CE45019DE}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand All @@ -30,25 +28,21 @@ Global
{2E78E0E5-DD54-457C-92F3-DB6D19EFA6DA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2E78E0E5-DD54-457C-92F3-DB6D19EFA6DA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2E78E0E5-DD54-457C-92F3-DB6D19EFA6DA}.Release|Any CPU.Build.0 = Release|Any CPU
{A8569975-D120-4FD5-841B-2B0568740500}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A8569975-D120-4FD5-841B-2B0568740500}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A8569975-D120-4FD5-841B-2B0568740500}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A8569975-D120-4FD5-841B-2B0568740500}.Release|Any CPU.Build.0 = Release|Any CPU
{13410153-13DB-41D4-A08A-CE507B963559}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{13410153-13DB-41D4-A08A-CE507B963559}.Debug|Any CPU.Build.0 = Debug|Any CPU
{13410153-13DB-41D4-A08A-CE507B963559}.Release|Any CPU.ActiveCfg = Release|Any CPU
{13410153-13DB-41D4-A08A-CE507B963559}.Release|Any CPU.Build.0 = Release|Any CPU
{1C67535A-5DB2-4B2E-B1F6-56B864229760}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1C67535A-5DB2-4B2E-B1F6-56B864229760}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1C67535A-5DB2-4B2E-B1F6-56B864229760}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1C67535A-5DB2-4B2E-B1F6-56B864229760}.Release|Any CPU.Build.0 = Release|Any CPU
{2CC3D7F9-F025-4F2B-A74A-B40CE45019DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2CC3D7F9-F025-4F2B-A74A-B40CE45019DE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2CC3D7F9-F025-4F2B-A74A-B40CE45019DE}.Release|Any CPU.ActiveCfg = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{2E78E0E5-DD54-457C-92F3-DB6D19EFA6DA} = {47FA0BFF-7C06-4F41-A91E-3119CE207B70}
{A8569975-D120-4FD5-841B-2B0568740500} = {6FE2E050-C1DF-4E58-8689-8D90C45931C3}
{13410153-13DB-41D4-A08A-CE507B963559} = {47FA0BFF-7C06-4F41-A91E-3119CE207B70}
{1C67535A-5DB2-4B2E-B1F6-56B864229760} = {47FA0BFF-7C06-4F41-A91E-3119CE207B70}
EndGlobalSection
Expand Down
13 changes: 4 additions & 9 deletions Src/Ingestion/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
FROM microsoft/dotnet:2.1-sdk AS build
COPY . ./build
WORKDIR /build/
RUN dotnet restore
RUN dotnet restore
RUN dotnet build --no-restore -c Release -o /app

FROM build AS publish
RUN dotnet publish --no-restore -c Release -o /app

FROM microsoft/dotnet:2.1-runtime-alpine

COPY --from=publish /app /var/app
COPY entrypoint.sh /var/app
COPY ./crontab /etc/crontabs
RUN chmod -R 644 /etc/crontabs
COPY wait-for-it.sh /var/app
RUN apk add bash
CMD ["bash", "/var/app/wait-for-it.sh", "rabbit.docker:5672", "--timeout=30","--", "/var/app/entrypoint.sh"]
#COPY wait-for-it.sh /var/app
#RUN apk add bash
#CMD ["bash","/var/app/wait-for-it.sh","localhost:5672","--timeout=30","--","/var/app/entrypoint.sh"]
7 changes: 1 addition & 6 deletions Src/Ingestion/Ingestion.csproj
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<None Remove="Dockerfile" />
</ItemGroup>

<ItemGroup>
<Content Include="Dockerfile" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="RabbitMQ.Client" Version="5.1.0" />
<PackageReference Include="Serilog" Version="2.7.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
</ItemGroup>

</Project>
</Project>
127 changes: 127 additions & 0 deletions Src/Ingestion/IngestionService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Http;
using System.Threading.Tasks;
using Newtonsoft.Json;
using RabbitMQ.Client;
using Serilog;
using Shared;

namespace Ingestion
{
public class IngestionService
{
private IDictionary<string,IngestionSource> _sourcesList;
private const string SourcesConfigRelativePath = "../../../../../config/IngestionSourcesTest.json";
private ILogger _logger;
private static IConnection _conn;
private static IModel _channel;

public IngestionService(IDictionary<string, IngestionSource> sources, ILogger logger)
{
_logger = logger;
IConnectionFactory factory = new ConnectionFactory() {HostName = "localhost", UserName = "guest", Password = "guest"};
_conn = factory.CreateConnection();
_channel = _conn.CreateModel();
_sourcesList = sources;
}

public IngestionService(ILogger logger)
{
_logger = logger;
_sourcesList = LoadSourcesFromJson(SourcesConfigRelativePath);
IConnectionFactory factory = new ConnectionFactory() {HostName = "localhost", UserName = "guest", Password = "guest"};
_conn = factory.CreateConnection();
_channel = _conn.CreateModel();
}

private async Task<string> GetData (string url)
{
var data = "";
using (var client = new HttpClient())
{
using (var res = await client.GetAsync(url))
{
using (var content = res.Content)
{
data = await content.ReadAsStringAsync();
}
}
}

return data;
}

public IDictionary<string, IngestionSource> LoadSourcesFromJson(string path)
{
IDictionary<string, IngestionSource> sources = new Dictionary<string, IngestionSource>();
// deserialize JSON directly from a file
if (File.Exists(path))
{
string JSONText = File.ReadAllText(path);
IList<IngestionSource> listsources = JsonConvert.DeserializeObject<IList<IngestionSource>>(JSONText);
foreach (IngestionSource s in listsources) {
sources.Add(s.Name, s);
}
}
else
{
throw new Exception("Cant find file");
}
return sources;
}

public void Ingest(string sourceId) {

if (!_sourcesList.ContainsKey(sourceId)) throw new Exception("Key was not found, source is not defined");

_logger.Information("Starting Ingestor");

var data = GetData(_sourcesList[sourceId].ApiUrl).Result;

ForwardMessageToRabbitMQ(data, _sourcesList[sourceId].ForwardMessageQueue);

_logger.Information("Stopping Ingestor");
}

public void ForwardMessageToRabbitMQ(string message, string queue)
{
using (_conn)
{
using (_channel)
{
var exchange = "mono.data.received";

_channel.QueueDeclare(queue, true, false, false, null);

_channel.ExchangeDeclare(exchange, "fanout");

_channel.QueueBind(queue, exchange, "");

var envelope = new Envelope<string>(Guid.NewGuid(), message);
var envelopedMessage = JsonConvert.SerializeObject(envelope);

byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(envelopedMessage);

_channel.BasicPublish(exchange, "", _channel.CreateBasicProperties(), messageBodyBytes);
Console.WriteLine(" [x] Sent '{0}':'{1}'", exchange, message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the _logger be used in this line?

}
}
}

public string SerializeObjects(object obj)
{
return JsonConvert.SerializeObject(obj);
}

public void StoreObject(string obj) {
try {
File.WriteAllText(SourcesConfigRelativePath, obj);
} catch(Exception exception)
{
Console.WriteLine("Couldnt write to file: " + exception.Message + "Message");
}
}
}
}
14 changes: 14 additions & 0 deletions Src/Ingestion/IngestionSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Ingestion
{
public class IngestionSource
{
public string Name { get; set; }
public string ApiUrl { get; set; }
public string Credential { get; set; }
public string ForwardMessageQueue { get; set; }
}
}
42 changes: 22 additions & 20 deletions Src/Ingestion/Program.cs
Original file line number Diff line number Diff line change
@@ -1,38 +1,40 @@
using Newtonsoft.Json;
using RabbitMQ.Client;
using Serilog;
using Shared;
using System;
using System.Threading;

namespace Ingestion
{
class Program
{
static void Main(string[] args)
{
//Read about Rabbitmq - https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html
// for more advanced Message Bus setup - http://masstransit-project.com/MassTransit/ which integrates with RabbitMQ as well
Console.WriteLine($"Starting Ingestor");
var factory = new ConnectionFactory() { HostName = "rabbit.docker" };
ILogger logger = new LoggerConfiguration().WriteTo.Console().CreateLogger();
string sourceID = "";
/*
try {
sourceID = args[0];
} catch (Exception e) {
logger.Error("Incorrect argument for ingestion dll crontab: " + e.Message);
}
*/
sourceID = "ResellerData";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be defined in line 15 as well

if (sourceID != ""){
IngestionService ingestService = new IngestionService(logger);

using (IConnection conn = factory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
try {
while (true) {
ingestService.Ingest(sourceID);
Thread.Sleep(3000);
}
} catch(Exception e)
{
var exchange = "grabid_exchange";
//This would properly be a http call to get new data from Mono
var message = "{'message':'Hello ','users':'Alexander, Bogdan, Elitsa, David'}";
var routingKey = "mono.data.received";

channel.ExchangeDeclare(exchange: exchange, type: "topic");
var envelope = new Envelope<string>(Guid.NewGuid(), message);
var envelopedMessage = JsonConvert.SerializeObject(envelope);
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(envelopedMessage);

channel.BasicPublish(exchange,routingKey , null, messageBodyBytes);
Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
logger.Error("Error: " + e.Message + "\n" + "Exception trace: " + e.StackTrace);
}
}
Console.WriteLine($"Stopping Ingestor");
}

}
}
3 changes: 1 addition & 2 deletions Src/Ingestion/crontab/root
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
*/5 * * * * /usr/bin/dotnet /var/app/Ingestion.dll >> /activity.log 2>&1

*/5 * * * * /usr/bin/dotnet /var/app/Ingestion.dll ResellerData >> /activity.log 2>&1
Loading