Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: use structured metadata rather than a JSON line #1918

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 83 additions & 47 deletions src/features/device-logs/lib/backends/loki.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,10 @@ import {
incrementPublishCallTotal,
} from './metrics.js';
import { setTimeout } from 'timers/promises';
import { omitNanoTimestamp } from '../config.js';
import { requestAsync } from '../../../../infra/request-promise/index.js';

const { BadRequestError } = errors;

interface LokiDeviceLog extends Omit<InternalDeviceLog, 'nanoTimestamp'> {
version?: number;
createdAt?: number;
}

// invert status object for quick lookup of status identifier using status code
const statusKeys = _.transform(
loki.status,
Expand All @@ -62,7 +56,6 @@ const lokiIngesterAddress = `${LOKI_INGESTER_HOST}:${LOKI_INGESTER_GRPC_PORT}`;

const MIN_BACKOFF = 100;
const MAX_BACKOFF = 10 * 1000;
const VERSION = 2;

function createTimestampFromDate(date = new Date()) {
const timestamp = new loki.Timestamp();
Expand Down Expand Up @@ -215,21 +208,43 @@ export class LokiBackend implements DeviceLogsBackend {

return _(
body.data.result as Array<{
stream: {
application_id: string;
device_id: string;
[name: string]: string;
};
values: Array<[timestamp: string, logLine: string]>;
}>,
)
.flatMap(({ values }) => values)
.map(([timestamp, logLine]): [bigint, OutputDeviceLog] => {
const log: LokiDeviceLog = JSON.parse(logLine);
if (log.version !== VERSION) {
throw new Error(
`Invalid Loki serialization version: ${JSON.stringify(log)}`,
);
.flatMap(({ stream, values }) => {
const baseLog: Partial<OutputDeviceLog> = {};
for (const [key, value] of Object.entries(stream)) {
switch (key) {
case 'timestamp':
baseLog.timestamp = Number(value);
break;
case 'is_system':
baseLog.isSystem = value === 'true';
break;
case 'is_stderr':
baseLog.isStdErr = value === 'true';
break;
case 'service_id':
baseLog.serviceId = Number(value);
break;
}
}
delete log.version;
const nanoTimestamp = BigInt(timestamp);
log.createdAt = Math.floor(Number(nanoTimestamp / 1000000n));
return [nanoTimestamp, log as OutputDeviceLog];
return values.map(([timestamp, message]): [bigint, OutputDeviceLog] => {
const nanoTimestamp = BigInt(timestamp);
return [
nanoTimestamp,
{
...baseLog,
createdAt: Math.floor(Number(nanoTimestamp / 1000000n)),
message,
} as OutputDeviceLog,
];
});
})
.sortBy(([timestamp]) => timestamp)
.map(([, log]) => log)
Expand All @@ -238,7 +253,7 @@ export class LokiBackend implements DeviceLogsBackend {

public async publish(
ctx: LogContext,
logs: Array<InternalDeviceLog & { version?: number }>,
logs: InternalDeviceLog[],
): Promise<any> {
const logEntries = this.fromDeviceLogsToEntries(ctx, logs);

Expand Down Expand Up @@ -349,12 +364,6 @@ export class LokiBackend implements DeviceLogsBackend {
return `o${ctx.orgId}:a${ctx.appId}:d${ctx.id}`;
}

private getStructuredMetadata(ctx: LogContext): loki.LabelPairAdapter[] {
return [
new loki.LabelPairAdapter().setName('device_id').setValue(`${ctx.id}`),
];
}

private getLabels(ctx: LokiLogContext): string {
return `{fleet_id="${ctx.appId}"}`;
}
Expand All @@ -364,18 +373,32 @@ export class LokiBackend implements DeviceLogsBackend {
): OutputDeviceLog[] {
try {
return stream.getEntriesList().map((entry) => {
const log: LokiDeviceLog = JSON.parse(entry.getLine());
if (log.version !== VERSION) {
throw new Error(
`Invalid Loki serialization version: ${JSON.stringify(log)}`,
);
}
delete log.version;
const timestampEntry = entry.getTimestamp()!;
const message = entry.getLine();
const structuredMetadataList = entry.getStructuredmetadataList();
const timestamp = entry.getTimestamp()!;
const nanoTimestamp =
BigInt(timestampEntry.getSeconds()) * 1000000000n +
BigInt(timestampEntry.getNanos());
log.createdAt = Math.floor(Number(nanoTimestamp / 1000000n));
BigInt(timestamp.getSeconds()) * 1000000000n +
BigInt(timestamp.getNanos());
const log: Partial<OutputDeviceLog> = {
createdAt: Math.floor(Number(nanoTimestamp / 1000000n)),
message,
};
for (const structuredMetadata of structuredMetadataList) {
switch (structuredMetadata.getName()) {
case 'timestamp':
log.timestamp = Number(structuredMetadata.getValue());
break;
case 'is_system':
log.isSystem = structuredMetadata.getValue() === 'true';
break;
case 'is_stderr':
log.isStdErr = structuredMetadata.getValue() === 'true';
break;
case 'service_id':
log.serviceId = Number(structuredMetadata.getValue());
break;
}
}
return log as OutputDeviceLog;
});
} catch (err) {
Expand All @@ -384,23 +407,36 @@ export class LokiBackend implements DeviceLogsBackend {
}
}

private fromDeviceLogsToEntries(
ctx: LogContext,
logs: Array<InternalDeviceLog & { version?: number }>,
) {
const structuredMetadata = this.getStructuredMetadata(ctx);
private fromDeviceLogsToEntries(ctx: LogContext, logs: InternalDeviceLog[]) {
const deviceId = new loki.LabelPairAdapter()
.setName('device_id')
.setValue(`${ctx.id}`);
return logs.map((log) => {
const timestamp = new loki.Timestamp();
timestamp.setSeconds(Math.floor(Number(log.nanoTimestamp / 1000000000n)));
timestamp.setNanos(Number(log.nanoTimestamp % 1000000000n));
// store log line as JSON
const logJson = JSON.stringify(
{ ...log, version: VERSION },
omitNanoTimestamp,
);
const structuredMetadata = [
deviceId,
new loki.LabelPairAdapter()
.setName('timestamp')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we name this piece of metadata as "device_timestamp" instead to make it clearer what it actually is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also questionable as to whether we should store it as structured metadata because every entry for it will be unique and possibly make a bunch of other things more painful and is probably not ergonomic or even useful to query by

.setValue(`${log.timestamp}`),
new loki.LabelPairAdapter()
.setName('is_system')
.setValue(`${log.isSystem}`),
new loki.LabelPairAdapter()
.setName('is_stderr')
.setValue(`${log.isStdErr}`),
];
if (log.serviceId) {
structuredMetadata.push(
new loki.LabelPairAdapter()
.setName('service_id')
.setValue(`${log.serviceId}`),
);
}
// create entry with labels, line and timestamp
return new loki.EntryAdapter()
.setLine(logJson)
.setLine(log.message)
.setTimestamp(timestamp)
.setStructuredmetadataList(structuredMetadata);
});
Expand Down
Loading