diff --git a/pom.xml b/pom.xml index 6d25477..70c3ca1 100644 --- a/pom.xml +++ b/pom.xml @@ -20,6 +20,21 @@ 1.18.16 provided + + org.apache.logging.log4j + log4j-api + 2.7 + + + org.apache.logging.log4j + log4j-core + 2.7 + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.7 + diff --git a/problem-statement.md b/problem-statement.md index ef0e9ce..1d48670 100644 --- a/problem-statement.md +++ b/problem-statement.md @@ -3,7 +3,7 @@ We have to design a message queue supporting publisher-subscriber model. It shou * It should support multiple topics where messages can be published. * Publisher should be able to publish a message to a particular topic. -* Subscribers should be able to subscribe to a topic. +* Subscribers should be able to subscribe to a topic (Note: subscriber can subscribe to at most 1 topic). * Whenever a message is published to a topic, all the subscribers, who are subscribed to that topic, should receive the message. * Subscribers should be able to run in parallel diff --git a/src/main/java/com/uditagarwal/Application.java b/src/main/java/com/uditagarwal/Application.java new file mode 100644 index 0000000..bfbe559 --- /dev/null +++ b/src/main/java/com/uditagarwal/Application.java @@ -0,0 +1,36 @@ +package com.uditagarwal; + +import com.uditagarwal.pub_sub_queue.factories.QueueFactory; +import com.uditagarwal.pub_sub_queue.interfaces.Queue; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class Application { + + public static void main(String[] args) throws InterruptedException { + log.info("Started application"); + final Queue queue = QueueFactory.getInMemoryQueue(); + final String topicName1 = "t1", topicName2 = "t2"; + + queue.createTopic(topicName1); + queue.createTopic(topicName2); + + queue.addSubscriber("sub1", topicName1, 10000); + queue.addSubscriber("sub2", topicName1, 10000); + queue.addSubscriber("sub3", topicName2, 5000); + + queue.publishMessage(topicName1, "m1"); + queue.publishMessage(topicName1, "m2"); + queue.publishMessage(topicName2, "m3"); + + log.info("Sleeping for 15 seconds ..."); + Thread.sleep(15000); + log.info("Woke up after sleeping for 15 seconds ..."); + + queue.publishMessage(topicName2, "m4"); + queue.publishMessage(topicName1, "m5"); + + queue.resetOffset(topicName1, "sub1", 0); + log.info("Stopped application"); + } +} diff --git a/src/main/java/com/uditagarwal/Main.java b/src/main/java/com/uditagarwal/Main.java deleted file mode 100644 index b528d04..0000000 --- a/src/main/java/com/uditagarwal/Main.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.uditagarwal; - -import com.uditagarwal.pub_sub_queue.public_interface.Queue; -import com.uditagarwal.pub_sub_queue.model.Message; -import com.uditagarwal.pub_sub_queue.model.Topic; - -public class Main { - public static void main(String[] args) throws InterruptedException { - final Queue queue = new Queue(); - final Topic topic1 = queue.createTopic("t1"); - final Topic topic2 = queue.createTopic("t2"); - final SleepingSubscriber sub1 = new SleepingSubscriber("sub1", 10000); - final SleepingSubscriber sub2 = new SleepingSubscriber("sub2", 10000); - queue.subscribe(sub1, topic1); - queue.subscribe(sub2, topic1); - - final SleepingSubscriber sub3 = new SleepingSubscriber("sub3", 5000); - queue.subscribe(sub3, topic2); - - queue.publish(topic1, new Message("m1")); - queue.publish(topic1, new Message("m2")); - - queue.publish(topic2, new Message("m3")); - - Thread.sleep(15000); - queue.publish(topic2, new Message("m4")); - queue.publish(topic1, new Message("m5")); - - queue.resetOffset(topic1, sub1, 0); - } -} diff --git a/src/main/java/com/uditagarwal/SleepingSubscriber.java b/src/main/java/com/uditagarwal/SleepingSubscriber.java deleted file mode 100644 index 6ec6fc0..0000000 --- a/src/main/java/com/uditagarwal/SleepingSubscriber.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.uditagarwal; - -import com.uditagarwal.pub_sub_queue.public_interface.ISubscriber; -import com.uditagarwal.pub_sub_queue.model.Message; - -public class SleepingSubscriber implements ISubscriber { - private final String id; - private final int sleepTimeInMillis; - - public SleepingSubscriber(String id, int sleepTimeInMillis) { - this.id = id; - this.sleepTimeInMillis = sleepTimeInMillis; - } - - @Override - public String getId() { - return id; - } - - @Override - public void consume(Message message) throws InterruptedException { - System.out.println("Subscriber: " + id + " started consuming: " + message.getMsg()); - Thread.sleep(sleepTimeInMillis); - System.out.println("Subscriber: " + id + " done consuming: " + message.getMsg()); - } -} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/factories/MessageFactory.java b/src/main/java/com/uditagarwal/pub_sub_queue/factories/MessageFactory.java new file mode 100644 index 0000000..62ab979 --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/factories/MessageFactory.java @@ -0,0 +1,9 @@ +package com.uditagarwal.pub_sub_queue.factories; + +import com.uditagarwal.pub_sub_queue.models.Message; + +public class MessageFactory { + public static Message getNewMessage(String message) { + return new Message(message); + } +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/factories/QueueFactory.java b/src/main/java/com/uditagarwal/pub_sub_queue/factories/QueueFactory.java new file mode 100644 index 0000000..75c72fc --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/factories/QueueFactory.java @@ -0,0 +1,10 @@ +package com.uditagarwal.pub_sub_queue.factories; + +import com.uditagarwal.pub_sub_queue.impl.InMemoryQueue; +import com.uditagarwal.pub_sub_queue.interfaces.Queue; + +public class QueueFactory { + public static Queue getInMemoryQueue() { + return new InMemoryQueue(); + } +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/factories/SubscriberFactory.java b/src/main/java/com/uditagarwal/pub_sub_queue/factories/SubscriberFactory.java new file mode 100644 index 0000000..f093c53 --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/factories/SubscriberFactory.java @@ -0,0 +1,11 @@ +package com.uditagarwal.pub_sub_queue.factories; + +import com.uditagarwal.pub_sub_queue.impl.DummySubscriber; +import com.uditagarwal.pub_sub_queue.interfaces.Subscriber; + +public class SubscriberFactory { + + public static Subscriber getDummySubscriber(String subscriberId, int sleepTimeInMs) { + return new DummySubscriber(subscriberId, sleepTimeInMs); + } +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/factories/TopicFactory.java b/src/main/java/com/uditagarwal/pub_sub_queue/factories/TopicFactory.java new file mode 100644 index 0000000..7286048 --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/factories/TopicFactory.java @@ -0,0 +1,14 @@ +package com.uditagarwal.pub_sub_queue.factories; + +import com.uditagarwal.pub_sub_queue.handler.TopicProcessor; +import com.uditagarwal.pub_sub_queue.impl.InMemoryTopic; +import com.uditagarwal.pub_sub_queue.interfaces.Topic; + +public class TopicFactory { + public static Topic getNewTopic(String topicName) { + return new InMemoryTopic(topicName); + } + public static TopicProcessor getNewTopicProcessor(final Topic topic) { + return new TopicProcessor(topic); + } +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/handler/SubscriberWorker.java b/src/main/java/com/uditagarwal/pub_sub_queue/handler/SubscriberWorker.java index d43ceee..d016b5a 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/handler/SubscriberWorker.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/handler/SubscriberWorker.java @@ -1,8 +1,8 @@ package com.uditagarwal.pub_sub_queue.handler; -import com.uditagarwal.pub_sub_queue.model.Message; -import com.uditagarwal.pub_sub_queue.model.Topic; -import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; +import com.uditagarwal.pub_sub_queue.interfaces.Subscriber; +import com.uditagarwal.pub_sub_queue.models.Message; +import com.uditagarwal.pub_sub_queue.interfaces.Topic; import lombok.Getter; import lombok.NonNull; import lombok.SneakyThrows; @@ -11,35 +11,35 @@ public class SubscriberWorker implements Runnable { private final Topic topic; - private final TopicSubscriber topicSubscriber; + private final Subscriber subscriber; - public SubscriberWorker(@NonNull final Topic topic, @NonNull final TopicSubscriber topicSubscriber) { + public SubscriberWorker(@NonNull final Topic topic, @NonNull final Subscriber subscriber) { this.topic = topic; - this.topicSubscriber = topicSubscriber; + this.subscriber = subscriber; } @SneakyThrows @Override public void run() { - synchronized (topicSubscriber) { + synchronized (subscriber) { do { - int curOffset = topicSubscriber.getOffset().get(); + int curOffset = subscriber.getOffset().get(); while (curOffset >= topic.getMessages().size()) { - topicSubscriber.wait(); + subscriber.wait(); } Message message = topic.getMessages().get(curOffset); - topicSubscriber.getSubscriber().consume(message); + subscriber.consume(message); // We cannot just increment here since subscriber offset can be reset while it is consuming. So, after // consuming we need to increase only if it was previous one. - topicSubscriber.getOffset().compareAndSet(curOffset, curOffset + 1); + subscriber.getOffset().compareAndSet(curOffset, curOffset + 1); } while (true); } } synchronized public void wakeUpIfNeeded() { - synchronized (topicSubscriber) { - topicSubscriber.notify(); + synchronized (subscriber) { + subscriber.notify(); } } } diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicHandler.java b/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicProcessor.java similarity index 58% rename from src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicHandler.java rename to src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicProcessor.java index 66bd337..3eee8f2 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicHandler.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicProcessor.java @@ -1,31 +1,33 @@ package com.uditagarwal.pub_sub_queue.handler; -import com.uditagarwal.pub_sub_queue.model.Topic; -import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; +import com.uditagarwal.pub_sub_queue.interfaces.Subscriber; +import com.uditagarwal.pub_sub_queue.interfaces.Topic; +import lombok.Getter; import lombok.NonNull; import java.util.HashMap; import java.util.Map; -public class TopicHandler { +@Getter +public class TopicProcessor { private final Topic topic; private final Map subscriberWorkers; - public TopicHandler(@NonNull final Topic topic) { + public TopicProcessor(@NonNull final Topic topic) { this.topic = topic; subscriberWorkers = new HashMap<>(); } public void publish() { - for (TopicSubscriber topicSubscriber:topic.getSubscribers()) { - startSubsriberWorker(topicSubscriber); + for (Subscriber subscriber : topic.getSubscribers()) { + startSubscriberWorker(subscriber); } } - public void startSubsriberWorker(@NonNull final TopicSubscriber topicSubscriber) { - final String subscriberId = topicSubscriber.getSubscriber().getId(); + public void startSubscriberWorker(@NonNull final Subscriber subscriber) { + final String subscriberId = subscriber.getId(); if (!subscriberWorkers.containsKey(subscriberId)) { - final SubscriberWorker subscriberWorker = new SubscriberWorker(topic, topicSubscriber); + final SubscriberWorker subscriberWorker = new SubscriberWorker(topic, subscriber); subscriberWorkers.put(subscriberId, subscriberWorker); new Thread(subscriberWorker).start(); } diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/impl/DummySubscriber.java b/src/main/java/com/uditagarwal/pub_sub_queue/impl/DummySubscriber.java new file mode 100644 index 0000000..6eff80b --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/impl/DummySubscriber.java @@ -0,0 +1,29 @@ +package com.uditagarwal.pub_sub_queue.impl; + +import com.uditagarwal.pub_sub_queue.interfaces.Subscriber; +import com.uditagarwal.pub_sub_queue.models.Message; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.atomic.AtomicInteger; + +@Slf4j +@Getter +public class DummySubscriber implements Subscriber { + private final String id; + private final int sleepTimeInMillis; + private final AtomicInteger offset; + + public DummySubscriber(String id, int sleepTimeInMillis) { + this.id = id; + this.sleepTimeInMillis = sleepTimeInMillis; + this.offset = new AtomicInteger(0); + } + + @Override + public void consume(Message message) throws InterruptedException { + log.info("Subscriber: " + id + " started consuming: " + message.getMsg()); + Thread.sleep(sleepTimeInMillis); + log.info("Subscriber: " + id + " done consuming: " + message.getMsg()); + } +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/impl/InMemoryQueue.java b/src/main/java/com/uditagarwal/pub_sub_queue/impl/InMemoryQueue.java new file mode 100644 index 0000000..3757ce6 --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/impl/InMemoryQueue.java @@ -0,0 +1,62 @@ +package com.uditagarwal.pub_sub_queue.impl; + +import com.uditagarwal.pub_sub_queue.factories.MessageFactory; +import com.uditagarwal.pub_sub_queue.factories.SubscriberFactory; +import com.uditagarwal.pub_sub_queue.factories.TopicFactory; +import com.uditagarwal.pub_sub_queue.handler.TopicProcessor; +import com.uditagarwal.pub_sub_queue.interfaces.Queue; +import com.uditagarwal.pub_sub_queue.interfaces.Subscriber; +import com.uditagarwal.pub_sub_queue.interfaces.Topic; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Slf4j +public class InMemoryQueue implements Queue { + + private final Map topicNameToProcessorMap; + + public InMemoryQueue() { + this.topicNameToProcessorMap = new HashMap<>(); + } + + public void createTopic(@NonNull final String topicName) { + final Topic topic = TopicFactory.getNewTopic(topicName); + final TopicProcessor topicProcessor = TopicFactory.getNewTopicProcessor(topic); + topicNameToProcessorMap.put(topicName, topicProcessor); + log.info("created topic with name: {}", topicName); + } + + public void addSubscriber(@NonNull final String subscriberId, + @NonNull final String topicName, + final int sleepTimeInMs) { + Subscriber subscriber = SubscriberFactory.getDummySubscriber(subscriberId, sleepTimeInMs); + getTopic(topicName).addSubscriber(subscriber); + log.info(subscriber.getId() + " subscribed to topic: " + topicName); + } + + private Topic getTopic(String topicName) { + return topicNameToProcessorMap.get(topicName).getTopic(); + } + + public void publishMessage(@NonNull final String topicName, @NonNull final String message) { + getTopic(topicName).addMessage(MessageFactory.getNewMessage(message)); + log.info(message + " published to topic: " + topicName); + new Thread(() -> topicNameToProcessorMap.get(topicName).publish()).start(); + } + + public void resetOffset(@NonNull final String topicName, @NonNull final String subscriberId, @NonNull final Integer newOffset) { + List subscriberList = getTopic(topicName).getSubscribers(); + for (Subscriber subscriber : subscriberList) { + if (subscriber.getId().equals(subscriberId)) { + subscriber.getOffset().set(newOffset); + log.info(subscriber.getId() + " offset reset to: " + newOffset); + new Thread(() -> topicNameToProcessorMap.get(topicName).startSubscriberWorker(subscriber)).start(); + break; + } + } + } +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/impl/InMemoryTopic.java b/src/main/java/com/uditagarwal/pub_sub_queue/impl/InMemoryTopic.java new file mode 100644 index 0000000..d390fa6 --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/impl/InMemoryTopic.java @@ -0,0 +1,31 @@ +package com.uditagarwal.pub_sub_queue.impl; + +import com.uditagarwal.pub_sub_queue.interfaces.Subscriber; +import com.uditagarwal.pub_sub_queue.interfaces.Topic; +import com.uditagarwal.pub_sub_queue.models.Message; +import lombok.Getter; +import lombok.NonNull; + +import java.util.ArrayList; +import java.util.List; + +@Getter +public class InMemoryTopic implements Topic { + private final String topicName; // treating topic name as identifier + private final List messages; + private final List subscribers; + + public InMemoryTopic(@NonNull final String topicName) { + this.topicName = topicName; + this.messages = new ArrayList<>(); + this.subscribers = new ArrayList<>(); + } + + public synchronized void addMessage(@NonNull final Message message) { + messages.add(message); + } + + public void addSubscriber(@NonNull final Subscriber subscriber) { + subscribers.add(subscriber); + } +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Queue.java b/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Queue.java new file mode 100644 index 0000000..8f30cf5 --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Queue.java @@ -0,0 +1,8 @@ +package com.uditagarwal.pub_sub_queue.interfaces; + +public interface Queue { + void createTopic(String topicName); + void addSubscriber(String subscriberId, String topicName, int sleepTimeInMs); + void publishMessage(String topicName, String message); + void resetOffset(String topicName, String subscriberId, Integer newOffset); +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Subscriber.java b/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Subscriber.java new file mode 100644 index 0000000..9f15cf8 --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Subscriber.java @@ -0,0 +1,12 @@ +package com.uditagarwal.pub_sub_queue.interfaces; + +import com.uditagarwal.pub_sub_queue.models.Message; + +import java.util.concurrent.atomic.AtomicInteger; + +public interface Subscriber { + String getId(); + int getSleepTimeInMillis(); + AtomicInteger getOffset(); + void consume(Message message) throws InterruptedException; +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Topic.java b/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Topic.java new file mode 100644 index 0000000..6b4dff5 --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Topic.java @@ -0,0 +1,12 @@ +package com.uditagarwal.pub_sub_queue.interfaces; + +import com.uditagarwal.pub_sub_queue.models.Message; + +import java.util.List; + +public interface Topic { + void addMessage(Message message); + void addSubscriber(Subscriber subscriber); + List getSubscribers(); + List getMessages(); +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/model/Topic.java b/src/main/java/com/uditagarwal/pub_sub_queue/model/Topic.java deleted file mode 100644 index 94a8cb5..0000000 --- a/src/main/java/com/uditagarwal/pub_sub_queue/model/Topic.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.uditagarwal.pub_sub_queue.model; - -import lombok.Getter; -import lombok.NonNull; - -import java.util.ArrayList; -import java.util.List; - -@Getter -public class Topic { - private final String topicName; - private final String topicId; - private final List messages; // TODO: Change getter this to send only immutable list outside. - private final List subscribers; // TODO: Change getter this to send only immutable list outside. - - public Topic(@NonNull final String topicName, @NonNull final String topicId) { - this.topicName = topicName; - this.topicId = topicId; - this.messages = new ArrayList<>(); - this.subscribers = new ArrayList<>(); - } - - public synchronized void addMessage(@NonNull final Message message) { - messages.add(message); - } - - public void addSubscriber(@NonNull final TopicSubscriber subscriber) { - subscribers.add(subscriber); - } -} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/model/TopicSubscriber.java b/src/main/java/com/uditagarwal/pub_sub_queue/model/TopicSubscriber.java deleted file mode 100644 index 64d5e71..0000000 --- a/src/main/java/com/uditagarwal/pub_sub_queue/model/TopicSubscriber.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.uditagarwal.pub_sub_queue.model; - -import com.uditagarwal.pub_sub_queue.public_interface.ISubscriber; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NonNull; - -import java.util.concurrent.atomic.AtomicInteger; - -@Getter -@AllArgsConstructor -public class TopicSubscriber { - private final AtomicInteger offset; - private final ISubscriber subscriber; - - public TopicSubscriber(@NonNull final ISubscriber subscriber) { - this.subscriber = subscriber; - this.offset = new AtomicInteger(0); - } -} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/model/Message.java b/src/main/java/com/uditagarwal/pub_sub_queue/models/Message.java similarity index 75% rename from src/main/java/com/uditagarwal/pub_sub_queue/model/Message.java rename to src/main/java/com/uditagarwal/pub_sub_queue/models/Message.java index 946e339..2f0cb88 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/model/Message.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/models/Message.java @@ -1,4 +1,4 @@ -package com.uditagarwal.pub_sub_queue.model; +package com.uditagarwal.pub_sub_queue.models; import lombok.AllArgsConstructor; import lombok.Getter; @@ -7,5 +7,4 @@ @Getter public class Message { private final String msg; - } diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/ISubscriber.java b/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/ISubscriber.java deleted file mode 100644 index 8e02fa7..0000000 --- a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/ISubscriber.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.uditagarwal.pub_sub_queue.public_interface; - -import com.uditagarwal.pub_sub_queue.model.Message; - -public interface ISubscriber { - - String getId(); - void consume(Message message) throws InterruptedException; -} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Queue.java b/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Queue.java deleted file mode 100644 index 535138a..0000000 --- a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Queue.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.uditagarwal.pub_sub_queue.public_interface; - -import com.uditagarwal.pub_sub_queue.handler.TopicHandler; -import com.uditagarwal.pub_sub_queue.model.Message; -import com.uditagarwal.pub_sub_queue.model.Topic; -import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; -import lombok.NonNull; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -public class Queue { - private final Map topicProcessors; - - public Queue() { - this.topicProcessors = new HashMap<>(); - } - - public Topic createTopic(@NonNull final String topicName) { - final Topic topic = new Topic(topicName, UUID.randomUUID().toString()); - TopicHandler topicHandler = new TopicHandler(topic); - topicProcessors.put(topic.getTopicId(), topicHandler); - System.out.println("Created topic: " + topic.getTopicName()); - return topic; - } - - public void subscribe(@NonNull final ISubscriber subscriber, @NonNull final Topic topic) { - topic.addSubscriber(new TopicSubscriber(subscriber)); - System.out.println(subscriber.getId() + " subscribed to topic: " + topic.getTopicName()); - } - - public void publish(@NonNull final Topic topic, @NonNull final Message message) { - topic.addMessage(message); - System.out.println(message.getMsg() + " published to topic: " + topic.getTopicName()); - new Thread(() -> topicProcessors.get(topic.getTopicId()).publish()).start(); - } - - public void resetOffset(@NonNull final Topic topic, @NonNull final ISubscriber subscriber, @NonNull final Integer newOffset) { - for (TopicSubscriber topicSubscriber : topic.getSubscribers()) { - if (topicSubscriber.getSubscriber().equals(subscriber)) { - topicSubscriber.getOffset().set(newOffset); - System.out.println(topicSubscriber.getSubscriber().getId() + " offset reset to: " + newOffset); - new Thread(() -> topicProcessors.get(topic.getTopicId()).startSubsriberWorker(topicSubscriber)).start(); - break; - } - } - } -} diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..56126c3 --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file