Skip to content

Commit 3e32528

Browse files
authored
Merge pull request #7 from 0xsequence/timeflusher
time interval based file roll policy + refactor
2 parents 24387ae + b9c5aab commit 3e32528

File tree

4 files changed

+134
-76
lines changed

4 files changed

+134
-76
lines changed

writer.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const defaultBufferSize = 8 * datasize.MB
1919
type Writer[T any] interface {
2020
Write(b Block[T]) error
2121
BlockNum() uint64
22+
RollFile() error
2223
Close() error
2324
}
2425

@@ -79,6 +80,7 @@ func NewWriter[T any](opt Options) (Writer[T], error) {
7980
lastBlockNum = walFiles[len(walFiles)-1].LastBlockNum
8081
}
8182

83+
// create new writer
8284
return &writer[T]{
8385
options: opt,
8486
path: walPath,
@@ -109,13 +111,16 @@ func (w *writer[T]) Write(b Block[T]) error {
109111
}
110112

111113
w.lastBlockNum = b.Number
112-
if p, ok := w.options.FileRollPolicy.(LastBlockNumberRollPolicy); ok {
113-
p.LastBlockNum(w.lastBlockNum)
114-
}
115-
114+
w.options.FileRollPolicy.onBlockProcessed(w.lastBlockNum)
116115
return nil
117116
}
118117

