Skip to content

Commit 804d91f

Browse files
author
george pogosyan
committed
Add feature flag
1 parent 20ac129 commit 804d91f

File tree

4 files changed

+44
-12
lines changed

4 files changed

+44
-12
lines changed

Diff for: e2e/file_es/config.yml

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pipelines:
1010
endpoints:
1111
- http://localhost:9200
1212
fatal_on_failed_insert: true
13+
split_enabled: true
1314
strict: false
1415
index_format: index_name
1516
retry: 1

Diff for: e2e/file_es/file_es.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"path"
1111
"path/filepath"
12+
"strings"
1213
"testing"
1314
"time"
1415

@@ -35,10 +36,12 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)
3536
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))
3637
}
3738

38-
const (
39-
n = 10
40-
successEvent = `{"field_a":"AAAA","field_b":"BBBB"}`
41-
failEvent = `{"field_a":"AAAA","field_b":"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"}`
39+
const n = 10
40+
41+
var (
42+
successEvent = `{"field_a":"AAA","field_b":"BBB"}`
43+
// see ES config: http.max_content_length=128b
44+
failEvent = fmt.Sprintf(`{"s":"%s"}`, strings.Repeat("#", 128))
4245
)
4346

4447
func (c *Config) Send(t *testing.T) {
@@ -56,7 +59,7 @@ func (c *Config) Send(t *testing.T) {
5659
err = addEvent(file, failEvent)
5760
require.NoError(t, err)
5861

59-
for i := 0; i < 2*n-1; i++ {
62+
for i := 0; i < 2*n; i++ {
6063
err = addEvent(file, successEvent)
6164
require.NoError(t, err)
6265
}

Diff for: plugin/output/elasticsearch/README.md

+6
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,12 @@ After an insert error, fall with a non-zero exit code or not
133133

134134
<br>
135135

136+
**`split_enabled`** *`bool`* *`default=false`*
137+
138+
Enable split big batches
139+
140+
<br>
141+
136142
**`retention`** *`cfg.Duration`* *`default=1s`*
137143

138144
Retention milliseconds for retry to DB.

Diff for: plugin/output/elasticsearch/elasticsearch.go

+29-7
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ type Plugin struct {
4242
batcher *pipeline.RetriableBatcher
4343
avgEventSize int
4444

45-
begin []int
46-
4745
time string
4846
headerPrefix string
4947
cancel context.CancelFunc
@@ -173,6 +171,11 @@ type Config struct {
173171
// > **Experimental feature**
174172
FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
175173

174+
// > @3@4@5@6
175+
// >
176+
// > Enable split big batches
177+
SplitEnabled bool `json:"split_enabled" default:"false"` // *
178+
176179
// > @3@4@5@6
177180
// >
178181
// > Retention milliseconds for retry to DB.
@@ -202,6 +205,7 @@ type KeepAliveConfig struct {
202205

203206
type data struct {
204207
outBuf []byte
208+
begin []int
205209
}
206210

207211
func init() {
@@ -223,7 +227,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
223227
p.registerMetrics(params.MetricCtl)
224228
p.mu = &sync.Mutex{}
225229
p.headerPrefix = `{"` + p.config.BatchOpType + `":{"_index":"`
226-
p.begin = make([]int, 0, p.config.BatchSize_+1)
227230

228231
if len(p.config.IndexValues) == 0 {
229232
p.config.IndexValues = append(p.config.IndexValues, "@time")
@@ -341,6 +344,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
341344
if *workerData == nil {
342345
*workerData = &data{
343346
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
347+
begin: make([]int, 0, p.config.BatchSize_+1),
344348
}
345349
}
346350

@@ -351,16 +355,24 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
351355
}
352356

353357
eventsCount := 0
354-
p.begin = p.begin[:0]
358+
data.begin = data.begin[:0]
355359
data.outBuf = data.outBuf[:0]
356360
batch.ForEach(func(event *pipeline.Event) {
357361
eventsCount++
358-
p.begin = append(p.begin, len(data.outBuf))
362+
data.begin = append(data.begin, len(data.outBuf))
359363
data.outBuf = p.appendEvent(data.outBuf, event)
360364
})
361-
p.begin = append(p.begin, len(data.outBuf))
365+
data.begin = append(data.begin, len(data.outBuf))
366+
367+
var statusCode int
368+
var err error
369+
370+
if p.config.SplitEnabled {
371+
statusCode, err = p.saveOrSplit(0, eventsCount, data.begin, data.outBuf)
372+
} else {
373+
statusCode, err = p.save(data.outBuf)
374+
}
362375

363-
statusCode, err := p.saveOrSplit(0, eventsCount, p.begin, data.outBuf)
364376
if err != nil {
365377
p.sendErrorMetric.WithLabelValues(strconv.Itoa(statusCode)).Inc()
366378
switch statusCode {
@@ -381,6 +393,16 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
381393
return nil
382394
}
383395

396+
func (p *Plugin) save(data []byte) (int, error) {
397+
return p.client.DoTimeout(
398+
http.MethodPost,
399+
NDJSONContentType,
400+
data,
401+
p.config.ConnectionTimeout_,
402+
p.reportESErrors,
403+
)
404+
}
405+
384406
func (p *Plugin) saveOrSplit(left int, right int, begin []int, data []byte) (int, error) {
385407
if left == right {
386408
return http.StatusOK, nil

0 commit comments

Comments
 (0)