4
4
import com .datadoghq .trace .DDTraceInfo ;
5
5
import com .datadoghq .trace .Service ;
6
6
import com .fasterxml .jackson .databind .ObjectMapper ;
7
+ import com .google .common .util .concurrent .RateLimiter ;
7
8
import java .io .IOException ;
8
9
import java .io .OutputStream ;
9
10
import java .net .HttpURLConnection ;
10
11
import java .net .URL ;
11
12
import java .util .List ;
12
13
import java .util .Map ;
14
+ import java .util .concurrent .TimeUnit ;
13
15
import lombok .extern .slf4j .Slf4j ;
14
16
import org .msgpack .jackson .dataformat .MessagePackFactory ;
15
17
@@ -19,10 +21,14 @@ public class DDApi {
19
21
20
22
private static final String TRACES_ENDPOINT = "/v0.3/traces" ;
21
23
private static final String SERVICES_ENDPOINT = "/v0.3/services" ;
24
+ private static final long SECONDS_BETWEEN_ERROR_LOG = TimeUnit .MINUTES .toSeconds (5 );
22
25
23
26
private final String tracesEndpoint ;
24
27
private final String servicesEndpoint ;
25
28
29
+ private final RateLimiter loggingRateLimiter =
30
+ RateLimiter .create (1.0 / SECONDS_BETWEEN_ERROR_LOG );
31
+
26
32
private final ObjectMapper objectMapper = new ObjectMapper (new MessagePackFactory ());
27
33
28
34
public DDApi (final String host , final int port ) {
@@ -37,14 +43,7 @@ public DDApi(final String host, final int port) {
37
43
* @return the staus code returned
38
44
*/
39
45
public boolean sendTraces (final List <List <DDBaseSpan <?>>> traces ) {
40
- final int status = callPUT (tracesEndpoint , traces );
41
- if (status == 200 ) {
42
- log .debug ("Succesfully sent {} traces to the DD agent." , traces .size ());
43
- return true ;
44
- } else {
45
- log .warn ("Error while sending {} traces to the DD agent. Status: {}" , traces .size (), status );
46
- return false ;
47
- }
46
+ return putContent ("traces" , tracesEndpoint , traces , traces .size ());
48
47
}
49
48
50
49
/**
@@ -56,15 +55,7 @@ public boolean sendServices(final Map<String, Service> services) {
56
55
if (services == null ) {
57
56
return true ;
58
57
}
59
- final int status = callPUT (servicesEndpoint , services );
60
- if (status == 200 ) {
61
- log .debug ("Succesfully sent {} services to the DD agent." , services .size ());
62
- return true ;
63
- } else {
64
- log .warn (
65
- "Error while sending {} services to the DD agent. Status: {}" , services .size (), status );
66
- return false ;
67
- }
58
+ return putContent ("services" , servicesEndpoint , services , services .size ());
68
59
}
69
60
70
61
/**
@@ -73,33 +64,52 @@ public boolean sendServices(final Map<String, Service> services) {
73
64
* @param content
74
65
* @return the status code
75
66
*/
76
- private int callPUT ( final String endpoint , final Object content ) {
77
- HttpURLConnection httpCon = null ;
67
+ private boolean putContent (
68
+ final String type , final String endpoint , final Object content , final int size ) {
78
69
try {
79
- httpCon = getHttpURLConnection (endpoint );
80
- } catch (final Exception e ) {
81
- log .warn ("Error thrown before PUT call to the DD agent." , e );
82
- return -1 ;
83
- }
70
+ final HttpURLConnection httpCon = getHttpURLConnection (endpoint );
84
71
85
- try {
86
72
final OutputStream out = httpCon .getOutputStream ();
87
73
objectMapper .writeValue (out , content );
88
74
out .flush ();
89
75
out .close ();
76
+
90
77
final int responseCode = httpCon .getResponseCode ();
91
- if (responseCode == 200 ) {
92
- log .debug ("Sent the payload to the DD agent." );
93
- } else {
78
+ if (responseCode != 200 ) {
79
+ if (log .isDebugEnabled ()) {
80
+ log .debug (
81
+ "Error while sending {} {} to the DD agent. Status: {}, ResponseMessage: " ,
82
+ size ,
83
+ type ,
84
+ responseCode ,
85
+ httpCon .getResponseMessage ());
86
+ } else if (loggingRateLimiter .tryAcquire ()) {
87
+ log .warn (
88
+ "Error while sending {} {} to the DD agent. Status: {} (going silent for {} seconds)" ,
89
+ size ,
90
+ type ,
91
+ responseCode ,
92
+ httpCon .getResponseMessage (),
93
+ SECONDS_BETWEEN_ERROR_LOG );
94
+ }
95
+ return false ;
96
+ }
97
+
98
+ log .debug ("Succesfully sent {} {} to the DD agent." , size , type );
99
+ return true ;
100
+
101
+ } catch (final IOException e ) {
102
+ if (log .isDebugEnabled ()) {
103
+ log .debug ("Error while sending " + size + " " + type + " to the DD agent." , e );
104
+ } else if (loggingRateLimiter .tryAcquire ()) {
94
105
log .warn (
95
- "Could not send the payload to the DD agent. Status: {} ResponseMessage: {}" ,
96
- httpCon .getResponseCode (),
97
- httpCon .getResponseMessage ());
106
+ "Error while sending {} {} to the DD agent. Message: {} (going silent for {} seconds)" ,
107
+ size ,
108
+ type ,
109
+ e .getMessage (),
110
+ SECONDS_BETWEEN_ERROR_LOG );
98
111
}
99
- return responseCode ;
100
- } catch (final Exception e ) {
101
- log .warn ("Could not send the payload to the DD agent." , e );
102
- return -1 ;
112
+ return false ;
103
113
}
104
114
}
105
115
0 commit comments