Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC: Telemetry for Data.read #12385

Open
wants to merge 17 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions distribution/lib/Standard/Base/0.0.0-dev/src/Logging.enso
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import project.Any.Any
import project.Data.Dictionary.Dictionary
import project.Data.Json.JS_Object
import project.Data.Text.Text
import project.Data.Vector.Vector
import project.Data.Numbers.Integer
import project.Errors.Illegal_Argument
import project.Meta
import project.Nothing.Nothing
import project.Panic.Panic

polyglot java import org.enso.base.enso_cloud.telemetry.TelemetryLog
polyglot java import org.enso.base.enso_cloud.telemetry.TelemetryLog.TelemetryLogError
polyglot java import org.slf4j.LoggerFactory

## PRIVATE
Expand Down Expand Up @@ -83,6 +89,29 @@ type Progress
## Simple name of the progress
to_text self = "Progress"


type Telemetry
logger_root = "org.enso.telemetry"

log self name:Text message:Text (metadata:Dictionary Text Any = Dictionary.empty) =
logger_name = Telemetry.logger_root + "." + name
metadata_js = JS_Object.from_pairs <| metadata.to_vector
Standard.Base.IO.println <| "Telemetry.log: name=" + name + ", message=" + message + ", metadata_js=" + metadata_js.to_text
Illegal_Argument.handle_java_exception <|
Telemetry_Log_Error.handle_java_exception <|
TelemetryLog.logAsync logger_name message metadata_js


type Telemetry_Log_Error
private Error message:Text cause:Any

private handle_java_exception =
on_error caught_panic =
cause = caught_panic.payload
Panic.throw (Telemetry_Log_Error.Error cause.getMessage cause)
Panic.catch TelemetryLogError handler=on_error


## PRIVATE
type Log_Level
## Finest (Trace) level log message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.enso.base.enso_cloud.logging.LogApiAccess;

