Skip to content

Commit 86e3362

Browse files
authored
observability: add configurable value for flush from configuration (grpc#9034)
1 parent b20ce17 commit 86e3362

File tree

10 files changed

+43
-245
lines changed

10 files changed

+43
-245
lines changed

gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ public static synchronized GcpObservability grpcInit() throws IOException {
4646
GlobalLoggingTags globalLoggingTags = new GlobalLoggingTags();
4747
ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance();
4848
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(),
49-
globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(), 10);
49+
globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(),
50+
observabilityConfig.getFlushMessageCount());
5051
LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
5152
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig);
5253
instance = grpcInit(sink,

gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ public interface ObservabilityConfig {
2626
/** Get destination project ID - where logs will go. */
2727
String getDestinationProjectId();
2828

29+
/** Get message count threshold to flush - flush once message count is reached. */
30+
Long getFlushMessageCount();
31+
2932
/** Get filters set for logging. */
3033
List<LogFilter> getLogFilters();
3134

gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ final class ObservabilityConfigImpl implements ObservabilityConfig {
3434

3535
private boolean enableCloudLogging = true;
3636
private String destinationProjectId = null;
37+
private Long flushMessageCount = null;
3738
private List<LogFilter> logFilters;
3839
private List<EventType> eventTypes;
3940

@@ -56,6 +57,7 @@ private void parseLoggingConfig(Map<String, ?> loggingConfig) {
5657
enableCloudLogging = value;
5758
}
5859
destinationProjectId = JsonUtil.getString(loggingConfig, "destination_project_id");
60+
flushMessageCount = JsonUtil.getNumberAsLong(loggingConfig, "flush_message_count");
5961
List<?> rawList = JsonUtil.getList(loggingConfig, "log_filters");
6062
if (rawList != null) {
6163
List<Map<String, ?>> jsonLogFilters = JsonUtil.checkObjectList(rawList);
@@ -116,6 +118,11 @@ public String getDestinationProjectId() {
116118
return destinationProjectId;
117119
}
118120

121+
@Override
122+
public Long getFlushMessageCount() {
123+
return flushMessageCount;
124+
}
125+
119126
@Override
120127
public List<LogFilter> getLogFilters() {
121128
return logFilters;

gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/ConfigFilterHelper.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,6 @@ public boolean isEventToBeLogged(EventType event) {
210210
if (logEventTypeSet == null) {
211211
return true;
212212
}
213-
boolean logEvent;
214-
if (logEventTypeSet.isEmpty()) {
215-
logEvent = false;
216-
} else {
217-
logEvent = logEventTypeSet.contains(event);
218-
}
219-
return logEvent;
213+
return logEventTypeSet.contains(event);
220214
}
221215
}

gcp-observability/src/main/java/io/grpc/gcp/observability/logging/CloudLoggingHandler.java

Lines changed: 0 additions & 181 deletions
This file was deleted.

gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,12 @@ public class GcpLogSink implements Sink {
4949
private static final Set<String> kubernetesResourceLabelSet
5050
= ImmutableSet.of("project_id", "location", "cluster_name", "namespace_name",
5151
"pod_name", "container_name");
52-
private static final int FALLBACK_FLUSH_LIMIT = 100;
52+
private static final long FALLBACK_FLUSH_LIMIT = 100L;
5353
private final Map<String, String> customTags;
5454
private final Logging gcpLoggingClient;
5555
private final MonitoredResource kubernetesResource;
56-
private final int flushLimit;
57-
private int flushCounter;
56+
private final Long flushLimit;
57+
private long flushCounter;
5858

5959
private static Logging createLoggingClient(String projectId) {
6060
LoggingOptions.Builder builder = LoggingOptions.newBuilder();
@@ -70,19 +70,19 @@ private static Logging createLoggingClient(String projectId) {
7070
* @param destinationProjectId cloud project id to write logs
7171
*/
7272
public GcpLogSink(String destinationProjectId, Map<String, String> locationTags,
73-
Map<String, String> customTags, int flushLimit) {
73+
Map<String, String> customTags, Long flushLimit) {
7474
this(createLoggingClient(destinationProjectId), locationTags, customTags, flushLimit);
7575

7676
}
7777

7878
@VisibleForTesting
7979
GcpLogSink(Logging client, Map<String, String> locationTags, Map<String, String> customTags,
80-
int flushLimit) {
80+
Long flushLimit) {
8181
this.gcpLoggingClient = client;
8282
this.customTags = customTags != null ? customTags : new HashMap<>();
8383
this.kubernetesResource = getResource(locationTags);
84-
this.flushLimit = flushLimit != 0 ? flushLimit : FALLBACK_FLUSH_LIMIT;
85-
this.flushCounter = 0;
84+
this.flushLimit = flushLimit != null ? flushLimit : FALLBACK_FLUSH_LIMIT;
85+
this.flushCounter = 0L;
8686
}
8787

8888
/**
@@ -116,10 +116,10 @@ public void write(GrpcLogRecord logProto) {
116116
synchronized (this) {
117117
logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", event);
118118
gcpLoggingClient.write(Collections.singleton(grpcLogEntry));
119-
flushCounter += 1;
119+
flushCounter = ++flushCounter;
120120
if (flushCounter >= flushLimit) {
121121
gcpLoggingClient.flush();
122-
flushCounter = 0;
122+
flushCounter = 0L;
123123
}
124124
}
125125
} catch (Exception e) {

gcp-observability/src/main/java/io/grpc/gcp/observability/logging/LogRecordExtension.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public class LoggingTest {
6565
private static final Map<String, String> customTags = ImmutableMap.of(
6666
"KEY1", "Value1",
6767
"KEY2", "VALUE2");
68-
private static final int flushLimit = 100;
68+
private static final long flushLimit = 100L;
6969

7070
/**
7171
* Cloud logging test using LoggingChannelProvider and LoggingServerProvider.

0 commit comments

Comments
 (0)