Skip to content

Improvements #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,21 @@
<version>1.18.16</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.7</version>
</dependency>
</dependencies>

<properties>
Expand Down
2 changes: 1 addition & 1 deletion problem-statement.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ We have to design a message queue supporting publisher-subscriber model. It shou

* It should support multiple topics where messages can be published.
* Publisher should be able to publish a message to a particular topic.
* Subscribers should be able to subscribe to a topic.
* Subscribers should be able to subscribe to a topic (Note: subscriber can subscribe to at most 1 topic).
* Whenever a message is published to a topic, all the subscribers, who are subscribed to that topic, should receive the message.
* Subscribers should be able to run in parallel

36 changes: 36 additions & 0 deletions src/main/java/com/uditagarwal/Application.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.uditagarwal;

import com.uditagarwal.pub_sub_queue.factories.QueueFactory;
import com.uditagarwal.pub_sub_queue.interfaces.Queue;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Application {

public static void main(String[] args) throws InterruptedException {
log.info("Started application");
final Queue queue = QueueFactory.getInMemoryQueue();
final String topicName1 = "t1", topicName2 = "t2";

queue.createTopic(topicName1);
queue.createTopic(topicName2);

queue.addSubscriber("sub1", topicName1, 10000);
queue.addSubscriber("sub2", topicName1, 10000);
queue.addSubscriber("sub3", topicName2, 5000);

queue.publishMessage(topicName1, "m1");
queue.publishMessage(topicName1, "m2");
queue.publishMessage(topicName2, "m3");

log.info("Sleeping for 15 seconds ...");
Thread.sleep(15000);
log.info("Woke up after sleeping for 15 seconds ...");

queue.publishMessage(topicName2, "m4");
queue.publishMessage(topicName1, "m5");

queue.resetOffset(topicName1, "sub1", 0);
log.info("Stopped application");
}
}
31 changes: 0 additions & 31 deletions src/main/java/com/uditagarwal/Main.java

This file was deleted.

26 changes: 0 additions & 26 deletions src/main/java/com/uditagarwal/SleepingSubscriber.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.uditagarwal.pub_sub_queue.factories;

import com.uditagarwal.pub_sub_queue.models.Message;

