Skip to content

Commit 3942f8f

Browse files
committed
Add long-polling loop use case example
1 parent 95701b5 commit 3942f8f

File tree

2 files changed

+206
-9
lines changed

2 files changed

+206
-9
lines changed

packages/action-listener-middleware/src/tests/job.test.ts

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -231,15 +231,15 @@ describe('Job', () => {
231231

232232
test('Multiple Job Runs', async () => {
233233
const job = new Job(async (job) => {
234-
await job.delay(500)
234+
await job.delay(50)
235235
return Outcome.ok(1)
236236
})
237237

238238
const result1 = await job.run()
239239
const result2 = await job.run()
240240
const result3 = await job.run()
241241

242-
if (result1.isError() || result1.value != 1)
242+
if (result1.isError() || result1.value !== 1)
243243
throw new Error('Job did not return value correctly')
244244
if (result2.isOk() || !(result2.error instanceof JobCancellationException))
245245
throw new Error('Job did not return JobCompletionException')
@@ -336,7 +336,7 @@ describe('Job', () => {
336336
return Outcome.ok(null)
337337
}).run()
338338

339-
expect(counter).toBe(3)
339+
expect(counter < 5).toBe(true)
340340
})
341341

342342
test('Job Immediate Cancellation', async () => {
@@ -404,14 +404,17 @@ describe('Job', () => {
404404

405405
test('Job Delay', async () => {
406406
const start = performance.now()
407+
const expectedTime = 100
407408
const result = await new Job(async (job) => {
408-
await job.delay(100)
409+
await job.delay(expectedTime)
409410
return Outcome.ok(1)
410411
}).run()
411412

412-
if (result.isError() || result.value != 1) {
413+
const elapsed = performance.now() - start
414+
415+
if (result.isError() || result.value !== 1) {
413416
throw new Error('Invalid Result')
414-
} else if (performance.now() - start < 100) {
417+
} else if (elapsed < expectedTime) {
415418
throw new Error('Delay did not work')
416419
}
417420
})
@@ -456,9 +459,13 @@ describe('Job', () => {
456459
test('SupervisorJob - Await', async () => {
457460
const start = performance.now()
458461
const supervisor = new SupervisorJob()
459-
await supervisor.runWithTimeout(200)
460-
if (performance.now() - start < 200)
461-
throw new Error('Supervisor Job finished before it was supposed to')
462+
const expectedTime = 200
463+
await supervisor.runWithTimeout(expectedTime)
464+
const elapsed = performance.now() - start
465+
if (elapsed < expectedTime)
466+
throw new Error(
467+
`Supervisor Job finished before it was supposed to (${elapsed}, ${expectedTime})`
468+
)
462469
})
463470

464471
test('Child Count', async () => {
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
import {
2+
configureStore,
3+
createAction,
4+
createSlice,
5+
isAnyOf,
6+
} from '@reduxjs/toolkit'
7+
8+
import type { AnyAction, PayloadAction, Action } from '@reduxjs/toolkit'
9+
10+
import {
11+
createActionListenerMiddleware,
12+
createListenerEntry,
13+
addListenerAction,
14+
removeListenerAction,
15+
} from '../index'
16+
17+
import type {
18+
When,
19+
ActionListenerMiddlewareAPI,
20+
TypedAddListenerAction,
21+
TypedAddListener,
22+
Unsubscribe,
23+
} from '../index'
24+
import { JobCancellationException } from '../job'
25+
import { Outcome } from '../outcome'
26+
27+
interface CounterState {
28+
value: number
29+
}
30+
31+
const counterSlice = createSlice({
32+
name: 'counter',
33+
initialState: { value: 0 } as CounterState,
34+
reducers: {
35+
increment(state) {
36+
state.value += 1
37+
},
38+
decrement(state) {
39+
state.value -= 1
40+
},
41+
// Use the PayloadAction type to declare the contents of `action.payload`
42+
incrementByAmount: (state, action: PayloadAction<number>) => {
43+
state.value += action.payload
44+
},
45+
},
46+
})
47+
const { increment, decrement, incrementByAmount } = counterSlice.actions
48+
49+
describe('Saga-style Effects Scenarios', () => {
50+
let middleware: ReturnType<typeof createActionListenerMiddleware>
51+
52+
let store = configureStore({
53+
reducer: counterSlice.reducer,
54+
middleware: (gDM) => gDM().prepend(createActionListenerMiddleware()),
55+
})
56+
57+
const testAction1 = createAction<string>('testAction1')
58+
type TestAction1 = ReturnType<typeof testAction1>
59+
const testAction2 = createAction<string>('testAction2')
60+
type TestAction2 = ReturnType<typeof testAction2>
61+
const testAction3 = createAction<string>('testAction3')
62+
type TestAction3 = ReturnType<typeof testAction3>
63+
64+
type RootState = ReturnType<typeof store.getState>
65+
66+
let addListener: TypedAddListener<RootState>
67+
68+
function delay(ms: number) {
69+
return new Promise((resolve) => setTimeout(resolve, ms))
70+
}
71+
72+
beforeEach(() => {
73+
middleware = createActionListenerMiddleware()
74+
addListener = middleware.addListener as TypedAddListener<RootState>
75+
store = configureStore({
76+
reducer: counterSlice.reducer,
77+
middleware: (gDM) => gDM().prepend(middleware),
78+
})
79+
})
80+
81+
test('Long polling loop', async () => {
82+
// Reimplementation of a saga-based long-polling loop that is controlled
83+
// by "start/stop" actions. The infinite loop waits for a message from the
84+
// server, processes it somehow, and waits for the next message.
85+
// Ref: https://gist.github.com/markerikson/5203e71a69fa9dff203c9e27c3d84154
86+
const eventPollingStarted = createAction('serverPolling/started')
87+
const eventPollingStopped = createAction('serverPolling/stopped')
88+
89+
// For this example, we're going to fake up a "server event poll" async
90+
// function by wrapping an event emitter so that every call returns a
91+
// promise that is resolved the next time an event is emitted.
92+
// This is the tiniest event emitter I could find to copy-paste in here.
93+
let createNanoEvents = () => ({
94+
events: {} as Record<string, any>,
95+
emit(event: string, ...args: any[]) {
96+
;(this.events[event] || []).forEach((i: any) => i(...args))
97+
},
98+
on(event: string, cb: (...args: any[]) => void) {
99+
;(this.events[event] = this.events[event] || []).push(cb)
100+
return () =>
101+
(this.events[event] = (this.events[event] || []).filter(
102+
(l: any) => l !== cb
103+
))
104+
},
105+
})
106+
const emitter = createNanoEvents()
107+
108+
// Rig up a dummy "receive a message from the server" API we can trigger manually
109+
function pollForEvent() {
110+
return new Promise<{ type: string }>((resolve, reject) => {
111+
const unsubscribe = emitter.on('serverEvent', (arg1: string) => {
112+
unsubscribe()
113+
resolve({ type: arg1 })
114+
})
115+
})
116+
}
117+
118+
// Track how many times each message was processed by the loop
119+
const receivedMessages = {
120+
a: 0,
121+
b: 0,
122+
c: 0,
123+
}
124+
125+
let pollingJobStarted = false
126+
let pollingJobCanceled = false
127+
128+
addListener({
129+
actionCreator: eventPollingStarted,
130+
listener: async (action, listenerApi) => {
131+
listenerApi.unsubscribe()
132+
133+
// Start a child job that will infinitely loop receiving messages
134+
const pollingJob = listenerApi.job.launch(async (handle) => {
135+
pollingJobStarted = true
136+
try {
137+
while (true) {
138+
const eventPromise = pollForEvent()
139+
// Cancelation-aware pause for a new server message
140+
const serverEvent = await handle.pause(eventPromise)
141+
142+
// Process the message. In this case, just count the times we've seen this message.
143+
if (serverEvent.type in receivedMessages) {
144+
receivedMessages[
145+
serverEvent.type as keyof typeof receivedMessages
146+
]++
147+
}
148+
}
149+
} catch (err) {
150+
if (err instanceof JobCancellationException) {
151+
pollingJobCanceled = true
152+
}
153+
}
154+
return Outcome.ok(0)
155+
})
156+
pollingJob.run()
157+
158+
// Wait for the "stop polling" action
159+
await listenerApi.condition(eventPollingStopped.match)
160+
pollingJob.cancel()
161+
},
162+
})
163+
164+
store.dispatch(eventPollingStarted())
165+
await delay(5)
166+
expect(pollingJobStarted).toBe(true)
167+
168+
await delay(5)
169+
emitter.emit('serverEvent', 'a')
170+
// Promise resolution
171+
await delay(1)
172+
emitter.emit('serverEvent', 'b')
173+
// Promise resolution
174+
await delay(1)
175+
176+
store.dispatch(eventPollingStopped())
177+
178+
// Have to break out of the event loop to let the cancelation promise
179+
// kick in - emitting before this would still resolve pollForEvent()
180+
await delay(1)
181+
emitter.emit('serverEvent', 'c')
182+
183+
// A and B were processed earlier. The first C was processed because the
184+
// emitter synchronously resolved the `pollForEvents` promise before
185+
// the cancelation took effect, but after another pause, the
186+
// cancelation kicked in and the second C is ignored.
187+
expect(receivedMessages).toEqual({ a: 1, b: 1, c: 0 })
188+
expect(pollingJobCanceled).toBe(true)
189+
})
190+
})

0 commit comments

Comments
 (0)