Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Add user tracking and enhance telemetry with execution context #286

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ LANGFUSE_BASEURL=
LANGFUSE_PUBLIC_KEY=
LANGFUSE_SECRET_KEY=

# Configure delay for tracer that set vercel/otel
# @see https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#batch-span-processor
# OTEL_BSP_SCHEDULE_DELAY=

# OpenAI API https://openai.com/index/openai-api/
# @see https://platform.openai.com/docs/quickstart
OPENAI_API_KEY=
Expand Down
84 changes: 19 additions & 65 deletions app/(playground)/p/[agentId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import type {
GitHubTriggerEvent,
} from "@/services/external/github/types";
import { reportAgentTimeUsage } from "@/services/usage-based-billing/report-agent-time-usage";
import { putGraph } from "@giselles-ai/actions";
import { Playground } from "@giselles-ai/components/playground";
import { AgentNameProvider } from "@giselles-ai/contexts/agent-name";
import { DeveloperModeProvider } from "@giselles-ai/contexts/developer-mode";
Expand All @@ -28,6 +27,7 @@ import { PlaygroundModeProvider } from "@giselles-ai/contexts/playground-mode";
import { PropertiesPanelProvider } from "@giselles-ai/contexts/properties-panel";
import { ToastProvider } from "@giselles-ai/contexts/toast";
import { ToolbarContextProvider } from "@giselles-ai/contexts/toolbar";
import { persistGraph } from "@giselles-ai/giselle-provider/graph/persist";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

import {
executeNode,
executeStep,
Expand All @@ -40,7 +40,6 @@ import {
import { isLatestVersion, migrateGraph } from "@giselles-ai/lib/graph";
import {
buildGraphExecutionPath,
buildGraphFolderPath,
createGithubIntegrationSettingId,
} from "@giselles-ai/lib/utils";
import type {
Expand All @@ -55,7 +54,7 @@ import type {
NodeId,
StepId,
} from "@giselles-ai/types";
import { del, list, put } from "@vercel/blob";
import { put } from "@vercel/blob";
import { ReactFlowProvider } from "@xyflow/react";
import { eq } from "drizzle-orm";
import { notFound } from "next/navigation";
Expand Down Expand Up @@ -89,70 +88,17 @@ export default async function Page({
let graph = await fetch(agent.graphUrl).then(
(res) => res.json() as unknown as Graph,
);
let graphUrl = agent.graphUrl;
if (!isLatestVersion(graph)) {
graph = migrateGraph(graph);
graphUrl = await persistGraph({ graph, agentId });
}

const gitHubIntegrationState = await getGitHubIntegrationState(agent.dbId);

async function persistGraph(graph: Graph) {
async function persistGraphAction(graph: Graph) {
"use server";
const startTime = Date.now();
const logger = createLogger("persistGraph");
const { url } = await putGraph(graph);
const { blobList } = await withCountMeasurement(
logger,
async () => {
const result = await list({
prefix: buildGraphFolderPath(graph.id),
mode: "folded",
});
const size = result.blobs.reduce((sum, blob) => sum + blob.size, 0);
return {
blobList: result,
size,
};
},
ExternalServiceName.VercelBlob,
startTime,
VercelBlobOperation.List,
);

const oldBlobs = blobList.blobs
.filter((blob) => blob.url !== url)
.map((blob) => ({
url: blob.url,
size: blob.size,
}));

if (oldBlobs.length > 0) {
await withCountMeasurement(
logger,
async () => {
await del(oldBlobs.map((blob) => blob.url));
const totalSize = oldBlobs.reduce((sum, blob) => sum + blob.size, 0);
return {
size: totalSize,
};
},
ExternalServiceName.VercelBlob,
startTime,
VercelBlobOperation.Del,
);
waitForTelemetryExport();
}

await db
.update(agents)
.set({
graphUrl: url,
})
.where(eq(agents.id, agentId));

return url;
}

let graphUrl = agent.graphUrl;
if (!isLatestVersion(graph)) {
graph = migrateGraph(graph);
graphUrl = await persistGraph(graph);
return persistGraph({ graph, agentId });
}

async function updateAgentName(agentName: string) {
Expand Down Expand Up @@ -180,6 +126,7 @@ export default async function Page({
stepId,
artifacts,
stream: true,
userId: user.id,
});
}
async function putExecutionAction(executionSnapshot: ExecutionSnapshot) {
Expand Down Expand Up @@ -223,12 +170,19 @@ export default async function Page({
stepId,
artifacts,
stream: true,
userId: user.id,
});
}

async function executeNodeAction(executionId: ExecutionId, nodeId: NodeId) {
"use server";
return await executeNode({ agentId, executionId, nodeId, stream: true });
return await executeNode({
agentId,
executionId,
nodeId,
stream: true,
userId: user.id,
});
}

async function onFinishPerformExecutionAction(
Expand Down Expand Up @@ -351,7 +305,7 @@ export default async function Page({
<DeveloperModeProvider developerMode={developerMode}>
<GraphContextProvider
defaultGraph={graph}
onPersistAction={persistGraph}
onPersistAction={persistGraphAction}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

defaultGraphUrl={graphUrl}
>
<GitHubIntegrationProvider
Expand Down
Binary file modified bun.lockb
Binary file not shown.
6 changes: 6 additions & 0 deletions instrumentation.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ import {
noopSpanProcessor,
} from "@/lib/opentelemetry";
import { registerOTel } from "@vercel/otel";
import { LangfuseExporter } from "langfuse-vercel";

registerOTel({
serviceName: "giselle",
spanProcessors: [noopSpanProcessor],
metricReader,
logRecordProcessor,
traceExporter: new LangfuseExporter({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't expect where the telemetry sent because this initialization does not contain target endpoint.
But reading the Langfuse documant, this configuration seems ok.
Let's merge and check that new fields are included in the telemetry arrived to our Langfuse account

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having "staging" environment is good if we accept pull requests from forked repo: we can check new functionalities before merging them into main

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Rindrics It deployed to staging. please verify!

flushInterval: Number.parseInt(
process.env.LANGFUSE_FLUSH_INTERVAL ?? "1000",
),
}),
});
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"import-in-the-middle": "1.11.0",
"input-otp": "^1.2.4",
"langfuse": "3.26.0",
"langfuse-vercel": "3.32.0",
"lucide-react": "0.417.0",
"next": "^15.1.2",
"next-auth": "^5.0.0-beta.20",
Expand Down
97 changes: 97 additions & 0 deletions packages/giselle-provider/graph/persist.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"use server";

import { agents, db } from "@/drizzle";
import {
ExternalServiceName,
VercelBlobOperation,
createLogger,
waitForTelemetryExport,
withCountMeasurement,
} from "@/lib/opentelemetry";
import { buildGraphFolderPath, buildGraphPath } from "@giselles-ai/lib/utils";
import type { AgentId, Graph } from "@giselles-ai/types";
import { del, list, put } from "@vercel/blob";
import { eq } from "drizzle-orm";

export async function putGraph(graph: Graph) {
const startTime = Date.now();
const stringifiedGraph = JSON.stringify(graph);
const result = await withCountMeasurement(
createLogger("put-graph"),
async () => {
const result = await put(buildGraphPath(graph.id), stringifiedGraph, {
access: "public",
});

return {
blob: result,
size: new TextEncoder().encode(stringifiedGraph).length,
};
},
ExternalServiceName.VercelBlob,
startTime,
VercelBlobOperation.Put,
);
waitForTelemetryExport();
return result.blob;
}

export async function persistGraph({
graph,
agentId,
}: { graph: Graph; agentId: AgentId }) {
const startTime = Date.now();
const { url } = await putGraph(graph);

await db
.update(agents)
.set({
graphUrl: url,
})
.where(eq(agents.id, agentId));

const logger = createLogger("persistGraph");
const { blobList } = await withCountMeasurement(
logger,
async () => {
const result = await list({
prefix: buildGraphFolderPath(graph.id),
mode: "folded",
});
const size = result.blobs.reduce((sum, blob) => sum + blob.size, 0);
return {
blobList: result,
size,
};
},
ExternalServiceName.VercelBlob,
startTime,
VercelBlobOperation.List,
);

const oldBlobs = blobList.blobs
.filter((blob) => blob.url !== url)
.map((blob) => ({
url: blob.url,
size: blob.size,
}));

if (oldBlobs.length > 0) {
await withCountMeasurement(
logger,
async () => {
await del(oldBlobs.map((blob) => blob.url));
const totalSize = oldBlobs.reduce((sum, blob) => sum + blob.size, 0);
return {
size: totalSize,
};
},
ExternalServiceName.VercelBlob,
startTime,
VercelBlobOperation.Del,
);
waitForTelemetryExport();
}

return url;
}
Loading
Loading