public class MessageFactory {
public static Message getNewMessage(String message) {
return new Message(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.uditagarwal.pub_sub_queue.factories;

import com.uditagarwal.pub_sub_queue.impl.InMemoryQueue;
import com.uditagarwal.pub_sub_queue.interfaces.Queue;

public class QueueFactory {
public static Queue getInMemoryQueue() {
return new InMemoryQueue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.uditagarwal.pub_sub_queue.factories;

import com.uditagarwal.pub_sub_queue.impl.DummySubscriber;
import com.uditagarwal.pub_sub_queue.interfaces.Subscriber;

public class SubscriberFactory {

public static Subscriber getDummySubscriber(String subscriberId, int sleepTimeInMs) {
return new DummySubscriber(subscriberId, sleepTimeInMs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.uditagarwal.pub_sub_queue.factories;

import com.uditagarwal.pub_sub_queue.handler.TopicProcessor;
import com.uditagarwal.pub_sub_queue.impl.InMemoryTopic;
import com.uditagarwal.pub_sub_queue.interfaces.Topic;

public class TopicFactory {
public static Topic getNewTopic(String topicName) {
return new InMemoryTopic(topicName);
}
public static TopicProcessor getNewTopicProcessor(final Topic topic) {
return new TopicProcessor(topic);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.uditagarwal.pub_sub_queue.handler;

import com.uditagarwal.pub_sub_queue.model.Message;
import com.uditagarwal.pub_sub_queue.model.Topic;
import com.uditagarwal.pub_sub_queue.model.TopicSubscriber;
import com.uditagarwal.pub_sub_queue.interfaces.Subscriber;
import com.uditagarwal.pub_sub_queue.models.Message;
import com.uditagarwal.pub_sub_queue.interfaces.Topic;
import lombok.Getter;
import lombok.NonNull;
import lombok.SneakyThrows;
Expand All @@ -11,35 +11,35 @@
public class SubscriberWorker implements Runnable {

private final Topic topic;
private final TopicSubscriber topicSubscriber;
private final Subscriber subscriber;

public SubscriberWorker(@NonNull final Topic topic, @NonNull final TopicSubscriber topicSubscriber) {
public SubscriberWorker(@NonNull final Topic topic, @NonNull final Subscriber subscriber) {
this.topic = topic;
this.topicSubscriber = topicSubscriber;
this.subscriber = subscriber;
}

@SneakyThrows
@Override
public void run() {
synchronized (topicSubscriber) {
synchronized (subscriber) {
do {
int curOffset = topicSubscriber.getOffset().get();
int curOffset = subscriber.getOffset().get();
while (curOffset >= topic.getMessages().size()) {
topicSubscriber.wait();
subscriber.wait();
}
Message message = topic.getMessages().get(curOffset);
topicSubscriber.getSubscriber().consume(message);
subscriber.consume(message);

// We cannot just increment here since subscriber offset can be reset while it is consuming. So, after
// consuming we need to increase only if it was previous one.
topicSubscriber.getOffset().compareAndSet(curOffset, curOffset + 1);
subscriber.getOffset().compareAndSet(curOffset, curOffset + 1);
} while (true);
}
}

synchronized public void wakeUpIfNeeded() {
synchronized (topicSubscriber) {
topicSubscriber.notify();
synchronized (subscriber) {
subscriber.notify();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@
package com.uditagarwal.pub_sub_queue.handler;

import com.uditagarwal.pub_sub_queue.model.Topic;
import com.uditagarwal.pub_sub_queue.model.TopicSubscriber;
import com.uditagarwal.pub_sub_queue.interfaces.Subscriber;
import com.uditagarwal.pub_sub_queue.interfaces.Topic;
import lombok.Getter;
import lombok.NonNull;

import java.util.HashMap;
import java.util.Map;

public class TopicHandler {
@Getter
public class TopicProcessor {
private final Topic topic;
private final Map<String, SubscriberWorker> subscriberWorkers;

public TopicHandler(@NonNull final Topic topic) {
public TopicProcessor(@NonNull final Topic topic) {
this.topic = topic;
subscriberWorkers = new HashMap<>();
}

public void publish() {
for (TopicSubscriber topicSubscriber:topic.getSubscribers()) {
startSubsriberWorker(topicSubscriber);
for (Subscriber subscriber : topic.getSubscribers()) {
startSubscriberWorker(subscriber);
}
}

public void startSubsriberWorker(@NonNull final TopicSubscriber topicSubscriber) {
final String subscriberId = topicSubscriber.getSubscriber().getId();
public void startSubscriberWorker(@NonNull final Subscriber subscriber) {
final String subscriberId = subscriber.getId();
if (!subscriberWorkers.containsKey(subscriberId)) {
final SubscriberWorker subscriberWorker = new SubscriberWorker(topic, topicSubscriber);
final SubscriberWorker subscriberWorker = new SubscriberWorker(topic, subscriber);
subscriberWorkers.put(subscriberId, subscriberWorker);
new Thread(subscriberWorker).start();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.uditagarwal.pub_sub_queue.impl;

import com.uditagarwal.pub_sub_queue.interfaces.Subscriber;
import com.uditagarwal.pub_sub_queue.models.Message;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Getter
public class DummySubscriber implements Subscriber {
private final String id;
private final int sleepTimeInMillis;
private final AtomicInteger offset;

public DummySubscriber(String id, int sleepTimeInMillis) {
this.id = id;
this.sleepTimeInMillis = sleepTimeInMillis;
this.offset = new AtomicInteger(0);
}

@Override
public void consume(Message message) throws InterruptedException {
log.info("Subscriber: " + id + " started consuming: " + message.getMsg());
Thread.sleep(sleepTimeInMillis);
log.info("Subscriber: " + id + " done consuming: " + message.getMsg());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.uditagarwal.pub_sub_queue.impl;

import com.uditagarwal.pub_sub_queue.factories.MessageFactory;
import com.uditagarwal.pub_sub_queue.factories.SubscriberFactory;
import com.uditagarwal.pub_sub_queue.factories.TopicFactory;
import com.uditagarwal.pub_sub_queue.handler.TopicProcessor;
import com.uditagarwal.pub_sub_queue.interfaces.Queue;
import com.uditagarwal.pub_sub_queue.interfaces.Subscriber;
import com.uditagarwal.pub_sub_queue.interfaces.Topic;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
public class InMemoryQueue implements Queue {

private final Map<String, TopicProcessor> topicNameToProcessorMap;

public InMemoryQueue() {
this.topicNameToProcessorMap = new HashMap<>();
}

public void createTopic(@NonNull final String topicName) {
final Topic topic = TopicFactory.getNewTopic(topicName);
final TopicProcessor topicProcessor = TopicFactory.getNewTopicProcessor(topic);
topicNameToProcessorMap.put(topicName, topicProcessor);
log.info("created topic with name: {}", topicName);
}

public void addSubscriber(@NonNull final String subscriberId,
@NonNull final String topicName,
final int sleepTimeInMs) {
Subscriber subscriber = SubscriberFactory.getDummySubscriber(subscriberId, sleepTimeInMs);
getTopic(topicName).addSubscriber(subscriber);
log.info(subscriber.getId() + " subscribed to topic: " + topicName);
}

private Topic getTopic(String topicName) {
return topicNameToProcessorMap.get(topicName).getTopic();
}

public void publishMessage(@NonNull final String topicName, @NonNull final String message) {
getTopic(topicName).addMessage(MessageFactory.getNewMessage(message));
log.info(message + " published to topic: " + topicName);
new Thread(() -> topicNameToProcessorMap.get(topicName).publish()).start();
}

public void resetOffset(@NonNull final String topicName, @NonNull final String subscriberId, @NonNull final Integer newOffset) {
List<Subscriber> subscriberList = getTopic(topicName).getSubscribers();
for (Subscriber subscriber : subscriberList) {
if (subscriber.getId().equals(subscriberId)) {
subscriber.getOffset().set(newOffset);
log.info(subscriber.getId() + " offset reset to: " + newOffset);
new Thread(() -> topicNameToProcessorMap.get(topicName).startSubscriberWorker(subscriber)).start();
break;
}
}
}
}
Loading