Skip to content

Commit af9fcab

Browse files
authored
[FSSDK-10642] Refactor batch event processor (#960)
1 parent 9e37f00 commit af9fcab

File tree

74 files changed

+4650
-4521
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+4650
-4521
lines changed

lib/core/event_builder/build_event_v1.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import {
1717
EventTags,
1818
ConversionEvent,
1919
ImpressionEvent,
20-
} from '../../event_processor';
20+
} from '../../event_processor/events';
2121

2222
import { Event } from '../../shared_types';
2323

lib/core/event_builder/index.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616
import { LoggerFacade } from '../../modules/logging';
17-
import { EventV1 as CommonEventParams } from '../../event_processor';
17+
import { EventV1 as CommonEventParams } from '../../event_processor/v1/buildEventV1';
1818

1919
import fns from '../../utils/fns';
2020
import { CONTROL_ATTRIBUTES, RESERVED_EVENT_KEYWORDS } from '../../utils/enums';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/**
2+
* Copyright 2024, Optimizely
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { vi, describe, it, expect, beforeEach } from 'vitest';
18+
19+
const mockNetInfo = vi.hoisted(() => {
20+
const netInfo = {
21+
listeners: [],
22+
unsubs: [],
23+
addEventListener(fn: any) {
24+
this.listeners.push(fn);
25+
const unsub = vi.fn();
26+
this.unsubs.push(unsub);
27+
return unsub;
28+
},
29+
pushState(state: boolean) {
30+
for (const listener of this.listeners) {
31+
listener({ isInternetReachable: state });
32+
}
33+
},
34+
clear() {
35+
this.listeners = [];
36+
this.unsubs = [];
37+
}
38+
};
39+
return netInfo;
40+
});
41+
42+
vi.mock('../utils/import.react_native/@react-native-community/netinfo', () => {
43+
return {
44+
addEventListener: mockNetInfo.addEventListener.bind(mockNetInfo),
45+
};
46+
});
47+
48+
import { ReactNativeNetInfoEventProcessor } from './batch_event_processor.react_native';
49+
import { getMockLogger } from '../tests/mock/mock_logger';
50+
import { getMockRepeater } from '../tests/mock/mock_repeater';
51+
import { getMockAsyncCache } from '../tests/mock/mock_cache';
52+
53+
import { EventWithId } from './batch_event_processor';
54+
import { EventDispatcher } from './eventDispatcher';
55+
import { formatEvents } from './v1/buildEventV1';
56+
import { createImpressionEvent } from '../tests/mock/create_event';
57+
import { ProcessableEvent } from './eventProcessor';
58+
59+
const getMockDispatcher = () => {
60+
return {
61+
dispatchEvent: vi.fn(),
62+
};
63+
};
64+
65+
const exhaustMicrotasks = async (loop = 100) => {
66+
for(let i = 0; i < loop; i++) {
67+
await Promise.resolve();
68+
}
69+
}
70+
71+
72+
describe('ReactNativeNetInfoEventProcessor', () => {
73+
beforeEach(() => {
74+
mockNetInfo.clear();
75+
});
76+
77+
it('should not retry failed events when reachable state does not change', async () => {
78+
const eventDispatcher = getMockDispatcher();
79+
const dispatchRepeater = getMockRepeater();
80+
const failedEventRepeater = getMockRepeater();
81+
82+
const cache = getMockAsyncCache<EventWithId>();
83+
const events: ProcessableEvent[] = [];
84+
85+
for(let i = 0; i < 5; i++) {
86+
const id = `id-${i}`;
87+
const event = createImpressionEvent(id);
88+
events.push(event);
89+
await cache.set(id, { id, event });
90+
}
91+
92+
const processor = new ReactNativeNetInfoEventProcessor({
93+
eventDispatcher,
94+
dispatchRepeater,
95+
failedEventRepeater,
96+
batchSize: 1000,
97+
eventStore: cache,
98+
});
99+
100+
processor.start();
101+
await processor.onRunning();
102+
103+
mockNetInfo.pushState(true);
104+
expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled();
105+
106+
mockNetInfo.pushState(true);
107+
expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled();
108+
});
109+
110+
it('should retry failed events when network becomes reachable', async () => {
111+
const eventDispatcher = getMockDispatcher();
112+
const dispatchRepeater = getMockRepeater();
113+
const failedEventRepeater = getMockRepeater();
114+
115+
const cache = getMockAsyncCache<EventWithId>();
116+
const events: ProcessableEvent[] = [];
117+
118+
for(let i = 0; i < 5; i++) {
119+
const id = `id-${i}`;
120+
const event = createImpressionEvent(id);
121+
events.push(event);
122+
await cache.set(id, { id, event });
123+
}
124+
125+
const processor = new ReactNativeNetInfoEventProcessor({
126+
eventDispatcher,
127+
dispatchRepeater,
128+
failedEventRepeater,
129+
batchSize: 1000,
130+
eventStore: cache,
131+
});
132+
133+
processor.start();
134+
await processor.onRunning();
135+
136+
mockNetInfo.pushState(false);
137+
expect(eventDispatcher.dispatchEvent).not.toHaveBeenCalled();
138+
139+
mockNetInfo.pushState(true);
140+
141+
await exhaustMicrotasks();
142+
143+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledWith(formatEvents(events));
144+
});
145+
146+
it('should unsubscribe from netinfo listener when stopped', async () => {
147+
const eventDispatcher = getMockDispatcher();
148+
const dispatchRepeater = getMockRepeater();
149+
const failedEventRepeater = getMockRepeater();
150+
151+
const cache = getMockAsyncCache<EventWithId>();
152+
153+
const processor = new ReactNativeNetInfoEventProcessor({
154+
eventDispatcher,
155+
dispatchRepeater,
156+
failedEventRepeater,
157+
batchSize: 1000,
158+
eventStore: cache,
159+
});
160+
161+
processor.start();
162+
await processor.onRunning();
163+
164+
mockNetInfo.pushState(false);
165+
166+
processor.stop();
167+
await processor.onTerminated();
168+
169+
expect(mockNetInfo.unsubs[0]).toHaveBeenCalled();
170+
});
171+
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Copyright 2024, Optimizely
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { NetInfoState, addEventListener } from '../utils/import.react_native/@react-native-community/netinfo';
18+
19+
import { BatchEventProcessor, BatchEventProcessorConfig } from './batch_event_processor';
20+
import { Fn } from '../utils/type';
21+
22+
export class ReactNativeNetInfoEventProcessor extends BatchEventProcessor {
23+
private isInternetReachable = true;
24+
private unsubscribeNetInfo?: Fn;
25+
26+
constructor(config: BatchEventProcessorConfig) {
27+
super(config);
28+
}
29+
30+
private async connectionListener(state: NetInfoState) {
31+
if (this.isInternetReachable && !state.isInternetReachable) {
32+
this.isInternetReachable = false;
33+
return;
34+
}
35+
36+
if (!this.isInternetReachable && state.isInternetReachable) {
37+
this.isInternetReachable = true;
38+
this.retryFailedEvents();
39+
}
40+
}
41+
42+
start(): void {
43+
super.start();
44+
if (addEventListener) {
45+
this.unsubscribeNetInfo = addEventListener(this.connectionListener.bind(this));
46+
}
47+
}
48+
49+
stop(): void {
50+
if (this.unsubscribeNetInfo) {
51+
this.unsubscribeNetInfo();
52+
}
53+
super.stop();
54+
}
55+
}

0 commit comments

Comments
 (0)