/**
* The high-level API for logging audit events.
Expand All @@ -13,16 +14,18 @@
* the meantime, all waiting messages (up to some limit) will be sent in a single request.
*/
public final class AuditLog {
private AuditLog() {}

/** Schedules the log message to be sent in the next batch, and returns immediately. */
public static void logAsync(String type, String message, ObjectNode metadata) {
var event = new AuditLogMessage(type, message, metadata);
AuditLogApiAccess.INSTANCE.logWithoutConfirmation(event);
var event = AuditLogMessage.create(type, message, metadata);
LogApiAccess.INSTANCE.logWithoutConfirmation(event);
}

/** Schedules the log message to be sent in the next batch, and waits until it has been sent. */
public static void logSynchronously(String type, String message, ObjectNode metadata) {
var event = new AuditLogMessage(type, message, metadata);
Future<Void> future = AuditLogApiAccess.INSTANCE.logWithConfirmation(event);
var event = AuditLogMessage.create(type, message, metadata);
Future<Void> future = LogApiAccess.INSTANCE.logWithConfirmation(event);
try {
future.get();
} catch (ExecutionException | InterruptedException e) {
Expand All @@ -37,6 +40,6 @@ public AuditLogError(String message, Throwable cause) {
}

public static void resetCache() {
AuditLogApiAccess.INSTANCE.resetCache();
LogApiAccess.INSTANCE.resetCache();
}
}
Original file line number Diff line number Diff line change
@@ -1,81 +1,29 @@
package org.enso.base.enso_cloud.audit;

import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import java.util.Objects;
import org.enso.base.CurrentEnsoProject;
import org.enso.base.enso_cloud.CloudAPI;
import org.enso.base.enso_cloud.logging.LogMessage;

class AuditLogMessage implements AuditLogApiAccess.LogMessage {

/**
* A reserved field that is currently added by the cloud backend. Duplicating it will lead to
* internal server errors and log messages being discarded.
*/
private static final String RESERVED_TYPE = "type";
final class AuditLogMessage extends LogMessage {

private static final String OPERATION = "operation";
private static final String PROJECT_NAME = "projectName";
private static final String PROJECT_ID = "projectId";
private static final String PROJECT_SESSION_ID = "projectSessionId";
private static final String LOCAL_TIMESTAMP = "localTimestamp";

private final String projectId;
private final String projectName;
private final String operation;
private final String message;
private final ObjectNode metadata;

public AuditLogMessage(String operation, String message, ObjectNode metadata) {
this.operation = Objects.requireNonNull(operation);
this.message = Objects.requireNonNull(message);
this.metadata = Objects.requireNonNull(metadata);
checkNoRestrictedField(metadata, RESERVED_TYPE);
checkNoRestrictedField(metadata, OPERATION);
checkNoRestrictedField(metadata, PROJECT_NAME);
checkNoRestrictedField(metadata, PROJECT_SESSION_ID);
checkNoRestrictedField(metadata, LOCAL_TIMESTAMP);

this.projectId = CloudAPI.getCloudProjectId();

var currentProject = CurrentEnsoProject.get();
this.projectName = currentProject == null ? null : currentProject.fullName();
private AuditLogMessage(String message, ObjectNode metadata) {
super(message, metadata);
}

private static void checkNoRestrictedField(ObjectNode metadata, String fieldName) {
if (metadata.has(fieldName)) {
throw new IllegalArgumentException(
"Metadata cannot contain a field named '" + fieldName + "'");
}
}

private ObjectNode computedMetadata() {
public static AuditLogMessage create(String operation, String message, ObjectNode metadata) {
Objects.requireNonNull(operation);
Objects.requireNonNull(message);
Objects.requireNonNull(metadata);
var copy = metadata.deepCopy();
copy.set(OPERATION, TextNode.valueOf(operation));

// The project name may be null if a script is run outside a project.
if (projectName != null) {
copy.set(PROJECT_NAME, TextNode.valueOf(projectName));
}

String projectSessionId = CloudAPI.getCloudSessionId();
if (projectSessionId != null) {
copy.set(PROJECT_SESSION_ID, TextNode.valueOf(projectSessionId));
}

return copy;
return new AuditLogMessage(message, copy);
}

@Override
public String payload() {
var payload = new ObjectNode(JsonNodeFactory.instance);
payload.set("message", TextNode.valueOf(message));
payload.set(
PROJECT_ID, projectId == null ? NullNode.getInstance() : TextNode.valueOf(projectId));
payload.set("metadata", computedMetadata());
payload.set("kind", TextNode.valueOf("Lib"));
return payload.toString();
protected String kind() {
return "Lib";
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.enso.base.enso_cloud.audit;
package org.enso.base.enso_cloud.logging;

import java.io.IOException;
import java.net.URI;
Expand All @@ -23,24 +23,23 @@
* Gives access to the low-level log event API in the Cloud and manages asynchronously submitting
* the logs.
*/
class AuditLogApiAccess {
private static final Logger logger = Logger.getLogger(AuditLogApiAccess.class.getName());

public final class LogApiAccess {
/**
* We still want to limit the batch size to some reasonable number - sending too many logs in one
* request could also be problematic.
*/
private static final int MAX_BATCH_SIZE = 100;

private static final int MAX_RETRIES = 5;

public static AuditLogApiAccess INSTANCE = new AuditLogApiAccess();
private static final Logger LOGGER = Logger.getLogger(LogApiAccess.class.getName());
public static final LogApiAccess INSTANCE = new LogApiAccess();

private HttpClient httpClient;
private final LogJobsQueue logQueue = new LogJobsQueue();
private final ThreadPoolExecutor backgroundThreadService;
private RequestConfig cachedRequestConfig = null;

private AuditLogApiAccess() {
private LogApiAccess() {
// We set-up a thread 'pool' that will contain at most one thread.
// If the thread is idle for 60 seconds, it will be shut down.
backgroundThreadService =
Expand All @@ -59,6 +58,10 @@ public void logWithoutConfirmation(LogMessage message) {
enqueueJob(new LogJob(message, null, currentRequestConfig));
}

public void resetCache() {
cachedRequestConfig = null;
}

private void enqueueJob(LogJob job) {
int queuedJobs = logQueue.enqueue(job);
if (queuedJobs == 1 && backgroundThreadService.getQueue().isEmpty()) {
Expand Down Expand Up @@ -109,10 +112,10 @@ private void sendBatch(List<LogJob> batch) {
assert !batch.isEmpty() : "The batch must not be empty.";
// We use the request config from the first message - all messages in the batch should have the
// same request config.
var requestConfig = batch.get(0).requestConfig();
var requestConfig = batch.get(0).getRequestConfig();
assert requestConfig != null
: "The request configuration must be set before building a request.";
assert batch.stream().allMatch(job -> job.requestConfig().equals(requestConfig))
assert batch.stream().allMatch(job -> job.getRequestConfig().equals(requestConfig))
: "All messages in a batch must have the same request configuration.";

try {
Expand All @@ -129,10 +132,10 @@ private void sendBatch(List<LogJob> batch) {
* configs (when the config changes between tests). To send each message where it is intended, we
* split up the batch by the config.
*/
Collection<List<LogJob>> splitMessagesByConfig(List<LogJob> messages) {
private Collection<List<LogJob>> splitMessagesByConfig(List<LogJob> messages) {
HashMap<RequestConfig, List<LogJob>> hashMap = new HashMap<>();
for (var message : messages) {
var list = hashMap.computeIfAbsent(message.requestConfig(), k -> new ArrayList<>());
var list = hashMap.computeIfAbsent(message.getRequestConfig(), k -> new ArrayList<>());
list.add(message);
}

Expand All @@ -141,16 +144,16 @@ Collection<List<LogJob>> splitMessagesByConfig(List<LogJob> messages) {

private void notifyJobsAboutSuccess(List<LogJob> jobs) {
for (var job : jobs) {
if (job.completionNotification() != null) {
job.completionNotification().complete(null);
if (job.getCompletionNotification() != null) {
job.getCompletionNotification().complete(null);
}
}
}

private void notifyJobsAboutFailure(List<LogJob> jobs, RequestFailureException e) {
for (var job : jobs) {
if (job.completionNotification() != null) {
job.completionNotification().completeExceptionally(e);
if (job.getCompletionNotification() != null) {
job.getCompletionNotification().completeExceptionally(e);
}
}
}
Expand All @@ -160,8 +163,8 @@ private HttpRequest buildRequest(RequestConfig requestConfig, List<LogJob> messa
: "The request configuration must be set before building a request.";
var payload = buildPayload(messages);
return HttpRequest.newBuilder()
.uri(requestConfig.apiUri)
.header("Authorization", "Bearer " + requestConfig.accessToken)
.uri(requestConfig.apiUri())
.header("Authorization", "Bearer " + requestConfig.accessToken())
.POST(HttpRequest.BodyPublishers.ofString(payload, StandardCharsets.UTF_8))
.build();
}
Expand All @@ -170,7 +173,7 @@ private String buildPayload(List<LogJob> messages) {
var payload = new StringBuilder();
payload.append("{\"logs\": [");
for (var message : messages) {
payload.append(message.message().payload()).append(",");
payload.append(message.getLogMessage().payload()).append(",");
}

// Remove the trailing comma.
Expand All @@ -179,8 +182,6 @@ private String buildPayload(List<LogJob> messages) {
return payload.toString();
}

private RequestConfig cachedRequestConfig = null;

/**
* Builds a request configuration based on runtime information.
*
Expand All @@ -193,26 +194,12 @@ private RequestConfig getRequestConfig() {
if (cachedRequestConfig != null) {
return cachedRequestConfig;
}

var uri = URI.create(CloudAPI.getAPIRootURI() + "logs");
var config = new RequestConfig(uri, AuthenticationProvider.getAccessToken());
cachedRequestConfig = config;
return config;
}

/**
* Contains information needed to build a request to the Cloud Logs API.
*
* <p>This information must be gathered on the main Enso thread, as only there we have access to
* the {@link AuthenticationProvider}.
*
* <p>We associate an instance with every message to be sent. When sending multiple messages in a
* batch, we will use the config from one of them. This should not matter as in normal operations
* the configs will be the same, they only change during testing. Tests should this into account,
* by sending the last message in synchronous mode.
*/
private record RequestConfig(URI apiUri, String accessToken) {}

private void sendLogRequest(HttpRequest request, int retryCount) throws RequestFailureException {
try {
try {
Expand All @@ -232,37 +219,12 @@ private void sendLogRequest(HttpRequest request, int retryCount) throws RequestF
}
} catch (RequestFailureException e) {
if (retryCount < 0) {
logger.severe("Failed to send log messages after retrying: " + e.getMessage());
LOGGER.severe("Failed to send log messages after retrying: " + e.getMessage());
throw e;
} else {
logger.warning("Exception when sending log messages: " + e.getMessage() + ". Retrying...");
LOGGER.warning("Exception when sending log messages: " + e.getMessage() + ". Retrying...");
sendLogRequest(request, retryCount - 1);
}
}
}

interface LogMessage {
String payload();
}

static class RequestFailureException extends RuntimeException {
public RequestFailureException(String message, Throwable cause) {
super(message, cause);
}
}

/**
* A record that represents a single log to be sent.
*
* <p>It may contain the `completionNotification` future that will be completed when the log is
* sent. If no-one is listening for confirmation, that field will be `null`.
*/
record LogJob(
LogMessage message,
CompletableFuture<Void> completionNotification,
RequestConfig requestConfig) {}

void resetCache() {
cachedRequestConfig = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.enso.base.enso_cloud.logging;

import java.util.concurrent.CompletableFuture;

public final class LogJob {
private final LogMessage logMessage;
private final CompletableFuture<Void> completionNotification;
private final RequestConfig requestConfig;

public LogJob(
LogMessage logMessage,
CompletableFuture<Void> completionNotification,
RequestConfig requestConfig) {
this.logMessage = logMessage;
this.completionNotification = completionNotification;
this.requestConfig = requestConfig;
}

public LogMessage getLogMessage() {
return logMessage;
}

public CompletableFuture<Void> getCompletionNotification() {
return completionNotification;
}

public RequestConfig getRequestConfig() {
return requestConfig;
}
}
Loading