@@ -31,42 +31,88 @@ func (e ElasticSearch) SendNotification(ctx context.Context, message []map[strin
3131
3232 var err error
3333
34- payloadMsg := ""
35- meta := "{\" index\" :{\" _index\" :\" " + e .Config .Index + "\" }}\n "
36- for _ , payload := range message {
37- pl , err := json .Marshal (payload )
34+ // send messages to bulk api
35+ sendBulkRequest := func () error {
36+ payloadMsg := ""
37+ meta := "{\" index\" :{\" _index\" :\" " + e .Config .Index + "\" }}\n "
38+ for _ , payload := range message {
39+ pl , err := json .Marshal (payload )
40+ if err != nil {
41+ return err
42+ }
43+ payloadMsg += meta + string (pl ) + "\n "
44+ }
45+ endpointURL := strings .TrimRight (e .Config .EndpointURL , "/" )
46+ req , err := http .NewRequest (http .MethodPost , endpointURL + "/_bulk" , bytes .NewBuffer ([]byte (payloadMsg )))
3847 if err != nil {
48+ log .Error ().Err (err ).Msg ("error on create http request" )
3949 return err
4050 }
41- payloadMsg += meta + string (pl ) + "\n "
42- }
4351
44- // send message to this elasticsearch using http
45- // Set up the HTTP request.
46- endpointURL := strings .TrimRight (e .Config .EndpointURL , "/" )
47- req , err := http .NewRequest (http .MethodPost , endpointURL + "/_bulk" , bytes .NewBuffer ([]byte (payloadMsg )))
48- if err != nil {
49- log .Error ().Err (err ).Msg ("error on create http request" )
50- span .EndWithErr (err )
51- return err
52- }
52+ req .Header .Set ("Content-Type" , "application/x-ndjson" )
5353
54- req .Header .Set ("Content-Type" , "application/x-ndjson" )
54+ if e .Config .AuthHeader != "" {
55+ req .Header .Set ("Authorization" , e .Config .AuthHeader )
56+ }
5557
56- if e .Config .AuthHeader != "" {
57- req .Header .Set ("Authorization" , e .Config .AuthHeader )
58+ // Make the HTTP request.
59+ resp , err := utils .GetHTTPClient ().Do (req )
60+ if err != nil {
61+ log .Error ().Err (err ).Msg ("error on http request" )
62+ return intgerr .CheckHTTPError (err )
63+ }
64+ defer resp .Body .Close ()
65+
66+ return intgerr .CheckResponseCode (resp , http .StatusOK )
5867 }
68+ err = sendBulkRequest ()
5969
60- // Make the HTTP request.
61- resp , err := utils .GetHTTPClient ().Do (req )
6270 if err != nil {
63- log .Error ().Err (err ).Msg ("error on http request" )
64- span .EndWithErr (err )
65- return intgerr .CheckHTTPError (err )
66- }
67- defer resp .Body .Close ()
71+ log .Warn ().Err (err ).Msg ("error sending to elasticsearch using bulk api, switching to individual requests" )
72+
73+ // Try sending the messages individually
74+ endpointURL := strings .TrimRight (e .Config .EndpointURL , "/" )
75+ postDocumentURL := fmt .Sprintf ("%s/%s/_doc" , endpointURL , e .Config .Index )
76+
77+ postDocumentRequest := func (message map [string ]interface {}) error {
78+ payloadMsg , err := json .Marshal (message )
79+ if err != nil {
80+ return err
81+ }
82+
83+ req , err := http .NewRequest (http .MethodPost , postDocumentURL , bytes .NewBuffer (payloadMsg ))
84+ if err != nil {
85+ log .Error ().Err (err ).Msg ("error on create http request" )
86+ return err
87+ }
88+
89+ req .Header .Set ("Content-Type" , "application/json" )
90+
91+ if e .Config .AuthHeader != "" {
92+ req .Header .Set ("Authorization" , e .Config .AuthHeader )
93+ }
94+
95+ // Make the HTTP request.
96+ resp , err := utils .GetHTTPClient ().Do (req )
97+ if err != nil {
98+ log .Error ().Err (err ).Msg ("error on http request" )
99+ return intgerr .CheckHTTPError (err )
100+ }
101+ defer resp .Body .Close ()
102+
103+ return intgerr .CheckResponseCode (resp , http .StatusCreated )
104+ }
68105
69- return intgerr .CheckResponseCode (resp , http .StatusOK )
106+ for _ , msg := range message {
107+ err = postDocumentRequest (msg )
108+ if err != nil {
109+ log .Error ().Err (err ).Msg ("error sending to elasticsearch" )
110+ span .EndWithErr (err )
111+ return err
112+ }
113+ }
114+ }
115+ return nil
70116}
71117
72118func (e ElasticSearch ) IsValidCredential (ctx context.Context ) (bool , error ) {
0 commit comments