Skip to content

Commit 93ceb6a

Browse files
aa1exaaleksandrovHeadHunter483
authored
Support for specifying ingest pipelines in Elasticsearch (#744)
* Support for specifying ingest pipelines in Elasticsearch (#735) Signed-off-by: Aleksandrov Aleksandr <[email protected]> * Update e2e/file_elasticsearch/helpers.go Co-authored-by: Oleg Don <[email protected]> --------- Signed-off-by: Aleksandrov Aleksandr <[email protected]> Co-authored-by: aaleksandrov <[email protected]> Co-authored-by: Oleg Don <[email protected]>
1 parent b6b166f commit 93ceb6a

File tree

8 files changed

+350
-25
lines changed

8 files changed

+350
-25
lines changed

e2e/file_elasticsearch/config.yml

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
pipelines:
2+
file_elasticsearch:
3+
input:
4+
type: file
5+
persistence_mode: async
6+
watching_dir: SOME_DIR
7+
offsets_file: SOME_FILE
8+
offsets_op: reset
9+
output:
10+
type: elasticsearch
11+
endpoints:
12+
- http://localhost:9200
13+
username: SOME_USERNAME
14+
password: SOME_PASSWORD
15+
index_format: SOME_INDEX
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# https://github.com/elastic/start-local/tree/main
2+
services:
3+
elasticsearch:
4+
image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
5+
container_name: es-local-test
6+
ports:
7+
- "19200:9200"
8+
environment:
9+
- discovery.type=single-node
10+
- ELASTIC_PASSWORD=elastic
11+
- xpack.security.enabled=true
12+
- xpack.security.http.ssl.enabled=false
13+
healthcheck:
14+
test:
15+
[
16+
"CMD-SHELL",
17+
"curl --output /dev/null --silent --head --fail -u elastic:elastic http://elasticsearch:19200",
18+
]
19+
interval: 10s
20+
timeout: 10s
21+
retries: 10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package file_elasticsearch
2+
3+
import (
4+
"fmt"
5+
"math/rand"
6+
"os"
7+
"path"
8+
"path/filepath"
9+
"testing"
10+
"time"
11+
12+
"github.com/ozontech/file.d/cfg"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
// This test verifies that messages sent to Elasticsearch are correctly processed by the ingest pipeline
17+
// and that each message is assigned a 'processed_at' field containing a timestamp.
18+
19+
// Config for file-elasticsearch plugin e2e test
20+
type Config struct {
21+
Count int
22+
Endpoint string
23+
Pipeline string
24+
Username string
25+
Password string
26+
dir string
27+
index string
28+
}
29+
30+
// Configure sets additional fields for input and output plugins
31+
func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
32+
c.dir = t.TempDir()
33+
offsetsDir := t.TempDir()
34+
35+
input := conf.Pipelines[pipelineName].Raw.Get("input")
36+
input.Set("watching_dir", c.dir)
37+
input.Set("filename_pattern", "messages.log")
38+
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))
39+
40+
output := conf.Pipelines[pipelineName].Raw.Get("output")
41+
c.index = fmt.Sprintf("my-index-%d", rand.Intn(1000))
42+
output.Set("index_format", c.index)
43+
output.Set("ingest_pipeline", c.Pipeline)
44+
output.Set("username", c.Username)
45+
output.Set("password", c.Password)
46+
output.Set("endpoints", []string{c.Endpoint})
47+
48+
err := createIngestPipeline(c.Endpoint, c.Pipeline, c.Username, c.Password)
49+
require.NoError(t, err)
50+
}
51+
52+
// Send creates file and writes messages
53+
func (c *Config) Send(t *testing.T) {
54+
file, err := os.Create(path.Join(c.dir, "messages.log"))
55+
require.NoError(t, err)
56+
defer func() { _ = file.Close() }()
57+
58+
for i := 0; i < c.Count; i++ {
59+
_, err = file.WriteString("{\"message\":\"test\"}\n")
60+
require.NoError(t, err)
61+
}
62+
}
63+
64+
// Validate waits for the message processing to complete
65+
func (c *Config) Validate(t *testing.T) {
66+
err := waitUntilIndexReady(c.Endpoint, c.index, c.Username, c.Password, c.Count, 10, 250*time.Millisecond)
67+
require.NoError(t, err)
68+
docs, err := getDocumentsFromIndex(c.Endpoint, c.index, c.Username, c.Password)
69+
require.NoError(t, err)
70+
require.Len(t, docs, c.Count)
71+
for _, doc := range docs {
72+
if _, ok := doc["processed_at"]; !ok {
73+
t.Errorf("doc %v doesn't have processed_at field", doc)
74+
}
75+
}
76+
}

e2e/file_elasticsearch/helpers.go

