diff --git a/api/channel_conversation_service.go b/api/channel_conversation_service.go index cc9914f..0244899 100644 --- a/api/channel_conversation_service.go +++ b/api/channel_conversation_service.go @@ -3,6 +3,7 @@ package main import ( "crypto/sha256" "encoding/hex" + "encoding/json" "fmt" "net/http" "strings" @@ -14,18 +15,21 @@ import ( ) const ( - channelConversationRouteLabelKey = "spritz.sh/channel-route" - channelConversationPrincipalAnnotationKey = "spritz.sh/channel-principal-id" - channelConversationProviderAnnotationKey = "spritz.sh/channel-provider" - channelConversationExternalScopeTypeAnnotationKey = "spritz.sh/channel-external-scope-type" - channelConversationExternalTenantIDAnnotationKey = "spritz.sh/channel-external-tenant-id" - channelConversationExternalChannelIDAnnotationKey = "spritz.sh/channel-external-channel-id" - channelConversationExternalConversationIDAnnotationKey = "spritz.sh/channel-external-conversation-id" + channelConversationRouteLabelKey = "spritz.sh/channel-route" + channelConversationPrincipalAnnotationKey = "spritz.sh/channel-principal-id" + channelConversationProviderAnnotationKey = "spritz.sh/channel-provider" + channelConversationExternalScopeTypeAnnotationKey = "spritz.sh/channel-external-scope-type" + channelConversationExternalTenantIDAnnotationKey = "spritz.sh/channel-external-tenant-id" + channelConversationExternalChannelIDAnnotationKey = "spritz.sh/channel-external-channel-id" + channelConversationExternalConversationIDAnnotationKey = "spritz.sh/channel-external-conversation-id" + channelConversationExternalConversationAliasesAnnotationKey = "spritz.sh/channel-external-conversation-aliases" + channelConversationBaseRouteLabelKey = "spritz.sh/channel-route-base" ) type channelConversationUpsertRequest struct { RequestID string `json:"requestId,omitempty"` Namespace string `json:"namespace,omitempty"` + ConversationID string `json:"conversationId,omitempty"` PrincipalID string `json:"principalId"` InstanceID string `json:"instanceId"` OwnerID string `json:"ownerId"` @@ -52,6 +56,7 @@ type normalizedChannelConversationIdentity struct { func normalizeChannelConversationUpsertRequest(body channelConversationUpsertRequest) (channelConversationUpsertRequest, normalizedChannelConversationIdentity, error) { body.RequestID = strings.TrimSpace(body.RequestID) body.Namespace = strings.TrimSpace(body.Namespace) + body.ConversationID = strings.TrimSpace(body.ConversationID) body.PrincipalID = strings.TrimSpace(body.PrincipalID) body.InstanceID = sanitizeSpritzNameToken(body.InstanceID) body.OwnerID = strings.TrimSpace(body.OwnerID) @@ -113,6 +118,19 @@ func channelConversationRouteHash(identity normalizedChannelConversationIdentity return hex.EncodeToString(sum[:16]) } +func channelConversationBaseRouteHash(identity normalizedChannelConversationIdentity, ownerID, instanceID string) string { + sum := sha256.Sum256([]byte(strings.Join([]string{ + identity.principalID, + identity.provider, + identity.externalScopeType, + identity.externalTenantID, + identity.externalChannelID, + strings.TrimSpace(ownerID), + strings.TrimSpace(instanceID), + }, "\n"))) + return hex.EncodeToString(sum[:16]) +} + func channelConversationName(spritzName, ownerID string, identity normalizedChannelConversationIdentity) string { prefix := strings.ToLower(strings.TrimSpace(spritzName)) prefix = strings.Trim(prefix, "-") @@ -137,7 +155,7 @@ func channelConversationName(spritzName, ownerID string, identity normalizedChan return fmt.Sprintf("%s-%s", prefix, suffix) } -func channelConversationMatchesIdentity(conversation *spritzv1.SpritzConversation, identity normalizedChannelConversationIdentity) bool { +func channelConversationMatchesBaseIdentity(conversation *spritzv1.SpritzConversation, identity normalizedChannelConversationIdentity) bool { if conversation == nil { return false } @@ -145,8 +163,102 @@ func channelConversationMatchesIdentity(conversation *spritzv1.SpritzConversatio strings.TrimSpace(conversation.Annotations[channelConversationProviderAnnotationKey]) == identity.provider && strings.TrimSpace(conversation.Annotations[channelConversationExternalScopeTypeAnnotationKey]) == identity.externalScopeType && strings.TrimSpace(conversation.Annotations[channelConversationExternalTenantIDAnnotationKey]) == identity.externalTenantID && - strings.TrimSpace(conversation.Annotations[channelConversationExternalChannelIDAnnotationKey]) == identity.externalChannelID && - strings.TrimSpace(conversation.Annotations[channelConversationExternalConversationIDAnnotationKey]) == identity.externalConversationID + strings.TrimSpace(conversation.Annotations[channelConversationExternalChannelIDAnnotationKey]) == identity.externalChannelID +} + +func channelConversationExternalConversationAliases(conversation *spritzv1.SpritzConversation) []string { + if conversation == nil { + return nil + } + raw := strings.TrimSpace(conversation.Annotations[channelConversationExternalConversationAliasesAnnotationKey]) + if raw == "" { + return nil + } + var payload []string + if err := json.Unmarshal([]byte(raw), &payload); err != nil { + return nil + } + primary := strings.TrimSpace(conversation.Annotations[channelConversationExternalConversationIDAnnotationKey]) + aliases := make([]string, 0, len(payload)) + seen := map[string]struct{}{} + for _, candidate := range payload { + candidate = strings.TrimSpace(candidate) + if candidate == "" || candidate == primary { + continue + } + if _, ok := seen[candidate]; ok { + continue + } + seen[candidate] = struct{}{} + aliases = append(aliases, candidate) + } + return aliases +} + +func channelConversationHasExternalConversationID(conversation *spritzv1.SpritzConversation, externalConversationID string) bool { + externalConversationID = strings.TrimSpace(externalConversationID) + if externalConversationID == "" || conversation == nil { + return false + } + if strings.TrimSpace(conversation.Annotations[channelConversationExternalConversationIDAnnotationKey]) == externalConversationID { + return true + } + for _, alias := range channelConversationExternalConversationAliases(conversation) { + if alias == externalConversationID { + return true + } + } + return false +} + +func channelConversationMatchesIdentity(conversation *spritzv1.SpritzConversation, identity normalizedChannelConversationIdentity) bool { + return channelConversationMatchesBaseIdentity(conversation, identity) && + channelConversationHasExternalConversationID(conversation, identity.externalConversationID) +} + +func channelConversationBelongsToSpritz(conversation *spritzv1.SpritzConversation, spritz *spritzv1.Spritz) bool { + if conversation == nil || spritz == nil { + return false + } + return strings.TrimSpace(conversation.Spec.SpritzName) == spritz.Name && + strings.TrimSpace(conversation.Spec.Owner.ID) == spritz.Spec.Owner.ID && + strings.TrimSpace(conversation.Labels[acpConversationSpritzLabelKey]) == spritz.Name && + strings.TrimSpace(conversation.Labels[acpConversationOwnerLabelKey]) == ownerLabelValue(spritz.Spec.Owner.ID) +} + +func appendChannelConversationAlias(conversation *spritzv1.SpritzConversation, externalConversationID string) (bool, error) { + externalConversationID = strings.TrimSpace(externalConversationID) + if externalConversationID == "" || conversation == nil { + return false, nil + } + if conversation.Annotations == nil { + conversation.Annotations = map[string]string{} + } + if channelConversationHasExternalConversationID(conversation, externalConversationID) { + return false, nil + } + aliases := append(channelConversationExternalConversationAliases(conversation), externalConversationID) + payload, err := json.Marshal(aliases) + if err != nil { + return false, err + } + conversation.Annotations[channelConversationExternalConversationAliasesAnnotationKey] = string(payload) + return true, nil +} + +func ensureChannelConversationBaseRouteLabel(conversation *spritzv1.SpritzConversation, identity normalizedChannelConversationIdentity, spritz *spritzv1.Spritz) bool { + if conversation == nil || spritz == nil { + return false + } + if conversation.Labels == nil { + conversation.Labels = map[string]string{} + } + expected := channelConversationBaseRouteHash(identity, spritz.Spec.Owner.ID, spritz.Name) + if strings.TrimSpace(conversation.Labels[channelConversationBaseRouteLabelKey]) == expected { + return false + } + conversation.Labels[channelConversationBaseRouteLabelKey] = expected + return true } func (s *server) getAdminScopedACPReadySpritz(c echo.Context, namespace, instanceID, ownerID string) (*spritzv1.Spritz, error) { @@ -164,10 +276,10 @@ func (s *server) getAdminScopedACPReadySpritz(c echo.Context, namespace, instanc } func (s *server) findChannelConversation(c echo.Context, namespace string, spritz *spritzv1.Spritz, identity normalizedChannelConversationIdentity) (*spritzv1.SpritzConversation, bool, error) { - list := &spritzv1.SpritzConversationList{} + exactList := &spritzv1.SpritzConversationList{} if err := s.client.List( c.Request().Context(), - list, + exactList, client.InNamespace(namespace), client.MatchingLabels{ acpConversationLabelKey: acpConversationLabelValue, @@ -179,8 +291,8 @@ func (s *server) findChannelConversation(c echo.Context, namespace string, sprit return nil, false, err } var match *spritzv1.SpritzConversation - for i := range list.Items { - item := &list.Items[i] + for i := range exactList.Items { + item := &exactList.Items[i] if !channelConversationMatchesIdentity(item, identity) { continue } @@ -189,6 +301,38 @@ func (s *server) findChannelConversation(c echo.Context, namespace string, sprit } match = item.DeepCopy() } + + baseList := &spritzv1.SpritzConversationList{} + if err := s.client.List( + c.Request().Context(), + baseList, + client.InNamespace(namespace), + client.MatchingLabels{ + acpConversationLabelKey: acpConversationLabelValue, + acpConversationOwnerLabelKey: ownerLabelValue(spritz.Spec.Owner.ID), + acpConversationSpritzLabelKey: spritz.Name, + channelConversationBaseRouteLabelKey: channelConversationBaseRouteHash( + identity, + spritz.Spec.Owner.ID, + spritz.Name, + ), + }, + ); err != nil { + return nil, false, err + } + for i := range baseList.Items { + item := &baseList.Items[i] + if !channelConversationMatchesIdentity(item, identity) { + continue + } + if match != nil && item.Name == match.Name { + continue + } + if match != nil { + return nil, true, echo.NewHTTPError(http.StatusConflict, "channel conversation is ambiguous") + } + match = item.DeepCopy() + } if match == nil { return nil, false, nil } @@ -203,6 +347,7 @@ func applyChannelConversationMetadata(conversation *spritzv1.SpritzConversation, conversation.Labels[acpConversationSpritzLabelKey] = spritz.Name conversation.Labels[acpConversationLabelKey] = acpConversationLabelValue conversation.Labels[channelConversationRouteLabelKey] = channelConversationRouteHash(identity, spritz.Spec.Owner.ID, spritz.Name) + conversation.Labels[channelConversationBaseRouteLabelKey] = channelConversationBaseRouteHash(identity, spritz.Spec.Owner.ID, spritz.Name) if conversation.Annotations == nil { conversation.Annotations = map[string]string{} diff --git a/api/channel_conversations.go b/api/channel_conversations.go index a0d672b..9e2c6f7 100644 --- a/api/channel_conversations.go +++ b/api/channel_conversations.go @@ -53,6 +53,44 @@ func (s *server) upsertChannelConversation(c echo.Context) error { return s.writeACPResourceError(c, err) } } + if normalizedBody.ConversationID != "" { + existing := &spritzv1.SpritzConversation{} + if err := s.client.Get(c.Request().Context(), clientKey(namespace, normalizedBody.ConversationID), existing); err != nil { + return s.writeACPResourceError(c, err) + } + if !channelConversationMatchesBaseIdentity(existing, identity) || !channelConversationBelongsToSpritz(existing, spritz) { + return writeError(c, http.StatusConflict, "channel conversation is ambiguous") + } + conversation, found, err := s.findChannelConversation(c, namespace, spritz, identity) + if err != nil { + if httpErr, ok := err.(*echo.HTTPError); ok { + return writeError(c, httpErr.Code, httpErr.Message.(string)) + } + return writeError(c, http.StatusInternalServerError, err.Error()) + } + if found && conversation.Name != existing.Name { + return writeError(c, http.StatusConflict, "channel conversation is ambiguous") + } + changed := ensureChannelConversationBaseRouteLabel(existing, identity, spritz) + aliasChanged, err := appendChannelConversationAlias(existing, identity.externalConversationID) + if err != nil { + return writeError(c, http.StatusInternalServerError, err.Error()) + } + changed = changed || aliasChanged + if normalizedBody.RequestID != "" { + if existing.Annotations == nil { + existing.Annotations = map[string]string{} + } + existing.Annotations[requestIDAnnotationKey] = normalizedBody.RequestID + changed = true + } + if changed { + if err := s.client.Update(c.Request().Context(), existing); err != nil { + return s.writeACPResourceError(c, err) + } + } + return writeJSON(c, http.StatusOK, map[string]any{"created": false, "conversation": existing}) + } conversation, found, err := s.findChannelConversation(c, namespace, spritz, identity) if err != nil { if httpErr, ok := err.(*echo.HTTPError); ok { diff --git a/api/channel_conversations_test.go b/api/channel_conversations_test.go index 348c613..779a4d0 100644 --- a/api/channel_conversations_test.go +++ b/api/channel_conversations_test.go @@ -2,6 +2,8 @@ package main import ( "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "net/http" "net/http/httptest" @@ -186,6 +188,316 @@ func TestUpsertChannelConversationReusesExistingConversation(t *testing.T) { } } +func TestUpsertChannelConversationPersistsAndResolvesReplyAliases(t *testing.T) { + s := newChannelConversationsTestServer(t, readyACPSpritz("zeno-acme", "owner-123")) + e := echo.New() + s.registerRoutes(e) + + createRec := httptest.NewRecorder() + e.ServeHTTP(createRec, newChannelConversationsRequest(`{ + "principalId":"shared-slack-gateway", + "instanceId":"zeno-acme", + "ownerId":"owner-123", + "provider":"slack", + "externalScopeType":"workspace", + "externalTenantId":"T_workspace_1", + "externalChannelId":"C_channel_1", + "externalConversationId":"1711387375.000100", + "title":"Slack concierge" + }`)) + if createRec.Code != http.StatusCreated { + t.Fatalf("expected first request to create, got %d: %s", createRec.Code, createRec.Body.String()) + } + var createPayload struct { + Data struct { + Conversation spritzv1.SpritzConversation `json:"conversation"` + } `json:"data"` + } + if err := json.Unmarshal(createRec.Body.Bytes(), &createPayload); err != nil { + t.Fatalf("failed to decode create response: %v", err) + } + + aliasRec := httptest.NewRecorder() + e.ServeHTTP(aliasRec, newChannelConversationsRequest(`{ + "principalId":"shared-slack-gateway", + "instanceId":"zeno-acme", + "ownerId":"owner-123", + "conversationId":"`+createPayload.Data.Conversation.Name+`", + "provider":"slack", + "externalScopeType":"workspace", + "externalTenantId":"T_workspace_1", + "externalChannelId":"C_channel_1", + "externalConversationId":"1711387376.000100", + "title":"Slack concierge" + }`)) + if aliasRec.Code != http.StatusOK { + t.Fatalf("expected alias request to reuse, got %d: %s", aliasRec.Code, aliasRec.Body.String()) + } + + reuseRec := httptest.NewRecorder() + e.ServeHTTP(reuseRec, newChannelConversationsRequest(`{ + "principalId":"shared-slack-gateway", + "instanceId":"zeno-acme", + "ownerId":"owner-123", + "provider":"slack", + "externalScopeType":"workspace", + "externalTenantId":"T_workspace_1", + "externalChannelId":"C_channel_1", + "externalConversationId":"1711387376.000100", + "title":"Slack concierge" + }`)) + if reuseRec.Code != http.StatusOK { + t.Fatalf("expected alias lookup to reuse, got %d: %s", reuseRec.Code, reuseRec.Body.String()) + } + + var reusePayload struct { + Data struct { + Created bool `json:"created"` + Conversation spritzv1.SpritzConversation `json:"conversation"` + } `json:"data"` + } + if err := json.Unmarshal(reuseRec.Body.Bytes(), &reusePayload); err != nil { + t.Fatalf("failed to decode reuse response: %v", err) + } + if reusePayload.Data.Created { + t.Fatalf("expected alias lookup to reuse the original conversation") + } + if reusePayload.Data.Conversation.Name != createPayload.Data.Conversation.Name { + t.Fatalf("expected alias lookup to reuse %q, got %q", createPayload.Data.Conversation.Name, reusePayload.Data.Conversation.Name) + } + aliases := channelConversationExternalConversationAliases(&reusePayload.Data.Conversation) + if len(aliases) != 1 || aliases[0] != "1711387376.000100" { + t.Fatalf("expected persisted alias, got %#v", aliases) + } +} + +func legacyChannelConversationRouteHash(identity normalizedChannelConversationIdentity, ownerID, instanceID string) string { + sum := sha256.Sum256([]byte(strings.Join([]string{ + identity.principalID, + identity.provider, + identity.externalScopeType, + identity.externalTenantID, + identity.externalChannelID, + identity.externalConversationID, + strings.TrimSpace(ownerID), + strings.TrimSpace(instanceID), + }, "\n"))) + return hex.EncodeToString(sum[:16]) +} + +func TestUpsertChannelConversationReusesLegacyConversationWithoutBaseRouteLabel(t *testing.T) { + spritz := readyACPSpritz("zeno-acme", "owner-123") + identity := normalizedChannelConversationIdentity{ + principalID: "shared-slack-gateway", + provider: "slack", + externalScopeType: "workspace", + externalTenantID: "T_workspace_1", + externalChannelID: "C_channel_1", + externalConversationID: "1711387375.000100", + } + conversation, err := buildACPConversationResource(spritz, "Slack concierge", "") + if err != nil { + t.Fatalf("build conversation: %v", err) + } + conversation.Name = channelConversationName(spritz.Name, spritz.Spec.Owner.ID, identity) + conversation.Spec.Owner = spritz.Spec.Owner + conversation.Spec.SpritzName = spritz.Name + conversation.Labels = map[string]string{ + acpConversationLabelKey: acpConversationLabelValue, + acpConversationOwnerLabelKey: ownerLabelValue(spritz.Spec.Owner.ID), + acpConversationSpritzLabelKey: spritz.Name, + channelConversationRouteLabelKey: legacyChannelConversationRouteHash( + identity, + spritz.Spec.Owner.ID, + spritz.Name, + ), + } + conversation.Annotations = map[string]string{ + channelConversationPrincipalAnnotationKey: identity.principalID, + channelConversationProviderAnnotationKey: identity.provider, + channelConversationExternalScopeTypeAnnotationKey: identity.externalScopeType, + channelConversationExternalTenantIDAnnotationKey: identity.externalTenantID, + channelConversationExternalChannelIDAnnotationKey: identity.externalChannelID, + channelConversationExternalConversationIDAnnotationKey: identity.externalConversationID, + requestIDAnnotationKey: "legacy-request", + } + + s := newChannelConversationsTestServer(t, spritz, conversation) + e := echo.New() + s.registerRoutes(e) + + rec := httptest.NewRecorder() + e.ServeHTTP(rec, newChannelConversationsRequest(`{ + "principalId":"shared-slack-gateway", + "instanceId":"zeno-acme", + "ownerId":"owner-123", + "provider":"slack", + "externalScopeType":"workspace", + "externalTenantId":"T_workspace_1", + "externalChannelId":"C_channel_1", + "externalConversationId":"1711387375.000100", + "title":"Slack concierge" + }`)) + if rec.Code != http.StatusOK { + t.Fatalf("expected legacy conversation reuse, got %d: %s", rec.Code, rec.Body.String()) + } + var payload struct { + Data struct { + Created bool `json:"created"` + Conversation spritzv1.SpritzConversation `json:"conversation"` + } `json:"data"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &payload); err != nil { + t.Fatalf("decode response: %v", err) + } + if payload.Data.Created { + t.Fatalf("expected legacy conversation to be reused") + } + if payload.Data.Conversation.Name != conversation.Name { + t.Fatalf("expected legacy conversation %q, got %q", conversation.Name, payload.Data.Conversation.Name) + } +} + +func TestUpsertChannelConversationRejectsAliasForWrongSpritzConversation(t *testing.T) { + targetSpritz := readyACPSpritz("zeno-acme", "owner-123") + otherSpritz := readyACPSpritz("zeno-other", "owner-999") + identity := normalizedChannelConversationIdentity{ + principalID: "shared-slack-gateway", + provider: "slack", + externalScopeType: "workspace", + externalTenantID: "T_workspace_1", + externalChannelID: "C_channel_1", + externalConversationID: "1711387375.000100", + } + otherConversation, err := buildACPConversationResource(otherSpritz, "Slack concierge", "") + if err != nil { + t.Fatalf("build conversation: %v", err) + } + otherConversation.Name = channelConversationName(otherSpritz.Name, otherSpritz.Spec.Owner.ID, identity) + applyChannelConversationMetadata(otherConversation, identity, "other-request", otherSpritz) + + s := newChannelConversationsTestServer(t, targetSpritz, otherSpritz, otherConversation) + e := echo.New() + s.registerRoutes(e) + + rec := httptest.NewRecorder() + e.ServeHTTP(rec, newChannelConversationsRequest(`{ + "principalId":"shared-slack-gateway", + "instanceId":"zeno-acme", + "ownerId":"owner-123", + "conversationId":"`+otherConversation.Name+`", + "provider":"slack", + "externalScopeType":"workspace", + "externalTenantId":"T_workspace_1", + "externalChannelId":"C_channel_1", + "externalConversationId":"1711387376.000100", + "title":"Slack concierge" + }`)) + if rec.Code != http.StatusConflict { + t.Fatalf("expected alias against wrong spritz to conflict, got %d: %s", rec.Code, rec.Body.String()) + } +} + +func TestUpsertChannelConversationRejectsAliasWhenAnotherConversationAlreadyOwnsIt(t *testing.T) { + spritz := readyACPSpritz("zeno-acme", "owner-123") + rootIdentity := normalizedChannelConversationIdentity{ + principalID: "shared-slack-gateway", + provider: "slack", + externalScopeType: "workspace", + externalTenantID: "T_workspace_1", + externalChannelID: "C_channel_1", + externalConversationID: "1711387375.000100", + } + aliasIdentity := rootIdentity + aliasIdentity.externalConversationID = "1711387376.000100" + + rootConversation, err := buildACPConversationResource(spritz, "Slack concierge", "") + if err != nil { + t.Fatalf("build root conversation: %v", err) + } + rootConversation.Name = channelConversationName(spritz.Name, spritz.Spec.Owner.ID, rootIdentity) + applyChannelConversationMetadata(rootConversation, rootIdentity, "root-request", spritz) + + aliasConversation, err := buildACPConversationResource(spritz, "Slack concierge", "") + if err != nil { + t.Fatalf("build alias conversation: %v", err) + } + aliasConversation.Name = channelConversationName(spritz.Name, spritz.Spec.Owner.ID, aliasIdentity) + applyChannelConversationMetadata(aliasConversation, aliasIdentity, "alias-request", spritz) + + s := newChannelConversationsTestServer(t, spritz, rootConversation, aliasConversation) + e := echo.New() + s.registerRoutes(e) + + rec := httptest.NewRecorder() + e.ServeHTTP(rec, newChannelConversationsRequest(`{ + "principalId":"shared-slack-gateway", + "instanceId":"zeno-acme", + "ownerId":"owner-123", + "conversationId":"`+rootConversation.Name+`", + "provider":"slack", + "externalScopeType":"workspace", + "externalTenantId":"T_workspace_1", + "externalChannelId":"C_channel_1", + "externalConversationId":"1711387376.000100", + "title":"Slack concierge" + }`)) + if rec.Code != http.StatusConflict { + t.Fatalf("expected alias conflict when another conversation already owns it, got %d: %s", rec.Code, rec.Body.String()) + } +} + +func TestUpsertChannelConversationRejectsWhenExactAndAliasedMatchesConflict(t *testing.T) { + spritz := readyACPSpritz("zeno-acme", "owner-123") + exactIdentity := normalizedChannelConversationIdentity{ + principalID: "shared-slack-gateway", + provider: "slack", + externalScopeType: "workspace", + externalTenantID: "T_workspace_1", + externalChannelID: "C_channel_1", + externalConversationID: "1711387376.000100", + } + aliasedIdentity := exactIdentity + aliasedIdentity.externalConversationID = "1711387375.000100" + + exactConversation, err := buildACPConversationResource(spritz, "Slack concierge", "") + if err != nil { + t.Fatalf("build exact conversation: %v", err) + } + exactConversation.Name = channelConversationName(spritz.Name, spritz.Spec.Owner.ID, exactIdentity) + applyChannelConversationMetadata(exactConversation, exactIdentity, "exact-request", spritz) + + aliasedConversation, err := buildACPConversationResource(spritz, "Slack concierge", "") + if err != nil { + t.Fatalf("build aliased conversation: %v", err) + } + aliasedConversation.Name = channelConversationName(spritz.Name, spritz.Spec.Owner.ID, aliasedIdentity) + applyChannelConversationMetadata(aliasedConversation, aliasedIdentity, "aliased-request", spritz) + if _, err := appendChannelConversationAlias(aliasedConversation, exactIdentity.externalConversationID); err != nil { + t.Fatalf("append alias: %v", err) + } + + s := newChannelConversationsTestServer(t, spritz, exactConversation, aliasedConversation) + e := echo.New() + s.registerRoutes(e) + + rec := httptest.NewRecorder() + e.ServeHTTP(rec, newChannelConversationsRequest(`{ + "principalId":"shared-slack-gateway", + "instanceId":"zeno-acme", + "ownerId":"owner-123", + "provider":"slack", + "externalScopeType":"workspace", + "externalTenantId":"T_workspace_1", + "externalChannelId":"C_channel_1", + "externalConversationId":"1711387376.000100", + "title":"Slack concierge" + }`)) + if rec.Code != http.StatusConflict { + t.Fatalf("expected ambiguity conflict when exact and aliased matches disagree, got %d: %s", rec.Code, rec.Body.String()) + } +} + func TestUpsertChannelConversationPreservesExistingTitleAndCWD(t *testing.T) { s := newChannelConversationsTestServer(t, readyACPSpritz("zeno-acme", "owner-123")) e := echo.New() diff --git a/docs/2026-03-24-slack-channel-gateway-implementation-plan.md b/docs/2026-03-24-slack-channel-gateway-implementation-plan.md index bef1134..9447740 100644 --- a/docs/2026-03-24-slack-channel-gateway-implementation-plan.md +++ b/docs/2026-03-24-slack-channel-gateway-implementation-plan.md @@ -207,7 +207,8 @@ product requirement says otherwise: - event type - `channel_type` - channel id - - message ts or thread ts + - message ts + - thread ts when present - external sender id 4. Gateway rejects the request if `api_app_id` or `team_id` do not match the expected shared Slack app installation. @@ -306,13 +307,20 @@ include it directly in the inbound payload. Phase 1 should keep channel behavior predictable: - direct-message conversations reply inline -- channel conversations reply in thread by default -- if inbound Slack payload already has `thread_ts`, reuse it -- if inbound channel message is not already threaded, use the source message - `ts` as `thread_ts` - -That keeps public channels cleaner and gives the concierge a consistent reply -target. +- top-level channel turns reply top-level by default +- top-level channel turns use the source Slack message ts as the conversation + identity +- threaded channel turns use the thread root `thread_ts` as the conversation + identity +- if the gateway posts a visible top-level assistant reply, later user replies + threaded off that bot message must map back to the original source-message + conversation instead of forking a new one +- if inbound Slack payload already has `thread_ts`, reuse it for the outbound + reply so existing threaded follow-ups stay in that thread + +That matches the desired Zenobot-style room behavior: visible top-level replies +for normal channel turns, with stable follow-up context only when the user is +already continuing the same Slack root message or thread. ## Persisted Metadata diff --git a/integrations/slack-gateway/backend_client.go b/integrations/slack-gateway/backend_client.go index b4b99df..ab271db 100644 --- a/integrations/slack-gateway/backend_client.go +++ b/integrations/slack-gateway/backend_client.go @@ -110,9 +110,10 @@ func (g *slackGateway) exchangeChannelSession(ctx context.Context, teamID string }, nil } -func (g *slackGateway) upsertChannelConversation(ctx context.Context, session channelSession, event slackEventInner, teamID string) (string, error) { +func (g *slackGateway) upsertChannelConversation(ctx context.Context, session channelSession, event slackEventInner, teamID, conversationID, externalConversationID string) (string, error) { body := map[string]any{ "namespace": session.Namespace, + "conversationId": strings.TrimSpace(conversationID), "principalId": g.cfg.PrincipalID, "instanceId": session.InstanceID, "ownerId": session.OwnerAuthID, @@ -120,7 +121,7 @@ func (g *slackGateway) upsertChannelConversation(ctx context.Context, session ch "externalScopeType": slackWorkspaceScope, "externalTenantId": strings.TrimSpace(teamID), "externalChannelId": strings.TrimSpace(event.Channel), - "externalConversationId": slackExternalConversationID(event), + "externalConversationId": strings.TrimSpace(externalConversationID), "title": fmt.Sprintf("Slack %s", strings.TrimSpace(event.Channel)), "cwd": defaultConversationCWD, } @@ -150,7 +151,7 @@ func (g *slackGateway) bootstrapConversation(ctx context.Context, serviceToken, return sessionID, cwd, nil } -func (g *slackGateway) postSlackMessage(ctx context.Context, token, channel, text, threadTS string) error { +func (g *slackGateway) postSlackMessage(ctx context.Context, token, channel, text, threadTS string) (string, error) { body := map[string]any{ "channel": strings.TrimSpace(channel), "text": strings.TrimSpace(text), @@ -161,33 +162,34 @@ func (g *slackGateway) postSlackMessage(ctx context.Context, token, channel, tex target := g.cfg.SlackAPIBaseURL + "/chat.postMessage" payload, err := json.Marshal(body) if err != nil { - return err + return "", err } req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, bytes.NewReader(payload)) if err != nil { - return err + return "", err } req.Header.Set("Authorization", "Bearer "+strings.TrimSpace(token)) req.Header.Set("Content-Type", "application/json") resp, err := g.httpClient.Do(req) if err != nil { - return err + return "", err } defer resp.Body.Close() if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { - return fmt.Errorf("slack chat.postMessage failed: %s", resp.Status) + return "", fmt.Errorf("slack chat.postMessage failed: %s", resp.Status) } var result struct { OK bool `json:"ok"` + TS string `json:"ts,omitempty"` Error string `json:"error,omitempty"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - return err + return "", err } if !result.OK { - return fmt.Errorf("slack chat.postMessage failed: %s", strings.TrimSpace(result.Error)) + return "", fmt.Errorf("slack chat.postMessage failed: %s", strings.TrimSpace(result.Error)) } - return nil + return strings.TrimSpace(result.TS), nil } func (g *slackGateway) postBackendJSON(ctx context.Context, path string, body any, target any) error { diff --git a/integrations/slack-gateway/gateway_test.go b/integrations/slack-gateway/gateway_test.go index c9a4f34..b251597 100644 --- a/integrations/slack-gateway/gateway_test.go +++ b/integrations/slack-gateway/gateway_test.go @@ -345,8 +345,8 @@ func TestSlackEventRoutesToConversationAndReplies(t *testing.T) { } var channelConversationCall struct { sync.Mutex - authHeader string - payload map[string]any + authHeaders []string + payloads []map[string]any } slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/chat.postMessage" { @@ -357,7 +357,7 @@ func TestSlackEventRoutesToConversationAndReplies(t *testing.T) { slackCalls.Lock() slackCalls.payloads = append(slackCalls.payloads, payload) slackCalls.Unlock() - writeJSON(w, http.StatusOK, map[string]any{"ok": true}) + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "ts": "1711387376.000100"}) return } t.Fatalf("unexpected slack path %s", r.URL.Path) @@ -396,13 +396,19 @@ func TestSlackEventRoutesToConversationAndReplies(t *testing.T) { t.Fatalf("decode channel conversation body: %v", err) } channelConversationCall.Lock() - channelConversationCall.authHeader = r.Header.Get("Authorization") - channelConversationCall.payload = payload + channelConversationCall.authHeaders = append(channelConversationCall.authHeaders, r.Header.Get("Authorization")) + channelConversationCall.payloads = append(channelConversationCall.payloads, payload) channelConversationCall.Unlock() - writeJSON(w, http.StatusCreated, map[string]any{ + statusCode := http.StatusCreated + created := true + if strings.TrimSpace(fmt.Sprint(payload["conversationId"])) != "" { + statusCode = http.StatusOK + created = false + } + writeJSON(w, statusCode, map[string]any{ "status": "success", "data": map[string]any{ - "created": true, + "created": created, "conversation": map[string]any{ "metadata": map[string]any{"name": "conv-1"}, "spec": map[string]any{"cwd": "/home/dev"}, @@ -519,8 +525,8 @@ func TestSlackEventRoutesToConversationAndReplies(t *testing.T) { if !strings.Contains(fmt.Sprint(payload["text"]), "Hello from concierge") { t.Fatalf("expected assistant reply, got %#v", payload["text"]) } - if payload["thread_ts"] != "1711387375.000100" { - t.Fatalf("expected thread reply, got %#v", payload["thread_ts"]) + if _, ok := payload["thread_ts"]; ok { + t.Fatalf("expected top-level channel reply, got %#v", payload["thread_ts"]) } acpAuthHeaders.Lock() defer acpAuthHeaders.Unlock() @@ -534,11 +540,25 @@ func TestSlackEventRoutesToConversationAndReplies(t *testing.T) { } channelConversationCall.Lock() defer channelConversationCall.Unlock() - if channelConversationCall.authHeader != "Bearer owner-token" { - t.Fatalf("expected owner token for channel conversation upsert, got %q", channelConversationCall.authHeader) + if len(channelConversationCall.authHeaders) != 2 { + t.Fatalf("expected root upsert plus alias upsert, got %#v", channelConversationCall.authHeaders) + } + for _, authHeader := range channelConversationCall.authHeaders { + if authHeader != "Bearer owner-token" { + t.Fatalf("expected owner token for channel conversation upsert, got %q", authHeader) + } + } + if channelConversationCall.payloads[0]["principalId"] != "shared-slack-gateway" { + t.Fatalf("expected shared gateway principal in first channel conversation payload, got %#v", channelConversationCall.payloads[0]["principalId"]) } - if channelConversationCall.payload["principalId"] != "shared-slack-gateway" { - t.Fatalf("expected shared gateway principal in channel conversation payload, got %#v", channelConversationCall.payload["principalId"]) + if channelConversationCall.payloads[0]["externalConversationId"] != "1711387375.000100" { + t.Fatalf("expected root-message conversation identity, got %#v", channelConversationCall.payloads[0]["externalConversationId"]) + } + if channelConversationCall.payloads[1]["conversationId"] != "conv-1" { + t.Fatalf("expected alias upsert to target the created conversation, got %#v", channelConversationCall.payloads[1]["conversationId"]) + } + if channelConversationCall.payloads[1]["externalConversationId"] != "1711387376.000100" { + t.Fatalf("expected alias upsert to persist the bot reply ts, got %#v", channelConversationCall.payloads[1]["externalConversationId"]) } return } @@ -1256,6 +1276,8 @@ func TestUpsertChannelConversationUsesChannelForDirectMessages(t *testing.T) { TS: "1711387375.000100", }, "T_workspace_1", + "", + "D_workspace_bot", ) if err != nil { t.Fatalf("upsert channel conversation failed: %v", err) @@ -1399,6 +1421,33 @@ func TestSlackDirectMessageHelpersReuseSharedDetection(t *testing.T) { if slackReplyThreadTS(groupDM) != "" { t.Fatalf("expected mpim replies to stay inline") } + + topLevelChannel := slackEventInner{ + Type: "app_mention", + Channel: "C_workspace_channel", + ChannelType: "channel", + TS: "1711387375.000100", + } + if slackExternalConversationID(topLevelChannel) != "1711387375.000100" { + t.Fatalf("expected top-level channel messages to key by root message ts") + } + if slackReplyThreadTS(topLevelChannel) != "" { + t.Fatalf("expected top-level channel mentions to reply inline") + } + + threadedChannel := slackEventInner{ + Type: "app_mention", + Channel: "C_workspace_channel", + ChannelType: "channel", + ThreadTS: "1711387375.000100", + TS: "1711387376.000100", + } + if slackExternalConversationID(threadedChannel) != "1711387375.000100" { + t.Fatalf("expected threaded channel messages to key by thread root ts") + } + if slackReplyThreadTS(threadedChannel) != "1711387375.000100" { + t.Fatalf("expected threaded channel mentions to reply in-thread") + } } func TestPromptConversationRejectsInteractivePermissionRequests(t *testing.T) { @@ -1676,8 +1725,179 @@ func TestProcessMessageEventPostsFallbackAfterPromptTimeout(t *testing.T) { if got := slackPayloads.items[0]["text"]; got != "I hit an internal error while processing that request." { t.Fatalf("expected fallback reply text, got %#v", got) } - if got := slackPayloads.items[0]["thread_ts"]; got != "1711387375.000100" { - t.Fatalf("expected threaded fallback reply, got %#v", got) + if _, ok := slackPayloads.items[0]["thread_ts"]; ok { + t.Fatalf("expected top-level fallback reply, got %#v", slackPayloads.items[0]["thread_ts"]) + } +} + +func TestProcessMessageEventPersistsReplyAliasAfterPromptTimeout(t *testing.T) { + var channelConversationCalls struct { + sync.Mutex + payloads []map[string]any + } + slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat.postMessage" { + t.Fatalf("unexpected slack path %s", r.URL.Path) + } + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "ts": "1711387376.000100"}) + })) + defer slackAPI.Close() + + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/internal/v1/spritz/channel-sessions/exchange" { + t.Fatalf("unexpected backend path %s", r.URL.Path) + } + writeJSON(w, http.StatusOK, map[string]any{ + "status": "resolved", + "session": map[string]any{ + "accessToken": "owner-token", + "ownerAuthId": "owner-123", + "namespace": "spritz-staging", + "instanceId": "zeno-acme", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }, + }) + })) + defer backend.Close() + + upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} + spritz := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/channel-conversations/upsert": + var payload map[string]any + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Fatalf("decode channel conversation payload: %v", err) + } + channelConversationCalls.Lock() + channelConversationCalls.payloads = append(channelConversationCalls.payloads, payload) + channelConversationCalls.Unlock() + statusCode := http.StatusCreated + created := true + if strings.TrimSpace(fmt.Sprint(payload["conversationId"])) != "" { + statusCode = http.StatusOK + created = false + } + writeJSON(w, statusCode, map[string]any{ + "status": "success", + "data": map[string]any{ + "created": created, + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/bootstrap": + writeJSON(w, http.StatusOK, map[string]any{ + "status": "success", + "data": map[string]any{ + "effectiveSessionId": "session-1", + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"sessionId": "session-1", "cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/connect": + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("upgrade failed: %v", err) + } + defer conn.Close() + for { + _, payload, err := conn.ReadMessage() + if err != nil { + return + } + var message map[string]any + if err := json.Unmarshal(payload, &message); err != nil { + t.Fatalf("decode ws payload: %v", err) + } + switch message["method"] { + case "initialize": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{"protocolVersion": 1}}) + case "session/load": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + case "session/prompt": + _ = conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "method": "session/update", + "params": map[string]any{ + "update": map[string]any{ + "sessionUpdate": "agent_message_chunk", + "content": []map[string]any{{ + "type": "text", + "text": "partial reply", + }}, + }, + }, + }) + time.Sleep(40 * time.Millisecond) + return + default: + t.Fatalf("unexpected ACP method %#v", message["method"]) + } + } + default: + t.Fatalf("unexpected spritz path %s", r.URL.Path) + } + })) + defer spritz.Close() + + cfg := config{ + SlackAPIBaseURL: slackAPI.URL, + BackendBaseURL: backend.URL, + BackendInternalToken: "backend-internal-token", + SpritzBaseURL: spritz.URL, + SpritzServiceToken: "spritz-service-token", + PrincipalID: "shared-slack-gateway", + HTTPTimeout: 200 * time.Millisecond, + DedupeTTL: time.Minute, + } + gateway := newSlackGateway(cfg, slog.New(slog.NewTextHandler(io.Discard, nil))) + + envelope := slackEnvelope{ + APIAppID: "A_app_1", + TeamID: "T_workspace_1", + Event: slackEventInner{ + Type: "app_mention", + User: "U_user", + Text: "<@U_bot> hello", + Channel: "C_channel_1", + ChannelType: "channel", + TS: "1711387375.000100", + }, + } + delivery, process, err := gateway.beginMessageEventDelivery(envelope) + if err != nil { + t.Fatalf("beginMessageEventDelivery returned error: %v", err) + } + if !process { + t.Fatal("expected app mention to be processed") + } + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + if err := gateway.processMessageEventWithDelivery(ctx, envelope, delivery); err != nil { + t.Fatalf("expected fallback reply flow to succeed, got %v", err) + } + + channelConversationCalls.Lock() + defer channelConversationCalls.Unlock() + if len(channelConversationCalls.payloads) != 2 { + t.Fatalf("expected root upsert plus alias persistence, got %#v", channelConversationCalls.payloads) + } + if channelConversationCalls.payloads[1]["conversationId"] != "conv-1" { + t.Fatalf("expected alias persistence to target conv-1, got %#v", channelConversationCalls.payloads[1]["conversationId"]) + } + if channelConversationCalls.payloads[1]["externalConversationId"] != "1711387376.000100" { + t.Fatalf("expected alias persistence to use the bot reply ts, got %#v", channelConversationCalls.payloads[1]["externalConversationId"]) } } diff --git a/integrations/slack-gateway/slack_events.go b/integrations/slack-gateway/slack_events.go index 74d628f..0eb5abe 100644 --- a/integrations/slack-gateway/slack_events.go +++ b/integrations/slack-gateway/slack_events.go @@ -211,7 +211,8 @@ func (g *slackGateway) processMessageEventWithDelivery( return nil } - conversationID, err := g.upsertChannelConversation(ctx, session, event, envelope.TeamID) + externalConversationID := slackExternalConversationID(event) + conversationID, err := g.upsertChannelConversation(ctx, session, event, envelope.TeamID, "", externalConversationID) if err != nil { return err } @@ -227,14 +228,37 @@ func (g *slackGateway) processMessageEventWithDelivery( reply = "I hit an internal error while processing that request." g.logger.Error("acp prompt failed", "error", err, "conversation_id", conversationID) } + replyThreadTS := slackReplyThreadTS(event) replyCtx, cancelReply := context.WithTimeout(context.WithoutCancel(ctx), g.cfg.HTTPTimeout) defer cancelReply() - if err := g.postSlackMessage(replyCtx, session.ProviderAuth.BotAccessToken, event.Channel, reply, slackReplyThreadTS(event)); err != nil { + replyMessageTS, err := g.postSlackMessage(replyCtx, session.ProviderAuth.BotAccessToken, event.Channel, reply, replyThreadTS) + if err != nil { // Once the ACP prompt has already been delivered, suppress duplicate // Slack retries from re-running the same agent side effects. success = promptSent return err } + if replyThreadTS == "" && !isSlackDirectMessageEvent(event) && strings.TrimSpace(replyMessageTS) != "" { + aliasCtx, cancelAlias := context.WithTimeout(context.WithoutCancel(ctx), g.cfg.HTTPTimeout) + if _, err := g.upsertChannelConversation( + aliasCtx, + session, + event, + envelope.TeamID, + conversationID, + replyMessageTS, + ); err != nil { + cancelAlias() + g.logger.Error( + "slack reply alias persistence failed", + "error", err, + "conversation_id", conversationID, + "reply_message_ts", replyMessageTS, + ) + } else { + cancelAlias() + } + } success = true return nil } @@ -280,17 +304,17 @@ func slackReplyThreadTS(event slackEventInner) string { if strings.TrimSpace(event.ThreadTS) != "" { return strings.TrimSpace(event.ThreadTS) } - if isSlackDirectMessageEvent(event) { - return "" - } - return strings.TrimSpace(event.TS) + return "" } func slackExternalConversationID(event slackEventInner) string { if isSlackDirectMessageEvent(event) { return strings.TrimSpace(event.Channel) } - return firstNonEmpty(strings.TrimSpace(event.ThreadTS), strings.TrimSpace(event.TS)) + if threadTS := strings.TrimSpace(event.ThreadTS); threadTS != "" { + return threadTS + } + return strings.TrimSpace(event.TS) } func (g *slackGateway) verifySlackSignature(header http.Header, body []byte) error {