Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split big batches in ES output plugin #739

Merged
merged 10 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e/file_elasticsearch/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ pipelines:
- http://localhost:9200
username: SOME_USERNAME
password: SOME_PASSWORD
index_format: SOME_INDEX
index_format: SOME_INDEX
3 changes: 2 additions & 1 deletion e2e/file_elasticsearch/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# https://github.com/elastic/start-local/tree/main
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
image: elasticsearch:8.17.0
container_name: es-local-test
ports:
- "19200:9200"
Expand All @@ -10,6 +10,7 @@ services:
- ELASTIC_PASSWORD=elastic
- xpack.security.enabled=true
- xpack.security.http.ssl.enabled=false
mem_limit: 1073741824
healthcheck:
test:
[
Expand Down
19 changes: 19 additions & 0 deletions e2e/file_es_split/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Password for the 'elastic' user (at least 6 characters)
ELASTIC_PASSWORD=password

# Version of Elastic products
STACK_VERSION=8.16.1

# Set the cluster name
CLUSTER_NAME=docker-cluster

# Set to 'basic' or 'trial' to automatically start the 30-day trial
LICENSE=basic
#LICENSE=trial

# Port to expose Elasticsearch HTTP API to the host
ES_PORT=9200
#ES_PORT=127.0.0.1:9200

# Increase or decrease based on the available host memory (in bytes)
MEM_LIMIT=1073741824
20 changes: 20 additions & 0 deletions e2e/file_es_split/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
pipelines:
file_es:
input:
type: file
output:
type: elasticsearch
batch_flush_timeout: 200ms
batch_size: 500 * 1
connection_timeout: 30s
endpoints:
- http://localhost:9200
fatal_on_failed_insert: true
split_batch: true
strict: false
index_format: index_name
retry: 1
retention: 1s
workers_count: 1
username: elastic
password: password
37 changes: 37 additions & 0 deletions e2e/file_es_split/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
version: "3"

services:
es01:
image: elasticsearch:${STACK_VERSION}
volumes:
- esdata01:/usr/share/elasticsearch/data
ports:
- ${ES_PORT}:9200
environment:
- node.name=es01
- cluster.name=${CLUSTER_NAME}
- cluster.initial_master_nodes=es01
- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
- bootstrap.memory_lock=true
- xpack.security.enabled=false
- xpack.license.self_generated.type=${LICENSE}
- xpack.ml.use_auto_machine_memory_percent=true
- http.max_content_length=128b
mem_limit: ${MEM_LIMIT}
ulimits:
memlock:
soft: -1
hard: -1
healthcheck:
test:
[
"CMD-SHELL",
"curl -s http://localhost:9200 | grep -q 'missing authentication credentials'",
]
interval: 10s
timeout: 10s
retries: 120

volumes:
esdata01:
driver: local
157 changes: 157 additions & 0 deletions e2e/file_es_split/file_es.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package file_es_split

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/stretchr/testify/require"
)

type Config struct {
ctx context.Context
cancel func()

inputDir string
}

func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
c.ctx, c.cancel = context.WithTimeout(context.Background(), time.Minute*2)

c.inputDir = t.TempDir()
offsetsDir := t.TempDir()

input := conf.Pipelines[pipelineName].Raw.Get("input")
input.Set("watching_dir", c.inputDir)
input.Set("filename_pattern", "input.log")
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))
}

const n = 10

var (
successEvent = `{"field_a":"AAA","field_b":"BBB"}`
// see ES config: http.max_content_length=128b
failEvent = fmt.Sprintf(`{"s":"%s"}`, strings.Repeat("#", 128))
)

func (c *Config) Send(t *testing.T) {
file, err := os.Create(path.Join(c.inputDir, "input.log"))
require.NoError(t, err)
defer func() {
_ = file.Close()
}()

for i := 0; i < n; i++ {
err = addEvent(file, successEvent)
require.NoError(t, err)
}

err = addEvent(file, failEvent)
require.NoError(t, err)

for i := 0; i < 2*n; i++ {
err = addEvent(file, successEvent)
require.NoError(t, err)
}

_ = file.Sync()
}

