4
4
import time
5
5
6
6
parsedFlows = {
7
- 'analyze' : [{'source' : 'A' , 'next' : ['B' ]}, {'source' : 'B' , 'next' : ['C' ]}, {'source' : 'C' , 'next' : ['D' ]}],
7
+ 'analyze' : [{'source' : 'A' , 'next' : ['B' ]}, {'source' : 'B' , 'next' : ['C' ]}, {'source' : 'C' , 'next' : ['D' ]}],
8
8
'master' : [{'source' : 'B' , 'next' : ['A' , 'C' ]}, {'source' : 'C' , 'next' : ['D' ]}, {'source' : 'D' , 'next' : ['E' ]}]
9
9
}
10
10
11
+
11
12
def test_streaming_flow ():
12
13
messages = []
13
14
@@ -21,7 +22,7 @@ def statsInvoked(args):
21
22
nodeName = 'B'
22
23
parents = [{'nodeName' : 'A' , 'address' : {'host' : '127.0.0.1' , 'port' : port }, 'type' : 'Add' }]
23
24
producer_config = {'port' : port , 'messagesMemoryBuff' : 5000 , 'encoding' : 'msgpack' , 'statisticsInterval' : 1 }
24
- listen_config = {'encoding' : 'msgpack' }
25
+ listen_config = {'encoding' : 'msgpack' , 'delay' : 10 }
25
26
streamingManager = StreamingManager ()
26
27
streamingManager .setParsedFlows (parsedFlows , 'analyze' )
27
28
streamingManager .setupStreamingProducer (statsInvoked , producer_config , [nodeName ], 'A' )
@@ -43,6 +44,7 @@ def statsInvoked(args):
43
44
assert messages [3 ] == {'msg' : '4' }
44
45
streamingManager .stopStreaming ()
45
46
47
+
46
48
def test_streaming_manager ():
47
49
resultsAtB = {}
48
50
resultsAtC = {}
@@ -65,7 +67,7 @@ def statsInvoked(args):
65
67
parents2 = [{'nodeName' : 'B' , 'address' : {'host' : '127.0.0.1' , 'port' : port2 }, 'type' : 'Add' }]
66
68
producer_configA = {'port' : port1 , 'messagesMemoryBuff' : 5000 , 'encoding' : 'msgpack' , 'statisticsInterval' : 1 }
67
69
producer_configB = {'port' : port2 , 'messagesMemoryBuff' : 5000 , 'encoding' : 'msgpack' , 'statisticsInterval' : 1 }
68
- listen_config = {'encoding' : 'msgpack' }
70
+ listen_config = {'encoding' : 'msgpack' , 'delay' : 10 }
69
71
70
72
streamingManagerA = StreamingManager ()
71
73
streamingManagerA .setParsedFlows (parsedFlows , 'analyze' )
@@ -92,8 +94,8 @@ def statsInvoked(args):
92
94
streamingManagerA .stopStreaming ()
93
95
streamingManagerB .stopStreaming ()
94
96
95
- def xtest_messaging ():
96
97
98
+ def xtest_messaging ():
97
99
producer_config = {'port' : 9536 , 'messagesMemoryBuff' : 5000 , 'encoding' : 'msgpack' , 'statisticsInterval' : 1 }
98
100
listenr_config = {'remoteAddress' : 'tcp://localhost:9536' , 'encoding' : 'msgpack' , 'messageOriginNodeName' : 'b' }
99
101
asserts = {}
@@ -132,7 +134,7 @@ def onMessage(envelope, msg, origin):
132
134
assert asserts ['stats' ][0 ]['sent' ] == 0
133
135
messageListener = MessageListener (listenr_config , receiverNode = 'a' )
134
136
messageListener .registerMessageListener (onMessage )
135
- for _ in range (1 ,5 ):
137
+ for _ in range (1 , 5 ):
136
138
if (asserts .get ('field1' )):
137
139
break
138
140
time .sleep (1 )
@@ -154,12 +156,13 @@ def getHello(envelope, msg, origin):
154
156
assert len (asserts ['envelope' ]) == 1
155
157
assert asserts ['envelope' ][0 ]['source' ] == 'a'
156
158
159
+
157
160
def xtest_messaging_split ():
158
161
producer_config = {'port' : 9536 , 'messagesMemoryBuff' : 5000 , 'encoding' : 'msgpack' , 'statisticsInterval' : 1 }
159
162
listenr_config_c = {'remoteAddress' : 'tcp://localhost:9536' , 'encoding' : 'msgpack' , 'messageOriginNodeName' : 'c' }
160
163
asserts = {}
161
164
asserts ['responses' ] = 0
162
- messageProducer = MessageProducer (producer_config , ['a' ,'b' ], 'c' )
165
+ messageProducer = MessageProducer (producer_config , ['a' , 'b' ], 'c' )
163
166
164
167
def onStatistics (statistics ):
165
168
asserts ['stats' ] = statistics
@@ -199,8 +202,8 @@ def onMessage(envelope, msg, origin):
199
202
200
203
messageListener = MessageListener (listenr_config_c , receiverNode = 'a' )
201
204
messageListener .registerMessageListener (onMessage )
202
- for _ in range (1 ,5 ):
203
- if ( asserts .get ('field1' )):
205
+ for _ in range (1 , 5 ):
206
+ if (asserts .get ('field1' )):
204
207
break
205
208
time .sleep (1 )
206
209
time .sleep (2 )
0 commit comments