Skip to content

Commit c579cf3

Browse files
Don't use interface
1 parent 222b01a commit c579cf3

File tree

2 files changed

+82
-92
lines changed

2 files changed

+82
-92
lines changed

workerpool.go

+53-57
Original file line numberDiff line numberDiff line change
@@ -6,111 +6,107 @@ import (
66
"time"
77
)
88

9-
type Client interface {
10-
Write(data []byte) error
11-
}
12-
13-
type pingConfig struct {
9+
type wpConfig struct {
1410
interval time.Duration
15-
dataFunc func() []byte
11+
callback func() error
1612
}
1713

18-
type PingManager struct {
19-
clients map[int64]map[Client]pingConfig
20-
clientPingMap map[Client]int64
21-
mutex sync.Mutex
14+
type WorkerPool struct {
15+
workers map[int64]map[int64]wpConfig
16+
workersByIntervals map[int64]int64
17+
mutex sync.Mutex
2218
}
2319

24-
// NewPingManager creates a new PingManager instance.
25-
func NewPingManager() *PingManager {
26-
return &PingManager{
27-
clients: make(map[int64]map[Client]pingConfig),
28-
clientPingMap: make(map[Client]int64),
20+
// NewWorkerPool creates a new WorkerPool instance.
21+
func NewWorkerPool() *WorkerPool {
22+
return &WorkerPool{
23+
workers: make(map[int64]map[int64]wpConfig),
24+
workersByIntervals: make(map[int64]int64),
2925
}
3026
}
3127

32-
// Add registers a client to be pinged.
33-
func (m *PingManager) Add(client Client, pingInterval time.Duration, dataFunc func() []byte) {
34-
// Calculate the next ping time in seconds
35-
timeToPing := time.Now().Add(pingInterval).Unix()
28+
// Add registers a new worker.
29+
func (m *WorkerPool) Add(id int64, interval time.Duration, callback func() error) {
30+
// Calculate the next worker time in seconds
31+
timeToWork := time.Now().Add(interval).Unix()
3632

3733
m.mutex.Lock()
3834
defer m.mutex.Unlock()
39-
if m.clients[timeToPing] == nil {
40-
m.clients[timeToPing] = make(map[Client]pingConfig)
35+
if m.workers[timeToWork] == nil {
36+
m.workers[timeToWork] = make(map[int64]wpConfig)
4137
}
4238

43-
m.clients[timeToPing][client] = pingConfig{
44-
interval: pingInterval,
45-
dataFunc: dataFunc,
39+
m.workers[timeToWork][id] = wpConfig{
40+
interval: interval,
41+
callback: callback,
4642
}
47-
m.clientPingMap[client] = timeToPing
43+
m.workersByIntervals[id] = timeToWork
4844
}
4945

50-
// Remove unregisters a client.
51-
func (m *PingManager) Remove(client Client) {
46+
// Remove unregisters a worker.
47+
func (m *WorkerPool) Remove(id int64) {
5248
m.mutex.Lock()
5349
defer m.mutex.Unlock()
5450

55-
nextPingTime, ok := m.clientPingMap[client]
51+
nexTime, ok := m.workersByIntervals[id]
5652
if ok {
57-
delete(m.clients[nextPingTime], client)
58-
if len(m.clients[nextPingTime]) == 0 {
59-
delete(m.clients, nextPingTime)
53+
delete(m.workers[nexTime], id)
54+
if len(m.workers[nexTime]) == 0 {
55+
delete(m.workers, nexTime)
6056
}
61-
delete(m.clientPingMap, client)
57+
delete(m.workersByIntervals, id)
6258
}
6359
}
6460

65-
// Reset reschedules the ping for a specific client.
66-
func (m *PingManager) Reset(client Client) {
61+
// Reset reschedules the worker for a specific id.
62+
func (m *WorkerPool) Reset(id int64) {
6763
m.mutex.Lock()
6864
defer m.mutex.Unlock()
6965

70-
nextPingTime, ok := m.clientPingMap[client]
66+
nextTime, ok := m.workersByIntervals[id]
7167
if ok {
72-
pingInfo := m.clients[nextPingTime][client]
73-
delete(m.clients[nextPingTime], client)
74-
if len(m.clients[nextPingTime]) == 0 {
75-
delete(m.clients, nextPingTime)
68+
wpCfg := m.workers[nextTime][id]
69+
delete(m.workers[nextTime], id)
70+
if len(m.workers[nextTime]) == 0 {
71+
delete(m.workers, nextTime)
7672
}
7773

78-
newPingTime := time.Now().Add(pingInfo.interval).Unix()
79-
if m.clients[newPingTime] == nil {
80-
m.clients[newPingTime] = make(map[Client]pingConfig)
74+
newTime := time.Now().Add(wpCfg.interval).Unix()
75+
if m.workers[newTime] == nil {
76+
m.workers[newTime] = make(map[int64]wpConfig)
8177
}
82-
m.clients[newPingTime][client] = pingInfo
83-
m.clientPingMap[client] = newPingTime
78+
m.workers[newTime][id] = wpCfg
79+
m.workersByIntervals[id] = newTime
8480
}
8581
}
8682

87-
// Start begins the pinging process.
88-
func (m *PingManager) Start() {
83+
// Start begins the worker process.
84+
func (m *WorkerPool) Start() {
8985
go func() {
9086
for {
9187
nowSeconds := time.Now().Unix()
9288

9389
m.mutex.Lock()
94-
clients, ok := m.clients[nowSeconds]
90+
workers, ok := m.workers[nowSeconds]
9591
if ok {
96-
for client, pingCfg := range clients {
97-
err := client.Write(pingCfg.dataFunc())
92+
for id, wpCfg := range workers {
93+
err := wpCfg.callback()
9894
if err != nil {
9995
log.Printf("error pinging client: %v", err)
100-
m.Remove(client)
96+
m.Remove(id)
10197
continue
10298
}
10399

104-
// Reschedule the next ping for the client
105-
timeToPing := time.Now().Add(pingCfg.interval).Unix()
106-
if m.clients[timeToPing] == nil {
107-
m.clients[timeToPing] = make(map[Client]pingConfig)
100+
// Reschedule the next worker
101+
timeToWork := time.Now().Add(wpCfg.interval).Unix()
102+
if m.workers[timeToWork] == nil {
103+
m.workers[timeToWork] = make(map[int64]wpConfig)
108104
}
109-
m.clients[timeToPing][client] = pingCfg
110-
m.clientPingMap[client] = timeToPing
105+
m.workers[timeToWork][id] = wpCfg
106+
m.workersByIntervals[id] = timeToWork
111107

112108
}
113-
delete(m.clients, nowSeconds)
109+
delete(m.workers, nowSeconds)
114110
}
115111
m.mutex.Unlock()
116112

workerpool_test.go

+29-35
Original file line numberDiff line numberDiff line change
@@ -9,67 +9,61 @@ import (
99
"github.com/xconnio/workerpool"
1010
)
1111

12-
type mockClient struct {
13-
messages []string
14-
}
15-
16-
func (c *mockClient) Write(data []byte) error {
17-
c.messages = append(c.messages, string(data))
18-
return nil
19-
}
20-
2112
func TestManager(t *testing.T) {
22-
manager := workerpool.NewPingManager()
23-
24-
client1 := &mockClient{}
25-
client2 := &mockClient{}
13+
manager := workerpool.NewWorkerPool()
2614

27-
manager.Add(client1, 2*time.Second, func() []byte {
28-
return []byte("ping")
15+
messages1 := 0
16+
manager.Add(1, 2*time.Second, func() error {
17+
messages1++
18+
return nil
2919
})
30-
manager.Add(client2, 1*time.Second, func() []byte {
31-
return []byte("ping")
20+
21+
messages2 := 0
22+
manager.Add(2, 1*time.Second, func() error {
23+
messages2++
24+
return nil
3225
})
3326

3427
go manager.Start()
3528

36-
// Wait for ping
29+
// Wait for worker
3730
time.Sleep(3 * time.Second)
3831

39-
require.NotEmpty(t, client1.messages)
40-
require.NotEmpty(t, client2.messages)
32+
require.NotZero(t, messages1)
33+
require.NotZero(t, messages2)
4134

42-
// Remove client1 and check
43-
manager.Remove(client1)
35+
// Remove worker1 and check
36+
manager.Remove(1)
4437
time.Sleep(3 * time.Second)
4538

46-
require.Len(t, client1.messages, 1)
39+
require.Equal(t, messages1, 1)
4740

48-
require.Greater(t, len(client2.messages), 2)
41+
require.Greater(t, messages2, 2)
4942
}
5043

5144
func TestManagerReset(t *testing.T) {
52-
manager := workerpool.NewPingManager()
45+
manager := workerpool.NewWorkerPool()
5346

54-
client1 := &mockClient{}
55-
manager.Add(client1, 2*time.Second, func() []byte {
56-
return []byte("ping")
47+
messages := 0
48+
manager.Add(1, 2*time.Second, func() error {
49+
messages++
50+
return nil
5751
})
5852

5953
go manager.Start()
6054

61-
// Wait for first ping
55+
// Wait for first work
6256
time.Sleep(3 * time.Second)
63-
require.Len(t, client1.messages, 1)
57+
require.Equal(t, 1, messages)
6458

6559
time.Sleep(1 * time.Second)
66-
// Reset client1 and check pings
67-
manager.Reset(client1)
60+
// Reset worker1 and check work
61+
manager.Reset(1)
6862

6963
time.Sleep(1 * time.Second)
70-
require.Len(t, client1.messages, 1)
64+
require.Equal(t, 1, messages)
7165

72-
// Wait for another ping after reset
66+
// Wait for another work after reset
7367
time.Sleep(2 * time.Second)
74-
require.Len(t, client1.messages, 2)
68+
require.Equal(t, 2, messages)
7569
}

0 commit comments

Comments
 (0)