@@ -66,20 +66,6 @@ type topicEventJSON struct {
66
66
Topic string `json:"topic"`
67
67
// PubsubName is name of the pub/sub this message came from
68
68
PubsubName string `json:"pubsubname"`
69
- // EntryID is the unique identifier of the entry
70
- EntryID string `json:"entryId"`
71
- // Event is a map of strings that contains additional information about the event
72
- Event map [string ]string `json:"event"`
73
- // Time is the timestamp of the event
74
- Time string `json:"time"`
75
- // TraceID is the identifier of the trace
76
- TraceID string `json:"traceid"`
77
- // TraceParent is the identifier of the parent span
78
- TraceParent string `json:"traceparent"`
79
- // TraceState is the state of the trace
80
- TraceState string `json:"tracestate"`
81
- // Metadata is an interface that contains any other metadata associated with the event
82
- Metadata interface {} `json:"metadata"`
83
69
}
84
70
85
71
func (s * Server ) registerBaseHandler () {
@@ -270,8 +256,6 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
270
256
return nil
271
257
}
272
258
273
-
274
-
275
259
func (in topicEventJSON ) getData () (data any , rawData []byte ) {
276
260
var (
277
261
err error
@@ -318,6 +302,42 @@ func (in topicEventJSON) getData() (data any, rawData []byte) {
318
302
return data , rawData
319
303
}
320
304
305
+ func (in BulkTopicJson ) getData () (data any , rawData []byte ) {
306
+ var (
307
+ err error
308
+ v any
309
+ )
310
+ if len (in .Data ) > 0 {
311
+ rawData = []byte (in .Data )
312
+ data = rawData
313
+ // We can assume that rawData is valid JSON
314
+ // without checking in.DataContentType == "application/json".
315
+ if err = json .Unmarshal (rawData , & v ); err == nil {
316
+ data = v
317
+ // Handling of JSON base64 encoded or escaped in a string.
318
+ if str , ok := v .(string ); ok {
319
+ // This is the path that will most likely succeed.
320
+ var (
321
+ vString any
322
+ decoded []byte
323
+ )
324
+ if err = json .Unmarshal ([]byte (str ), & vString ); err == nil {
325
+ data = vString
326
+ } else if decoded , err = base64 .StdEncoding .DecodeString (str ); err == nil {
327
+ // Decoded Base64 encoded JSON does not seem to be in the spec
328
+ // but it is in existing unit tests so this handles that case.
329
+ var vBase64 any
330
+ if err = json .Unmarshal (decoded , & vBase64 ); err == nil {
331
+ data = vBase64
332
+ }
333
+ }
334
+ }
335
+ }
336
+ }
337
+
338
+ return data , rawData
339
+ }
340
+
321
341
type BulkTopicJson struct {
322
342
ContentType string `json:"contentType"`
323
343
EntryID string `json:"entryId"`
@@ -404,14 +424,25 @@ func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.Bu
404
424
return
405
425
}
406
426
427
+ data , rawData := item .getData ()
428
+
429
+ if item .PubsubName == "" {
430
+ item .PubsubName = sub .PubsubName
431
+ }
432
+
433
+ if item .Topic == "" {
434
+ item .Topic = sub .Topic
435
+ }
436
+
407
437
408
438
newItem := common.BulkTopic {
409
439
ContentType : item .ContentType ,
410
440
EntryID : item .EntryID ,
411
441
Event : item .Event ,
412
- Data : item .Data ,
442
+ Data : data ,
443
+ RawData : rawData ,
413
444
DataContentType : item .DataContentType ,
414
- ID : item .ID ,
445
+ ID : item .EntryID ,
415
446
PubsubName : item .PubsubName ,
416
447
Source : item .Source ,
417
448
SpecVersion : item .SpecVersion ,
@@ -447,7 +478,6 @@ func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.Bu
447
478
return nil
448
479
}
449
480
450
-
451
481
func writeStatus (w http.ResponseWriter , s string ) {
452
482
status := & common.SubscriptionResponse {Status : s }
453
483
if err := json .NewEncoder (w ).Encode (status ); err != nil {
0 commit comments