Skip to content

Commit fbcea5e

Browse files
author
avalkov
committed
Initial commit
0 parents  commit fbcea5e

26 files changed

+1343
-0
lines changed

Diff for: .env

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
AMQP_HOST="rabbitmq"
2+
AMQP_PORT=5672
3+
AMQP_USER="user"
4+
AMQP_PASS="password"
5+
AMQP_QUEUE_NAME="commands_queue"
6+
PROCESSING_WORKERS_COUNT=3

Diff for: .gitignore

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# If you prefer the allow list template instead of the deny list, see community template:
2+
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
3+
#
4+
# Binaries for programs and plugins
5+
*.exe
6+
*.exe~
7+
*.dll
8+
*.so
9+
*.dylib
10+
11+
# Test binary, built with `go test -c`
12+
*.test
13+
14+
# Output of the go coverage tool, specifically when used with LiteIDE
15+
*.out
16+
17+
# Dependency directories (remove the comment below to include it)
18+
# vendor/
19+
20+
# Go workspace file
21+
go.work
22+
23+
.netrc

Diff for: README.md

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# SCS - Simple Client-Server via RabbitMQ
2+
3+
You can run it with docker-compose and the default .env:
4+
```
5+
docker-compose up -d
6+
```
7+
8+
Uses CSP (Common Sequantial Processes) pattern to distribute the work between logically independant routines.
9+
After messages are received from AMQP, they are being sent to router that will use consistient hashing to distribute them
10+
between set of pre-configured workers so all operations for same ```key``` are serialized to execute in same worker to avoid potential race conditions.
11+
12+
The ```commands``` directory contains multiple files with commands. Which ones to be loaded by the client during test is specified in ```client/Dockerfile```. Each file will be loaded and executed concurrently to simulate multiple clients.
13+
14+
After client finishes, it won't exit so we can attach to the running container and inspect the saved files.

Diff for: apps/client/main.go

