Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial commit #4

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ func main() {
log.Printf("started server at %s", *addr)
// Create a channel do mirror commands with capacity of *buffer to receive the mirrored commands
mirrorDoQueue = make(chan rdbDo, *buffer)
// Starting full sync
go fullsync()
// Starting a separate goroutine to process the mirror commands
go mirrorDo()
err := redcon.ListenAndServe(*addr,
Expand Down
2 changes: 1 addition & 1 deletion params.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ var buffer *int

func init() {
mainRedis = flag.String("main", ":6379", "connection string for the main redis")
mirrorRedis = flag.String("mirror", ":6381", "connection string for the main redis")
mirrorRedis = flag.String("mirror", ":6381", "connection string for the mirror redis")
addr = flag.String("addr", ":6380", "connection string for the mirroring service")
buffer = flag.Int("buff", 500, "Buffer size for mirror queue. If it is full Do commands will be ignored")
flag.Parse()
Expand Down
29 changes: 29 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,31 @@ type rdbDo struct {
cmdArgs []interface{}
}

func scanAndUpdate(mainRedis, mirrorRedis redis.Conn) {
cursor := 0
for {
keys, err := redis.Values(mainRedis.Do("SCAN", cursor, "MATCH", "*"))
if err != nil {
log.Println("Scanning failed from the elasticache:", (err))
}
// Iterate over the keys and update them in the destination
for _, key := range keys {
value, err := redis.String(mainRedis.Do("GET", key))
if err != nil {
log.Println("Unable get the key/value from elasticache:", (err))
}
// Update keys in the destination redis
_, err = mirrorRedis.Do("SET", key, value)
if err != nil {
log.Println("Intial replication got failed on the EC2 Redis:", (err))
}
}
if cursor == 0 {
break
}
}
}

// This function will process the incoming messages from client. It will act as a multiplexer.
// To get more information refer to:
// https://redis.io/docs/reference/protocol-spec
Expand Down Expand Up @@ -100,6 +125,10 @@ func redisClose(conn redcon.Conn, err error) {
log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err)
}

func fullsync() {
scanAndUpdate(mainRedis, mirrorRedis)
}

// This function starts as a goroutine. A concurrent process.
// mirrorDo loops forever and run the queued commands against the mirror redis
func mirrorDo() {
Expand Down