-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathoptions.go
164 lines (130 loc) · 4.32 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package circular
import (
"fmt"
"math/rand/v2"
"time"
"go.uber.org/zap"
)
// Options defines settings for Buffer.
type Options struct {
Compressor Compressor
Logger *zap.Logger
PersistenceOptions PersistenceOptions
InitialCapacity int
MaxCapacity int
SafetyGap int
NumCompressedChunks int
}
// PersistenceOptions defines settings for Buffer persistence.
type PersistenceOptions struct {
// ChunkPath is the base path to the store chunk files.
//
// Example: /var/log/machine/my-machine.log, chunks will be stored
// by appending a chunk ID to this path, e.g. /var/log/machine/my-machine.log.3.
//
// If ChunkPath is empty, persistence is disabled.
ChunkPath string
// FlushInterval flushes buffer content to disk every FlushInterval (if there were any changes).
FlushInterval time.Duration
// FlushJitter adds random jitter to FlushInterval to avoid thundering herd problem (a ratio of FlushInterval).
FlushJitter float64
}
// NextInterval calculates next flush interval with jitter.
func (p PersistenceOptions) NextInterval() time.Duration {
return time.Duration(((rand.Float64()*2-1)*p.FlushJitter + 1.0) * float64(p.FlushInterval))
}
// Compressor implements an optional interface for chunk compression.
//
// Compress and Decompress append to the dest slice and return the result.
//
// Compressor should be safe for concurrent use by multiple goroutines.
// Compressor should verify checksums of the compressed data.
type Compressor interface {
Compress(src, dest []byte) ([]byte, error)
Decompress(src, dest []byte) ([]byte, error)
DecompressedSize(src []byte) (int64, error)
}
// defaultOptions returns default initial values.
func defaultOptions() Options {
return Options{
InitialCapacity: 16384,
MaxCapacity: 1048576,
SafetyGap: 1024,
Logger: zap.NewNop(),
}
}
// OptionFunc allows setting Buffer options.
type OptionFunc func(*Options) error
// WithInitialCapacity sets initial buffer capacity.
func WithInitialCapacity(capacity int) OptionFunc {
return func(opt *Options) error {
if capacity <= 0 {
return fmt.Errorf("initial capacity should be positive: %d", capacity)
}
opt.InitialCapacity = capacity
return nil
}
}
// WithMaxCapacity sets maximum buffer capacity.
func WithMaxCapacity(capacity int) OptionFunc {
return func(opt *Options) error {
if capacity <= 0 {
return fmt.Errorf("max capacity should be positive: %d", capacity)
}
opt.MaxCapacity = capacity
return nil
}
}
// WithSafetyGap sets safety gap between readers and writers to avoid buffer overrun for the reader.
//
// Reader initial position is set to be as far as possible in the buffer history, but next concurrent write
// might overwrite read position, and safety gap helps to prevent it. With safety gap, maximum available
// bytes to read are: MaxCapacity-SafetyGap.
func WithSafetyGap(gap int) OptionFunc {
return func(opt *Options) error {
if gap <= 0 {
return fmt.Errorf("safety gap should be positive: %d", gap)
}
opt.SafetyGap = gap
return nil
}
}
// WithNumCompressedChunks sets number of compressed chunks to keep in the buffer.
//
// Default is to keep no compressed chunks, only uncompressed circular buffer is used.
func WithNumCompressedChunks(num int, c Compressor) OptionFunc {
return func(opt *Options) error {
if num < 0 {
return fmt.Errorf("number of compressed chunks should be non-negative: %d", num)
}
opt.NumCompressedChunks = num
opt.Compressor = c
return nil
}
}
// WithPersistence enables buffer persistence to disk.
func WithPersistence(options PersistenceOptions) OptionFunc {
return func(opt *Options) error {
if options.ChunkPath == "" {
return fmt.Errorf("chunk path should be set")
}
if options.FlushJitter < 0 || options.FlushJitter > 1 {
return fmt.Errorf("flush jitter should be in range [0, 1]: %f", options.FlushJitter)
}
if opt.Compressor == nil {
return fmt.Errorf("compressor should be set for persistence")
}
opt.PersistenceOptions = options
return nil
}
}
// WithLogger sets logger for Buffer.
func WithLogger(logger *zap.Logger) OptionFunc {
return func(opt *Options) error {
opt.Logger = logger
return nil
}
}