-
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathMessanger.pas
2058 lines (1770 loc) · 70.1 KB
/
Messanger.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
{-------------------------------------------------------------------------------
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/.
-------------------------------------------------------------------------------}
{===============================================================================
Messanger
Small library for message-based intraprocess (ie. between threads of single
process) communication.
To use this library, create an instance of TMessanger class. This object
is managing all communication endpoints and their connections. It should be
created and completely managed only in one thread, do not access it in
other threads.
For each thread that wants to communicate, create exactly one endpoint
instance (instance of class TMessangerEndpoint) - refrain from using more
than one endpoint in a single thread.
Do NOT create these endpoints directly, always use TMessanger instance and
its method CreateEndpoint to create the endpoint object - do this in the
thread that is managing the messanger. Then pass this object by whatever
means to the thread that will be using it.
On the other hand, always free the endpoint from the thread that is using
it, do not pass it back to thread managing the messanger.
WARNING - all endpoint objects must be freed before you destroy the
messanger, otherwise an exception will be raised while freeing
the messanger instance.
To communicate, use methods of endpoints to do so. SendMessage for
synchronous sending, PostMessage for asynchronous sending and so on.
To receive messages, you have to call methods WaitForMessage,
FetchMessages, and DispatchMessages repeatedly - or call method Cycle to
do so in one call.
If you want to process the incoming messages, assign one of the OnMessage*
handlers - the endpoint will be dispatching all received messages to the
handler for processing.
Version 2.0.4 (2024-05-03)
Last change 2024-10-04
©2016-2024 František Milt
Contacts:
František Milt: [email protected]
Support:
If you find this code useful, please consider supporting its author(s) by
making a small donation using the following link(s):
https://www.paypal.me/FMilt
Changelog:
For detailed changelog and history please refer to this git repository:
github.com/TheLazyTomcat/Lib.Messanger
Dependencies:
AuxClasses - github.com/TheLazyTomcat/Lib.AuxClasses
* AuxExceptions - github.com/TheLazyTomcat/Lib.AuxExceptions
AuxTypes - github.com/TheLazyTomcat/Lib.AuxTypes
CrossSyncObjs - github.com/TheLazyTomcat/Lib.CrossSyncObjs
MemVector - github.com/TheLazyTomcat/Lib.MemVector
Library AuxExceptions is required only when rebasing local exception classes
(see symbol Messanger_UseAuxExceptions for details).
Library AuxExceptions might also be required as an indirect dependency.
Indirect dependencies:
BasicUIM - github.com/TheLazyTomcat/Lib.BasicUIM
BinaryStreamingLite - github.com/TheLazyTomcat/Lib.BinaryStreamingLite
BitOps - github.com/TheLazyTomcat/Lib.BitOps
BitVector - github.com/TheLazyTomcat/Lib.BitVector
HashBase - github.com/TheLazyTomcat/Lib.HashBase
InterlockedOps - github.com/TheLazyTomcat/Lib.InterlockedOps
LinSyncObjs - github.com/TheLazyTomcat/Lib.LinSyncObjs
ListSorters - github.com/TheLazyTomcat/Lib.ListSorters
NamedSharedItems - github.com/TheLazyTomcat/Lib.NamedSharedItems
SHA1 - github.com/TheLazyTomcat/Lib.SHA1
SharedMemoryStream - github.com/TheLazyTomcat/Lib.SharedMemoryStream
SimpleCPUID - github.com/TheLazyTomcat/Lib.SimpleCPUID
SimpleFutex - github.com/TheLazyTomcat/Lib.SimpleFutex
StaticMemoryStream - github.com/TheLazyTomcat/Lib.StaticMemoryStream
StrRect - github.com/TheLazyTomcat/Lib.StrRect
UInt64Utils - github.com/TheLazyTomcat/Lib.UInt64Utils
WinFileInfo - github.com/TheLazyTomcat/Lib.WinFileInfo
WinSyncObjs - github.com/TheLazyTomcat/Lib.WinSyncObjs
===============================================================================}
unit Messanger;
{
Messanger_UseAuxExceptions
If you want library-specific exceptions to be based on more advanced classes
provided by AuxExceptions library instead of basic Exception class, and don't
want to or cannot change code in this unit, you can define global symbol
Messanger_UseAuxExceptions to achieve this.
}
{$IF Defined(Messanger_UseAuxExceptions)}
{$DEFINE UseAuxExceptions}
{$IFEND}
//------------------------------------------------------------------------------
{$IF Defined(WINDOWS) or Defined(MSWINDOWS)}
{$DEFINE Windows}
{$ELSEIF Defined(LINUX) and Defined(FPC)}
{$DEFINE Linux}
{$ELSE}
{$MESSAGE FATAL 'Unsupported operating system.'}
{$IFEND}
{$IFDEF FPC}
{$MODE ObjFPC}
{$MODESWITCH DuplicateLocals+}
{$ENDIF}
{$H+}
interface
uses
SysUtils, Classes,{$IFNDEF Windows} BaseUnix,{$ENDIF}
AuxTypes, AuxClasses, MemVector, CrossSyncObjs
{$IFDEF UseAuxExceptions}, AuxExceptions{$ENDIF};
{===============================================================================
Library-specific exceptions
===============================================================================}
type
EMsgrException = class({$IFDEF UseAuxExceptions}EAEGeneralException{$ELSE}Exception{$ENDIF});
EMsgrTimestampError = class(EMsgrException);
EMsgrIndexOutOfBounds = class(EMsgrException);
EMsgrInvalidValue = class(EMsgrException);
EMsgrInvalidState = class(EMsgrException);
EMsgrNoResources = class(EMsgrException);
{===============================================================================
Common types and constants
===============================================================================}
type
TMsgrEndpointID = UInt16; PMsgrEndpointID = ^TMsgrEndpointID;
TMsgrPriority = Int32; PMsgrPriority = ^TMsgrPriority;
TMsgrTimestamp = Int64; PMsgrTimestamp = ^TMsgrTimestamp;
TMsgrParam = PtrInt; PMsgrParam = ^TMsgrParam;
TMsgrSendParam = Pointer; PMsgrSendParam = ^TMsgrSendParam;
type
{
TMsgrMessage is used only internally, do not use it anywhere else.
}
TMsgrMessage = packed record
Sender: TMsgrEndpointID;
Recipient: TMsgrEndpointID;
Priority: TMsgrPriority;
Timestamp: TMsgrTimestamp;
Parameter1: TMsgrParam;
Parameter2: TMsgrParam;
Parameter3: TMsgrParam;
Parameter4: TMsgrParam;
SendParam: TMsgrSendParam; // used for synchronous communication
end;
PMsgrMessage = ^TMsgrMessage;
{
TMsgrMessageIn is used in places where an incoming messages is passed for
processing by the user.
}
TMsgrMessageIn = record
Sender: TMsgrEndpointID;
Parameter1: TMsgrParam;
Parameter2: TMsgrParam;
Parameter3: TMsgrParam;
Parameter4: TMsgrParam;
end;
{
TMsgrMessageOut is used in sending of messages, where user can pass this
structure in place of number of individual parameters.
}
TMsgrMessageOut = record
Recipient: TMsgrEndpointID;
Priority: TMsgrPriority;
Parameter1: TMsgrParam;
Parameter2: TMsgrParam;
Parameter3: TMsgrParam;
Parameter4: TMsgrParam;
end;
const
{
When selecting MSGR_ID_BROADCAST as a recipient, the message will be posted
to all existing endpoints (including the sender).
WARNING - this is only possible for posted messages, trying to broadcast
synchronous message will always fail.
}
MSGR_ID_BROADCAST = TMsgrEndpointID(High(TMsgrEndpointID)); // $FFFF
{
Note that, when received messages are sorted for processing, the priority
takes precedence over timestamp, meaning messages with higher priority are
processed sooner than messages sent before them but with lower priority.
}
MSGR_PRIORITY_MINIMAL = TMsgrPriority(-100000);
MSGR_PRIORITY_EXTREME_LOW = TMsgrPriority(-10000);
MSGR_PRIORITY_VERY_LOW = TMsgrPriority(-1000);
MSGR_PRIORITY_LOW = TMsgrPriority(-100);
MSGR_PRIORITY_BELOW_NORMAL = TMsgrPriority(-10);
MSGR_PRIORITY_NORMAL = TMsgrPriority(0);
MSGR_PRIORITY_ABOVE_NORMAL = TMsgrPriority(10);
MSGR_PRIORITY_HIGH = TMsgrPriority(100);
MSGR_PRIORITY_VERY_HIGH = TMsgrPriority(1000);
MSGR_PRIORITY_EXTREME_HIGH = TMsgrPriority(10000);
MSGR_PRIORITY_ABSOLUTE = TMsgrPriority(100000);
MSGR_PRIORITY_MIN = MSGR_PRIORITY_MINIMAL;
MSGR_PRIORITY_MAX = MSGR_PRIORITY_ABSOLUTE;
// infinite timeout
INFINITE = UInt32(-1);
{===============================================================================
--------------------------------------------------------------------------------
TMsgrMessageVector
--------------------------------------------------------------------------------
===============================================================================}
{
Class TMsgrMessageVector is only for internal purposes.
}
{===============================================================================
TMsgrMessageVector - class declaration
===============================================================================}
type
TMsgrMessageVector = class(TMemVector)
protected
Function GetItem(Index: Integer): TMsgrMessage; virtual;
procedure SetItem(Index: Integer; Value: TMsgrMessage); virtual;
Function ItemCompare(Item1,Item2: Pointer): Integer; override;
public
constructor Create; overload;
constructor Create(Memory: Pointer; Count: Integer); overload;
Function First: TMsgrMessage; reintroduce;
Function Last: TMsgrMessage; reintroduce;
Function IndexOf(Item: TMsgrMessage): Integer; reintroduce;
Function Find(Item: TMsgrMessage; out Index: Integer): Boolean; reintroduce;
Function Add(Item: TMsgrMessage): Integer; reintroduce;
procedure Insert(Index: Integer; Item: TMsgrMessage); reintroduce;
Function Remove(Item: TMsgrMessage): Integer; reintroduce;
Function Extract(Item: TMsgrMessage): TMsgrMessage; reintroduce;
property Items[Index: Integer]: TMsgrMessage read GetItem write SetItem; default;
end;
{===============================================================================
--------------------------------------------------------------------------------
TMsgrBufferedMessagesVector
--------------------------------------------------------------------------------
===============================================================================}
{
Class TMsgrBufferedMessagesVector is only for internal purposes.
}
{===============================================================================
TMsgrBufferedMessagesVector - class declaration
===============================================================================}
type
TMsgrBufferedMessageVector = class(TMsgrMessageVector)
protected
Function ItemCompare(Item1,Item2: Pointer): Integer; override;
end;
{===============================================================================
--------------------------------------------------------------------------------
TMsgrWaiter
--------------------------------------------------------------------------------
===============================================================================}
{
Class TMsgrWaiter is only for internal purposes.
}
type
TMsgrWaiterResult = (wtrMessage,wtrTimeout,wtrError,wtrSentMessage,
wtrSendReleased,wtrSendProcessed);
TMsgrWaiterResults = set of TMsgrWaiterResult;
{===============================================================================
TMsgrWaiter - class declaration
===============================================================================}
type
TMsgrWaiter = class(TObject)
protected
fInterlock: TCriticalSectionRTL;
fEvent: TEvent;
fFlags: UInt32;
procedure Initialize; virtual;
procedure Finalize; virtual;
public
constructor Create;
destructor Destroy; override;
procedure SentMessageReceived; virtual;
procedure PostedMessageReceived; virtual;
procedure ReleaseSendMessage(var LocalFlags: UInt32; Processed: Boolean); virtual;
Function WaitForMessage(Timeout: UInt32): TMsgrWaiterResult; virtual;
Function WaitForRelease(var LocalFlags: UInt32): TMsgrWaiterResults; virtual;
end;
{===============================================================================
--------------------------------------------------------------------------------
TMessangerEndpoint
--------------------------------------------------------------------------------
===============================================================================}
type
{
TMsgrDispatchFlag
Dispatch flags are serving two purposes - on enter to the handler, they can
contain flags the handler can use to discern details about the currently
processed message and also about the state of the messanger endpoint.
Secondly, the handler can include or exclude some flags from the set - some
flags are checked after the handle returns, and their presence or absence is
used to control further dispatching or workings of the endpoint.
Individual flags can be in, out, or in-out in nature.
In flags are included before the handler is called, so it can probe them.
Removing or adding them has no effect as they are ignored after the handler
returns.
Out flags are never included before the call, but handler can include them
to control further processing.
In-out flags can be both included before the call and also excluded or
included to the set by the handler.
mdfSentMessage (in) - informs the handler that the current message
was sent synchronously (ie. using Send* method,
not Post* methods) and sender is waiting for
its processing
mdfUndeliveredMessage (in) - currently processed message is an undelivered
buffered message
mdfSendBlocked (in) - any sending or posting is blocked and will fail
mdfStopDispatching (out) - when added by the handler, orders the dispatcher
to stop dispatching further messages after the
current one
mdfAutoCycle (in-out) - when included on enter, informs the handler
that the endpoint is running the autocycle, by
removing this flag, handler can break the
autocycle, note that adding this flag when it
is not present will NOT activate the autocycle
mdfDirectDispatch (in) - processed message was sent to this endpoint
(ie. the sender is the same as recipient)
}
TMsgrDispatchFlag = (mdfSentMessage,mdfUndeliveredMessage,mdfSendBlocked,
mdfStopDispatching,mdfAutoCycle,mdfDirectDispatch);
TMsgrDispatchFlags = set of TMsgrDispatchFlag;
TMsgrWaitResult = (mwrMessage,mwrTimeout,mwrError);
TMsgrMessageInEvent = procedure(Sender: TObject; Msg: TMsgrMessageIn; var Flags: TMsgrDispatchFlags) of object;
TMsgrMessageInCallback = procedure(Sender: TObject; Msg: TMsgrMessageIn; var Flags: TMsgrDispatchFlags);
TMsgrMessageOutEvent = procedure(Sender: TObject; Msg: TMsgrMessageOut; var Flags: TMsgrDispatchFlags) of object;
TMsgrMessageOutCallback = procedure(Sender: TObject; Msg: TMsgrMessageOut; var Flags: TMsgrDispatchFlags);
{===============================================================================
TMessangerEndpoint - class declaration
===============================================================================}
type
TMessangerEndpoint = class(TCustomObject)
protected
fWorkingThread: {$IFDEF Windows}DWORD{$ELSE}pid_t{$ENDIF};
fMessanger: TObject; // should be TMessanger class
fEndpointID: TMsgrEndpointID;
fAutoBuffSend: Boolean;
fSendLevelMax: Integer;
// runtime variables
fSendLevel: Integer;
fAutoCycle: Boolean;
fSendBlocked: Boolean;
// synchronizers
fIncomingSynchronizer: TCriticalSectionRTL; // protects vector of incoming messages
fWaiter: TMsgrWaiter;
// message storage vectors
fIncomingSentMessages: TMsgrMessageVector;
fIncomingPostedMessages: TMsgrMessageVector;
fReceivedSentMessages: TMsgrMessageVector;
fReceivedPostedMessages: TMsgrMessageVector;
fBufferedMessages: TMsgrBufferedMessageVector;
fUndeliveredMessages: TMsgrMessageVector; // stores undelivered buffered messages
fVectorsReady: Boolean; // used to indicate successful creation
// events
fOnMessageEvent: TMsgrMessageInEvent;
fOnMessageCallback: TMsgrMessageInCallback;
fOnUndeliveredEvent: TMsgrMessageOutEvent;
fOnUndeliveredCallback: TMsgrMessageOutCallback;
fOnDestroyingEvent: TNotifyEvent;
fOnDestroyingCallback: TNotifyCallback;
// getters, setters
Function GetMessageCount: Integer;
Function GetMessage(Index: Integer): TMsgrMessage;
// methods called from other (sender) threads
procedure ReceiveSentMessages(Messages: PMsgrMessage; Count: Integer); virtual;
procedure ReceivePostedMessages(Messages: PMsgrMessage; Count: Integer); virtual;
// message fetching
procedure SentMessagesFetch; virtual;
// message dispatching
procedure SentMessagesDispatch; virtual;
procedure PostedMessagesDispatch; virtual;
procedure UndeliveredDispatch; virtual;
Function DirectDispatch(Msg: TMsgrMessage): Boolean; virtual;
// events firing
procedure DoMessage(Msg: TMsgrMessageIn; var Flags: TMsgrDispatchFlags); virtual;
procedure DoUndelivered(Msg: TMsgrMessageOut; var Flags: TMsgrDispatchFlags); virtual;
procedure DoDestroying; virtual;
// init/final
procedure Initialize(EndpointID: TMsgrEndpointID; Messanger: TObject); virtual;
procedure Finalize; virtual;
// utility
Function MsgOutToMsg(Msg: TMsgrMessageOut; SendParamPtr: PMsgrSendParam): TMsgrMessage; virtual;
Function MsgToMsgIn(Msg: TMsgrMessage): TMsgrMessageIn; virtual;
Function MsgToMsgOut(Msg: TMsgrMessage): TMsgrMessageOut; virtual;
public
{
Create
Do not call the constructor, use TMessanger instance to create a new
endpoint object.
}
constructor Create(EndpointID: TMsgrEndpointID; Messanger: TObject);
{
Destroy
During the object destruction, it is possible that events/callbacks
OnMessage* and OnUndelivered* will be fired. It is to process messages
received from last dispatching and unposted buffered messages respectively.
Note that for both the SendBlocked is true.
After that, OnDestroying* event/callback is fired - if both are assigned,
then only the event will be called.
}
destructor Destroy; override;
{
ThreadInit
If you want to use more than one instance of TMessangerEndpoint in a single
thread, call this method as soon as possible at least once on all intances
within the context of the thread that will be using them.
}
procedure ThreadInit; virtual;
// message sending methods
{
SendMessage
SendMessage will send the passed message and then waits - it will not
return until the message has been processed by the recipient.
It will return true only when the message was received AND processed
(passed to message handler) by the recipient. So, in case the message was
delivered, but not processed (eg. because the recipient was destroyed),
it will return false.
The endpoint can still receive and dispatch other sent messages (but not
the posted ones).
This means that, while you are waiting for the SendMessage to return, the
endpoint can and will call message handler.
It is to prevent a deadlock when the recipient responds by sending another
synchronous message back to the sender within its message dispatch.
It is allowed to send another synchronous message from a message handler,
but as this creates a complex recursion, it is limited - see SendLevel and
SendLevelMaximum properties.
It is not allowed to use MSGR_ID_BROADCAST as a recipient - synchronous
messages cannot be broadcasted.
If AutomaticBufferedSend is set to true and message sending is not blocked
(see property SendBlocked), then all buffered messages are posted before
the actual sending is performed.
}
Function SendMessage(Recipient: TMsgrEndpointID; P1,P2,P3,P4: TMsgrParam; Priority: TMsgrPriority = MSGR_PRIORITY_NORMAL): Boolean; overload; virtual;
Function SendMessage(Msg: TMsgrMessageOut): Boolean; overload; virtual;
{
PostMessage
PostMessage adds the passed message to the recipient's incoming messages
and immediately exits.
Use MSGR_ID_BROADCAST as a recipient to post the message to all existing
endpoints (sender also receives a copy).
If AutomaticBufferedSend is set to true and message sending is not blocked
(see property SendBlocked), then all buffered messages are posted before
the actual posting is performed.
}
Function PostMessage(Recipient: TMsgrEndpointID; P1,P2,P3,P4: TMsgrParam; Priority: TMsgrPriority = MSGR_PRIORITY_NORMAL): Boolean; overload; virtual;
Function PostMessage(Msg: TMsgrMessageOut): Boolean; overload; virtual;
{
BufferMessage
BufferMessage merely adds the passed message to internal storage, preparing
it for future posting.
This is here to optimize rapid posting of large number of messages - they
can be buffered and then posted all at once, significantly reducing the
operation overhead.
MSGR_ID_BROADCAST can be used as s recipient to post the message to all
existing endpoints (including the sender).
}
procedure BufferMessage(Recipient: TMsgrEndpointID; P1,P2,P3,P4: TMsgrParam; Priority: TMsgrPriority = MSGR_PRIORITY_NORMAL); overload; virtual;
procedure BufferMessage(Msg: TMsgrMessageOut); overload; virtual;
{
PostBufferedMessages
PostBufferedMessages posts all buffered messages at once to their respective
recipients.
If some of the messages cannot be delivered, they are passed at the end of
the processing to the OnUndelivered* event/callback.
If both OnUndeliveredCallback and OnUndeliveredEvent (equivalent to
OnUndelivered) are assigned, then only the event is called. If none is
assigned, then the messages are dropped an deleted.
Note that while in the OnUndelivered* handler, all message sending and
posting is blocked (property SendBlocked is true).
}
procedure PostBufferedMessages; virtual;
// operation methods
{
WaitForMessage
WaitForMessage waits until a new message is received, timeout elapses (you
can use INFINITE constant for infinite wait) or an error occurs.
Whichever happened is indicated by the result.
}
Function WaitForMessage(Timeout: UInt32): TMsgrWaitResult; virtual;
{
FetchMessages
FetchMessages moves incoming messages (those deposited to a shared location
by other endpoints) into local storage (received messages), where they can
be accessed without the overhead of thread synchronization and locking.
}
procedure FetchMessages; virtual;
{
DispatchMessages
When any of OnMessage(Event) or OnMessageCallback is assigned, it will
traverse all received messages, passing each of them to the handler for
processing (this process is called dispatching).
All sent messages are dispatched first, the posted right after them. They
are dispatched in order of ther priority (from higher to lower) and time of
sending/posting (from oldest to newest). Note that priority takes precedence
over the time.
If both OnMessageEvent (which is equivalent to OnMessage) and
OnMessageCallback are assigned, then only the OnMessageEvent is called.
When no event or callback is assigned, it only clears all received messages.
}
procedure DispatchMessages; virtual;
{
ClearMessages
Deletes all received messags without dispatching them.
Note that senders waiting for sent messages to be processed will be
released but the messages will NOT be marked as processed, meaning the
respective SendMessage methods will return false.
}
procedure ClearMessages; virtual;
{
Cycle
Calling this method is equivalent to the following sequence:
FetchMessages;
DispatchMessages;
If WaitForMessage(Timeout) = mwrMessage then
begin
FetchMessages;
DispatchMessages;
end;
}
procedure Cycle(Timeout: UInt32); virtual;
{
AutoCycle
By calling this method, the endpoint enters the autocycle. It sets
InAutoCycle property to true and then repeatedly calls method Cycle with
the provided timeout as long as the InAutoCycle is true.
The autocyle can be exited by calling BreakAutoCycle or by excluding
mdfAutoCycle from dispatch flags within any dispatch handler.
If autocycle is already running when calling this method, it will do
nothing and immediately returns.
}
procedure AutoCycle(Timeout: UInt32); virtual;
{
BreakAutoCycle
Sets InAutoCycle to false, effectively breaking out of the autocycle.
Note that the autocycle might not end immediately, depending on multiple
factors, mainly of depth of send recursion (see SendLevelMaximum and
SendLevel properties).
}
procedure BreakAutoCycle; virtual;
// properties
property EndpointID: TMsgrEndpointID read fEndpointID;
{
AutomaticBufferedSend
When AutomaticBufferedSend is set to true, all buffered messages are
automatically posted whenever you send or post another message (before it).
When false, you have to explicitly call PostBufferedMessages method to post
buffered messages.
If SendBlocked is true, this automatic posting is blocked too.
By default enabled.
}
property AutomaticBufferedSend: Boolean read fAutoBuffSend write fAutoBuffSend;
{
SendLevelMaximum
Maximum depth of send recursion, see property SendLevel for details.
}
property SendLevelMaximum: Integer read fSendLevelMax write fSendLevelMax;
{
SendLevel
When you send a synchronous message (methods SendMessage), the endpoint
will, while waiting for the message to be processed, fetch and dispatch
any incoming synchronous (sent) messages.
This means that, while waiting for SendMessage to return, the endpoint can
and will call OnMessage* handler. It is possible to again call SendMessage
from this handler where this call can again result in dispatch and so on.
This creates somewhat complex indirect recursion - if unchecked, it can
lead to stack overflow and other problems.
To limit a posibility of problems, each SendMessage increments SendLevel
counter. When this counter is equal to or above SendLevelMaximum before
the increment, the call to SendMessage will fail (actual send is not even
attempted). This limits possible depth of recursion this process can go
into.
This counter is exposed so you can check whether SendMessage failing was
due to this protection or not (SendLevel < SendLevelMaximum means it was
not), and then act accordingly.
}
property SendLevel: Integer read fSendLevel;
{
InAutoCycle
This property indicates whether the endpoint is running within the
autocycle or not.
}
property InAutoCycle: Boolean read fAutoCycle;
{
SendBlocked
When true, all sending and posting is blocked. It affects all SendMessage,
PostMessage and also PostBufferedMessages methods. BufferMessage is not
affected.
The effect of blocking is that when you try to post/send a message, the
respective method will just exit and return false.
As for PostBufferedMessages - it will not technically fail, so all currently
buffered messages stay buffered and undelivered handlers are NOT called.
}
property SendBlocked: Boolean read fSendBlocked;
{
MessageCount
Number of messages currently received - it does not include incoming but
not yet fetched messages.
}
property MessageCount: Integer read GetMessageCount;
{
Messages
All received messages, both posted and sent.
}
property Messages[Index: Integer]: TMsgrMessage read GetMessage;
// event/callback properties
property OnMessageCallback: TMsgrMessageInCallback read fOnMessageCallback write fOnMessageCallback;
property OnMessageEvent: TMsgrMessageInEvent read fOnMessageEvent write fOnMessageEvent;
property OnMessage: TMsgrMessageInEvent read fOnMessageEvent write fOnMessageEvent;
property OnUndeliveredCallback: TMsgrMessageOutCallback read fOnUndeliveredCallback write fOnUndeliveredCallback;
property OnUndeliveredEvent: TMsgrMessageOutEvent read fOnUndeliveredEvent write fOnUndeliveredEvent;
property OnUndelivered: TMsgrMessageOutEvent read fOnUndeliveredEvent write fOnUndeliveredEvent;
end;
{===============================================================================
--------------------------------------------------------------------------------
TMessanger
--------------------------------------------------------------------------------
===============================================================================}
type
TMsgrSendResult = (msrFail,msrSent,msrReleased,msrProcessed);
{===============================================================================
TMessanger - class declaration
===============================================================================}
type
TMessanger = class(TCustomObject)
protected
fEndpoints: array of TMessangerEndpoint;
fSynchronizer: TMultiReadExclusiveWriteSynchronizerRTL;
// getters, setters
Function GetEndpointCapacity: Integer;
Function GetEndpointCount: Integer;
Function GetEndpoint(Index: Integer): TMessangerEndpoint;
// methods called from endpoint
procedure RemoveEndpoint(EndpointID: TMsgrEndpointID); virtual;
Function SendMessage(Msg: TMsgrMessage): TMsgrSendResult; virtual;
Function PostMessage(Msg: TMsgrMessage): Boolean; virtual;
procedure PostBufferedMessages(Messages,Undelivered: TMsgrMessageVector); virtual;
// init, final
procedure Initialize(EndpointCapacity: TMsgrEndpointID); virtual;
procedure Finalize; virtual;
public
constructor Create(EndpointCapacity: TMsgrEndpointID = 128);
destructor Destroy; override;
{
IDAvailable
Returns true when endpoint with the given ID does not exist (ie. the ID is
not taken). False otherwise.
}
Function IDAvailable(EndpointID: TMsgrEndpointID): Boolean; virtual;
{
CreateEndpoint
Creates new endpoint. First overload uses first unused ID.
Second overload will create new endpoint with exactly the ID given. If the
reaquested ID is not available (is out of allocated capacity or is already
taken), then an exception is raised.
}
Function CreateEndpoint: TMessangerEndpoint; overload; virtual;
Function CreateEndpoint(EndpointID: TMsgrEndpointID): TMessangerEndpoint; overload; virtual;
// properties
property Endpoints[Index: Integer]: TMessangerEndpoint read GetEndpoint;
property EndpointCapacity: Integer read GetEndpointCapacity;
property EndpointCount: Integer read GetEndpointCount;
end;
{===============================================================================
Auxiliary functions - declaration
===============================================================================}
Function GetTimestamp: TMsgrTimestamp;
Function BuildMessage(Recipient: TMsgrEndpointID; P1,P2,P3,P4: TMsgrParam; Priority: TMsgrPriority = MSGR_PRIORITY_NORMAL): TMsgrMessageOut;
implementation
uses
{$IFDEF Windows}Windows{$ELSE}Linux, SysCall{$ENDIF};
{===============================================================================
Auxiliary functions - implementation
===============================================================================}
Function GetTimestamp: TMsgrTimestamp;
{$IFNDEF Windows}
var
Time: TTimeSpec;
begin
If clock_gettime(CLOCK_MONOTONIC_RAW,@Time) = 0 then
Result := (Int64(Time.tv_sec) * 1000000000) + Time.tv_nsec
else
raise EMsgrTimestampError.CreateFmt('GetTimestamp: Cannot obtain time stamp (%d).',[errno]);
{$ELSE}
begin
Result := 0;
If not QueryPerformanceCounter(Result) then
raise EMsgrTimestampError.CreateFmt('GetTimestamp: Cannot obtain time stamp (%d).',[GetLastError]);
{$ENDIF}
Result := Result and $7FFFFFFFFFFFFFFF; // mask out sign bit
end;
//------------------------------------------------------------------------------
Function GetElapsedMillis(StartTime: TMsgrTimestamp): UInt32;
var
CurrentTime: TMsgrTimestamp;
Temp: Int64;
begin
CurrentTime := GetTimestamp;
If CurrentTime >= StartTime then
begin
{$IFDEF Windows}
Temp := 1;
If QueryPerformanceFrequency(Temp) then
Temp := Trunc(((CurrentTime - StartTime) / Temp) * 1000)
else
raise EMsgrTimestampError.CreateFmt('GetElapsedMillis: Failed to obtain timer frequency (%d).',[GetLastError]);
{$ELSE}
Temp := (CurrentTime - StartTime) div 1000000; // stamps are in ns, convert to ms
{$ENDIF}
If Temp < INFINITE then
Result := UInt32(Temp)
else
Result := INFINITE;
end
else Result := INFINITE;
end;
//------------------------------------------------------------------------------
Function BuildMessage(Recipient: TMsgrEndpointID; P1,P2,P3,P4: TMsgrParam; Priority: TMsgrPriority = MSGR_PRIORITY_NORMAL): TMsgrMessageOut;
begin
Result.Recipient := Recipient;
Result.Priority := Priority;
Result.Parameter1 := P1;
Result.Parameter2 := P2;
Result.Parameter3 := P3;
Result.Parameter4 := P4;
end;
//------------------------------------------------------------------------------
{$IFNDEF Windows}
Function gettid: pid_t;
begin
Result := do_syscall(syscall_nr_gettid);
end;
{$ENDIF}
{===============================================================================
--------------------------------------------------------------------------------
TMsgrMessageVector
--------------------------------------------------------------------------------
===============================================================================}
{===============================================================================
TMsgrMessageVector - class implementation
===============================================================================}
{-------------------------------------------------------------------------------
TMsgrMessageVector - protected methods
-------------------------------------------------------------------------------}
Function TMsgrMessageVector.GetItem(Index: Integer): TMsgrMessage;
begin
Result := TMsgrMessage(GetItemPtr(Index)^);
end;
//------------------------------------------------------------------------------
procedure TMsgrMessageVector.SetItem(Index: Integer; Value: TMsgrMessage);
begin
SetItemPtr(Index,@Value);
end;
//------------------------------------------------------------------------------
Function TMsgrMessageVector.ItemCompare(Item1,Item2: Pointer): Integer;
begin
{
Because messages are traversed from high index to low, the sorting is done in
reversed order.
}
Result := 0;
If Assigned(TMsgrMessage(Item1^).SendParam) xor Assigned(TMsgrMessage(Item2^).SendParam) then
begin
If Assigned(TMsgrMessage(Item1^).SendParam) then
Result := +1 // first is assigned, second is not => change order
else
Result := -1; // first in not assigned, second is => keep order
end
else
begin
{
Both are assigned or not assigned, decide upon other parameters - first
take priority (must go up), then Timestamp (down).
}
If TMsgrMessage(Item1^).Priority > TMsgrMessage(Item2^).Priority then
Result := +1
else If TMsgrMessage(Item1^).Priority < TMsgrMessage(Item2^).Priority then
Result := -1
else
begin
If TMsgrMessage(Item1^).Timestamp < TMsgrMessage(Item2^).Timestamp then
Result := +1
else If TMsgrMessage(Item1^).Timestamp > TMsgrMessage(Item2^).Timestamp then
Result := -1;
end;
end;
end;
{-------------------------------------------------------------------------------
TMsgrMessageVector - public methods
-------------------------------------------------------------------------------}
constructor TMsgrMessageVector.Create;
begin
inherited Create(SizeOf(TMsgrMessage));
ShrinkMode := smKeepCap;
end;
// --- --- --- --- --- --- --- --- --- --- --- --- ---
constructor TMsgrMessageVector.Create(Memory: Pointer; Count: Integer);
begin
inherited Create(Memory,Count,SizeOf(TMsgrMessage));
end;
//------------------------------------------------------------------------------
Function TMsgrMessageVector.First: TMsgrMessage;
begin
Result := TMsgrMessage(inherited First^);
end;
//------------------------------------------------------------------------------
Function TMsgrMessageVector.Last: TMsgrMessage;
begin
Result := TMsgrMessage(inherited Last^);
end;
//------------------------------------------------------------------------------
Function TMsgrMessageVector.IndexOf(Item: TMsgrMessage): Integer;
begin
Result := inherited IndexOf(@Item);
end;
//------------------------------------------------------------------------------
Function TMsgrMessageVector.Find(Item: TMsgrMessage; out Index: Integer): Boolean;
begin
Result := inherited Find(@Item,Index);
end;
//------------------------------------------------------------------------------
Function TMsgrMessageVector.Add(Item: TMsgrMessage): Integer;
begin
Result := inherited Add(@Item);
end;
//------------------------------------------------------------------------------
procedure TMsgrMessageVector.Insert(Index: Integer; Item: TMsgrMessage);
begin
inherited Insert(Index,@Item);
end;
//------------------------------------------------------------------------------
Function TMsgrMessageVector.Remove(Item: TMsgrMessage): Integer;
begin
Result := inherited Remove(@Item);
end;
//------------------------------------------------------------------------------
Function TMsgrMessageVector.Extract(Item: TMsgrMessage): TMsgrMessage;
var
TempPtr: Pointer;
begin
TempPtr := inherited Extract(@Item);
If Assigned(TempPtr) then
Result := TMsgrMessage(TempPtr^)
else
FillChar(Addr(Result)^,SizeOf(Result),0);
end;
{===============================================================================
--------------------------------------------------------------------------------
TMsgrBufferedMessagesVector
--------------------------------------------------------------------------------
===============================================================================}
{===============================================================================
TMsgrBufferedMessageVector - class implementation
===============================================================================}