+157
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"fmt"
7+
"log"
8+
"os"
9+
"regexp"
10+
"time"
11+
12+
"github.com/avalkov/SCS/internal/configuration"
13+
"github.com/joho/godotenv"
14+
"github.com/sethvargo/go-envconfig"
15+
"github.com/streadway/amqp"
16+
)
17+
18+
type Message struct {
19+
Body string
20+
ReplyTo string
21+
CorrelationId string
22+
}
23+
24+
const (
25+
replyQueue = "reply_queue"
26+
)
27+
28+
var jsonArrayRegex = regexp.MustCompile(`^\s*\[\s*.*\s*\]\s*$`)
29+
30+
func failOnError(err error, msg string) {
31+
if err != nil {
32+
log.Fatalf("%s: %s", msg, err)
33+
}
34+
}
35+
36+
func connectToRabbitMQ(user, password, host string, port int) (*amqp.Connection, *amqp.Channel) {
37+
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", user, password, host, port))
38+
failOnError(err, "Failed to connect to RabbitMQ")
39+
40+
ch, err := conn.Channel()
41+
failOnError(err, "Failed to open a channel")
42+
43+
return conn, ch
44+
}
45+
46+
func declareQueue(ch *amqp.Channel, name string) amqp.Queue {
47+
q, err := ch.QueueDeclare(
48+
name, // name
49+
false, // durable
50+
false, // delete when unused
51+
false, // exclusive
52+
false, // no-wait
53+
nil, // arguments
54+
)
55+
failOnError(err, fmt.Sprintf("Failed to declare queue: %s", name))
56+
return q
57+
}
58+
59+
func loadCommandsFromFile(filename string) ([]string, error) {
60+
file, err := os.Open(filename)
61+
if err != nil {
62+
return nil, err
63+
}
64+
defer file.Close()
65+
66+
var commands []string
67+
scanner := bufio.NewScanner(file)
68+
for scanner.Scan() {
69+
commands = append(commands, scanner.Text())
70+
}
71+
72+
if err := scanner.Err(); err != nil {
73+
return nil, err
74+
}
75+
76+
return commands, nil
77+
}
78+
79+
func sendCommand(ch *amqp.Channel, command string, requestQueue, replyQueue string) {
80+
corrId := fmt.Sprintf("%d", time.Now().UnixNano())
81+
82+
err := ch.Publish(
83+
"", // exchange
84+
requestQueue, // routing key
85+
false, // mandatory
86+
false, // immediate
87+
amqp.Publishing{
88+
ContentType: "text/plain",
89+
CorrelationId: corrId,
90+
ReplyTo: replyQueue,
91+
Body: []byte(command),
92+
})
93+
failOnError(err, "Failed to publish a message")
94+
}
95+
96+
func receiveReplies(ch *amqp.Channel, replyQueue string, clientID int) {
97+
msgs, err := ch.Consume(
98+
replyQueue, // queue
99+
"", // consumer
100+
true, // auto-ack
101+
false, // exclusive
102+
false, // no-local
103+
false, // no-wait
104+
nil, // args
105+
)
106+
failOnError(err, "Failed to register a consumer")
107+
108+
for d := range msgs {
109+
fmt.Printf("Client %d received reply: %s\n", clientID, d.Body)
110+
if jsonArrayRegex.Match(d.Body) {
111+
saveToFile(fmt.Sprintf("getAllItemsResponse_client_%d.json", clientID), d.Body)
112+
}
113+
}
114+
}
115+
116+
func saveToFile(filename string, data []byte) {
117+
file, err := os.Create(filename)
118+
failOnError(err, fmt.Sprintf("Failed to create file: %s", filename))
119+
defer file.Close()
120+
121+
_, err = file.Write(data)
122+
failOnError(err, fmt.Sprintf("Failed to write to file: %s", filename))
123+
}
124+
125+
func main() {
126+
if len(os.Args) < 2 {
127+
log.Fatalf("Usage: %s <commands_file_1> <commands_file_2> ... <commands_file_n>", os.Args[0])
128+
}
129+
130+
godotenv.Load(".env")
131+
132+
var config configuration.Config
133+
if err := envconfig.Process(context.Background(), &config); err != nil {
134+
log.Fatalf("Failed to process env var: %v", err)
135+
}
136+
137+
conn, ch := connectToRabbitMQ(config.AMQP.User, config.AMQP.Pass, config.AMQP.Host, config.AMQP.Port)
138+
defer conn.Close()
139+
140+
replyQueue := declareQueue(ch, replyQueue)
141+
142+
for i, filename := range os.Args[1:] {
143+
log.Printf("Processing commands from file: %s", filename)
144+
commands, err := loadCommandsFromFile(filename)
145+
failOnError(err, fmt.Sprintf("Failed to load commands from file %s", filename))
146+
147+
go func(clientID int, commands []string) {
148+
go receiveReplies(ch, replyQueue.Name, clientID)
149+
for _, command := range commands {
150+
sendCommand(ch, command, config.AMQP.QueueName, replyQueue.Name)
151+
}
152+
}(i, commands)
153+
}
154+
155+
ctx := context.Background()
156+
<-ctx.Done()
157+
}

