8
8
"fmt"
9
9
"io"
10
10
"net/http"
11
+ "strconv"
11
12
"time"
12
13
13
14
"github.com/ozontech/file.d/cfg"
@@ -37,7 +38,7 @@ type Plugin struct {
37
38
controller pipeline.OutputPluginController
38
39
39
40
// plugin metrics
40
- sendErrorMetric prometheus.Counter
41
+ sendErrorMetric * prometheus.CounterVec
41
42
}
42
43
43
44
// ! config-params
@@ -172,7 +173,11 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
172
173
}
173
174
174
175
func (p * Plugin ) registerMetrics (ctl * metric.Ctl ) {
175
- p .sendErrorMetric = ctl .RegisterCounter ("output_splunk_send_error" , "Total splunk send errors" )
176
+ p .sendErrorMetric = ctl .RegisterCounterVec (
177
+ "output_splunk_send_error" ,
178
+ "Total splunk send errors" ,
179
+ "status_code" ,
180
+ )
176
181
}
177
182
178
183
func (p * Plugin ) Stop () {
@@ -209,10 +214,15 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
209
214
210
215
p .logger .Debugf ("trying to send: %s" , outBuf )
211
216
212
- err := p .send (outBuf )
217
+ code , err := p .send (outBuf )
213
218
if err != nil {
214
- p .sendErrorMetric .Inc ()
219
+ p .sendErrorMetric .WithLabelValues ( strconv . Itoa ( code )). Inc ()
215
220
p .logger .Errorf ("can't send data to splunk address=%s: %s" , p .config .Endpoint , err .Error ())
221
+
222
+ // skip retries for bad request
223
+ if code == http .StatusBadRequest {
224
+ return nil
225
+ }
216
226
} else {
217
227
p .logger .Debugf ("successfully sent: %s" , outBuf )
218
228
}
@@ -234,46 +244,46 @@ func (p *Plugin) newClient(timeout time.Duration) http.Client {
234
244
}
235
245
}
236
246
237
- func (p * Plugin ) send (data []byte ) error {
247
+ func (p * Plugin ) send (data []byte ) ( int , error ) {
238
248
r := bytes .NewReader (data )
239
249
// todo pass context from parent.
240
250
req , err := http .NewRequestWithContext (context .Background (), http .MethodPost , p .config .Endpoint , r )
241
251
if err != nil {
242
- return fmt .Errorf ("can't create request: %w" , err )
252
+ return 0 , fmt .Errorf ("can't create request: %w" , err )
243
253
}
244
254
245
255
req .Header .Set ("Authorization" , "Splunk " + p .config .Token )
246
256
resp , err := p .client .Do (req )
247
257
if err != nil {
248
- return fmt .Errorf ("can't send request: %w" , err )
258
+ return 0 , fmt .Errorf ("can't send request: %w" , err )
249
259
}
250
260
defer func (Body io.ReadCloser ) {
251
261
_ = Body .Close ()
252
262
}(resp .Body )
253
263
254
- if resp .StatusCode != http .StatusOK {
255
- return fmt .Errorf ("can't send request: %s" , resp .Status )
256
- }
257
-
258
264
b , err := io .ReadAll (resp .Body )
259
265
if err != nil {
260
- return fmt .Errorf ("can't read response: %w" , err )
266
+ return resp .StatusCode , fmt .Errorf ("can't read response: %w" , err )
267
+ }
268
+
269
+ if resp .StatusCode != http .StatusOK {
270
+ return resp .StatusCode , fmt .Errorf ("bad response: code=%s, body=%s" , resp .Status , b )
261
271
}
262
272
263
273
root , err := insaneJSON .DecodeBytes (b )
264
274
defer insaneJSON .Release (root )
265
275
if err != nil {
266
- return fmt .Errorf ("can't decode response: %w" , err )
276
+ return resp . StatusCode , fmt .Errorf ("can't decode response: %w" , err )
267
277
}
268
278
269
279
code := root .Dig ("code" )
270
280
if code == nil {
271
- return fmt .Errorf ("invalid response format, expecting json with 'code' field, got: %s" , string (b ))
281
+ return resp . StatusCode , fmt .Errorf ("invalid response format, expecting json with 'code' field, got: %s" , string (b ))
272
282
}
273
283
274
284
if code .AsInt () > 0 {
275
- return fmt .Errorf ("error while sending to splunk: %s" , string (b ))
285
+ return resp . StatusCode , fmt .Errorf ("error while sending to splunk: %s" , string (b ))
276
286
}
277
287
278
- return nil
288
+ return resp . StatusCode , nil
279
289
}
0 commit comments