Skip to content

Commit 86c4b28

Browse files
committed
fix(spritz): harden slack recovery polling
1 parent 86f2199 commit 86c4b28

File tree

2 files changed

+287
-4
lines changed

2 files changed

+287
-4
lines changed

integrations/slack-gateway/gateway_test.go

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3147,6 +3147,258 @@ func TestProcessMessageEventPostsWakeUpDuringSlowPromptExecution(t *testing.T) {
31473147
}
31483148
}
31493149

3150+
func TestProcessMessageEventKeepsRecoveringAfterTransientExchangeError(t *testing.T) {
3151+
var slackPayloads struct {
3152+
sync.Mutex
3153+
items []map[string]any
3154+
}
3155+
slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3156+
if r.URL.Path != "/chat.postMessage" {
3157+
t.Fatalf("unexpected slack path %s", r.URL.Path)
3158+
}
3159+
var payload map[string]any
3160+
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
3161+
t.Fatalf("decode slack post body: %v", err)
3162+
}
3163+
slackPayloads.Lock()
3164+
slackPayloads.items = append(slackPayloads.items, payload)
3165+
slackPayloads.Unlock()
3166+
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "ts": fmt.Sprintf("1711387375.00010%d", len(slackPayloads.items))})
3167+
}))
3168+
defer slackAPI.Close()
3169+
3170+
var sessionExchangeCalls atomic.Int32
3171+
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3172+
if r.URL.Path != "/internal/v1/spritz/channel-sessions/exchange" {
3173+
t.Fatalf("unexpected backend path %s", r.URL.Path)
3174+
}
3175+
call := sessionExchangeCalls.Add(1)
3176+
switch call {
3177+
case 1:
3178+
writeJSON(w, http.StatusServiceUnavailable, map[string]any{
3179+
"status": "unavailable",
3180+
"providerAuth": map[string]any{
3181+
"providerInstallRef": "cred_slack_workspace_1",
3182+
"apiAppId": "A_app_1",
3183+
"teamId": "T_workspace_1",
3184+
"botUserId": "U_bot",
3185+
"botAccessToken": "xoxb-installed",
3186+
},
3187+
})
3188+
case 2:
3189+
http.Error(w, "backend unavailable", http.StatusInternalServerError)
3190+
default:
3191+
writeJSON(w, http.StatusOK, map[string]any{
3192+
"status": "resolved",
3193+
"session": map[string]any{
3194+
"accessToken": "owner-token",
3195+
"ownerAuthId": "owner-123",
3196+
"namespace": "spritz-staging",
3197+
"instanceId": "zeno-acme",
3198+
"providerAuth": map[string]any{
3199+
"providerInstallRef": "cred_slack_workspace_1",
3200+
"apiAppId": "A_app_1",
3201+
"teamId": "T_workspace_1",
3202+
"botUserId": "U_bot",
3203+
"botAccessToken": "xoxb-installed",
3204+
},
3205+
},
3206+
})
3207+
}
3208+
}))
3209+
defer backend.Close()
3210+
3211+
upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
3212+
spritz := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3213+
switch r.URL.Path {
3214+
case "/api/channel-conversations/upsert":
3215+
writeJSON(w, http.StatusCreated, map[string]any{
3216+
"status": "success",
3217+
"data": map[string]any{
3218+
"created": true,
3219+
"conversation": map[string]any{
3220+
"metadata": map[string]any{"name": "conv-1"},
3221+
"spec": map[string]any{"cwd": "/home/dev"},
3222+
},
3223+
},
3224+
})
3225+
case "/api/acp/conversations/conv-1/bootstrap":
3226+
writeJSON(w, http.StatusOK, map[string]any{
3227+
"status": "success",
3228+
"data": map[string]any{
3229+
"effectiveSessionId": "session-1",
3230+
"conversation": map[string]any{
3231+
"metadata": map[string]any{"name": "conv-1"},
3232+
"spec": map[string]any{"sessionId": "session-1", "cwd": "/home/dev"},
3233+
},
3234+
},
3235+
})
3236+
case "/api/acp/conversations/conv-1/connect":
3237+
conn, err := upgrader.Upgrade(w, r, nil)
3238+
if err != nil {
3239+
t.Fatalf("upgrade failed: %v", err)
3240+
}
3241+
defer conn.Close()
3242+
for {
3243+
_, payload, err := conn.ReadMessage()
3244+
if err != nil {
3245+
return
3246+
}
3247+
var message map[string]any
3248+
if err := json.Unmarshal(payload, &message); err != nil {
3249+
t.Fatalf("decode ws payload: %v", err)
3250+
}
3251+
switch message["method"] {
3252+
case "initialize":
3253+
_ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{"protocolVersion": 1}})
3254+
case "session/load":
3255+
_ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}})
3256+
case "session/prompt":
3257+
_ = conn.WriteJSON(map[string]any{
3258+
"jsonrpc": "2.0",
3259+
"method": "session/update",
3260+
"params": map[string]any{
3261+
"update": map[string]any{
3262+
"sessionUpdate": "agent_message_chunk",
3263+
"content": []map[string]any{{
3264+
"type": "text",
3265+
"text": "Hello after transient exchange failure",
3266+
}},
3267+
},
3268+
},
3269+
})
3270+
_ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}})
3271+
return
3272+
default:
3273+
t.Fatalf("unexpected ACP method %#v", message["method"])
3274+
}
3275+
}
3276+
default:
3277+
t.Fatalf("unexpected spritz path %s", r.URL.Path)
3278+
}
3279+
}))
3280+
defer spritz.Close()
3281+
3282+
cfg := config{
3283+
SlackAPIBaseURL: slackAPI.URL,
3284+
BackendBaseURL: backend.URL,
3285+
BackendInternalToken: "backend-internal-token",
3286+
SpritzBaseURL: spritz.URL,
3287+
SpritzServiceToken: "spritz-service-token",
3288+
PrincipalID: "shared-slack-gateway",
3289+
HTTPTimeout: 200 * time.Millisecond,
3290+
DedupeTTL: time.Minute,
3291+
StatusMessageDelay: 5 * time.Millisecond,
3292+
SessionRetryInterval: 10 * time.Millisecond,
3293+
ProcessingTimeout: 250 * time.Millisecond,
3294+
}
3295+
gateway := newSlackGateway(cfg, slog.New(slog.NewTextHandler(io.Discard, nil)))
3296+
3297+
envelope := slackEnvelope{
3298+
APIAppID: "A_app_1",
3299+
TeamID: "T_workspace_1",
3300+
Event: slackEventInner{
3301+
Type: "app_mention",
3302+
User: "U_user",
3303+
Text: "<@U_bot> hello",
3304+
Channel: "C_channel_1",
3305+
ChannelType: "channel",
3306+
TS: "1711387375.000100",
3307+
},
3308+
}
3309+
delivery, process, err := gateway.beginMessageEventDelivery(envelope)
3310+
if err != nil {
3311+
t.Fatalf("beginMessageEventDelivery returned error: %v", err)
3312+
}
3313+
if !process {
3314+
t.Fatal("expected app mention to be processed")
3315+
}
3316+
3317+
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
3318+
defer cancel()
3319+
if err := gateway.processMessageEventWithDelivery(ctx, envelope, delivery); err != nil {
3320+
t.Fatalf("expected recovery after transient exchange error to succeed, got %v", err)
3321+
}
3322+
3323+
slackPayloads.Lock()
3324+
defer slackPayloads.Unlock()
3325+
if len(slackPayloads.items) != 2 {
3326+
t.Fatalf("expected wake-up status and final reply, got %#v", slackPayloads.items)
3327+
}
3328+
if got := slackPayloads.items[0]["text"]; got != slackRecoveryStatusText {
3329+
t.Fatalf("expected wake-up status text, got %#v", got)
3330+
}
3331+
if got := slackPayloads.items[1]["text"]; got != "Hello after transient exchange failure" {
3332+
t.Fatalf("expected final reply text, got %#v", got)
3333+
}
3334+
if sessionExchangeCalls.Load() != 3 {
3335+
t.Fatalf("expected recovery polling to continue through transient errors, got %d exchange attempts", sessionExchangeCalls.Load())
3336+
}
3337+
}
3338+
3339+
func TestProcessMessageEventIgnoresMentionOnlyBeforeRecoveryStarts(t *testing.T) {
3340+
var slackPostCalls atomic.Int32
3341+
slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3342+
slackPostCalls.Add(1)
3343+
t.Fatalf("did not expect slack post for mention-only event")
3344+
}))
3345+
defer slackAPI.Close()
3346+
3347+
var sessionExchangeCalls atomic.Int32
3348+
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3349+
sessionExchangeCalls.Add(1)
3350+
t.Fatalf("did not expect session exchange for mention-only event")
3351+
}))
3352+
defer backend.Close()
3353+
3354+
cfg := config{
3355+
SlackAPIBaseURL: slackAPI.URL,
3356+
BackendBaseURL: backend.URL,
3357+
BackendInternalToken: "backend-internal-token",
3358+
SpritzBaseURL: "https://spritz.example.test",
3359+
SpritzServiceToken: "spritz-service-token",
3360+
PrincipalID: "shared-slack-gateway",
3361+
HTTPTimeout: 200 * time.Millisecond,
3362+
DedupeTTL: time.Minute,
3363+
StatusMessageDelay: 5 * time.Millisecond,
3364+
SessionRetryInterval: 10 * time.Millisecond,
3365+
ProcessingTimeout: 200 * time.Millisecond,
3366+
}
3367+
gateway := newSlackGateway(cfg, slog.New(slog.NewTextHandler(io.Discard, nil)))
3368+
3369+
envelope := slackEnvelope{
3370+
APIAppID: "A_app_1",
3371+
TeamID: "T_workspace_1",
3372+
Event: slackEventInner{
3373+
Type: "app_mention",
3374+
User: "U_user",
3375+
Text: "<@U_bot>",
3376+
Channel: "C_channel_1",
3377+
ChannelType: "channel",
3378+
TS: "1711387375.000100",
3379+
},
3380+
}
3381+
delivery, process, err := gateway.beginMessageEventDelivery(envelope)
3382+
if err != nil {
3383+
t.Fatalf("beginMessageEventDelivery returned error: %v", err)
3384+
}
3385+
if !process {
3386+
t.Fatal("expected app mention to be processed")
3387+
}
3388+
3389+
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
3390+
defer cancel()
3391+
if err := gateway.processMessageEventWithDelivery(ctx, envelope, delivery); err != nil {
3392+
t.Fatalf("expected mention-only app mention to be ignored cleanly, got %v", err)
3393+
}
3394+
if sessionExchangeCalls.Load() != 0 {
3395+
t.Fatalf("expected no session exchange attempts, got %d", sessionExchangeCalls.Load())
3396+
}
3397+
if slackPostCalls.Load() != 0 {
3398+
t.Fatalf("expected no slack posts, got %d", slackPostCalls.Load())
3399+
}
3400+
}
3401+
31503402
func TestProcessMessageEventPostsTerminalErrorAfterRecoveryTimeout(t *testing.T) {
31513403
var slackPayloads struct {
31523404
sync.Mutex

integrations/slack-gateway/slack_events.go

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"io"
1212
"net/http"
13+
"regexp"
1314
"strconv"
1415
"strings"
1516
"sync"
@@ -21,6 +22,8 @@ const (
2122
slackRecoveryFailureText = "I could not recover the channel runtime. Please try again."
2223
)
2324

25+
var slackMentionTokenPattern = regexp.MustCompile(`<@[^>]+>`)
26+
2427
type slackEnvelope struct {
2528
Type string `json:"type"`
2629
Challenge string `json:"challenge,omitempty"`
@@ -80,6 +83,15 @@ func (state *channelSessionRecoveryState) remainingStatusDelay(delay time.Durati
8083
return remaining
8184
}
8285

86+
func (state *channelSessionRecoveryState) hasProviderAuth() bool {
87+
if state == nil {
88+
return false
89+
}
90+
state.mu.Lock()
91+
defer state.mu.Unlock()
92+
return state.statusAuthReady
93+
}
94+
8395
func (state *channelSessionRecoveryState) maybePostStatus(
8496
ctx context.Context,
8597
g *slackGateway,
@@ -353,6 +365,10 @@ func (g *slackGateway) processMessageEventWithDelivery(
353365
}()
354366

355367
event := envelope.Event
368+
if normalizeSlackPromptText(event.Type, event.Text, "") == "" {
369+
success = true
370+
return nil
371+
}
356372

357373
recoveryState := newChannelSessionRecoveryState()
358374
session, terminalHandled, err := g.awaitChannelSession(ctx, envelope, event, recoveryState)
@@ -373,6 +389,7 @@ func (g *slackGateway) processMessageEventWithDelivery(
373389
session.ProviderAuth.BotUserID,
374390
)
375391
if promptText == "" {
392+
success = true
376393
return nil
377394
}
378395

@@ -428,6 +445,7 @@ func (g *slackGateway) processMessageEventWithDelivery(
428445
session.ProviderAuth.BotUserID,
429446
)
430447
if promptText == "" {
448+
success = true
431449
return nil
432450
}
433451
stopPromptStatusTimer = g.startPromptStatusTimer(ctx, event, recoveryState)
@@ -541,9 +559,19 @@ func (g *slackGateway) awaitChannelSession(
541559
if err == nil {
542560
recoveryState.rememberProviderAuth(session.ProviderAuth)
543561
return session, false, nil
544-
} else {
545-
providerAuth, recoverable := channelSessionUnavailableProviderAuth(err)
546-
if !recoverable {
562+
}
563+
564+
providerAuth, recoverable := channelSessionUnavailableProviderAuth(err)
565+
if !recoverable {
566+
if recoveryState.hasProviderAuth() {
567+
g.logger.Error(
568+
"slack session recovery poll failed",
569+
"error", err,
570+
"team_id", strings.TrimSpace(envelope.TeamID),
571+
"channel_id", strings.TrimSpace(event.Channel),
572+
"message_ts", strings.TrimSpace(event.TS),
573+
)
574+
} else {
547575
if terminalHandled, postErr := recoveryState.maybePostFailure(ctx, g, event); postErr != nil {
548576
g.logger.Error(
549577
"slack recovery failure reply failed",
@@ -558,6 +586,7 @@ func (g *slackGateway) awaitChannelSession(
558586
}
559587
return channelSession{}, false, err
560588
}
589+
} else {
561590
recoveryState.rememberProviderAuth(providerAuth)
562591
}
563592

@@ -621,9 +650,11 @@ func normalizeSlackPromptText(eventType, text, botUserID string) string {
621650
normalized[:index] + normalized[index+len(mentionToken):],
622651
)
623652
}
653+
} else {
654+
normalized = slackMentionTokenPattern.ReplaceAllString(normalized, " ")
624655
}
625656
}
626-
return normalized
657+
return strings.TrimSpace(normalized)
627658
}
628659

629660
type slackPromptContext struct {

0 commit comments

Comments
 (0)