@@ -22,6 +22,7 @@ import (
22
22
"path"
23
23
"strconv"
24
24
"strings"
25
+ "sync"
25
26
"time"
26
27
27
28
"github.com/pkg/errors"
@@ -41,12 +42,14 @@ type fileStore struct {
41
42
sessionFname string
42
43
senderSeqNumsFname string
43
44
targetSeqNumsFname string
44
- bodyFile * os.File
45
- headerFile * os.File
46
- sessionFile * os.File
47
- senderSeqNumsFile * os.File
48
- targetSeqNumsFile * os.File
49
- fileSync bool
45
+
46
+ fileMu sync.Mutex
47
+ bodyFile * os.File
48
+ headerFile * os.File
49
+ sessionFile * os.File
50
+ senderSeqNumsFile * os.File
51
+ targetSeqNumsFile * os.File
52
+ fileSync bool
50
53
}
51
54
52
55
// NewStoreFactory returns a file-based implementation of MessageStoreFactory.
@@ -218,6 +221,9 @@ func (store *fileStore) populateCache() (creationTimePopulated bool, err error)
218
221
}
219
222
220
223
func (store * fileStore ) setSession () error {
224
+ store .fileMu .Lock ()
225
+ defer store .fileMu .Unlock ()
226
+
221
227
if _ , err := store .sessionFile .Seek (0 , io .SeekStart ); err != nil {
222
228
return fmt .Errorf ("unable to rewind file: %s: %s" , store .sessionFname , err .Error ())
223
229
}
@@ -238,6 +244,8 @@ func (store *fileStore) setSession() error {
238
244
}
239
245
240
246
func (store * fileStore ) setSeqNum (f * os.File , seqNum int ) error {
247
+ store .fileMu .Lock ()
248
+ defer store .fileMu .Unlock ()
241
249
if _ , err := f .Seek (0 , io .SeekStart ); err != nil {
242
250
return fmt .Errorf ("unable to rewind file: %s: %s" , f .Name (), err .Error ())
243
251
}
@@ -304,6 +312,8 @@ func (store *fileStore) SetCreationTime(_ time.Time) {
304
312
}
305
313
306
314
func (store * fileStore ) SaveMessage (seqNum int , msg []byte ) error {
315
+ store .fileMu .Lock ()
316
+ defer store .fileMu .Unlock ()
307
317
offset , err := store .bodyFile .Seek (0 , io .SeekEnd )
308
318
if err != nil {
309
319
return fmt .Errorf ("unable to seek to end of file: %s: %s" , store .bodyFname , err .Error ())
@@ -339,6 +349,9 @@ func (store *fileStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []
339
349
}
340
350
341
351
func (store * fileStore ) IterateMessages (beginSeqNum , endSeqNum int , cb func ([]byte ) error ) error {
352
+ store .fileMu .Lock ()
353
+ defer store .fileMu .Unlock ()
354
+
342
355
// Sync files and seek to start of header file
343
356
if err := store .bodyFile .Sync (); err != nil {
344
357
return fmt .Errorf ("unable to flush file: %s: %s" , store .bodyFname , err .Error ())
@@ -386,19 +399,19 @@ func (store *fileStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error
386
399
387
400
// Close closes the store's files.
388
401
func (store * fileStore ) Close () error {
389
- if err := closeFile (store .bodyFile ); err != nil {
402
+ if err := closeSyncFile (store .bodyFile ); err != nil {
390
403
return err
391
404
}
392
- if err := closeFile (store .headerFile ); err != nil {
405
+ if err := closeSyncFile (store .headerFile ); err != nil {
393
406
return err
394
407
}
395
- if err := closeFile (store .sessionFile ); err != nil {
408
+ if err := closeSyncFile (store .sessionFile ); err != nil {
396
409
return err
397
410
}
398
- if err := closeFile (store .senderSeqNumsFile ); err != nil {
411
+ if err := closeSyncFile (store .senderSeqNumsFile ); err != nil {
399
412
return err
400
413
}
401
- if err := closeFile (store .targetSeqNumsFile ); err != nil {
414
+ if err := closeSyncFile (store .targetSeqNumsFile ); err != nil {
402
415
return err
403
416
}
404
417
0 commit comments