Skip to content

Commit 6889cf1

Browse files
committed
Adds proper parsing for repeating groups to keep group field order
1 parent d591447 commit 6889cf1

File tree

4 files changed

+452
-66
lines changed

4 files changed

+452
-66
lines changed

in_session.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,11 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
232232
nextSeqNum := seqNum
233233
msg := NewMessage()
234234
for _, msgBytes := range msgs {
235-
_ = ParseMessageWithDataDictionary(msg, bytes.NewBuffer(msgBytes), session.transportDataDictionary, session.appDataDictionary)
235+
err = ParseMessageWithDataDictionary(msg, bytes.NewBuffer(msgBytes), session.transportDataDictionary, session.appDataDictionary)
236+
if err != nil {
237+
session.log.OnEventf("Resend Msg Parse Error: %v, %v", err.Error(), bytes.NewBuffer(msgBytes).String())
238+
return // We cant continue with a message that cant be parsed correctly.
239+
}
236240
msgType, _ := msg.Header.GetBytes(tagMsgType)
237241
sentMessageSeqNum, _ := msg.Header.GetInt(tagMsgSeqNum)
238242

message.go

Lines changed: 225 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,19 @@ import (
2727
// Header is first section of a FIX Message.
2828
type Header struct{ FieldMap }
2929

30+
// msgparser contains message parsing vars needed to parse a string into a message.
31+
type msgParser struct {
32+
msg *Message
33+
transportDataDictionary *datadictionary.DataDictionary
34+
appDataDictionary *datadictionary.DataDictionary
35+
rawBytes []byte
36+
fieldIndex int
37+
parsedFieldBytes *TagValue
38+
trailerBytes []byte
39+
foundBody bool
40+
foundTrailer bool
41+
}
42+
3043
// in the message header, the first 3 tags in the message header must be 8,9,35.
3144
func headerFieldOrdering(i, j Tag) bool {
3245
var ordering = func(t Tag) uint32 {
@@ -152,124 +165,135 @@ func ParseMessageWithDataDictionary(
152165
msg *Message,
153166
rawMessage *bytes.Buffer,
154167
transportDataDictionary *datadictionary.DataDictionary,
155-
_ *datadictionary.DataDictionary,
168+
appDataDictionary *datadictionary.DataDictionary,
156169
) (err error) {
157-
msg.Header.Clear()
158-
msg.Body.Clear()
159-
msg.Trailer.Clear()
160-
msg.rawMessage = rawMessage
170+
// Create msgparser before we go any further.
171+
mp := &msgParser{
172+
msg: msg,
173+
transportDataDictionary: transportDataDictionary,
174+
appDataDictionary: appDataDictionary,
175+
}
176+
mp.msg.rawMessage = rawMessage
177+
mp.rawBytes = rawMessage.Bytes()
161178

162-
rawBytes := rawMessage.Bytes()
179+
return doParsing(mp)
180+
}
181+
182+
// doParsing executes the message parsing process.
183+
func doParsing(mp *msgParser) (err error) {
184+
// Initialize for parsing.
185+
mp.msg.Header.Clear()
186+
mp.msg.Body.Clear()
187+
mp.msg.Trailer.Clear()
163188

164-
// Allocate fields in one chunk.
189+
// Allocate expected message fields in one chunk.
165190
fieldCount := 0
166-
for _, b := range rawBytes {
191+
for _, b := range mp.rawBytes {
167192
if b == '\001' {
168193
fieldCount++
169194
}
170195
}
171-
172196
if fieldCount == 0 {
173-
return parseError{OrigError: fmt.Sprintf("No Fields detected in %s", string(rawBytes))}
197+
return parseError{OrigError: fmt.Sprintf("No Fields detected in %s", string(mp.rawBytes))}
174198
}
175-
176-
if cap(msg.fields) < fieldCount {
177-
msg.fields = make([]TagValue, fieldCount)
199+
if cap(mp.msg.fields) < fieldCount {
200+
mp.msg.fields = make([]TagValue, fieldCount)
178201
} else {
179-
msg.fields = msg.fields[0:fieldCount]
202+
mp.msg.fields = mp.msg.fields[0:fieldCount]
180203
}
181204

182-
fieldIndex := 0
183-
184205
// Message must start with begin string, body length, msg type.
185-
if rawBytes, err = extractSpecificField(&msg.fields[fieldIndex], tagBeginString, rawBytes); err != nil {
206+
// Get begin string.
207+
if mp.rawBytes, err = extractSpecificField(&mp.msg.fields[mp.fieldIndex], tagBeginString, mp.rawBytes); err != nil {
186208
return
187209
}
210+
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
188211

189-
msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
190-
fieldIndex++
191-
192-
parsedFieldBytes := &msg.fields[fieldIndex]
193-
if rawBytes, err = extractSpecificField(parsedFieldBytes, tagBodyLength, rawBytes); err != nil {
212+
// Get body length.
213+
mp.fieldIndex++
214+
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
215+
if mp.rawBytes, err = extractSpecificField(mp.parsedFieldBytes, tagBodyLength, mp.rawBytes); err != nil {
194216
return
195217
}
218+
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
196219

197-
msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
198-
fieldIndex++
199-
200-
parsedFieldBytes = &msg.fields[fieldIndex]
201-
if rawBytes, err = extractSpecificField(parsedFieldBytes, tagMsgType, rawBytes); err != nil {
220+
// Get msg type.
221+
mp.fieldIndex++
222+
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
223+
if mp.rawBytes, err = extractSpecificField(mp.parsedFieldBytes, tagMsgType, mp.rawBytes); err != nil {
202224
return
203225
}
226+
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
227+
204228

229+
// Start parsing.
230+
mp.fieldIndex++
205231
xmlDataLen := 0
206232
xmlDataMsg := false
207-
208-
msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
209-
fieldIndex++
210-
211-
trailerBytes := []byte{}
212-
foundBody := false
213-
foundTrailer := false
233+
mp.trailerBytes = []byte{}
234+
mp.foundBody = false
235+
mp.foundTrailer = false
214236
for {
215-
parsedFieldBytes = &msg.fields[fieldIndex]
237+
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
216238
if xmlDataLen > 0 {
217-
rawBytes, err = extractXMLDataField(parsedFieldBytes, rawBytes, xmlDataLen)
239+
mp.rawBytes, err = extractXMLDataField(mp.parsedFieldBytes, mp.rawBytes, xmlDataLen)
218240
xmlDataLen = 0
219241
xmlDataMsg = true
220242
} else {
221-
rawBytes, err = extractField(parsedFieldBytes, rawBytes)
243+
mp.rawBytes, err = extractField(mp.parsedFieldBytes, mp.rawBytes)
222244
}
223245
if err != nil {
224246
return
225247
}
226248

227249
switch {
228-
case isHeaderField(parsedFieldBytes.tag, transportDataDictionary):
229-
msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
230-
case isTrailerField(parsedFieldBytes.tag, transportDataDictionary):
231-
msg.Trailer.add(msg.fields[fieldIndex : fieldIndex+1])
232-
foundTrailer = true
250+
case isHeaderField(mp.parsedFieldBytes.tag, mp.transportDataDictionary):
251+
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
252+
case isTrailerField(mp.parsedFieldBytes.tag, mp.transportDataDictionary):
253+
mp.msg.Trailer.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
254+
mp.foundTrailer = true
255+
case isNumInGroupField(mp.msg, []Tag{mp.parsedFieldBytes.tag}, mp.appDataDictionary):
256+
parseGroup(mp, []Tag{mp.parsedFieldBytes.tag})
233257
default:
234-
foundBody = true
235-
trailerBytes = rawBytes
236-
msg.Body.add(msg.fields[fieldIndex : fieldIndex+1])
258+
mp.foundBody = true
259+
mp.trailerBytes = mp.rawBytes
260+
mp.msg.Body.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
237261
}
238-
if parsedFieldBytes.tag == tagCheckSum {
262+
if mp.parsedFieldBytes.tag == tagCheckSum {
239263
break
240264
}
241265

242-
if !foundBody {
243-
msg.bodyBytes = rawBytes
266+
if !mp.foundBody {
267+
mp.msg.bodyBytes = mp.rawBytes
244268
}
245269

246-
if parsedFieldBytes.tag == tagXMLDataLen {
247-
xmlDataLen, _ = msg.Header.GetInt(tagXMLDataLen)
270+
if mp.parsedFieldBytes.tag == tagXMLDataLen {
271+
xmlDataLen, _ = mp.msg.Header.GetInt(tagXMLDataLen)
248272
}
249-
fieldIndex++
273+
mp.fieldIndex++
250274
}
251275

252276
// This will happen if there are no fields in the body
253-
if foundTrailer && !foundBody {
254-
trailerBytes = rawBytes
255-
msg.bodyBytes = nil
277+
if mp.foundTrailer && !mp.foundBody {
278+
mp.trailerBytes = mp.rawBytes
279+
mp.msg.bodyBytes = nil
256280
}
257281

258282
// Body length would only be larger than trailer if fields out of order.
259-
if len(msg.bodyBytes) > len(trailerBytes) {
260-
msg.bodyBytes = msg.bodyBytes[:len(msg.bodyBytes)-len(trailerBytes)]
283+
if len(mp.msg.bodyBytes) > len(mp.trailerBytes) {
284+
mp.msg.bodyBytes = mp.msg.bodyBytes[:len(mp.msg.bodyBytes)-len(mp.trailerBytes)]
261285
}
262286

263287
length := 0
264-
for _, field := range msg.fields {
288+
for _, field := range mp.msg.fields {
265289
switch field.tag {
266290
case tagBeginString, tagBodyLength, tagCheckSum: // Tags do not contribute to length.
267291
default:
268292
length += field.length()
269293
}
270294
}
271295

272-
bodyLength, err := msg.Header.GetInt(tagBodyLength)
296+
bodyLength, err := mp.msg.Header.GetInt(tagBodyLength)
273297
if err != nil {
274298
err = parseError{OrigError: err.Error()}
275299
} else if length != bodyLength && !xmlDataMsg {
@@ -279,6 +303,149 @@ func ParseMessageWithDataDictionary(
279303
return
280304
}
281305

306+
// parseGroup iterates through a repeating group to maintain correct order of those fields.
307+
func parseGroup(mp *msgParser, tags []Tag) {
308+
mp.foundBody = true
309+
dm := mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1]
310+
fields := getGroupFields(mp.msg, tags, mp.appDataDictionary)
311+
312+
for {
313+
mp.fieldIndex++
314+
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
315+
mp.rawBytes, _ = extractField(mp.parsedFieldBytes, mp.rawBytes)
316+
mp.trailerBytes = mp.rawBytes
317+
318+
// Is this field a member for the group.
319+
if isGroupMember(mp.parsedFieldBytes.tag, fields) {
320+
// Is this field a nested repeating group.
321+
if isNumInGroupField(mp.msg, append(tags, mp.parsedFieldBytes.tag), mp.appDataDictionary) {
322+
dm = append(dm, *mp.parsedFieldBytes)
323+
tags = append(tags, mp.parsedFieldBytes.tag)
324+
fields = getGroupFields(mp.msg, tags, mp.appDataDictionary)
325+
continue
326+
}
327+
// Add the field member to the group.
328+
dm = append(dm, *mp.parsedFieldBytes)
329+
} else if isHeaderField(mp.parsedFieldBytes.tag, mp.transportDataDictionary) {
330+
// Found a header tag for some reason..
331+
mp.msg.Body.add(dm)
332+
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
333+
break
334+
} else if isTrailerField(mp.parsedFieldBytes.tag, mp.transportDataDictionary) {
335+
// Found the trailer at the end of the message.
336+
mp.msg.Body.add(dm)
337+
mp.msg.Trailer.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
338+
mp.foundTrailer = true
339+
break
340+
} else {
341+
// Found a body field outside the group.
342+
searchTags := []Tag{mp.parsedFieldBytes.tag}
343+
// Is this a new group not inside the existing group.
344+
if isNumInGroupField(mp.msg, searchTags, mp.appDataDictionary) {
345+
// Add the current repeating group.
346+
mp.msg.Body.add(dm)
347+
// Cycle again with the new group.
348+
dm = mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1]
349+
fields = getGroupFields(mp.msg, searchTags, mp.appDataDictionary)
350+
continue
351+
} else {
352+
if len(tags) > 1 {
353+
searchTags = tags[:len(tags)-1]
354+
}
355+
// Did this tag occur after a nested group and belongs to the parent group.
356+
if isNumInGroupField(mp.msg, searchTags, mp.appDataDictionary) {
357+
// Add the field member to the group.
358+
dm = append(dm, *mp.parsedFieldBytes)
359+
// Continue parsing the parent group.
360+
fields = getGroupFields(mp.msg, searchTags, mp.appDataDictionary)
361+
continue
362+
}
363+
// Add the repeating group.
364+
mp.msg.Body.add(dm)
365+
// Add the next body field.
366+
mp.msg.Body.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
367+
}
368+
break
369+
}
370+
}
371+
}
372+
373+
// isNumInGroupField evaluates if this tag is the start of a repeating group.
374+
// tags slice will contain multiple tags if the tag in question is found while processing a group already.
375+
func isNumInGroupField(msg *Message, tags []Tag, appDataDictionary *datadictionary.DataDictionary) bool {
376+
if appDataDictionary != nil {
377+
msgt, err := msg.MsgType()
378+
if err != nil {
379+
return false
380+
}
381+
mm, ok := appDataDictionary.Messages[msgt]
382+
if ok {
383+
fields := mm.Fields
384+
for idx, tag := range tags {
385+
fd, ok := fields[int(tag)]
386+
if ok {
387+
if idx == len(tags) - 1 {
388+
if len(fd.Fields) > 0 {
389+
return true
390+
}
391+
} else {
392+
// Map nested fields.
393+
newFields := make(map[int]*datadictionary.FieldDef)
394+
for _, ff := range fd.Fields {
395+
newFields[ff.Tag()] = ff
396+
}
397+
fields = newFields
398+
}
399+
}
400+
}
401+
}
402+
}
403+
return false
404+
}
405+
406+
// getGroupFields gets the relevant fields for parsing a repeating group if this tag is the start of a repeating group.
407+
// tags slice will contain multiple tags if the tag in question is found while processing a group already.
408+
func getGroupFields(msg *Message, tags []Tag, appDataDictionary *datadictionary.DataDictionary) (fields []*datadictionary.FieldDef) {
409+
if appDataDictionary != nil {
410+
msgt, err := msg.MsgType()
411+
if err != nil {
412+
return
413+
}
414+
mm, ok := appDataDictionary.Messages[msgt]
415+
if ok {
416+
fields := mm.Fields
417+
for idx, tag := range tags {
418+
fd, ok := fields[int(tag)]
419+
if ok {
420+
if idx == len(tags) - 1 {
421+
if len(fd.Fields) > 0 {
422+
return fd.Fields
423+
}
424+
} else {
425+
// Map nested fields.
426+
newFields := make(map[int]*datadictionary.FieldDef)
427+
for _, ff := range fd.Fields {
428+
newFields[ff.Tag()] = ff
429+
}
430+
fields = newFields
431+
}
432+
}
433+
}
434+
}
435+
}
436+
return
437+
}
438+
439+
// isGroupMember evaluates if this tag belongs to a repeating group.
440+
func isGroupMember(tag Tag, fields []*datadictionary.FieldDef) bool {
441+
for _, f := range fields{
442+
if f.Tag() == int(tag) {
443+
return true
444+
}
445+
}
446+
return false
447+
}
448+
282449
func isHeaderField(tag Tag, dataDict *datadictionary.DataDictionary) bool {
283450
if tag.IsHeader() {
284451
return true

0 commit comments

Comments
 (0)