-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathdispatcher.js
195 lines (162 loc) · 5.34 KB
/
dispatcher.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import Scuttlebutt, { filter } from 'scuttlebutt'
import * as orderedHistory from './orderedHistory'
import {
// action constants
UPDATE_TIMESTAMP,
UPDATE_SOURCE,
META_TIMESTAMP,
META_SOURCE
} from './constants'
// ignore actiontypes beginning with @
// by default just pass through missing types (redux will blow up later)
export function isGossipType(type = '') {
return type.substr(0, 1) !== '@'
}
// queue a _reduxDispatch call, debounced by animation frame.
// configurable, but requires use of private methods at the moment
// keep a reference to dispatcher because methods will change over time
function getDelayedDispatch(dispatcher) {
if (typeof window === 'undefined'
|| typeof window.requestAnimationFrame !== 'function') {
return false
}
const queue = []
function drainQueue() {
let state = dispatcher._reduxGetState(),
i
for (i = 0; i < 100 && (i <= queue.length - 1); i++) {
// for-real dispatch the last action, triggering redux's subscribe
// (and thus UI re-renders). This prioritises crunching data over
// feedback, but potentially we should dispatch perodically, even
// with items in the queue
if (i < queue.length - 1) {
state = dispatcher._historyReducer(state, queue[i])
} else {
dispatcher._reduxDispatch(queue[i])
}
}
// reset the queue
queue.splice(0, i + 1)
if (queue.length)
window.requestAnimationFrame(drainQueue)
}
return function delayedDispatch(action) {
queue.push(action)
// on first action, queue dispatching the action queue
if (queue.length === 1) {
window.requestAnimationFrame(drainQueue)
}
}
}
const defaultOptions = {
customDispatch: getDelayedDispatch,
isGossipType: isGossipType,
}
export default class Dispatcher extends Scuttlebutt {
constructor(options) {
super()
this.options = { ...defaultOptions, ...options }
this._customDispatch =
this.options.customDispatch && this.options.customDispatch(this)
this._isGossipType = this.options.isGossipType
// redux methods to wrap
this._reduxDispatch = () => {
throw new Error('Are you sure you called wrapDispatch?')
}
this._reduxGetState = () => {
// throw new Error('Are you sure you called wrapGetState?')
// this must return a default state for the very first history call,
// before .wrapGetState has been applied in the store enhancer.
return []
}
}
// wraps the redux dispatch
wrapDispatch(dispatch) {
this._reduxDispatch = dispatch
return (action) => {
// apply this action to our scuttlebutt model (and send to peers). It
// will dispatch, taking care of the the appropriate time ordering
if (this._isGossipType(action.type)) {
this.localUpdate(action)
} else {
return dispatch(action)
}
}
}
// wraps getState to the state within orderedHistory
wrapGetState(getState) {
this._reduxGetState = getState
return () => orderedHistory.getState(getState())
}
// wraps the initial state, if any, into the first snapshot
wrapInitialState(initialState) {
return initialState && [,,initialState]
}
// rewinds history when it changes
wrapReducer(reducer) {
this._historyReducer = orderedHistory.reducer(reducer)
// wrap the root reducer to track history and rewind occasionally
return (currentState, action) => {
return this._historyReducer(currentState, action)
}
}
// Apply update (action) to our store
// implemented for scuttlebutt class
applyUpdate(update) {
const [action, timestamp, source] = update,
// copy the object so we can modify its properties later
localAction = { meta: {}, ...action },
dispatch = (shouldApply) => {
if (!shouldApply) {
return
} else if (this._customDispatch) {
this._customDispatch(localAction)
} else {
this._reduxDispatch(localAction)
}
}
// add our metadata to the action as non-enumerable properties
Object.defineProperty(localAction.meta, META_TIMESTAMP, {
enumerable: false,
value: timestamp
})
Object.defineProperty(localAction.meta, META_SOURCE, {
enumerable: false,
value: source
})
dispatch(true)
// recieved message succesfully. if false, peers may retry the message.
return true
}
// reply to gossip with the latest timestamps for the sources we've seen
// implemented for scuttlebutt class
history(sources) {
// our state (updates[]) has a similar shape to scuttlebutt's own updates.
return this._reduxGetState().reduce((arr, update) => {
if (
update[UPDATE_ACTION]
&& this._isGossipType(update[UPDATE_ACTION].type)
&& filter(update, sources)
) {
// scuttlebutt only wants ACTION, TIMESTAMP, SOURCE, and not: SNAPSHOT
arr.push(update.slice(0, 3))
}
return arr
}, [])
}
// apply an update locally
// we should ensure we don't send objects which will explode JSON.parse here
// implemented over scuttlebutt class
localUpdate(action) {
if (process.env.NODE_ENV === 'development') {
try {
super.localUpdate(action)
} catch (error) {
throw new Error('Scuttlebutt couldn\'t dispatch', error)
}
} else {
// try our luck
super.localUpdate(action)
}
}
}