118+
func (w *writer[T]) RollFile() error {
119+
w.mu.Lock()
120+
defer w.mu.Unlock()
121+
return w.rollFile()
122+
}
123+
119124
func (w *writer[T]) BlockNum() uint64 {
120125
w.mu.Lock()
121126
defer w.mu.Unlock()
@@ -209,9 +214,7 @@ func (w *writer[T]) newFile() error {
209214

210215
// create new buffer writer
211216
bufferWriter := io.Writer(w.buffer)
212-
if policy, ok := w.options.FileRollPolicy.(FileSizeRollPolicy); ok {
213-
bufferWriter = policy.WrapWriter(bufferWriter)
214-
}
217+
bufferWriter = &writerWrapper{Writer: bufferWriter, fsrp: w.options.FileRollPolicy}
215218

216219
// create new buffer closer
217220
w.bufferCloser = &funcCloser{

writer_file_roll_policy.go

Lines changed: 84 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,53 @@
11
package ethwal
22

3-
import "io"
3+
import (
4+
"context"
5+
"io"
6+
"sync"
7+
"time"
8+
)
49

510
type FileRollPolicy interface {
611
ShouldRoll() bool
712
Reset()
8-
}
9-
10-
type FileSizeRollPolicy interface {
11-
FileRollPolicy
12-
13-
WrapWriter(w io.Writer) io.Writer
14-
}
1513

16-
type LastBlockNumberRollPolicy interface {
17-
FileRollPolicy
18-
19-
LastBlockNum(blockNum uint64)
14+
onWrite(data []byte)
15+
onBlockProcessed(blockNum uint64)
2016
}
2117

2218
type fileSizeRollPolicy struct {
23-
maxSize uint64
24-
stats *fileStats
19+
maxSize uint64
20+
bytesWritten uint64
2521
}
2622

27-
func NewFileSizeRollPolicy(maxSize uint64) FileSizeRollPolicy {
23+
func NewFileSizeRollPolicy(maxSize uint64) FileRollPolicy {
2824
return &fileSizeRollPolicy{maxSize: maxSize}
2925
}
3026

31-
func (p *fileSizeRollPolicy) WrapWriter(w io.Writer) io.Writer {
32-
p.stats = &fileStats{Writer: w}
33-
return p.stats
34-
}
35-
3627
func (p *fileSizeRollPolicy) ShouldRoll() bool {
37-
return p.stats.BytesWritten >= p.maxSize
28+
return p.bytesWritten >= p.maxSize
3829
}
3930

4031
func (p *fileSizeRollPolicy) Reset() {
41-
p.stats = &fileStats{}
32+
p.bytesWritten = 0
33+
}
34+
35+
func (p *fileSizeRollPolicy) onWrite(data []byte) {
36+
p.bytesWritten += uint64(len(data))
4237
}
4338

39+
func (p *fileSizeRollPolicy) onBlockProcessed(blockNum uint64) {}
40+
4441
// fileStats is a writer that keeps track of the number of bytes written to it.
45-
type fileStats struct {
42+
type writerWrapper struct {
4643
io.Writer
47-
BytesWritten uint64
44+
45+
fsrp FileRollPolicy
4846
}
4947

50-
func (w *fileStats) Write(p []byte) (n int, err error) {
51-
n, err = w.Writer.Write(p)
52-
w.BytesWritten += uint64(n)
53-
return
48+
func (w *writerWrapper) Write(p []byte) (n int, err error) {
49+
defer w.fsrp.onWrite(p)
50+
return w.Writer.Write(p)
5451
}
5552

5653
type lastBlockNumberRollPolicy struct {
@@ -59,7 +56,9 @@ type lastBlockNumberRollPolicy struct {
5956
lastBlockNum uint64
6057
}
6158

62-
func NewLastBlockNumberRollPolicy(rollInterval uint64) LastBlockNumberRollPolicy {
59+
func (l *lastBlockNumberRollPolicy) onWrite(data []byte) {}
60+
61+
func NewLastBlockNumberRollPolicy(rollInterval uint64) FileRollPolicy {
6362
return &lastBlockNumberRollPolicy{rollInterval: rollInterval}
6463
}
6564

@@ -71,27 +70,70 @@ func (l *lastBlockNumberRollPolicy) Reset() {
7170
// noop
7271
}
7372

74-
func (l *lastBlockNumberRollPolicy) LastBlockNum(blockNum uint64) {
73+
func (l *lastBlockNumberRollPolicy) onBlockProcessed(blockNum uint64) {
7574
l.lastBlockNum = blockNum
7675
}
7776

78-
type fileSizeOrLastBlockNumberRollPolicy struct {
79-
FileSizeRollPolicy
80-
LastBlockNumberRollPolicy
77+
type timeBasedRollPolicy struct {
78+
rollInterval time.Duration
79+
onError func(err error)
80+
81+
rollFunc func() error
82+
83+
bgCtx context.Context
84+
bgCancel context.CancelFunc
85+
86+
lastTimeRolled time.Time
87+
88+
mu sync.Mutex
89+
}
90+
91+
func NewTimeBasedRollPolicy(rollInterval time.Duration, onError func(err error)) FileRollPolicy {
92+
return &timeBasedRollPolicy{rollInterval: rollInterval, lastTimeRolled: time.Now(), onError: onError}
93+
}
94+
95+
func (t *timeBasedRollPolicy) ShouldRoll() bool {
96+
if time.Since(t.lastTimeRolled) >= t.rollInterval {
97+
return true
98+
}
99+
return false
100+
}
101+
102+
func (t *timeBasedRollPolicy) Reset() {
103+
t.lastTimeRolled = time.Now()
104+
}
105+
106+
func (t *timeBasedRollPolicy) onWrite(data []byte) {}
107+
108+
func (t *timeBasedRollPolicy) onBlockProcessed(blockNum uint64) {}
109+
110+
type FileRollPolicies []FileRollPolicy
111+
112+
func (policies FileRollPolicies) ShouldRoll() bool {
113+
for _, p := range policies {
114+
if p.ShouldRoll() {
115+
return true
116+
}
117+
}
118+
return false
81119
}
82120

83-
func NewFileSizeOrLastBlockNumberRollPolicy(maxSize, rollInterval uint64) FileRollPolicy {
84-
return &fileSizeOrLastBlockNumberRollPolicy{
85-
FileSizeRollPolicy: NewFileSizeRollPolicy(maxSize),
86-
LastBlockNumberRollPolicy: NewLastBlockNumberRollPolicy(rollInterval),
121+
func (policies FileRollPolicies) Reset() {
122+
for _, p := range policies {
123+
p.Reset()
87124
}
88125
}
89126

90-
func (f *fileSizeOrLastBlockNumberRollPolicy) ShouldRoll() bool {
91-
return f.FileSizeRollPolicy.ShouldRoll() || f.LastBlockNumberRollPolicy.ShouldRoll()
127+
func (policies FileRollPolicies) onWrite(data []byte) {
128+
for _, p := range policies {
129+
p.onWrite(data)
130+
}
92131
}
93132

94-
func (f *fileSizeOrLastBlockNumberRollPolicy) Reset() {
95-
f.FileSizeRollPolicy.Reset()
96-
f.LastBlockNumberRollPolicy.Reset()
133+
func (policies FileRollPolicies) onBlockProcessed(blockNum uint64) {
134+
for _, p := range policies {
135+
p.onBlockProcessed(blockNum)
136+
}
97137
}
138+
139+
var _ FileRollPolicy = &fileSizeRollPolicy{}

writer_file_roll_policy_test.go

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@ package ethwal
33
import (
44
"bytes"
55
"testing"
6+
"time"
67

78
"github.com/stretchr/testify/assert"
89
"github.com/stretchr/testify/require"
910
)
1011

1112
func TestFileSizeRollPolicy(t *testing.T) {
12-
var p FileSizeRollPolicy
1313
var buff = bytes.NewBuffer(nil)
1414

15-
p = NewFileSizeRollPolicy(10)
16-
w := p.WrapWriter(buff)
15+
p := NewFileSizeRollPolicy(10)
16+
w := writerWrapper{buff, p}
1717

1818
_, err := w.Write([]byte("hello"))
1919
require.NoError(t, err)
@@ -26,7 +26,7 @@ func TestFileSizeRollPolicy(t *testing.T) {
2626
assert.True(t, p.ShouldRoll())
2727

2828
p.Reset()
29-
w = p.WrapWriter(buff)
29+
w = writerWrapper{buff, p}
3030
assert.False(t, p.ShouldRoll())
3131

3232
_, err = w.Write([]byte("hello world"))
@@ -38,50 +38,59 @@ func TestFileSizeRollPolicy(t *testing.T) {
3838
}
3939

4040
func TestLastBlockNumberRollPolicy(t *testing.T) {
41-
var p LastBlockNumberRollPolicy
41+
p := NewLastBlockNumberRollPolicy(10)
42+
assert.False(t, p.ShouldRoll())
43+
44+
p.onBlockProcessed(5)
45+
assert.False(t, p.ShouldRoll())
4246

43-
p = NewLastBlockNumberRollPolicy(10)
47+
p.onBlockProcessed(10)
48+
assert.True(t, p.ShouldRoll())
49+
50+
p.onBlockProcessed(11)
4451
assert.False(t, p.ShouldRoll())
52+
}
4553

46-
p.LastBlockNum(5)
54+
func TestTimeBasedRollPolicy(t *testing.T) {
55+
p := NewTimeBasedRollPolicy(1500*time.Millisecond, nil)
4756
assert.False(t, p.ShouldRoll())
4857

49-
p.LastBlockNum(10)
58+
time.Sleep(1500 * time.Millisecond)
5059
assert.True(t, p.ShouldRoll())
5160

52-
p.LastBlockNum(11)
61+
p.Reset()
5362
assert.False(t, p.ShouldRoll())
63+
64+
time.Sleep(1500 * time.Millisecond)
65+
assert.True(t, p.ShouldRoll())
5466
}
5567

5668
func TestNewFileSizeOrLastBlockNumberRollPolicy(t *testing.T) {
5769
var buff = bytes.NewBuffer(nil)
5870

59-
fol := NewFileSizeOrLastBlockNumberRollPolicy(10, 10)
60-
61-
fs := fol.(FileSizeRollPolicy)
62-
lb := fol.(LastBlockNumberRollPolicy)
63-
64-
require.NotNil(t, fs)
65-
require.NotNil(t, lb)
71+
fol := FileRollPolicies{
72+
NewFileSizeRollPolicy(10),
73+
NewLastBlockNumberRollPolicy(10),
74+
}
6675

67-
w := fs.WrapWriter(buff)
76+
w := writerWrapper{buff, fol}
6877

69-
assert.False(t, fs.ShouldRoll())
78+
assert.False(t, fol.ShouldRoll())
7079

71-
lb.LastBlockNum(10)
72-
assert.True(t, lb.ShouldRoll())
80+
fol.onBlockProcessed(10)
81+
assert.True(t, fol.ShouldRoll())
7382

74-
lb.LastBlockNum(11)
75-
assert.False(t, lb.ShouldRoll())
83+
fol.onBlockProcessed(11)
84+
assert.False(t, fol.ShouldRoll())
7685

7786
_, err := w.Write([]byte("hello world"))
7887
require.NoError(t, err)
7988

80-
assert.True(t, fs.ShouldRoll())
89+
assert.True(t, fol.ShouldRoll())
8190

82-
fs.Reset()
83-
assert.False(t, fs.ShouldRoll())
91+
fol.Reset()
92+
assert.False(t, fol.ShouldRoll())
8493

85-
lb.LastBlockNum(20)
86-
assert.True(t, lb.ShouldRoll())
94+
fol.onBlockProcessed(20)
95+
assert.True(t, fol.ShouldRoll())
8796
}

writer_no_gap.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ func (n *noGapWriter[T]) Write(b Block[T]) error {
3333
return n.w.Write(b)
3434
}
3535

36+
func (n *noGapWriter[T]) RollFile() error {
37+
return n.w.RollFile()
38+
}
39+
3640
func (n *noGapWriter[T]) BlockNum() uint64 {
3741
return n.w.BlockNum()
3842
}

0 commit comments

Comments
 (0)