Diff for: apps/server/main.go

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"os"
8+
9+
"github.com/avalkov/SCS/internal/configuration"
10+
ds "github.com/avalkov/SCS/internal/datastructures"
11+
commandsParser "github.com/avalkov/SCS/internal/domain/commands_parser"
12+
commandsProcessor "github.com/avalkov/SCS/internal/domain/commands_processor"
13+
"github.com/avalkov/SCS/internal/multiworker"
14+
"github.com/avalkov/SCS/internal/queueservice"
15+
"github.com/avalkov/SCS/internal/queueservice/amqp"
16+
"github.com/joho/godotenv"
17+
"github.com/sethvargo/go-envconfig"
18+
)
19+
20+
func main() {
21+
if err := run(); err != nil {
22+
fmt.Fprintf(os.Stderr, "%v\n", err)
23+
os.Exit(1)
24+
}
25+
}
26+
27+
func run() error {
28+
ctx, cancelFunc := context.WithCancel(context.Background())
29+
defer cancelFunc()
30+
31+
godotenv.Load(".env")
32+
33+
var config configuration.Config
34+
if err := envconfig.Process(ctx, &config); err != nil {
35+
return err
36+
}
37+
38+
amqpWorker := amqp.NewAmqpWorker(amqp.AmqpConfig{
39+
Host: config.AMQP.Host,
40+
Port: config.AMQP.Port,
41+
User: config.AMQP.User,
42+
Pass: config.AMQP.Pass,
43+
QueueName: config.AMQP.QueueName,
44+
})
45+
if err := amqpWorker.Initialize(); err != nil {
46+
return fmt.Errorf("failed to initialize AMQP worker: %v", err)
47+
}
48+
49+
defer amqpWorker.Close()
50+
51+
requestsChans := make(chan queueservice.Message)
52+
repliesChans := make(chan queueservice.Message)
53+
54+
defer close(requestsChans)
55+
defer close(repliesChans)
56+
57+
if err := amqpWorker.Run(ctx, requestsChans, repliesChans); err != nil {
58+
return fmt.Errorf("failed to run AMQP worker: %v", err)
59+
}
60+
61+
parser := commandsParser.NewCommandsParser()
62+
processor := commandsProcessor.NewCommandsProcessor(parser, ds.NewOrderedMap())
63+
64+
multiWorker := multiworker.NewMultiWorker(
65+
config.PROCESSING_WORKERS_COUNT,
66+
parser,
67+
processor,
68+
)
69+
70+
go multiWorker.Run(ctx, requestsChans, repliesChans)
71+
72+
log.Printf("Server started")
73+
74+
<-ctx.Done()
75+
76+
return nil
77+
}

Diff for: cleanup.sh

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/bin/bash
2+
3+
# Function to delete a Docker container
4+
delete_container() {
5+
container_name=$1
6+
echo "Deleting container: $container_name"
7+
docker rm -f $container_name
8+
}
9+
10+
# Function to delete a Docker image
11+
delete_image() {
12+
image_name=$1
13+
echo "Deleting image: $image_name"
14+
docker rmi $image_name
15+
}
16+
17+
# Delete containers
18+
delete_container "scs_server"
19+
delete_container "scs_client"
20+
delete_container "rabbitmq"
21+
22+
# Delete images
23+
delete_image "scs_server"
24+
delete_image "scs_client"
25+
26+
# Delete volumes
27+
#docker volume rm scs_rabbitmq_data
28+
29+
echo "Cleanup complete."

Diff for: client/Dockerfile

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
FROM amd64/golang:1.22.3-bullseye AS client_builder
2+
3+
WORKDIR /app
4+
5+
COPY . .
6+
7+
RUN go mod download
8+
9+
RUN go build -o /app/scs_client /app/apps/client/main.go
10+
11+
FROM debian:bullseye-slim
12+
13+
WORKDIR /root/
14+
15+
COPY --from=client_builder /app/scs_client /root/scs_client
16+
COPY ./commands /root/commands
17+
18+
RUN chmod +x /root/scs_client
19+
20+
ENTRYPOINT ["/root/scs_client", "/root/commands/commands1.txt", "/root/commands/commands2.txt", "/root/commands/commands3.txt"]

Diff for: commands/commands1.txt

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
addItem('key1', 'val1')
2+
addItem('key2', 'val2')
3+
addItem('key3', 'val3')
4+
getItem('key1')
5+
deleteItem('key2')
6+
addItem('key4', 'val4')
7+
getAllItems()

Diff for: commands/commands2.txt

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
deleteItem('key10')
2+
addItem('key10', 'val10')
3+
addItem('key20', 'val20')
4+
addItem('key30', 'val30')
5+
getItem('key10')
6+
addItem('key40', 'val40')
7+
getAllItems()

Diff for: commands/commands3.txt

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
addItem('key33', 'val33')
2+
getAllItems()

0 commit comments

Comments
 (0)