+162
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package file_elasticsearch
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"strings"
9+
"time"
10+
)
11+
12+
func createIngestPipeline(elasticURL, pipelineID, username, password string) error {
13+
url := fmt.Sprintf("%s/_ingest/pipeline/%s", elasticURL, pipelineID)
14+
15+
pipelineBody := `{"description":"test ingest pipeline","processors":[{"set":{"field":"processed_at","value":"{{_ingest.timestamp}}"}}]}`
16+
17+
req, err := http.NewRequest(http.MethodPut, url, strings.NewReader(pipelineBody))
18+
if err != nil {
19+
return fmt.Errorf("failed to create request: %w", err)
20+
}
21+
22+
req.Header.Set("Content-Type", "application/json")
23+
if username != "" && password != "" {
24+
req.SetBasicAuth(username, password)
25+
}
26+
27+
client := &http.Client{Timeout: time.Second}
28+
resp, err := client.Do(req)
29+
if err != nil {
30+
return fmt.Errorf("failed to make HTTP request: %w", err)
31+
}
32+
defer func() { _ = resp.Body.Close() }()
33+
34+
respBody, err := io.ReadAll(resp.Body)
35+
if err != nil {
36+
return fmt.Errorf("failed to read body response: %w", err)
37+
}
38+
39+
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
40+
return fmt.Errorf("unexpected status: %d, body: %s", resp.StatusCode, string(respBody))
41+
}
42+
43+
return nil
44+
}
45+
46+
func getDocumentsFromIndex(elasticURL, indexName, username, password string) ([]map[string]interface{}, error) {
47+
url := fmt.Sprintf("%s/%s/_search", elasticURL, indexName)
48+
49+
body := `{"query":{"match_all":{}}}`
50+
51+
req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(body))
52+
if err != nil {
53+
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
54+
}
55+
56+
req.Header.Set("Content-Type", "application/json")
57+
if username != "" && password != "" {
58+
req.SetBasicAuth(username, password)
59+
}
60+
61+
client := &http.Client{Timeout: time.Second}
62+
resp, err := client.Do(req)
63+
if err != nil {
64+
return nil, fmt.Errorf("failed to make HTTP request: %w", err)
65+
}
66+
defer func() { _ = resp.Body.Close() }()
67+
68+
respBody, err := io.ReadAll(resp.Body)
69+
if err != nil {
70+
return nil, fmt.Errorf("error reading response: %w", err)
71+
}
72+
73+
if resp.StatusCode != http.StatusOK {
74+
return nil, fmt.Errorf("unexpected status: %d, response: %s", resp.StatusCode, string(respBody))
75+
}
76+
77+
type searchResponse struct {
78+
Hits struct {
79+
Hits []struct {
80+
Source map[string]interface{} `json:"_source"`
81+
} `json:"hits"`
82+
} `json:"hits"`
83+
}
84+
85+
var result searchResponse
86+
if err := json.Unmarshal(respBody, &result); err != nil {
87+
return nil, fmt.Errorf("failed to decode response: %w", err)
88+
}
89+
90+
resultDocs := make([]map[string]interface{}, 0, len(result.Hits.Hits))
91+
92+
for _, hit := range result.Hits.Hits {
93+
resultDocs = append(resultDocs, hit.Source)
94+
}
95+
96+
return resultDocs, nil
97+
}
98+
99+
func waitUntilIndexReady(elasticURL, indexName, username, password string, minDocs, retries int, delay time.Duration) error {
100+
client := &http.Client{
101+
Timeout: time.Second,
102+
}
103+
104+
url := fmt.Sprintf("%s/%s/_count", elasticURL, indexName)
105+
req, err := http.NewRequest(http.MethodGet, url, http.NoBody)
106+
if err != nil {
107+
return fmt.Errorf("failed to create request: %w", err)
108+
}
109+
110+
req.Header.Set("Content-Type", "application/json")
111+
if username != "" && password != "" {
112+
req.SetBasicAuth(username, password)
113+
}
114+
115+
for i := 0; i < retries; i++ {
116+
ok, err := func() (bool, error) {
117+
resp, err := client.Do(req)
118+
if err != nil {
119+
return false, fmt.Errorf("failed to make request: %w", err)
120+
}
121+
defer func() { _ = resp.Body.Close() }()
122+
123+
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusServiceUnavailable {
124+
return false, nil
125+
}
126+
127+
if resp.StatusCode != http.StatusOK {
128+
return false, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
129+
}
130+
131+
body, err := io.ReadAll(resp.Body)
132+
if err != nil {
133+
return false, fmt.Errorf("failed to read response: %w", err)
134+
}
135+
136+
var result map[string]interface{}
137+
if err := json.Unmarshal(body, &result); err != nil {
138+
return false, fmt.Errorf("failed to decode response: %w", err)
139+
}
140+
141+
if count, ok := result["count"].(float64); ok {
142+
if int(count) >= minDocs {
143+
return true, nil
144+
}
145+
} else {
146+
return false, fmt.Errorf("unexpected response structure")
147+
}
148+
149+
return false, nil
150+
}()
151+
152+
if err != nil {
153+
return err
154+
}
155+
if ok {
156+
return nil
157+
}
158+
time.Sleep(delay)
159+
}
160+
161+
return fmt.Errorf("index '%s' did not meet conditions after %d retries", indexName, retries)
162+
}

