Skip to content

Commit c306fb2

Browse files
Merge pull request #1 from muzzammilshahid/ping-manager
Implement ping manager
2 parents 3a4021a + 30e2fe1 commit c306fb2

File tree

4 files changed

+197
-2
lines changed

4 files changed

+197
-2
lines changed

go.mod

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
11
module github.com/xconnio/workerpool
22

33
go 1.19
4+
5+
require github.com/stretchr/testify v1.9.0
6+
7+
require (
8+
github.com/davecgh/go-spew v1.1.1 // indirect
9+
github.com/pmezard/go-difflib v1.0.0 // indirect
10+
gopkg.in/yaml.v3 v3.0.1 // indirect
11+
)

go.sum

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
2+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
4+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
5+
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
6+
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
7+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
8+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
9+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
10+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

workerpool.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,116 @@
11
package workerpool
2+
3+
import (
4+
"log"
5+
"sync"
6+
"time"
7+
)
8+
9+
type wpConfig struct {
10+
interval time.Duration
11+
callback func() error
12+
}
13+
14+
type WorkerPool struct {
15+
workers map[int64]map[int64]wpConfig
16+
workersByIntervals map[int64]int64
17+
mutex sync.Mutex
18+
}
19+
20+
// NewWorkerPool creates a new WorkerPool instance.
21+
func NewWorkerPool() *WorkerPool {
22+
return &WorkerPool{
23+
workers: make(map[int64]map[int64]wpConfig),
24+
workersByIntervals: make(map[int64]int64),
25+
}
26+
}
27+
28+
// Add registers a new worker.
29+
func (m *WorkerPool) Add(id int64, interval time.Duration, callback func() error) {
30+
// Calculate the next worker time in seconds
31+
timeToWork := time.Now().Add(interval).Unix()
32+
33+
m.mutex.Lock()
34+
defer m.mutex.Unlock()
35+
if m.workers[timeToWork] == nil {
36+
m.workers[timeToWork] = make(map[int64]wpConfig)
37+
}
38+
39+
m.workers[timeToWork][id] = wpConfig{
40+
interval: interval,
41+
callback: callback,
42+
}
43+
m.workersByIntervals[id] = timeToWork
44+
}
45+
46+
// Remove unregisters a worker.
47+
func (m *WorkerPool) Remove(id int64) {
48+
m.mutex.Lock()
49+
defer m.mutex.Unlock()
50+
51+
nexTime, ok := m.workersByIntervals[id]
52+
if ok {
53+
delete(m.workers[nexTime], id)
54+
if len(m.workers[nexTime]) == 0 {
55+
delete(m.workers, nexTime)
56+
}
57+
delete(m.workersByIntervals, id)
58+
}
59+
}
60+
61+
// Reset reschedules the worker for a specific id.
62+
func (m *WorkerPool) Reset(id int64) {
63+
m.mutex.Lock()
64+
defer m.mutex.Unlock()
65+
66+
nextTime, ok := m.workersByIntervals[id]
67+
if ok {
68+
wpCfg := m.workers[nextTime][id]
69+
delete(m.workers[nextTime], id)
70+
if len(m.workers[nextTime]) == 0 {
71+
delete(m.workers, nextTime)
72+
}
73+
74+
newTime := time.Now().Add(wpCfg.interval).Unix()
75+
if m.workers[newTime] == nil {
76+
m.workers[newTime] = make(map[int64]wpConfig)
77+
}
78+
m.workers[newTime][id] = wpCfg
79+
m.workersByIntervals[id] = newTime
80+
}
81+
}
82+
83+
// Start begins the worker process.
84+
func (m *WorkerPool) Start() {
85+
go func() {
86+
for {
87+
nowSeconds := time.Now().Unix()
88+
89+
m.mutex.Lock()
90+
workers, ok := m.workers[nowSeconds]
91+
if ok {
92+
for id, wpCfg := range workers {
93+
err := wpCfg.callback()
94+
if err != nil {
95+
log.Printf("error pinging client: %v", err)
96+
m.Remove(id)
97+
continue
98+
}
99+
100+
// Reschedule the next worker
101+
timeToWork := time.Now().Add(wpCfg.interval).Unix()
102+
if m.workers[timeToWork] == nil {
103+
m.workers[timeToWork] = make(map[int64]wpConfig)
104+
}
105+
m.workers[timeToWork][id] = wpCfg
106+
m.workersByIntervals[id] = timeToWork
107+
108+
}
109+
delete(m.workers, nowSeconds)
110+
}
111+
m.mutex.Unlock()
112+
113+
time.Sleep(1 * time.Second)
114+
}
115+
}()
116+
}

workerpool_test.go

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,69 @@
11
package workerpool_test
22

3-
import "testing"
3+
import (
4+
"testing"
5+
"time"
46

5-
func TestHello(t *testing.T) {
7+
"github.com/stretchr/testify/require"
68

9+
"github.com/xconnio/workerpool"
10+
)
11+
12+
func TestManager(t *testing.T) {
13+
manager := workerpool.NewWorkerPool()
14+
15+
messages1 := 0
16+
manager.Add(1, 2*time.Second, func() error {
17+
messages1++
18+
return nil
19+
})
20+
21+
messages2 := 0
22+
manager.Add(2, 1*time.Second, func() error {
23+
messages2++
24+
return nil
25+
})
26+
27+
go manager.Start()
28+
29+
// Wait for worker
30+
time.Sleep(3 * time.Second)
31+
32+
require.NotZero(t, messages1)
33+
require.NotZero(t, messages2)
34+
35+
// Remove worker1 and check
36+
manager.Remove(1)
37+
time.Sleep(3 * time.Second)
38+
39+
require.Equal(t, messages1, 1)
40+
41+
require.Greater(t, messages2, 2)
42+
}
43+
44+
func TestManagerReset(t *testing.T) {
45+
manager := workerpool.NewWorkerPool()
46+
47+
messages := 0
48+
manager.Add(1, 2*time.Second, func() error {
49+
messages++
50+
return nil
51+
})
52+
53+
go manager.Start()
54+
55+
// Wait for first work
56+
time.Sleep(3 * time.Second)
57+
require.Equal(t, 1, messages)
58+
59+
time.Sleep(900 * time.Millisecond)
60+
// Reset worker1 and check work
61+
manager.Reset(1)
62+
63+
time.Sleep(1 * time.Second)
64+
require.Equal(t, 1, messages)
65+
66+
// Wait for another work after reset
67+
time.Sleep(2 * time.Second)
68+
require.Equal(t, 2, messages)
769
}

0 commit comments

Comments
 (0)