1
+ '''
2
+ // ##### Relay_Server.py #####
3
+ //
4
+ // Independent Relay server for WsRtd AmiBroker Data Plugin
5
+ //
6
+ // Central Websocket Python Program that runs as a Server.
7
+ // Both, CLient Application RTD Senders and the WsRTD data plugin
8
+ // connect to this server via specified IP:Port
9
+ //
10
+ // The advantage is that either Sender-Client or Receiver-client
11
+ // can restart or drop the connection without affecting the other.
12
+ // In the future, it will also serve to hook ArcticDB integration into the system.
13
+ //
14
+ ///////////////////////////////////////////////////////////////////////
15
+ // Author: NSM51
16
+ // https://github.com/ideepcoder/Rtd_Ws_AB_plugin/
17
+ // https://forum.amibroker.com/u/nsm51/summary
18
+ //
19
+ // Users and possessors of this source code are hereby granted a nonexclusive,
20
+ // royalty-free copyright license to use this code in individual and commercial software.
21
+ //
22
+ // AUTHOR ( NSM51 ) MAKES NO REPRESENTATION ABOUT THE SUITABILITY OF THIS SOURCE CODE FOR ANY PURPOSE.
23
+ // IT IS PROVIDED "AS IS" WITHOUT EXPRESS OR IMPLIED WARRANTY OF ANY KIND.
24
+ // AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOURCE CODE,
25
+ // INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
26
+ // IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY SPECIAL, INDIRECT, INCIDENTAL, OR
27
+ // CONSEQUENTIAL DAMAGES, OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
28
+ // WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION,
29
+ // ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOURCE CODE.
30
+ //
31
+ // Any use of this source code must include the above notice,
32
+ // in the user documentation and internal comments to the code.
33
+ '''
34
+
35
+
36
+ ## stable with ws-close(), handles with BaseException catch. Just prints the exception though.
37
+ ## use as middle server to recv from one client and send to another.
38
+ ## this is working code with ctrl+c issue fixed
39
+ wsport = 10101
40
+
41
+ import asyncio
42
+ import websockets
43
+ import threading
44
+ import datetime
45
+ import sys
46
+
47
+ stop_event = 0
48
+ stop_threads = False
49
+ CLIENTS = set ()
50
+ SENDERS = set ()
51
+ ctr = [0 , 0 , 0 ] # counters: clients, senders, reserved
52
+ retCode = 0
53
+
54
+
55
+ '''
56
+ Function to iterate over all CLIENTS and broadcast message received from SENDER
57
+ '''
58
+ async def broadcast_c ( message ):
59
+
60
+ Bad_WS = set ()
61
+
62
+ for websocket in CLIENTS :
63
+ try :
64
+ await websocket .send ( message )
65
+ except websockets .ConnectionClosed : Bad_WS .add ( websocket ); continue
66
+ except ConnectionResetError : Bad_WS .add ( websocket ); continue
67
+ except Exception as e : print (f"broadcast_C() { e } " ); break
68
+
69
+ if len ( Bad_WS ) > 0 :
70
+ for ws in Bad_WS :
71
+ CLIENTS .remove ( ws )
72
+
73
+
74
+
75
+ '''
76
+ Function to iterate over all SENDER(S) and broadcast message received from CLIENTS
77
+ '''
78
+ async def broadcast_s ( message ):
79
+
80
+ Bad_WS = set ()
81
+
82
+ for websocket in SENDERS :
83
+ try :
84
+ await websocket .send ( message )
85
+ except websockets .ConnectionClosed : Bad_WS .add ( websocket ); continue
86
+ except ConnectionResetError : Bad_WS .add ( websocket ); continue
87
+ except Exception as e : print (f"broadcast_S() { e } " ); break
88
+
89
+
90
+ if len ( Bad_WS ) > 0 :
91
+ for ws in Bad_WS :
92
+ SENDERS .remove ( ws )
93
+
94
+
95
+
96
+ '''
97
+ Main function that creates Handler for Each websocket connection (ie. Send() / Receive() functionality)
98
+ '''
99
+ async def handler (websocket ):
100
+ global stop_event , stop_threads , SENDERS , CLIENTS , ctr
101
+ role = l_role = 0 ## local role
102
+
103
+ try :
104
+ role = await websocket .recv ()
105
+
106
+ if str (role ).startswith ('role' ):
107
+ if str (role ).endswith ('send' ): SENDERS .add ( websocket ); l_role = 1
108
+ else : CLIENTS .add ( websocket ); l_role = 2
109
+ except : pass
110
+
111
+ ## create periodic task: ## disabled task to work as echo server. Client (fws) -> WS_server->Client C++
112
+ #asyncio.create_task(send(websocket))
113
+ try :
114
+ if l_role == 1 :
115
+ print (f"sender conn" ); ctr [1 ]+= 1 ; await stats (); await asyncio .create_task ( senders_t ( websocket ))
116
+ print (f"sender disc" ); ctr [1 ]-= 1
117
+ elif l_role == 2 :
118
+ print (f"client conn" ); ctr [0 ]+= 1 ; await stats (); await asyncio .create_task ( clients_t ( websocket ))
119
+ print (f"client disc" ); ctr [0 ]-= 1
120
+ else : print (f"Bad Auth: { role } " ); await websocket .send ('Server: Bad or No Auth' )
121
+
122
+ except TimeoutError : pass
123
+ except websockets .ConnectionClosed : return
124
+ except ConnectionResetError : return
125
+ except Exception as e :print (f"handle() Ex { e } " ); return
126
+ except BaseException as e :print (f"handle() BEx { e } " );return
127
+
128
+
129
+
130
+ '''
131
+ Individual Role-Senders RECEIVE function as task, this message is BROADCAST to all CLIENTS
132
+ '''
133
+ async def senders_t ( websocket ):
134
+ global stop_threads , stop_event
135
+ while not stop_threads :
136
+ try :
137
+ # this code is for normal recv but now above is just echo
138
+ message = await websocket .recv ()
139
+ #print(message)
140
+ await broadcast_c ( message )
141
+
142
+ except TimeoutError : pass
143
+ except websockets .ConnectionClosed : break ## connection drops
144
+ except ConnectionResetError : break
145
+ except Exception as e :print (f"senders() Ex { e } " ); break
146
+ return 0
147
+
148
+
149
+
150
+ '''
151
+ Individual Role-Clients RECEIVE function as task, this message is BROADCAST to all SENDERS
152
+ '''
153
+ async def clients_t ( websocket ):
154
+ global stop_threads , stop_event
155
+ while not stop_threads :
156
+ try :
157
+ # this code is for normal recv, can use async as well for timeout
158
+ message = await websocket .recv ()
159
+ print (message ) # print client msessages in server
160
+ await cmdMmessage ( message )
161
+
162
+ except TimeoutError : pass
163
+ except websockets .ConnectionClosed : break
164
+ except ConnectionResetError : break
165
+ except Exception as e :print (f"clients() Ex { e } " ); break
166
+ return 0
167
+
168
+
169
+
170
+ '''
171
+ utility function to parse CLIENTS commands / testing
172
+ '''
173
+ async def cmdMmessage (message ):
174
+ global stop_threads , stop_event
175
+ try :
176
+ if message == "zzz" : ## just testing
177
+ print (f"kill sig rec'd" ); stop_threads = True
178
+ await asyncio .sleep (1 ); stop_event .set ()
179
+ elif message == "close" :
180
+ print (f"recd shutdown from client" )
181
+ else :
182
+ await broadcast_s ( message )
183
+ except : pass
184
+ return 0
185
+
186
+
187
+
188
+ '''
189
+ utility function to print counts of Role-Senders & Role-Clients
190
+ '''
191
+ async def stats ():
192
+ global ctr
193
+ print (f"Clients={ ctr [0 ]} , Senders={ ctr [1 ]} " )
194
+
195
+
196
+
197
+ '''
198
+ Websocket Server init.
199
+ '''
200
+ async def ws_start ( stop ):
201
+ global stop_event , wsport , retCode
202
+ print (f"RTD Relay server started: { datetime .datetime .now ()} on localhost:{ wsport } ,\n ctrl+c once to shutdown." )
203
+ try :
204
+ async with websockets .serve (handler , "localhost" , wsport ):
205
+ await stop
206
+
207
+ except Exception as e : print (f"ws_start() Ex { e } " ); stop_event .set (); retCode = 404 ; return
208
+
209
+
210
+ '''
211
+ Main function to create Asyncio Event_Loop and start Websocket Server.
212
+ '''
213
+ def main_S ( lst = [] ):
214
+ global stop_event , stop_threads , wsport , retCode
215
+
216
+ try :
217
+ if len ( lst ) > 2 : ## sys.argv[0] = dummy
218
+ p = int ( lst [1 ] ) ## try to int
219
+ if lst [2 ] == "startserver" :
220
+ wsport = p
221
+ else : print (f"Bad args" ); exit (404 )
222
+
223
+ except : print (f"first arg is port#" ); exit (404 )
224
+
225
+ try :
226
+ loop = asyncio .new_event_loop ()
227
+ asyncio .set_event_loop ( loop )
228
+ stop_event = threading .Event ()
229
+ stop = loop .run_in_executor (None , stop_event .wait )
230
+
231
+ loop .run_until_complete ( ws_start ( stop ))
232
+ stop_threads = True ; loop .stop (); loop .close ()
233
+
234
+ except Exception as e : print (f"main() Ex { e } " ); stop_threads = True ; stop_event .set (); loop .stop ()
235
+ except BaseException as b :print (f"main() bEx { b } " ); stop_threads = True ; stop_event .set (); loop .stop ()
236
+ print (f"shutdown server. Exit { retCode } " ); exit ( retCode )
237
+
238
+
239
+
240
+ if __name__ == "__main__" :
241
+ if ( len ( sys .argv ) > 2 ): main_S ( sys .argv ) ## default port 10101
242
+ else : main_S ( ['main' , str (wsport ), 'startserver' ] )
243
+
244
+
245
+ ## Credits:
246
+ ## NSM51, Author.
0 commit comments