func addEvent(f *os.File, s string) error {
_, err := f.WriteString(s + "\n")
return err
}

func (c *Config) Validate(t *testing.T) {
time.Sleep(5 * time.Second)

count, err := c.getEventsCount()
require.NoError(t, err)
require.Equal(t, n, count)

err = c.deleteAll()
require.NoError(t, err)
}

func (c *Config) deleteAll() error {
client := &http.Client{Timeout: 3 * time.Second}

req, err := http.NewRequest(http.MethodDelete, "http://127.0.0.1:9200/index_name", http.NoBody)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
req.Header.Add("Authorization", "elastic:password")

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("do request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("read all: %w", err)
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("wrong status code; status = %d; body = %s", resp.StatusCode, respBody)
}

return nil
}

type searchResp struct {
Hits struct {
Total struct {
Value int `json:"value"`
Relation string `json:"relation"`
}
} `json:"hits"`
}

func (c *Config) getEventsCount() (int, error) {
client := &http.Client{Timeout: 3 * time.Second}

req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:9200/index_name/_search", http.NoBody)
if err != nil {
return 0, fmt.Errorf("create request: %w", err)
}
req.Header.Add("Authorization", "elastic:password")

resp, err := client.Do(req)
if err != nil {
return 0, fmt.Errorf("do request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("read all: %w", err)
}

if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("wrong status code; status = %d; body = %s", resp.StatusCode, respBody)
}

var respData searchResp
err = json.Unmarshal(respBody, &respData)
if err != nil {
return 0, err
}

return respData.Hits.Total.Value, nil
}
1 change: 1 addition & 0 deletions e2e/http_file/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pipelines:
http_file:
input:
type: http
address: ":9201"
meta:
remote_addr: "{{ .remote_addr }}"
user_agent: '{{ index (index .request.Header "User-Agent") 0}}'
Expand Down
2 changes: 1 addition & 1 deletion e2e/http_file/http_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (c *Config) Send(t *testing.T) {
cl := http.Client{}
for j := 0; j < c.Lines; j++ {
rd := bytes.NewReader(samples[j%len(samples)])
req, err := http.NewRequest(http.MethodPost, "http://localhost:9200/?login=e2e-test", rd)
req, err := http.NewRequest(http.MethodPost, "http://localhost:9201/?login=e2e-test", rd)
require.NoError(t, err)
_, err = cl.Do(req)
require.NoError(t, err)
Expand Down
15 changes: 11 additions & 4 deletions e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ package e2e_test

import (
"context"
"log"
"strconv"
"testing"
"time"

"github.com/ozontech/file.d/e2e/file_clickhouse"
"github.com/ozontech/file.d/e2e/file_elasticsearch"
"github.com/ozontech/file.d/e2e/file_es_split"
"github.com/ozontech/file.d/e2e/file_file"
"github.com/ozontech/file.d/e2e/http_file"
"github.com/ozontech/file.d/e2e/join_throttle"
"github.com/ozontech/file.d/e2e/kafka_auth"
"github.com/ozontech/file.d/e2e/kafka_file"
"github.com/ozontech/file.d/e2e/split_join"
"log"
"strconv"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
Expand Down Expand Up @@ -155,6 +157,11 @@ func TestE2EStabilityWorkCase(t *testing.T) {
},
cfgPath: "./file_elasticsearch/config.yml",
},
{
name: "file_es",
e2eTest: &file_es_split.Config{},
cfgPath: "./file_es_split/config.yml",
},
}

for num, test := range testsList {
Expand Down
6 changes: 6 additions & 0 deletions plugin/output/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ After an insert error, fall with a non-zero exit code or not

<br>

**`split_batch`** *`bool`* *`default=false`*

Enable split big batches

<br>

**`retention`** *`cfg.Duration`* *`default=1s`*

Retention milliseconds for retry to DB.
Expand Down
Loading