diff --git a/packages/auth-server/src/app/api/cloud-dev/create/route.ts b/packages/auth-server/src/app/api/cloud-dev/create/route.ts index 1a8904e..58ae128 100644 --- a/packages/auth-server/src/app/api/cloud-dev/create/route.ts +++ b/packages/auth-server/src/app/api/cloud-dev/create/route.ts @@ -189,13 +189,34 @@ export async function POST(req: NextRequest) { throw new Error(`IPv4 allocation failed — machine would be unreachable`); } - // 4. Set secrets (env vars) — staged so they apply when machine starts - const secrets = { + // 4. Insert into dev_machines + dev_machine_members (before secrets so we can include WORKSPACE_ID) + const insertResult = await db.query( + `INSERT INTO dev_machines (app_name, fly_app_name, app_url, created_by, ssh_private_key) + VALUES ($1, $2, $3, $4, $5) + RETURNING id`, + [appName, flyAppName, appUrl, userId, sshKeypair.privateKey], + ); + + const machineDbId = insertResult.rows[0].id as number; + + await db.query( + `INSERT INTO dev_machine_members (machine_id, user_id, role, linux_user) + VALUES ($1, $2, 'owner', $3)`, + [machineDbId, userId, linuxUser], + ); + + // 5. Set secrets (env vars) — staged so they apply when machine starts + const secrets: Record = { ...envVars, APP_NAME: appName, DEV_USER: linuxUser, SSH_PUBLIC_KEY: sshKeypair.publicKey, + WORKSPACE_ID: String(machineDbId), }; + // Inject auth-server's public URL so the cloud machine calls back to us + if (process.env.PUBLIC_URL) { + secrets.OPFLOW_SERVER_URL = process.env.PUBLIC_URL; + } const secretsFile = join(tmpdir(), `secrets-${flyAppName}.env`); const secretsContent = Object.entries(secrets) .map(([k, v]) => `${k}=${v}`) @@ -219,7 +240,7 @@ export async function POST(req: NextRequest) { try { unlinkSync(secretsFile); } catch { /* ignore */ } } - // 5. Create machine via Fly Machines API + // 6. Create machine via Fly Machines API const machineConfig: CreateMachineConfig = { image: CLOUD_DEV_IMAGE, services: [ @@ -261,22 +282,6 @@ export async function POST(req: NextRequest) { `[cloud-dev/create] Machine created: ${machine.id} for ${flyAppName}`, ); - // 6. Insert into dev_machines + dev_machine_members - const insertResult = await db.query( - `INSERT INTO dev_machines (app_name, fly_app_name, app_url, created_by, ssh_private_key) - VALUES ($1, $2, $3, $4, $5) - RETURNING id`, - [appName, flyAppName, appUrl, userId, sshKeypair.privateKey], - ); - - const machineDbId = insertResult.rows[0].id as number; - - await db.query( - `INSERT INTO dev_machine_members (machine_id, user_id, role, linux_user) - VALUES ($1, $2, 'owner', $3)`, - [machineDbId, userId, linuxUser], - ); - return NextResponse.json({ data: { appUrl, flyAppName, sshPrivateKey: sshKeypair.privateKey }, }); diff --git a/packages/auth-server/src/app/api/connections/credentials/route.ts b/packages/auth-server/src/app/api/connections/credentials/route.ts new file mode 100644 index 0000000..80fa0f0 --- /dev/null +++ b/packages/auth-server/src/app/api/connections/credentials/route.ts @@ -0,0 +1,87 @@ +import { type NextRequest, NextResponse } from "next/server"; +import { authenticateRequest } from "@/lib/auth"; +import { getNango } from "@/lib/nango"; +import { verifyWorkspaceMembership } from "@/lib/workspace"; + +/** + * GET /api/connections/credentials?integration_id=X&connection_id=Y + * Fetch actual credentials from Nango for a connection. + * Authorization is based on workspace membership via connection tags. + * Returns { token, connectionConfig, raw }. + */ +export async function GET(req: NextRequest) { + const auth = await authenticateRequest(req); + if (auth instanceof NextResponse) return auth; + + const integrationId = req.nextUrl.searchParams.get("integration_id"); + const connectionId = req.nextUrl.searchParams.get("connection_id"); + + if (!integrationId) { + return NextResponse.json( + { error: "integration_id query parameter is required" }, + { status: 400 }, + ); + } + + if (!connectionId) { + return NextResponse.json( + { error: "connection_id query parameter is required" }, + { status: 400 }, + ); + } + + try { + const nango = getNango(); + const connection = await nango.getConnection(integrationId, connectionId); + + // Extract workspace-id from connection tags and verify membership + const connAny = connection as unknown as { + tags?: Record; + }; + const workspaceId = connAny.tags?.["workspace-id"]; + + if (!workspaceId) { + return NextResponse.json( + { error: "Connection not found" }, + { status: 404 }, + ); + } + + const membership = await verifyWorkspaceMembership( + auth.userId, + workspaceId, + ); + if (!membership) { + return NextResponse.json( + { error: "Connection not found" }, + { status: 404 }, + ); + } + + const creds = (connection.credentials ?? {}) as Record; + const token = + (creds.access_token ?? + creds.api_key ?? + creds.apiKey ?? + creds.token ?? + "") as string; + + return NextResponse.json({ + data: { + token, + connectionConfig: connection.connection_config ?? {}, + raw: creds, + }, + }); + } catch (err) { + return NextResponse.json( + { + error: + err instanceof Error + ? err.message + : "Failed to fetch credentials", + }, + { status: 500 }, + ); + } +} diff --git a/packages/auth-server/src/app/api/credentials/[integrationId]/route.ts b/packages/auth-server/src/app/api/credentials/[integrationId]/route.ts deleted file mode 100644 index b133cdc..0000000 --- a/packages/auth-server/src/app/api/credentials/[integrationId]/route.ts +++ /dev/null @@ -1,62 +0,0 @@ -import { type NextRequest, NextResponse } from "next/server"; -import { authenticateRequest } from "@/lib/auth"; -import { getNango } from "@/lib/nango"; - -/** - * GET /api/credentials/{integrationId}?connection_id=X - * Core proxy: fetch actual credentials from Nango for a connection. - * Returns { token, connectionConfig, raw }. - */ -export async function GET( - req: NextRequest, - { params }: { params: Promise<{ integrationId: string }> }, -) { - const auth = await authenticateRequest(req); - if (auth instanceof NextResponse) return auth; - - const { integrationId } = await params; - const connectionId = new URL(req.url).searchParams.get("connection_id"); - - if (!connectionId) { - return NextResponse.json( - { error: "connection_id query parameter is required" }, - { status: 400 }, - ); - } - - try { - const nango = getNango(); - const connection = await nango.getConnection(integrationId, connectionId); - - // Verify the connection belongs to the requesting user - const connAny = connection as unknown as { end_user?: { id: string } | null }; - const endUserId = connAny.end_user?.id; - if (endUserId && endUserId !== auth.userId) { - return NextResponse.json( - { error: "Connection not found" }, - { status: 404 }, - ); - } - - const creds = (connection.credentials ?? {}) as Record; - const token = - (creds.access_token ?? - creds.api_key ?? - creds.apiKey ?? - creds.token ?? - "") as string; - - return NextResponse.json({ - data: { - token, - connectionConfig: connection.connection_config ?? {}, - raw: creds, - }, - }); - } catch (err) { - return NextResponse.json( - { error: err instanceof Error ? err.message : "Failed to fetch credentials" }, - { status: 500 }, - ); - } -} diff --git a/packages/auth-server/src/app/api/integrations/[id]/connections/route.ts b/packages/auth-server/src/app/api/integrations/[id]/connections/route.ts deleted file mode 100644 index 5d7e152..0000000 --- a/packages/auth-server/src/app/api/integrations/[id]/connections/route.ts +++ /dev/null @@ -1,35 +0,0 @@ -import { type NextRequest, NextResponse } from "next/server"; -import { authenticateRequest } from "@/lib/auth"; -import { getNango } from "@/lib/nango"; - -/** - * GET /api/integrations/{id}/connections - * List Nango connections for an integration. Requires Bearer token. - */ -export async function GET( - req: NextRequest, - { params }: { params: Promise<{ id: string }> }, -) { - const auth = await authenticateRequest(req); - if (auth instanceof NextResponse) return auth; - - const { id: integrationId } = await params; - - try { - const nango = getNango(); - const result = await nango.listConnections(); - // Filter by integration AND by end_user to enforce tenant isolation - const filtered = (result.connections ?? []).filter( - (c: { provider_config_key: string; end_user?: { id: string } | null }) => - c.provider_config_key === integrationId && - c.end_user?.id === auth.userId, - ); - - return NextResponse.json({ data: filtered }); - } catch (err) { - return NextResponse.json( - { error: err instanceof Error ? err.message : "Failed to list connections" }, - { status: 500 }, - ); - } -} diff --git a/packages/auth-server/src/app/api/nango/connect-session/route.ts b/packages/auth-server/src/app/api/nango/connect-session/route.ts deleted file mode 100644 index 9b3088d..0000000 --- a/packages/auth-server/src/app/api/nango/connect-session/route.ts +++ /dev/null @@ -1,40 +0,0 @@ -import { type NextRequest, NextResponse } from "next/server"; -import { authenticateRequest } from "@/lib/auth"; -import { getNango } from "@/lib/nango"; - -/** - * POST /api/nango/connect-session - * Create a Nango Connect session for OAuth setup. - * Body: { integration_id: string } - * Returns: { token: string } - */ -export async function POST(req: NextRequest) { - const auth = await authenticateRequest(req); - if (auth instanceof NextResponse) return auth; - - const body = (await req.json()) as { integration_id?: string }; - - if (!body.integration_id) { - return NextResponse.json( - { error: "integration_id is required" }, - { status: 400 }, - ); - } - - try { - const nango = getNango(); - const session = await nango.createConnectSession({ - end_user: { id: auth.userId }, - allowed_integrations: [body.integration_id], - }); - - return NextResponse.json({ - data: { token: session.data.token }, - }); - } catch (err) { - return NextResponse.json( - { error: err instanceof Error ? err.message : "Failed to create connect session" }, - { status: 500 }, - ); - } -} diff --git a/packages/auth-server/src/app/api/workspaces/[workspaceId]/connections/route.ts b/packages/auth-server/src/app/api/workspaces/[workspaceId]/connections/route.ts new file mode 100644 index 0000000..abb7153 --- /dev/null +++ b/packages/auth-server/src/app/api/workspaces/[workspaceId]/connections/route.ts @@ -0,0 +1,96 @@ +import { type NextRequest, NextResponse } from "next/server"; +import { authenticateRequest } from "@/lib/auth"; +import { getNango, listConnectionsByTags } from "@/lib/nango"; +import { verifyWorkspaceMembership } from "@/lib/workspace"; + +type RouteContext = { params: Promise<{ workspaceId: string }> }; + +/** + * GET /api/workspaces/{workspaceId}/connections + * List all connections for a workspace across all integrations. + */ +export async function GET(req: NextRequest, { params }: RouteContext) { + const auth = await authenticateRequest(req); + if (auth instanceof NextResponse) return auth; + + const { workspaceId } = await params; + + const membership = await verifyWorkspaceMembership(auth.userId, workspaceId); + if (!membership) { + return NextResponse.json( + { error: "Not a member of this workspace" }, + { status: 403 }, + ); + } + + try { + const connections = await listConnectionsByTags({ + "workspace-id": workspaceId, + }); + + return NextResponse.json({ data: connections }); + } catch (err) { + return NextResponse.json( + { + error: + err instanceof Error ? err.message : "Failed to list connections", + }, + { status: 500 }, + ); + } +} + +/** + * POST /api/workspaces/{workspaceId}/connections + * Create a new connection via Nango Connect OAuth session. + * Body: { integration_id: string } + * Returns: { token: string } + */ +export async function POST(req: NextRequest, { params }: RouteContext) { + const auth = await authenticateRequest(req); + if (auth instanceof NextResponse) return auth; + + const { workspaceId } = await params; + + const membership = await verifyWorkspaceMembership(auth.userId, workspaceId); + if (!membership) { + return NextResponse.json( + { error: "Not a member of this workspace" }, + { status: 403 }, + ); + } + + const body = (await req.json()) as { integration_id?: string }; + + if (!body.integration_id) { + return NextResponse.json( + { error: "integration_id is required" }, + { status: 400 }, + ); + } + + try { + const nango = getNango(); + const session = await nango.createConnectSession({ + tags: { + "user-id": auth.userId, + "workspace-id": workspaceId, + }, + allowed_integrations: [body.integration_id], + }); + + return NextResponse.json({ + data: { token: session.data.token }, + }); + } catch (err) { + return NextResponse.json( + { + error: + err instanceof Error + ? err.message + : "Failed to create connect session", + }, + { status: 500 }, + ); + } +} diff --git a/packages/auth-server/src/lib/nango.ts b/packages/auth-server/src/lib/nango.ts index e4e6497..3c2271f 100644 --- a/packages/auth-server/src/lib/nango.ts +++ b/packages/auth-server/src/lib/nango.ts @@ -12,3 +12,67 @@ export function getNango(): Nango { } return nangoInstance; } + +/** + * List connections from Nango filtered by tags. + * + * The SDK's listConnections() doesn't support tag filtering, + * so we call the Nango REST API directly: GET /connection?tags[key]=value + */ +export async function listConnectionsByTags( + tags: Record, + integrationId?: string, +): Promise< + Array<{ + id: number; + connection_id: string; + provider_config_key: string; + provider: string; + created: string; + end_user: { id: string; display_name: string | null; email: string | null } | null; + tags: Record; + metadata: Record | null; + }> +> { + const secretKey = process.env.NANGO_SECRET_KEY; + if (!secretKey) { + throw new Error("NANGO_SECRET_KEY environment variable is required"); + } + + const nangoHost = process.env.NANGO_HOST || "https://api.nango.dev"; + const url = new URL(`${nangoHost}/connection`); + + for (const [key, value] of Object.entries(tags)) { + url.searchParams.append(`tags[${key}]`, value); + } + + if (integrationId) { + url.searchParams.append("integrationId", integrationId); + } + + const response = await fetch(url.toString(), { + headers: { + Authorization: `Bearer ${secretKey}`, + }, + }); + + if (!response.ok) { + throw new Error( + `Nango API error: ${response.status} ${response.statusText}`, + ); + } + + const data = (await response.json()) as { + connections: Array<{ + id: number; + connection_id: string; + provider_config_key: string; + provider: string; + created: string; + end_user: { id: string; display_name: string | null; email: string | null } | null; + tags: Record; + metadata: Record | null; + }>; + }; + return data.connections ?? []; +} diff --git a/packages/auth-server/src/lib/workspace.ts b/packages/auth-server/src/lib/workspace.ts new file mode 100644 index 0000000..476431a --- /dev/null +++ b/packages/auth-server/src/lib/workspace.ts @@ -0,0 +1,31 @@ +import { getPool } from "./db"; + +export interface WorkspaceMembership { + machineId: number; + role: string; +} + +/** + * Verify that a user is a member of the given workspace. + * Returns the membership record if valid, null otherwise. + * + * "workspace_id" maps to dev_machines.id (the numeric PK). + */ +export async function verifyWorkspaceMembership( + userId: string, + workspaceId: string, +): Promise { + const db = await getPool(); + const result = await db.query( + `SELECT dmm.machine_id, dmm.role + FROM dev_machine_members dmm + WHERE dmm.user_id = $1 AND dmm.machine_id = $2`, + [userId, workspaceId], + ); + + if (result.rows.length === 0) return null; + return { + machineId: result.rows[0].machine_id as number, + role: result.rows[0].role as string, + }; +} diff --git a/packages/core/src/connections/cloud-integration-provider.ts b/packages/core/src/connections/cloud-integration-provider.ts index 4dedf4f..64927e1 100644 --- a/packages/core/src/connections/cloud-integration-provider.ts +++ b/packages/core/src/connections/cloud-integration-provider.ts @@ -14,7 +14,7 @@ export class CloudIntegrationProvider implements IntegrationProvider { ): Promise { const data = (await apiCall( "GET", - `/api/credentials/${encodeURIComponent(integrationId)}?connection_id=${encodeURIComponent(connectionId)}`, + `/api/connections/credentials?integration_id=${encodeURIComponent(integrationId)}&connection_id=${encodeURIComponent(connectionId)}`, )) as ConnectionCredentials; return data; @@ -30,11 +30,21 @@ export class CloudIntegrationProvider implements IntegrationProvider { async listConnections( integrationId: string, + workspaceId?: string, ): Promise> { - const data = (await apiCall( + if (!workspaceId) { + return []; + } + + const allConnections = (await apiCall( "GET", - `/api/integrations/${encodeURIComponent(integrationId)}/connections`, + `/api/workspaces/${encodeURIComponent(workspaceId)}/connections`, )) as Array<{ connection_id: string; provider_config_key: string }>; + + const data = allConnections.filter( + (c) => c.provider_config_key === integrationId, + ); + return Promise.all( data.map(async (c) => { let displayName = c.connection_id; @@ -55,10 +65,16 @@ export class CloudIntegrationProvider implements IntegrationProvider { async createConnectSession( integrationId: string, + workspaceId?: string, ): Promise<{ token: string }> { - const data = (await apiCall("POST", "/api/nango/connect-session", { - integration_id: integrationId, - })) as { token: string }; + if (!workspaceId) { + throw new Error("workspace_id is required for cloud mode"); + } + const data = (await apiCall( + "POST", + `/api/workspaces/${encodeURIComponent(workspaceId)}/connections`, + { integration_id: integrationId }, + )) as { token: string }; return data; } } diff --git a/packages/core/src/connections/integration-provider.ts b/packages/core/src/connections/integration-provider.ts index f28d665..fdb2d6e 100644 --- a/packages/core/src/connections/integration-provider.ts +++ b/packages/core/src/connections/integration-provider.ts @@ -21,15 +21,16 @@ export interface IntegrationProvider { Array<{ id: string; provider: string }> >; - /** List connections for an integration */ + /** List connections for an integration, optionally scoped to a workspace */ listConnections( integrationId: string, + workspaceId?: string, ): Promise>; - /** Create a Connect session for OAuth setup */ + /** Create a Connect session for OAuth setup, optionally scoped to a workspace */ createConnectSession( integrationId: string, - endUserId?: string, + workspaceId?: string, ): Promise<{ token: string }>; } diff --git a/packages/core/src/connections/local-integration-provider.ts b/packages/core/src/connections/local-integration-provider.ts index 6a8ea12..c1cde9d 100644 --- a/packages/core/src/connections/local-integration-provider.ts +++ b/packages/core/src/connections/local-integration-provider.ts @@ -74,12 +74,23 @@ export class LocalIntegrationProvider implements IntegrationProvider { async createConnectSession( integrationId: string, - endUserId?: string, + workspaceId?: string, ): Promise<{ token: string }> { - const session = await this.nango.createConnectSession({ - end_user: { id: endUserId ?? "dev-ui-user" }, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const sessionProps: any = { allowed_integrations: [integrationId], - }); + }; + + if (workspaceId) { + sessionProps.tags = { + "user-id": "local-dev", + "workspace-id": workspaceId, + }; + } else { + sessionProps.end_user = { id: "dev-ui-user" }; + } + + const session = await this.nango.createConnectSession(sessionProps); return { token: session.data.token }; } } diff --git a/packages/core/src/dev-ui/api.ts b/packages/core/src/dev-ui/api.ts index 4bd2ee4..71616c4 100644 --- a/packages/core/src/dev-ui/api.ts +++ b/packages/core/src/dev-ui/api.ts @@ -21,6 +21,8 @@ export interface ApiContext { appSchema: string; /** Project root directory for CLI subprocess execution */ projectRoot: string; + /** Workspace ID for cloud mode (from WORKSPACE_ID env var) */ + workspaceId?: string; } function jsonResponse(res: ServerResponse, status: number, data: unknown): void { @@ -138,7 +140,7 @@ export async function handleApiRequest( if (nangoConnectionsMatch && method === "GET") { const integrationId = decodeURIComponent(nangoConnectionsMatch[1]); try { - const connections = await ctx.integrationProvider.listConnections(integrationId); + const connections = await ctx.integrationProvider.listConnections(integrationId, ctx.workspaceId); jsonResponse(res, 200, connections); } catch (err) { jsonResponse(res, 500, { @@ -158,7 +160,7 @@ export async function handleApiRequest( return true; } try { - const session = await ctx.integrationProvider.createConnectSession(body.integration_id); + const session = await ctx.integrationProvider.createConnectSession(body.integration_id, ctx.workspaceId); jsonResponse(res, 200, session); } catch (err) { jsonResponse(res, 500, { diff --git a/packages/core/src/dev-ui/dev-server.ts b/packages/core/src/dev-ui/dev-server.ts index 084ae65..22dfabe 100644 --- a/packages/core/src/dev-ui/dev-server.ts +++ b/packages/core/src/dev-ui/dev-server.ts @@ -125,6 +125,7 @@ export async function startDevServer(options: DevServerOptions) { schema: dbosSchema, appSchema, projectRoot, + workspaceId: process.env.WORKSPACE_ID, }); if (handled) return; } catch (err) { diff --git a/scripts/dev-auth.sh b/scripts/dev-auth.sh new file mode 100755 index 0000000..4b5ab29 --- /dev/null +++ b/scripts/dev-auth.sh @@ -0,0 +1,50 @@ +#!/bin/bash +set -e + +# Dev script: starts ngrok + local auth-server. +# The ngrok URL is injected as PUBLIC_URL so the create route +# sets OPFLOW_SERVER_URL on new Fly machines. +# +# Usage: ./scripts/dev-auth.sh +# +# Prerequisites: +# - ngrok installed (brew install ngrok) +# - docker compose up -d in packages/auth-server/ +# - .env.local configured in packages/auth-server/ + +NGROK_PID="" +cleanup() { + echo "" + echo "Shutting down..." + if [ -n "$NGROK_PID" ]; then + kill "$NGROK_PID" 2>/dev/null || true + fi +} +trap cleanup EXIT INT TERM + +# 1. Start ngrok in background +echo "Starting ngrok..." +ngrok http 3000 --log=stdout > /dev/null 2>&1 & +NGROK_PID=$! +sleep 2 + +# 2. Extract public URL from ngrok local API +PUBLIC_URL=$(curl -s http://127.0.0.1:4040/api/tunnels | python3 -c "import sys,json; tunnels=json.load(sys.stdin)['tunnels']; print(next(t['public_url'] for t in tunnels if t['public_url'].startswith('https')))" 2>/dev/null) + +if [ -z "$PUBLIC_URL" ]; then + echo "ERROR: Failed to get ngrok URL. Is ngrok running?" + exit 1 +fi + +echo "" +echo "===================================" +echo " ngrok URL: $PUBLIC_URL" +echo "===================================" +echo "" +echo "To create a cloud workspace against this local server:" +echo " OPFLOW_SERVER_URL=$PUBLIC_URL pnpm --filter 0pflow exec 0pflow cloud run" +echo "" + +# 3. Start auth-server with PUBLIC_URL injected +cd "$(dirname "$0")/../packages/auth-server" +PUBLIC_URL="$PUBLIC_URL" pnpm dev