e2e/start_work_test.go

+18-6
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,20 @@ package e2e_test
44

55
import (
66
"context"
7-
"log"
8-
"strconv"
9-
"testing"
10-
"time"
11-
12-
"github.com/ozontech/file.d/cfg"
137
"github.com/ozontech/file.d/e2e/file_clickhouse"
8+
"github.com/ozontech/file.d/e2e/file_elasticsearch"
149
"github.com/ozontech/file.d/e2e/file_file"
1510
"github.com/ozontech/file.d/e2e/http_file"
1611
"github.com/ozontech/file.d/e2e/join_throttle"
1712
"github.com/ozontech/file.d/e2e/kafka_auth"
1813
"github.com/ozontech/file.d/e2e/kafka_file"
1914
"github.com/ozontech/file.d/e2e/split_join"
15+
"log"
16+
"strconv"
17+
"testing"
18+
"time"
19+
20+
"github.com/ozontech/file.d/cfg"
2021
"github.com/ozontech/file.d/fd"
2122
_ "github.com/ozontech/file.d/plugin/action/add_file_name"
2223
_ "github.com/ozontech/file.d/plugin/action/add_host"
@@ -143,6 +144,17 @@ func TestE2EStabilityWorkCase(t *testing.T) {
143144
e2eTest: &file_clickhouse.Config{},
144145
cfgPath: "./file_clickhouse/config.yml",
145146
},
147+
{
148+
name: "file_elasticsearch",
149+
e2eTest: &file_elasticsearch.Config{
150+
Count: 10,
151+
Pipeline: "test-ingest-pipeline",
152+
Endpoint: "http://localhost:19200",
153+
Username: "elastic",
154+
Password: "elastic",
155+
},
156+
cfgPath: "./file_elasticsearch/config.yml",
157+
},
146158
}
147159

148160
for num, test := range testsList {

plugin/output/elasticsearch/README.md

+6
Original file line numberDiff line numberDiff line change
@@ -151,5 +151,11 @@ After a non-retryable write error, fall with a non-zero exit code or not
151151

152152
<br>
153153

154+
**`ingest_pipeline`** *`string`*
155+
156+
The name of the ingest pipeline to write events to.
157+
158+
<br>
159+
154160

155161
<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*

plugin/output/elasticsearch/elasticsearch.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,11 @@ type Config struct {
186186
// >
187187
// > After a non-retryable write error, fall with a non-zero exit code or not
188188
Strict bool `json:"strict" default:"false"` // *
189+
190+
// > @3@4@5@6
191+
// >
192+
// > The name of the ingest pipeline to write events to.
193+
IngestPipeline string `json:"ingest_pipeline"` // *
189194
}
190195

191196
type KeepAliveConfig struct {
@@ -293,7 +298,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
293298

294299
func (p *Plugin) prepareClient() {
295300
config := &xhttp.ClientConfig{
296-
Endpoints: prepareEndpoints(p.config.Endpoints),
301+
Endpoints: prepareEndpoints(p.config.Endpoints, p.config.IngestPipeline),
297302
ConnectionTimeout: p.config.ConnectionTimeout_ * 2,
298303
AuthHeader: p.getAuthHeader(),
299304
KeepAlive: &xhttp.ClientKeepAliveConfig{
@@ -317,13 +322,17 @@ func (p *Plugin) prepareClient() {
317322
}
318323
}
319324

320-
func prepareEndpoints(endpoints []string) []string {
325+
func prepareEndpoints(endpoints []string, ingestPipeline string) []string {
321326
res := make([]string, 0, len(endpoints))
322327
for _, e := range endpoints {
323328
if e[len(e)-1] == '/' {
324329
e = e[:len(e)-1]
325330
}
326-
res = append(res, e+"/_bulk?_source=false")
331+
e += "/_bulk?_source=false"
332+
if ingestPipeline != "" {
333+
e += "&pipeline=" + ingestPipeline
334+
}
335+
res = append(res, e)
327336
}
328337
return res
329338
}

0 commit comments

Comments
 (0)