diff --git a/GraBID.sln b/GraBID.sln index f6bbddb..25401c7 100644 --- a/GraBID.sln +++ b/GraBID.sln @@ -5,8 +5,6 @@ VisualStudioVersion = 15.0.28306.52 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Src", "Src", "{47FA0BFF-7C06-4F41-A91E-3119CE207B70}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{6FE2E050-C1DF-4E58-8689-8D90C45931C3}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docker", "Docker", "{D695FB5C-0D3D-468A-95F0-43ADC918BE06}" ProjectSection(SolutionItems) = preProject docker-compose.yml = docker-compose.yml @@ -14,11 +12,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docker", "Docker", "{D695FB EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Ingestion", "Src\Ingestion\Ingestion.csproj", "{2E78E0E5-DD54-457C-92F3-DB6D19EFA6DA}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Ingestion.Tests", "Tests\Ingestion.Tests\Ingestion.Tests.csproj", "{A8569975-D120-4FD5-841B-2B0568740500}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Refiner", "src\Refiner\Refiner.csproj", "{13410153-13DB-41D4-A08A-CE507B963559}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Logger", "src\Logger\Logger.csproj", "{1C67535A-5DB2-4B2E-B1F6-56B864229760}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Logger", "src\Logger\Logger.csproj", "{1C67535A-5DB2-4B2E-B1F6-56B864229760}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tests", "Test\Tests.csproj", "{2CC3D7F9-F025-4F2B-A74A-B40CE45019DE}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -30,25 +28,21 @@ Global {2E78E0E5-DD54-457C-92F3-DB6D19EFA6DA}.Debug|Any CPU.Build.0 = Debug|Any CPU {2E78E0E5-DD54-457C-92F3-DB6D19EFA6DA}.Release|Any CPU.ActiveCfg = Release|Any CPU {2E78E0E5-DD54-457C-92F3-DB6D19EFA6DA}.Release|Any CPU.Build.0 = Release|Any CPU - {A8569975-D120-4FD5-841B-2B0568740500}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {A8569975-D120-4FD5-841B-2B0568740500}.Debug|Any CPU.Build.0 = Debug|Any CPU - {A8569975-D120-4FD5-841B-2B0568740500}.Release|Any CPU.ActiveCfg = Release|Any CPU - {A8569975-D120-4FD5-841B-2B0568740500}.Release|Any CPU.Build.0 = Release|Any CPU {13410153-13DB-41D4-A08A-CE507B963559}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {13410153-13DB-41D4-A08A-CE507B963559}.Debug|Any CPU.Build.0 = Debug|Any CPU {13410153-13DB-41D4-A08A-CE507B963559}.Release|Any CPU.ActiveCfg = Release|Any CPU - {13410153-13DB-41D4-A08A-CE507B963559}.Release|Any CPU.Build.0 = Release|Any CPU {1C67535A-5DB2-4B2E-B1F6-56B864229760}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {1C67535A-5DB2-4B2E-B1F6-56B864229760}.Debug|Any CPU.Build.0 = Debug|Any CPU {1C67535A-5DB2-4B2E-B1F6-56B864229760}.Release|Any CPU.ActiveCfg = Release|Any CPU - {1C67535A-5DB2-4B2E-B1F6-56B864229760}.Release|Any CPU.Build.0 = Release|Any CPU + {2CC3D7F9-F025-4F2B-A74A-B40CE45019DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2CC3D7F9-F025-4F2B-A74A-B40CE45019DE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2CC3D7F9-F025-4F2B-A74A-B40CE45019DE}.Release|Any CPU.ActiveCfg = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection GlobalSection(NestedProjects) = preSolution {2E78E0E5-DD54-457C-92F3-DB6D19EFA6DA} = {47FA0BFF-7C06-4F41-A91E-3119CE207B70} - {A8569975-D120-4FD5-841B-2B0568740500} = {6FE2E050-C1DF-4E58-8689-8D90C45931C3} {13410153-13DB-41D4-A08A-CE507B963559} = {47FA0BFF-7C06-4F41-A91E-3119CE207B70} {1C67535A-5DB2-4B2E-B1F6-56B864229760} = {47FA0BFF-7C06-4F41-A91E-3119CE207B70} EndGlobalSection diff --git a/Src/Ingestion/Dockerfile b/Src/Ingestion/Dockerfile index 134c13f..84c6eac 100644 --- a/Src/Ingestion/Dockerfile +++ b/Src/Ingestion/Dockerfile @@ -1,18 +1,13 @@ FROM microsoft/dotnet:2.1-sdk AS build COPY . ./build WORKDIR /build/ -RUN dotnet restore +RUN dotnet restore RUN dotnet build --no-restore -c Release -o /app - FROM build AS publish RUN dotnet publish --no-restore -c Release -o /app - FROM microsoft/dotnet:2.1-runtime-alpine - COPY --from=publish /app /var/app COPY entrypoint.sh /var/app -COPY ./crontab /etc/crontabs -RUN chmod -R 644 /etc/crontabs -COPY wait-for-it.sh /var/app -RUN apk add bash -CMD ["bash", "/var/app/wait-for-it.sh", "rabbit.docker:5672", "--timeout=30","--", "/var/app/entrypoint.sh"] +#COPY wait-for-it.sh /var/app +#RUN apk add bash +#CMD ["bash","/var/app/wait-for-it.sh","localhost:5672","--timeout=30","--","/var/app/entrypoint.sh"] \ No newline at end of file diff --git a/Src/Ingestion/Ingestion.csproj b/Src/Ingestion/Ingestion.csproj index ff423f2..9655860 100644 --- a/Src/Ingestion/Ingestion.csproj +++ b/Src/Ingestion/Ingestion.csproj @@ -1,23 +1,18 @@  - Exe netcoreapp2.1 - - - - - + \ No newline at end of file diff --git a/Src/Ingestion/IngestionService.cs b/Src/Ingestion/IngestionService.cs new file mode 100644 index 0000000..62943c8 --- /dev/null +++ b/Src/Ingestion/IngestionService.cs @@ -0,0 +1,127 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Net.Http; +using System.Threading.Tasks; +using Newtonsoft.Json; +using RabbitMQ.Client; +using Serilog; +using Shared; + +namespace Ingestion +{ + public class IngestionService + { + private IDictionary _sourcesList; + private const string SourcesConfigRelativePath = "../../../../../config/IngestionSourcesTest.json"; + private ILogger _logger; + private static IConnection _conn; + private static IModel _channel; + + public IngestionService(IDictionary sources, ILogger logger) + { + _logger = logger; + IConnectionFactory factory = new ConnectionFactory() {HostName = "localhost", UserName = "guest", Password = "guest"}; + _conn = factory.CreateConnection(); + _channel = _conn.CreateModel(); + _sourcesList = sources; + } + + public IngestionService(ILogger logger) + { + _logger = logger; + _sourcesList = LoadSourcesFromJson(SourcesConfigRelativePath); + IConnectionFactory factory = new ConnectionFactory() {HostName = "localhost", UserName = "guest", Password = "guest"}; + _conn = factory.CreateConnection(); + _channel = _conn.CreateModel(); + } + + private async Task GetData (string url) + { + var data = ""; + using (var client = new HttpClient()) + { + using (var res = await client.GetAsync(url)) + { + using (var content = res.Content) + { + data = await content.ReadAsStringAsync(); + } + } + } + + return data; + } + + public IDictionary LoadSourcesFromJson(string path) + { + IDictionary sources = new Dictionary(); + // deserialize JSON directly from a file + if (File.Exists(path)) + { + string JSONText = File.ReadAllText(path); + IList listsources = JsonConvert.DeserializeObject>(JSONText); + foreach (IngestionSource s in listsources) { + sources.Add(s.Name, s); + } + } + else + { + throw new Exception("Cant find file"); + } + return sources; + } + + public void Ingest(string sourceId) { + + if (!_sourcesList.ContainsKey(sourceId)) throw new Exception("Key was not found, source is not defined"); + + _logger.Information("Starting Ingestor"); + + var data = GetData(_sourcesList[sourceId].ApiUrl).Result; + + ForwardMessageToRabbitMQ(data, _sourcesList[sourceId].ForwardMessageQueue); + + _logger.Information("Stopping Ingestor"); + } + + public void ForwardMessageToRabbitMQ(string message, string queue) + { + using (_conn) + { + using (_channel) + { + var exchange = "mono.data.received"; + + _channel.QueueDeclare(queue, true, false, false, null); + + _channel.ExchangeDeclare(exchange, "fanout"); + + _channel.QueueBind(queue, exchange, ""); + + var envelope = new Envelope(Guid.NewGuid(), message); + var envelopedMessage = JsonConvert.SerializeObject(envelope); + + byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(envelopedMessage); + + _channel.BasicPublish(exchange, "", _channel.CreateBasicProperties(), messageBodyBytes); + Console.WriteLine(" [x] Sent '{0}':'{1}'", exchange, message); + } + } + } + + public string SerializeObjects(object obj) + { + return JsonConvert.SerializeObject(obj); + } + + public void StoreObject(string obj) { + try { + File.WriteAllText(SourcesConfigRelativePath, obj); + } catch(Exception exception) + { + Console.WriteLine("Couldnt write to file: " + exception.Message + "Message"); + } + } + } +} diff --git a/Src/Ingestion/IngestionSource.cs b/Src/Ingestion/IngestionSource.cs new file mode 100644 index 0000000..247c9c9 --- /dev/null +++ b/Src/Ingestion/IngestionSource.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Ingestion +{ + public class IngestionSource + { + public string Name { get; set; } + public string ApiUrl { get; set; } + public string Credential { get; set; } + public string ForwardMessageQueue { get; set; } + } +} diff --git a/Src/Ingestion/Program.cs b/Src/Ingestion/Program.cs index 8374801..512d1ee 100644 --- a/Src/Ingestion/Program.cs +++ b/Src/Ingestion/Program.cs @@ -1,7 +1,9 @@ using Newtonsoft.Json; using RabbitMQ.Client; +using Serilog; using Shared; using System; +using System.Threading; namespace Ingestion { @@ -9,30 +11,30 @@ class Program { static void Main(string[] args) { - //Read about Rabbitmq - https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html - // for more advanced Message Bus setup - http://masstransit-project.com/MassTransit/ which integrates with RabbitMQ as well - Console.WriteLine($"Starting Ingestor"); - var factory = new ConnectionFactory() { HostName = "rabbit.docker" }; + ILogger logger = new LoggerConfiguration().WriteTo.Console().CreateLogger(); + string sourceID = ""; + /* + try { + sourceID = args[0]; + } catch (Exception e) { + logger.Error("Incorrect argument for ingestion dll crontab: " + e.Message); + } + */ + sourceID = "ResellerData"; + if (sourceID != ""){ + IngestionService ingestService = new IngestionService(logger); - using (IConnection conn = factory.CreateConnection()) - { - using (IModel channel = conn.CreateModel()) + try { + while (true) { + ingestService.Ingest(sourceID); + Thread.Sleep(3000); + } + } catch(Exception e) { - var exchange = "grabid_exchange"; - //This would properly be a http call to get new data from Mono - var message = "{'message':'Hello ','users':'Alexander, Bogdan, Elitsa, David'}"; - var routingKey = "mono.data.received"; - - channel.ExchangeDeclare(exchange: exchange, type: "topic"); - var envelope = new Envelope(Guid.NewGuid(), message); - var envelopedMessage = JsonConvert.SerializeObject(envelope); - byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(envelopedMessage); - - channel.BasicPublish(exchange,routingKey , null, messageBodyBytes); - Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message); + logger.Error("Error: " + e.Message + "\n" + "Exception trace: " + e.StackTrace); } } - Console.WriteLine($"Stopping Ingestor"); } + } } diff --git a/Src/Ingestion/crontab/root b/Src/Ingestion/crontab/root index f3cc3b4..7066796 100644 --- a/Src/Ingestion/crontab/root +++ b/Src/Ingestion/crontab/root @@ -1,2 +1 @@ -*/5 * * * * /usr/bin/dotnet /var/app/Ingestion.dll >> /activity.log 2>&1 - +*/5 * * * * /usr/bin/dotnet /var/app/Ingestion.dll ResellerData >> /activity.log 2>&1 \ No newline at end of file diff --git a/Src/Ingestion/wait-for-it.sh b/Src/Ingestion/wait-for-it.sh index bbe4043..b64739f 100644 --- a/Src/Ingestion/wait-for-it.sh +++ b/Src/Ingestion/wait-for-it.sh @@ -1,15 +1,190 @@ #!/usr/bin/env bash -# Use this script to test if a given TCP host/port are available +##!/usr/bin/env bash +## Use this script to test if a given TCP host/port are available +# +#cmdname=$(basename $0) +# +#echoerr() { if [[ $QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } +# +##usage() +##{ +## cat << USAGE >&2 +##Usage: +## $cmdname host:port [-s] [-t timeout] [-- command args] +## -h HOST | --host=HOST Host or IP under test +## -p PORT | --port=PORT TCP port under test +## Alternatively, you specify the host and port as host:port +## -s | --strict Only execute subcommand if the test succeeds +## -q | --quiet Don't output any status messages +## -t TIMEOUT | --timeout=TIMEOUT +## Timeout in seconds, zero for no timeout +## -- COMMAND ARGS Execute command with args after the test finishes +##USAGE +## exit 1 +##} +# +#wait_for() +#{ +# if [[ $TIMEOUT -gt 0 ]]; then +# echoerr "$cmdname: waiting $TIMEOUT seconds for $HOST:$PORT" +# else +# echoerr "$cmdname: waiting for $HOST:$PORT without a timeout" +# fi +# start_ts=$(date +%s) +# while : +# do +# if [[ $ISBUSY -eq 1 ]]; then +# nc -z $HOST $PORT +# result=$? +# else +# (echo > /dev/tcp/$HOST/$PORT) >/dev/null 2>&1 +# result=$? +# fi +# if [[ $result -eq 0 ]]; then +# end_ts=$(date +%s) +# echoerr "$cmdname: $HOST:$PORT is available after $((end_ts - start_ts)) seconds" +# break +# fi +# sleep 1 +# done +# return $result +#} +# +#wait_for_wrapper() +#{ +# # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 +# if [[ $QUIET -eq 1 ]]; then +# timeout $BUSYTIMEFLAG $TIMEOUT $0 --quiet --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & +# else +# timeout $BUSYTIMEFLAG $TIMEOUT $0 --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & +# fi +# PID=$! +# trap "kill -INT -$PID" INT +# wait $PID +# RESULT=$? +# if [[ $RESULT -ne 0 ]]; then +# echoerr "$cmdname: timeout occurred after waiting $TIMEOUT seconds for $HOST:$PORT" +# fi +# return $RESULT +#} +# +## process arguments +#while [[ $# -gt 0 ]] +#do +# case "$1" in +# *:* ) +# hostport=(${1//:/ }) +# HOST=${hostport[0]} +# PORT=${hostport[1]} +# shift 1 +# ;; +# --child) +# CHILD=1 +# shift 1 +# ;; +# -q | --quiet) +# QUIET=1 +# shift 1 +# ;; +# -s | --strict) +# STRICT=1 +# shift 1 +# ;; +# -h) +# HOST="$2" +# if [[ $HOST == "" ]]; then break; fi +# shift 2 +# ;; +# --host=*) +# HOST="${1#*=}" +# shift 1 +# ;; +# -p) +# PORT="$2" +# if [[ $PORT == "" ]]; then break; fi +# shift 2 +# ;; +# --port=*) +# PORT="${1#*=}" +# shift 1 +# ;; +# -t) +# TIMEOUT="$2" +# if [[ $TIMEOUT == "" ]]; then break; fi +# shift 2 +# ;; +# --timeout=*) +# TIMEOUT="${1#*=}" +# shift 1 +# ;; +# --) +# shift +# CLI=("$@") +# break +# ;; +# --help) +# usage +# ;; +# *) +# echoerr "Unknown argument: $1" +# usage +# ;; +# esac +#done +# +#if [[ "$HOST" == "" || "$PORT" == "" ]]; then +# echoerr "Error: you need to provide a host and port to test." +# usage +#fi +# +#TIMEOUT=${TIMEOUT:-15} +#STRICT=${STRICT:-0} +#CHILD=${CHILD:-0} +#QUIET=${QUIET:-0} +# +## check to see if timeout is from busybox? +## check to see if timeout is from busybox? +#TIMEOUT_PATH=$(realpath $(which timeout)) +#if [[ $TIMEOUT_PATH =~ "busybox" ]]; then +# ISBUSY=1 +# BUSYTIMEFLAG="-t" +#else +# ISBUSY=0 +# BUSYTIMEFLAG="" +#fi +# +#if [[ $CHILD -gt 0 ]]; then +# wait_for +# RESULT=$? +# exit $RESULT +#else +# if [[ $TIMEOUT -gt 0 ]]; then +# wait_for_wrapper +# RESULT=$? +# else +# wait_for +# RESULT=$? +# fi +#fi +# +#if [[ $CLI != "" ]]; then +# if [[ $RESULT -ne 0 && $STRICT -eq 1 ]]; then +# echoerr "$cmdname: strict mode, refusing to execute subprocess" +# exit $RESULT +# fi +# exec "${CLI[@]}" +#else +# exit $RESULT +#fi +WAITFORIT_cmdname=${0##*/} -cmdname=$(basename $0) - -echoerr() { if [[ $QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } +echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } usage() { cat << USAGE >&2 Usage: - $cmdname host:port [-s] [-t timeout] [-- command args] + $WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args] -h HOST | --host=HOST Host or IP under test -p PORT | --port=PORT TCP port under test Alternatively, you specify the host and port as host:port @@ -24,47 +199,47 @@ USAGE wait_for() { - if [[ $TIMEOUT -gt 0 ]]; then - echoerr "$cmdname: waiting $TIMEOUT seconds for $HOST:$PORT" + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then + echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" else - echoerr "$cmdname: waiting for $HOST:$PORT without a timeout" + echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout" fi - start_ts=$(date +%s) + WAITFORIT_start_ts=$(date +%s) while : do - if [[ $ISBUSY -eq 1 ]]; then - nc -z $HOST $PORT - result=$? + if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then + nc -z $WAITFORIT_HOST $WAITFORIT_PORT + WAITFORIT_result=$? else - (echo > /dev/tcp/$HOST/$PORT) >/dev/null 2>&1 - result=$? + (echo > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1 + WAITFORIT_result=$? fi - if [[ $result -eq 0 ]]; then - end_ts=$(date +%s) - echoerr "$cmdname: $HOST:$PORT is available after $((end_ts - start_ts)) seconds" + if [[ $WAITFORIT_result -eq 0 ]]; then + WAITFORIT_end_ts=$(date +%s) + echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds" break fi sleep 1 done - return $result + return $WAITFORIT_result } wait_for_wrapper() { # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 - if [[ $QUIET -eq 1 ]]; then - timeout $BUSYTIMEFLAG $TIMEOUT $0 --quiet --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & + if [[ $WAITFORIT_QUIET -eq 1 ]]; then + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & else - timeout $BUSYTIMEFLAG $TIMEOUT $0 --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & fi - PID=$! - trap "kill -INT -$PID" INT - wait $PID - RESULT=$? - if [[ $RESULT -ne 0 ]]; then - echoerr "$cmdname: timeout occurred after waiting $TIMEOUT seconds for $HOST:$PORT" + WAITFORIT_PID=$! + trap "kill -INT -$WAITFORIT_PID" INT + wait $WAITFORIT_PID + WAITFORIT_RESULT=$? + if [[ $WAITFORIT_RESULT -ne 0 ]]; then + echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" fi - return $RESULT + return $WAITFORIT_RESULT } # process arguments @@ -72,53 +247,53 @@ while [[ $# -gt 0 ]] do case "$1" in *:* ) - hostport=(${1//:/ }) - HOST=${hostport[0]} - PORT=${hostport[1]} + WAITFORIT_hostport=(${1//:/ }) + WAITFORIT_HOST=${WAITFORIT_hostport[0]} + WAITFORIT_PORT=${WAITFORIT_hostport[1]} shift 1 ;; --child) - CHILD=1 + WAITFORIT_CHILD=1 shift 1 ;; -q | --quiet) - QUIET=1 + WAITFORIT_QUIET=1 shift 1 ;; -s | --strict) - STRICT=1 + WAITFORIT_STRICT=1 shift 1 ;; -h) - HOST="$2" - if [[ $HOST == "" ]]; then break; fi + WAITFORIT_HOST="$2" + if [[ $WAITFORIT_HOST == "" ]]; then break; fi shift 2 ;; --host=*) - HOST="${1#*=}" + WAITFORIT_HOST="${1#*=}" shift 1 ;; -p) - PORT="$2" - if [[ $PORT == "" ]]; then break; fi + WAITFORIT_PORT="$2" + if [[ $WAITFORIT_PORT == "" ]]; then break; fi shift 2 ;; --port=*) - PORT="${1#*=}" + WAITFORIT_PORT="${1#*=}" shift 1 ;; -t) - TIMEOUT="$2" - if [[ $TIMEOUT == "" ]]; then break; fi + WAITFORIT_TIMEOUT="$2" + if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi shift 2 ;; --timeout=*) - TIMEOUT="${1#*=}" + WAITFORIT_TIMEOUT="${1#*=}" shift 1 ;; --) shift - CLI=("$@") + WAITFORIT_CLI=("$@") break ;; --help) @@ -131,47 +306,48 @@ do esac done -if [[ "$HOST" == "" || "$PORT" == "" ]]; then +if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then echoerr "Error: you need to provide a host and port to test." usage fi -TIMEOUT=${TIMEOUT:-15} -STRICT=${STRICT:-0} -CHILD=${CHILD:-0} -QUIET=${QUIET:-0} +WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15} +WAITFORIT_STRICT=${WAITFORIT_STRICT:-0} +WAITFORIT_CHILD=${WAITFORIT_CHILD:-0} +WAITFORIT_QUIET=${WAITFORIT_QUIET:-0} # check to see if timeout is from busybox? -# check to see if timeout is from busybox? -TIMEOUT_PATH=$(realpath $(which timeout)) -if [[ $TIMEOUT_PATH =~ "busybox" ]]; then - ISBUSY=1 - BUSYTIMEFLAG="-t" +WAITFORIT_TIMEOUT_PATH=$(type -p timeout) +WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH) +if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then + WAITFORIT_ISBUSY=1 + WAITFORIT_BUSYTIMEFLAG="-t" + else - ISBUSY=0 - BUSYTIMEFLAG="" + WAITFORIT_ISBUSY=0 + WAITFORIT_BUSYTIMEFLAG="" fi -if [[ $CHILD -gt 0 ]]; then +if [[ $WAITFORIT_CHILD -gt 0 ]]; then wait_for - RESULT=$? - exit $RESULT + WAITFORIT_RESULT=$? + exit $WAITFORIT_RESULT else - if [[ $TIMEOUT -gt 0 ]]; then + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then wait_for_wrapper - RESULT=$? + WAITFORIT_RESULT=$? else wait_for - RESULT=$? + WAITFORIT_RESULT=$? fi fi -if [[ $CLI != "" ]]; then - if [[ $RESULT -ne 0 && $STRICT -eq 1 ]]; then - echoerr "$cmdname: strict mode, refusing to execute subprocess" - exit $RESULT +if [[ $WAITFORIT_CLI != "" ]]; then + if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then + echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess" + exit $WAITFORIT_RESULT fi - exec "${CLI[@]}" + exec "${WAITFORIT_CLI[@]}" else - exit $RESULT -fi + exit $WAITFORIT_RESULT +fi \ No newline at end of file diff --git a/Src/Logger/Dockerfile b/Src/Logger/Dockerfile index 8b2bcff..84c6eac 100644 --- a/Src/Logger/Dockerfile +++ b/Src/Logger/Dockerfile @@ -3,13 +3,11 @@ COPY . ./build WORKDIR /build/ RUN dotnet restore RUN dotnet build --no-restore -c Release -o /app - FROM build AS publish RUN dotnet publish --no-restore -c Release -o /app - FROM microsoft/dotnet:2.1-runtime-alpine COPY --from=publish /app /var/app COPY entrypoint.sh /var/app -COPY wait-for-it.sh /var/app -RUN apk add bash -CMD ["bash", "/var/app/wait-for-it.sh", "rabbit.docker:5672", "--timeout=30","--", "/var/app/entrypoint.sh"] +#COPY wait-for-it.sh /var/app +#RUN apk add bash +#CMD ["bash","/var/app/wait-for-it.sh","localhost:5672","--timeout=30","--","/var/app/entrypoint.sh"] \ No newline at end of file diff --git a/Src/Logger/Program.cs b/Src/Logger/Program.cs index bf6807e..26a89ab 100644 --- a/Src/Logger/Program.cs +++ b/Src/Logger/Program.cs @@ -75,8 +75,6 @@ private static void Exit() Log.Information("Exiting..."); channel.Close(); conn.Close(); - - } } diff --git a/Src/Logger/wait-for-it.sh b/Src/Logger/wait-for-it.sh index bbe4043..b64739f 100644 --- a/Src/Logger/wait-for-it.sh +++ b/Src/Logger/wait-for-it.sh @@ -1,15 +1,190 @@ #!/usr/bin/env bash -# Use this script to test if a given TCP host/port are available +##!/usr/bin/env bash +## Use this script to test if a given TCP host/port are available +# +#cmdname=$(basename $0) +# +#echoerr() { if [[ $QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } +# +##usage() +##{ +## cat << USAGE >&2 +##Usage: +## $cmdname host:port [-s] [-t timeout] [-- command args] +## -h HOST | --host=HOST Host or IP under test +## -p PORT | --port=PORT TCP port under test +## Alternatively, you specify the host and port as host:port +## -s | --strict Only execute subcommand if the test succeeds +## -q | --quiet Don't output any status messages +## -t TIMEOUT | --timeout=TIMEOUT +## Timeout in seconds, zero for no timeout +## -- COMMAND ARGS Execute command with args after the test finishes +##USAGE +## exit 1 +##} +# +#wait_for() +#{ +# if [[ $TIMEOUT -gt 0 ]]; then +# echoerr "$cmdname: waiting $TIMEOUT seconds for $HOST:$PORT" +# else +# echoerr "$cmdname: waiting for $HOST:$PORT without a timeout" +# fi +# start_ts=$(date +%s) +# while : +# do +# if [[ $ISBUSY -eq 1 ]]; then +# nc -z $HOST $PORT +# result=$? +# else +# (echo > /dev/tcp/$HOST/$PORT) >/dev/null 2>&1 +# result=$? +# fi +# if [[ $result -eq 0 ]]; then +# end_ts=$(date +%s) +# echoerr "$cmdname: $HOST:$PORT is available after $((end_ts - start_ts)) seconds" +# break +# fi +# sleep 1 +# done +# return $result +#} +# +#wait_for_wrapper() +#{ +# # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 +# if [[ $QUIET -eq 1 ]]; then +# timeout $BUSYTIMEFLAG $TIMEOUT $0 --quiet --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & +# else +# timeout $BUSYTIMEFLAG $TIMEOUT $0 --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & +# fi +# PID=$! +# trap "kill -INT -$PID" INT +# wait $PID +# RESULT=$? +# if [[ $RESULT -ne 0 ]]; then +# echoerr "$cmdname: timeout occurred after waiting $TIMEOUT seconds for $HOST:$PORT" +# fi +# return $RESULT +#} +# +## process arguments +#while [[ $# -gt 0 ]] +#do +# case "$1" in +# *:* ) +# hostport=(${1//:/ }) +# HOST=${hostport[0]} +# PORT=${hostport[1]} +# shift 1 +# ;; +# --child) +# CHILD=1 +# shift 1 +# ;; +# -q | --quiet) +# QUIET=1 +# shift 1 +# ;; +# -s | --strict) +# STRICT=1 +# shift 1 +# ;; +# -h) +# HOST="$2" +# if [[ $HOST == "" ]]; then break; fi +# shift 2 +# ;; +# --host=*) +# HOST="${1#*=}" +# shift 1 +# ;; +# -p) +# PORT="$2" +# if [[ $PORT == "" ]]; then break; fi +# shift 2 +# ;; +# --port=*) +# PORT="${1#*=}" +# shift 1 +# ;; +# -t) +# TIMEOUT="$2" +# if [[ $TIMEOUT == "" ]]; then break; fi +# shift 2 +# ;; +# --timeout=*) +# TIMEOUT="${1#*=}" +# shift 1 +# ;; +# --) +# shift +# CLI=("$@") +# break +# ;; +# --help) +# usage +# ;; +# *) +# echoerr "Unknown argument: $1" +# usage +# ;; +# esac +#done +# +#if [[ "$HOST" == "" || "$PORT" == "" ]]; then +# echoerr "Error: you need to provide a host and port to test." +# usage +#fi +# +#TIMEOUT=${TIMEOUT:-15} +#STRICT=${STRICT:-0} +#CHILD=${CHILD:-0} +#QUIET=${QUIET:-0} +# +## check to see if timeout is from busybox? +## check to see if timeout is from busybox? +#TIMEOUT_PATH=$(realpath $(which timeout)) +#if [[ $TIMEOUT_PATH =~ "busybox" ]]; then +# ISBUSY=1 +# BUSYTIMEFLAG="-t" +#else +# ISBUSY=0 +# BUSYTIMEFLAG="" +#fi +# +#if [[ $CHILD -gt 0 ]]; then +# wait_for +# RESULT=$? +# exit $RESULT +#else +# if [[ $TIMEOUT -gt 0 ]]; then +# wait_for_wrapper +# RESULT=$? +# else +# wait_for +# RESULT=$? +# fi +#fi +# +#if [[ $CLI != "" ]]; then +# if [[ $RESULT -ne 0 && $STRICT -eq 1 ]]; then +# echoerr "$cmdname: strict mode, refusing to execute subprocess" +# exit $RESULT +# fi +# exec "${CLI[@]}" +#else +# exit $RESULT +#fi +WAITFORIT_cmdname=${0##*/} -cmdname=$(basename $0) - -echoerr() { if [[ $QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } +echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } usage() { cat << USAGE >&2 Usage: - $cmdname host:port [-s] [-t timeout] [-- command args] + $WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args] -h HOST | --host=HOST Host or IP under test -p PORT | --port=PORT TCP port under test Alternatively, you specify the host and port as host:port @@ -24,47 +199,47 @@ USAGE wait_for() { - if [[ $TIMEOUT -gt 0 ]]; then - echoerr "$cmdname: waiting $TIMEOUT seconds for $HOST:$PORT" + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then + echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" else - echoerr "$cmdname: waiting for $HOST:$PORT without a timeout" + echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout" fi - start_ts=$(date +%s) + WAITFORIT_start_ts=$(date +%s) while : do - if [[ $ISBUSY -eq 1 ]]; then - nc -z $HOST $PORT - result=$? + if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then + nc -z $WAITFORIT_HOST $WAITFORIT_PORT + WAITFORIT_result=$? else - (echo > /dev/tcp/$HOST/$PORT) >/dev/null 2>&1 - result=$? + (echo > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1 + WAITFORIT_result=$? fi - if [[ $result -eq 0 ]]; then - end_ts=$(date +%s) - echoerr "$cmdname: $HOST:$PORT is available after $((end_ts - start_ts)) seconds" + if [[ $WAITFORIT_result -eq 0 ]]; then + WAITFORIT_end_ts=$(date +%s) + echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds" break fi sleep 1 done - return $result + return $WAITFORIT_result } wait_for_wrapper() { # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 - if [[ $QUIET -eq 1 ]]; then - timeout $BUSYTIMEFLAG $TIMEOUT $0 --quiet --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & + if [[ $WAITFORIT_QUIET -eq 1 ]]; then + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & else - timeout $BUSYTIMEFLAG $TIMEOUT $0 --child --host=$HOST --port=$PORT --timeout=$TIMEOUT & + timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & fi - PID=$! - trap "kill -INT -$PID" INT - wait $PID - RESULT=$? - if [[ $RESULT -ne 0 ]]; then - echoerr "$cmdname: timeout occurred after waiting $TIMEOUT seconds for $HOST:$PORT" + WAITFORIT_PID=$! + trap "kill -INT -$WAITFORIT_PID" INT + wait $WAITFORIT_PID + WAITFORIT_RESULT=$? + if [[ $WAITFORIT_RESULT -ne 0 ]]; then + echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" fi - return $RESULT + return $WAITFORIT_RESULT } # process arguments @@ -72,53 +247,53 @@ while [[ $# -gt 0 ]] do case "$1" in *:* ) - hostport=(${1//:/ }) - HOST=${hostport[0]} - PORT=${hostport[1]} + WAITFORIT_hostport=(${1//:/ }) + WAITFORIT_HOST=${WAITFORIT_hostport[0]} + WAITFORIT_PORT=${WAITFORIT_hostport[1]} shift 1 ;; --child) - CHILD=1 + WAITFORIT_CHILD=1 shift 1 ;; -q | --quiet) - QUIET=1 + WAITFORIT_QUIET=1 shift 1 ;; -s | --strict) - STRICT=1 + WAITFORIT_STRICT=1 shift 1 ;; -h) - HOST="$2" - if [[ $HOST == "" ]]; then break; fi + WAITFORIT_HOST="$2" + if [[ $WAITFORIT_HOST == "" ]]; then break; fi shift 2 ;; --host=*) - HOST="${1#*=}" + WAITFORIT_HOST="${1#*=}" shift 1 ;; -p) - PORT="$2" - if [[ $PORT == "" ]]; then break; fi + WAITFORIT_PORT="$2" + if [[ $WAITFORIT_PORT == "" ]]; then break; fi shift 2 ;; --port=*) - PORT="${1#*=}" + WAITFORIT_PORT="${1#*=}" shift 1 ;; -t) - TIMEOUT="$2" - if [[ $TIMEOUT == "" ]]; then break; fi + WAITFORIT_TIMEOUT="$2" + if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi shift 2 ;; --timeout=*) - TIMEOUT="${1#*=}" + WAITFORIT_TIMEOUT="${1#*=}" shift 1 ;; --) shift - CLI=("$@") + WAITFORIT_CLI=("$@") break ;; --help) @@ -131,47 +306,48 @@ do esac done -if [[ "$HOST" == "" || "$PORT" == "" ]]; then +if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then echoerr "Error: you need to provide a host and port to test." usage fi -TIMEOUT=${TIMEOUT:-15} -STRICT=${STRICT:-0} -CHILD=${CHILD:-0} -QUIET=${QUIET:-0} +WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15} +WAITFORIT_STRICT=${WAITFORIT_STRICT:-0} +WAITFORIT_CHILD=${WAITFORIT_CHILD:-0} +WAITFORIT_QUIET=${WAITFORIT_QUIET:-0} # check to see if timeout is from busybox? -# check to see if timeout is from busybox? -TIMEOUT_PATH=$(realpath $(which timeout)) -if [[ $TIMEOUT_PATH =~ "busybox" ]]; then - ISBUSY=1 - BUSYTIMEFLAG="-t" +WAITFORIT_TIMEOUT_PATH=$(type -p timeout) +WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH) +if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then + WAITFORIT_ISBUSY=1 + WAITFORIT_BUSYTIMEFLAG="-t" + else - ISBUSY=0 - BUSYTIMEFLAG="" + WAITFORIT_ISBUSY=0 + WAITFORIT_BUSYTIMEFLAG="" fi -if [[ $CHILD -gt 0 ]]; then +if [[ $WAITFORIT_CHILD -gt 0 ]]; then wait_for - RESULT=$? - exit $RESULT + WAITFORIT_RESULT=$? + exit $WAITFORIT_RESULT else - if [[ $TIMEOUT -gt 0 ]]; then + if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then wait_for_wrapper - RESULT=$? + WAITFORIT_RESULT=$? else wait_for - RESULT=$? + WAITFORIT_RESULT=$? fi fi -if [[ $CLI != "" ]]; then - if [[ $RESULT -ne 0 && $STRICT -eq 1 ]]; then - echoerr "$cmdname: strict mode, refusing to execute subprocess" - exit $RESULT +if [[ $WAITFORIT_CLI != "" ]]; then + if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then + echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess" + exit $WAITFORIT_RESULT fi - exec "${CLI[@]}" + exec "${WAITFORIT_CLI[@]}" else - exit $RESULT -fi + exit $WAITFORIT_RESULT +fi \ No newline at end of file diff --git a/Src/Refiner/Program.cs b/Src/Refiner/Program.cs index dd35d4f..0547580 100644 --- a/Src/Refiner/Program.cs +++ b/Src/Refiner/Program.cs @@ -6,7 +6,6 @@ using Serilog; using Shared; using System; -using System.Collections.Generic; using System.Linq; using System.Runtime.Loader; using System.Text; diff --git a/Test/IngestionServiceTest.cs b/Test/IngestionServiceTest.cs new file mode 100644 index 0000000..fba3cf0 --- /dev/null +++ b/Test/IngestionServiceTest.cs @@ -0,0 +1,54 @@ +using NUnit.Framework; +using Ingestion; +using System.Collections.Generic; +using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; +using Serilog; + +namespace Tests +{ + public class IngestionServiceTest + { + private IngestionService service; + + [SetUp] + public void Setup() + { + Dictionary sources = new Dictionary(); + sources.Add( + "Test",new IngestionSource() { ApiUrl = "Example", Credential = "blabla", ForwardMessageQueue = "monoqueue", Name = "Test"} + ); + sources.Add( + "Test2", new IngestionSource() { ApiUrl = "Example2", Credential = "blabla2", ForwardMessageQueue = "monoqueue2", Name = "Test2" } + ); + sources.Add( + "FakeData", new IngestionSource() { ApiUrl = "https://jsonplaceholder.typicode.com/posts", Credential = "", ForwardMessageQueue = "mono.data.received", Name = "FakeData" } + ); + + service = new IngestionService(sources, new LoggerConfiguration().WriteTo.Console().CreateLogger()); + } + + [Test] + public void CanIngestFromEndpoint() + { + service.Ingest("FakeData"); + using (IConnection conn = new ConnectionFactory() {HostName = "localhost", UserName = "guest", Password = "guest"}.CreateConnection()) + { + using (IModel channel = conn.CreateModel()) + { + BasicGetResult result = channel.BasicGet("mono.data.received", false); + Assert.IsNotNull(result.Body); + } + } + } + + [Test] + public void TestCanLoadSourcesFromJSONFile() + { + IDictionary sources = service.LoadSourcesFromJson("../../../../config/IngestionSourcesTest.json"); + Assert.AreEqual(sources["ResellerData"].Name, "ResellerData"); + Assert.AreEqual(sources["ResellerData2"].Name, "ResellerData2"); + Assert.AreEqual(sources["FakeData"].Name, "FakeData"); + } + } +} diff --git a/Test/Tests.csproj b/Test/Tests.csproj new file mode 100644 index 0000000..d9a666e --- /dev/null +++ b/Test/Tests.csproj @@ -0,0 +1,15 @@ + + + netcoreapp2.1 + false + + + + + + + + + + + \ No newline at end of file diff --git a/Tests/Ingestion.Tests/Class1.cs b/Tests/Ingestion.Tests/Class1.cs deleted file mode 100644 index ad7f63c..0000000 --- a/Tests/Ingestion.Tests/Class1.cs +++ /dev/null @@ -1,8 +0,0 @@ -using System; - -namespace Ingestion.Tests -{ - public class Class1 - { - } -} diff --git a/Tests/Ingestion.Tests/Ingestion.Tests.csproj b/Tests/Ingestion.Tests/Ingestion.Tests.csproj deleted file mode 100644 index 9f5c4f4..0000000 --- a/Tests/Ingestion.Tests/Ingestion.Tests.csproj +++ /dev/null @@ -1,7 +0,0 @@ - - - - netstandard2.0 - - - diff --git a/config/IngestionSources.json b/config/IngestionSources.json new file mode 100644 index 0000000..7cc218c --- /dev/null +++ b/config/IngestionSources.json @@ -0,0 +1,8 @@ +[ + { + "name": "ResellerData", + "ApiUrl": "https://hal.mono.net/api/v1/admin", + "Credential": "b68064b8cf25842a1ea86cd61a7ee5fa", + "ForwardMessageQueue": "mono.queue.resellers" + } +] diff --git a/config/IngestionSourcesTest.json b/config/IngestionSourcesTest.json new file mode 100644 index 0000000..dc44bc8 --- /dev/null +++ b/config/IngestionSourcesTest.json @@ -0,0 +1,20 @@ +[ + { + "name": "ResellerData", + "ApiUrl": "https://hal.mono.net/api/v1/admin", + "Credential": "b68064b8cf25842a1ea86cd61a7ee5fa", + "ForwardMessageQueue": "mono.queue.resellers" + }, + { + "name": "ResellerData2", + "ApiUrl": "https://hal.mono.net/api/v1/admin", + "Credential": "b68064b8cf25842a1ea86cd61a7ee5fa", + "ForwardMessageQueue": "mono.queue.resellers" + }, + { + "name": "FakeData", + "ApiUrl": "https://jsonplaceholder.typicode.com/posts", + "Credential": "", + "ForwardMessageQueue": "mono.queue.test" + } +] diff --git a/docker-compose.yml b/docker-compose.yml index 8bd38a5..2383c50 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,9 @@ version: "3" + +networks: + mongodb_application-network-db: + external: true + services: rabbit.docker: @@ -8,6 +13,7 @@ services: ports: - "15672:15672" - "5672:5672" + refiner: image: grabid/refiner:local build: @@ -15,6 +21,9 @@ services: depends_on: - rabbit.docker restart: always + networks: + - mongodb_application-network-db + logger: image: grabid/logger:local build: @@ -22,6 +31,7 @@ services: depends_on: - rabbit.docker restart: always + ingestion: image: grabid/ingestion:local build: @@ -29,14 +39,3 @@ services: depends_on: - rabbit.docker restart: always - - - - - - - - - - -