Skip to content

Commit 5d4e498

Browse files
committed
bulkresponse
Signed-off-by: sadath-12 <[email protected]>
1 parent 2193f47 commit 5d4e498

File tree

1 file changed

+19
-5
lines changed

1 file changed

+19
-5
lines changed

service/http/topic.go

+19-5
Original file line numberDiff line numberDiff line change
@@ -416,20 +416,23 @@ func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.Bu
416416
return
417417
}
418418

419-
entriesInterface ,ok := ins["entries"].([]interface{})
419+
entriesInterface, ok := ins["entries"].([]interface{})
420420
if !ok {
421421
// Handle the error or return an error response
422422
http.Error(w, "Entries format error", PubSubHandlerDropStatusCode)
423423
return
424424
}
425425

426+
statuses := make([]BulkSubscribeResponseEntry, 0, len(entriesInterface))
427+
426428
var messages []common.BulkTopic
427429
for _, entry := range entriesInterface {
428430
itemMap, ok := entry.(map[string]interface{})
429431
if !ok {
430432
http.Error(w, "Entry format error", PubSubHandlerDropStatusCode)
431433
return
432434
}
435+
433436
itemJSON, err := json.Marshal(itemMap["event"])
434437
if err != nil {
435438
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
@@ -443,6 +446,7 @@ func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.Bu
443446
}
444447
data, rawData := item.getData()
445448

449+
446450
if item.PubsubName == "" {
447451
item.PubsubName = sub.PubsubName
448452
}
@@ -451,6 +455,11 @@ func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.Bu
451455
item.Topic = sub.Topic
452456
}
453457

458+
statuses = append(statuses, BulkSubscribeResponseEntry{
459+
entryId: item.EntryID,
460+
status: SubscriptionResponseStatusSuccess,
461+
},
462+
)
454463

455464
newItem := common.BulkTopic{
456465
ContentType: item.ContentType,
@@ -473,11 +482,17 @@ func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.Bu
473482

474483
messages = append(messages, newItem)
475484
}
476-
485+
resp := BulkSubscribeResponse{
486+
statuses: statuses,
487+
}
488+
responseJSON, err := json.Marshal(resp)
489+
if err != nil {
490+
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
491+
return
492+
}
477493
w.Header().Add("Content-Type", "application/json")
478-
w.WriteHeader(http.StatusOK)
494+
w.Write(responseJSON)
479495

480-
// execute user handler
481496
retry, err := fn(r.Context(), messages)
482497
if err == nil {
483498
writeStatus(w, common.SubscriptionResponseStatusSuccess)
@@ -488,7 +503,6 @@ func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.Bu
488503
writeStatus(w, common.SubscriptionResponseStatusRetry)
489504
return
490505
}
491-
492506
writeStatus(w, common.SubscriptionResponseStatusDrop)
493507
})))
494508

0 commit comments

Comments
 (0)