diff --git a/.env.example b/.env.example index 2a117e54..35d248cc 100644 --- a/.env.example +++ b/.env.example @@ -43,6 +43,10 @@ LANGFUSE_BASEURL= LANGFUSE_PUBLIC_KEY= LANGFUSE_SECRET_KEY= +# Configure delay for tracer that set vercel/otel +# @see https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#batch-span-processor +# OTEL_BSP_SCHEDULE_DELAY= + # OpenAI API https://openai.com/index/openai-api/ # @see https://platform.openai.com/docs/quickstart OPENAI_API_KEY= diff --git a/app/(playground)/p/[agentId]/page.tsx b/app/(playground)/p/[agentId]/page.tsx index 36956ace..af48ea25 100644 --- a/app/(playground)/p/[agentId]/page.tsx +++ b/app/(playground)/p/[agentId]/page.tsx @@ -16,7 +16,6 @@ import type { GitHubTriggerEvent, } from "@/services/external/github/types"; import { reportAgentTimeUsage } from "@/services/usage-based-billing/report-agent-time-usage"; -import { putGraph } from "@giselles-ai/actions"; import { Playground } from "@giselles-ai/components/playground"; import { AgentNameProvider } from "@giselles-ai/contexts/agent-name"; import { DeveloperModeProvider } from "@giselles-ai/contexts/developer-mode"; @@ -28,6 +27,7 @@ import { PlaygroundModeProvider } from "@giselles-ai/contexts/playground-mode"; import { PropertiesPanelProvider } from "@giselles-ai/contexts/properties-panel"; import { ToastProvider } from "@giselles-ai/contexts/toast"; import { ToolbarContextProvider } from "@giselles-ai/contexts/toolbar"; +import { persistGraph } from "@giselles-ai/giselle-provider/graph/persist"; import { executeNode, executeStep, @@ -40,7 +40,6 @@ import { import { isLatestVersion, migrateGraph } from "@giselles-ai/lib/graph"; import { buildGraphExecutionPath, - buildGraphFolderPath, createGithubIntegrationSettingId, } from "@giselles-ai/lib/utils"; import type { @@ -55,7 +54,7 @@ import type { NodeId, StepId, } from "@giselles-ai/types"; -import { del, list, put } from "@vercel/blob"; +import { put } from "@vercel/blob"; import { ReactFlowProvider } from "@xyflow/react"; import { eq } from "drizzle-orm"; import { notFound } from "next/navigation"; @@ -89,70 +88,17 @@ export default async function Page({ let graph = await fetch(agent.graphUrl).then( (res) => res.json() as unknown as Graph, ); + let graphUrl = agent.graphUrl; + if (!isLatestVersion(graph)) { + graph = migrateGraph(graph); + graphUrl = await persistGraph({ graph, agentId }); + } const gitHubIntegrationState = await getGitHubIntegrationState(agent.dbId); - async function persistGraph(graph: Graph) { + async function persistGraphAction(graph: Graph) { "use server"; - const startTime = Date.now(); - const logger = createLogger("persistGraph"); - const { url } = await putGraph(graph); - const { blobList } = await withCountMeasurement( - logger, - async () => { - const result = await list({ - prefix: buildGraphFolderPath(graph.id), - mode: "folded", - }); - const size = result.blobs.reduce((sum, blob) => sum + blob.size, 0); - return { - blobList: result, - size, - }; - }, - ExternalServiceName.VercelBlob, - startTime, - VercelBlobOperation.List, - ); - - const oldBlobs = blobList.blobs - .filter((blob) => blob.url !== url) - .map((blob) => ({ - url: blob.url, - size: blob.size, - })); - - if (oldBlobs.length > 0) { - await withCountMeasurement( - logger, - async () => { - await del(oldBlobs.map((blob) => blob.url)); - const totalSize = oldBlobs.reduce((sum, blob) => sum + blob.size, 0); - return { - size: totalSize, - }; - }, - ExternalServiceName.VercelBlob, - startTime, - VercelBlobOperation.Del, - ); - waitForTelemetryExport(); - } - - await db - .update(agents) - .set({ - graphUrl: url, - }) - .where(eq(agents.id, agentId)); - - return url; - } - - let graphUrl = agent.graphUrl; - if (!isLatestVersion(graph)) { - graph = migrateGraph(graph); - graphUrl = await persistGraph(graph); + return persistGraph({ graph, agentId }); } async function updateAgentName(agentName: string) { @@ -180,6 +126,7 @@ export default async function Page({ stepId, artifacts, stream: true, + userId: user.id, }); } async function putExecutionAction(executionSnapshot: ExecutionSnapshot) { @@ -223,12 +170,19 @@ export default async function Page({ stepId, artifacts, stream: true, + userId: user.id, }); } async function executeNodeAction(executionId: ExecutionId, nodeId: NodeId) { "use server"; - return await executeNode({ agentId, executionId, nodeId, stream: true }); + return await executeNode({ + agentId, + executionId, + nodeId, + stream: true, + userId: user.id, + }); } async function onFinishPerformExecutionAction( @@ -351,7 +305,7 @@ export default async function Page({ { + const result = await put(buildGraphPath(graph.id), stringifiedGraph, { + access: "public", + }); + + return { + blob: result, + size: new TextEncoder().encode(stringifiedGraph).length, + }; + }, + ExternalServiceName.VercelBlob, + startTime, + VercelBlobOperation.Put, + ); + waitForTelemetryExport(); + return result.blob; +} + +export async function persistGraph({ + graph, + agentId, +}: { graph: Graph; agentId: AgentId }) { + const startTime = Date.now(); + const { url } = await putGraph(graph); + + await db + .update(agents) + .set({ + graphUrl: url, + }) + .where(eq(agents.id, agentId)); + + const logger = createLogger("persistGraph"); + const { blobList } = await withCountMeasurement( + logger, + async () => { + const result = await list({ + prefix: buildGraphFolderPath(graph.id), + mode: "folded", + }); + const size = result.blobs.reduce((sum, blob) => sum + blob.size, 0); + return { + blobList: result, + size, + }; + }, + ExternalServiceName.VercelBlob, + startTime, + VercelBlobOperation.List, + ); + + const oldBlobs = blobList.blobs + .filter((blob) => blob.url !== url) + .map((blob) => ({ + url: blob.url, + size: blob.size, + })); + + if (oldBlobs.length > 0) { + await withCountMeasurement( + logger, + async () => { + await del(oldBlobs.map((blob) => blob.url)); + const totalSize = oldBlobs.reduce((sum, blob) => sum + blob.size, 0); + return { + size: totalSize, + }; + }, + ExternalServiceName.VercelBlob, + startTime, + VercelBlobOperation.Del, + ); + waitForTelemetryExport(); + } + + return url; +} diff --git a/packages/lib/execution.ts b/packages/lib/execution.ts index 32bb8bd3..11a42ebc 100644 --- a/packages/lib/execution.ts +++ b/packages/lib/execution.ts @@ -23,7 +23,7 @@ import { createStreamableValue } from "ai/rsc"; import { MockLanguageModelV1, simulateReadableStream } from "ai/test"; import { and, eq } from "drizzle-orm"; import HandleBars from "handlebars"; -import Langfuse from "langfuse"; +import { after } from "next/server"; import * as v from "valibot"; import type { AgentId, @@ -44,7 +44,11 @@ import type { } from "../types"; import { AgentTimeNotAvailableError } from "./errors"; import { textGenerationPrompt } from "./prompts"; -import { langfuseModel, toErrorWithMessage } from "./utils"; +import { + langfuseModel, + toErrorWithMessage, + waitForLangfuseFlush, +} from "./utils"; function resolveLanguageModel( llm: TextGenerateActionContent["llm"], @@ -275,6 +279,7 @@ interface ExecutionContext { nodes: Node[]; connections: Connection[]; stream?: boolean; + userId?: string; } async function performFlowExecution( @@ -285,10 +290,6 @@ async function performFlowExecution( throw new AgentTimeNotAvailableError(); } const startTime = Date.now(); - const lf = new Langfuse(); - const trace = lf.trace({ - sessionId: context.executionId, - }); const node = context.node; switch (node.content.type) { @@ -310,20 +311,6 @@ async function performFlowExecution( const topP = node.content.topP; const temperature = node.content.temperature; - trace.update({ - input: prompt, - }); - - const generationTracer = trace.generation({ - name: "generate-text", - input: prompt, - model: langfuseModel(node.content.llm), - modelParameters: { - topP: node.content.topP, - temperature: node.content.temperature, - }, - }); - if (context.stream) { const streamableValue = createStreamableValue(); (async () => { @@ -335,6 +322,11 @@ async function performFlowExecution( ), topP, temperature, + experimental_telemetry: { + isEnabled: true, + functionId: "giselles-ai.lib.performFlowExecution", + metadata: parseExecutionContextToTelemetryMetadata(context), + }, }); for await (const partialObject of partialObjectStream) { @@ -353,21 +345,15 @@ async function performFlowExecution( await withTokenMeasurement( createLogger(node.content.type), async () => { - generationTracer.end({ output: result }); - trace.update({ output: result }); - await lf.shutdownAsync(); waitForTelemetryExport(); return { usage: await usage }; }, model, startTime, ); + after(waitForLangfuseFlush); streamableValue.done(); })().catch((error) => { - generationTracer.update({ - level: "ERROR", - statusMessage: toErrorWithMessage(error).message, - }); streamableValue.error(error); }); @@ -381,14 +367,16 @@ async function performFlowExecution( ), topP, temperature, + experimental_telemetry: { + isEnabled: true, + functionId: "giselles-ai.lib.performFlowExecution.generateObject", + metadata: parseExecutionContextToTelemetryMetadata(context), + }, }); waitUntil( withTokenMeasurement( createLogger(node.content.type), async () => { - generationTracer.end({ output: object }); - trace.update({ output: object }); - await lf.shutdownAsync(); waitForTelemetryExport(); return { usage }; }, @@ -396,6 +384,7 @@ async function performFlowExecution( startTime, ), ); + after(waitForLangfuseFlush); return { type: "text", title: object.title, @@ -422,6 +411,7 @@ interface ExecuteStepParams { stepId: StepId; artifacts: Artifact[]; stream?: boolean; + userId?: string; overrideData?: OverrideData[]; } export async function executeStep({ @@ -431,6 +421,7 @@ export async function executeStep({ stepId, artifacts, stream, + userId, overrideData, }: ExecuteStepParams) { const agent = await db.query.agents.findFirst({ @@ -501,6 +492,7 @@ export async function executeStep({ artifacts, nodes: graph.nodes, connections: graph.connections, + userId, stream, }; @@ -514,6 +506,7 @@ interface RetryStepParams { stepId: StepId; artifacts: Artifact[]; stream?: boolean; + userId?: string; } export async function retryStep({ agentId, @@ -522,6 +515,7 @@ export async function retryStep({ stepId, artifacts, stream, + userId, }: RetryStepParams) { const executionSnapshot = await fetch(retryExecutionSnapshotUrl).then( (res) => res.json() as unknown as ExecutionSnapshot, @@ -548,6 +542,7 @@ export async function retryStep({ nodes: executionSnapshot.nodes, connections: executionSnapshot.connections, stream, + userId, }; return performFlowExecution(context); @@ -558,12 +553,14 @@ interface ExecuteNodeParams { executionId: ExecutionId; nodeId: NodeId; stream?: boolean; + userId?: string; } export async function executeNode({ agentId, executionId, nodeId, stream, + userId, }: ExecuteNodeParams) { const agent = await db.query.agents.findFirst({ where: (agents, { eq }) => eq(agents.id, agentId), @@ -590,6 +587,7 @@ export async function executeNode({ nodes: graph.nodes, connections: graph.connections, stream, + userId, }; return performFlowExecution(context); @@ -623,3 +621,13 @@ async function canPerformFlowExecution(agentId: AgentId) { const team = res[0]; return await isAgentTimeAvailable(team); } + +function parseExecutionContextToTelemetryMetadata( + executionContext: ExecutionContext, +): Record { + return { + sessionId: executionContext.executionId, + agentId: executionContext.agentId, + ...(executionContext.userId && { userId: executionContext.userId }), + }; +} diff --git a/packages/lib/utils.ts b/packages/lib/utils.ts index 49e88c2c..dc6d8b21 100644 --- a/packages/lib/utils.ts +++ b/packages/lib/utils.ts @@ -410,3 +410,14 @@ export function isStreamableValue(value: unknown): value is StreamableValue { value.type === STREAMABLE_VALUE_TYPE ); } + +const otelBspScheduleDelay = Number.parseInt( + process.env.OTEL_BSP_SCHEDULE_DELAY ?? "5000", +); +const langfuseFlushInterval = Number.parseInt( + process.env.LANGFUSE_FLUSH_INTERVAL ?? "1000", +); +export const waitForLangfuseFlush = () => + new Promise((resolve) => + setTimeout(resolve, otelBspScheduleDelay + langfuseFlushInterval), + );