@@ -6,10 +6,6 @@ title: Managed Process Tutorial
6
6
7
7
### Introduction
8
8
9
- In order to explore the ` ManagedProcess ` API, we will present a simple
10
- example taken from the test suite, which exercises some of the more
11
- interesting features.
12
-
13
9
The main idea behind ` ManagedProcess ` is to separate the functional
14
10
and non-functional aspects of a process. By functional, we mean whatever
15
11
application specific task the process performs, and by non-functional
@@ -33,16 +29,19 @@ from the backlog and executed.
33
29
34
30
` ManagedProcess ` provides a basic protocol for * server-like* processes
35
31
such as this, based on the synchronous ` call ` and asynchronous ` cast `
36
- functions. Although ` call ` is synchronous, communication with the
37
- * server process* is out of band, both from the client and the server's
38
- point of view. The server implementation chooses whether to reply to
39
- a call request immediately, or defer its reply until a later stage
40
- and go back to receiving messages in the meanwhile.
32
+ functions used by code we provide to client clients and matching
33
+ * handler* functions in the process itself, for which there is a similar
34
+ API on the * server* . Although ` call ` is a synchronous protocol,
35
+ communication with the * server process* is out of band, both from the
36
+ client and the server's point of view. The server implementation chooses
37
+ whether to reply to a call request immediately, or defer its reply until
38
+ a later stage and go back to receiving other messages in the meanwhile.
41
39
42
- ### Implementation Sketch
40
+ ### Implementing the client
43
41
44
- We start out with some types: the tasks we perform and the maximum
45
- pool size:
42
+ Before we figure out the shape of our state, let's think about the types
43
+ we'll need to consume in the server process: the tasks we perform and the
44
+ maximum pool size.
46
45
47
46
{% highlight haskell %}
48
47
type PoolSize = Int
@@ -52,4 +51,305 @@ type SimpleTask a = Closure (Process a)
52
51
To submit a task, our clients will submit an action in the process
53
52
monad, wrapped in a ` Closure ` environment. We will use the ` Addressable `
54
53
typeclass to allow clients to specify the server's location in whatever
55
- manner suits them.
54
+ manner suits them:
55
+
56
+ {% highlight haskell %}
57
+ -- enqueues the task in the pool and blocks
58
+ -- the caller until the task is complete
59
+ executeTask :: forall s a . (Addressable s, Serializable a)
60
+ => s
61
+ -> Closure (Process a)
62
+ -> Process (Either String a)
63
+ executeTask sid t = call sid t
64
+ {% endhighlight %}
65
+
66
+ That's it for the client! Note that the type signature we expose to
67
+ our consumers is specific, and that we do not expose them to either
68
+ arbitrary messages arriving in their mailbox or to exceptions being
69
+ thrown in their thread. Instead we return an ` Either ` .
70
+
71
+ There are several varieties of the ` call ` API that deal with error
72
+ handling in different ways. Consult the haddocks for more info about
73
+ these.
74
+
75
+ ### Implementing the server
76
+
77
+ Back on the server, we write a function that takes our state and an
78
+ input message - in this case, the ` Closure ` we've been sent - and
79
+ have that update the process' state and possibility launch the task
80
+ if we have enough spare capacity.
81
+
82
+ {% highlight haskell %}
83
+ data Pool a = Pool a
84
+ {% endhighlight %}
85
+
86
+ I've called the state type ` Pool ` as we're providing a fixed size resource
87
+ pool from the consumer's perspective. We could think of this as a bounded
88
+ size latch or barrier of sorts, but that conflates the example a bit too
89
+ much. We parameterise the state by the type of data that can be returned
90
+ by submitted tasks.
91
+
92
+ The updated pool must store the task ** and** the caller (so we can reply
93
+ once the task is complete). The ` ManagedProcess.Server ` API will provide us
94
+ with a ` Recipient ` value which can be used to reply to the caller at a later
95
+ time, so we'll make use of that here.
96
+
97
+ {% highlight haskell %}
98
+ acceptTask :: Serializable a
99
+ => Pool a
100
+ -> Recipient
101
+ -> Closure (Process a)
102
+ -> Process (Pool a)
103
+ {% endhighlight %}
104
+
105
+ For our example we will avoid using even vaguely exotic types to manage our
106
+ process' internal state, and stick to simple property lists. This is hardly
107
+ efficient, but that's fine for a test/demo.
108
+
109
+ {% highlight haskell %}
110
+ data Pool a = Pool {
111
+ poolSize :: PoolSize
112
+ , accepted :: [ (Recipient, Closure (Process a))]
113
+ } deriving (Typeable)
114
+ {% endhighlight %}
115
+
116
+ ### Making use of Async
117
+
118
+ So ** how** can we execute this ` Closure (Process a) ` without blocking the server
119
+ process itself? We will use the ` Control.Distributed.Process.Platform.Async ` API
120
+ to execute the task asynchronously and provide a means for waiting on the result.
121
+
122
+ In order to use the ` Async ` handle to get the result of the computation once it's
123
+ complete, we'll have to hang on to a reference. We also need a way to associate the
124
+ submitter with the handle, so we end up with one field for the active (running)
125
+ tasks and another for the queue of accepted (but inactive) ones, like so...
126
+
127
+ {% highlight haskell %}
128
+ data Pool a = Pool {
129
+ poolSize :: PoolSize
130
+ , active :: [ (Recipient, Async a)]
131
+ , accepted :: [ (Recipient, Closure (Process a))]
132
+ } deriving (Typeable)
133
+ {% endhighlight %}
134
+
135
+ To turn that ` Closure ` environment into a thunk we can evaluate, we'll use the
136
+ built in ` unClosure ` function, and we'll pass the thunk to ` async ` and get back
137
+ a handle to the async task.
138
+
139
+ {% highlight haskell %}
140
+ proc <- unClosure task'
141
+ asyncHandle <- async proc
142
+ {% endhighlight %}
143
+
144
+ Of course, we decided that we wouldn't block on each ` Async ` handle, and we're not
145
+ able to sit in a * loop* polling all the handles representing tasks we're running,
146
+ because no submissions would be handled whilst spinning and waiting for results.
147
+ We're relying on monitors instead, so we need to store the ` MonitorRef ` so we know
148
+ which monitor signal relates to which async task (and recipient).
149
+
150
+ {% highlight haskell %}
151
+ data Pool a = Pool {
152
+ poolSize :: PoolSize
153
+ , active :: [ (MonitorRef, Recipient, Async a)]
154
+ , accepted :: [ (Recipient, Closure (Process a))]
155
+ } deriving (Typeable)
156
+ {% endhighlight %}
157
+
158
+ Finally we can implement the ` acceptTask ` function.
159
+
160
+ {% highlight haskell %}
161
+ acceptTask :: Serializable a
162
+ => Pool a
163
+ -> Recipient
164
+ -> Closure (Process a)
165
+ -> Process (Pool a)
166
+ acceptTask s@(Pool sz' runQueue taskQueue) from task' =
167
+ let currentSz = length runQueue
168
+ in case currentSz >= sz' of
169
+ True -> do
170
+ return $ s { accepted = ((from, task'): taskQueue ) }
171
+ False -> do
172
+ proc <- unClosure task'
173
+ asyncHandle <- async proc
174
+ ref <- monitorAsync asyncHandle
175
+ taskEntry <- return (ref, from, asyncHandle)
176
+ return s { active = (taskEntry: runQueue ) }
177
+ {% endhighlight %}
178
+
179
+ If we're at capacity, we add the task (and caller) to the ` accepted ` queue,
180
+ otherwise we launch and monitor the task using ` async ` and stash the monitor
181
+ ref, caller ref and the async handle together in the ` active ` field. Prepending
182
+ to the list of active/running tasks is a somewhat arbitrary choice. One might
183
+ argue that heuristically, the younger a task is the less likely it is that it
184
+ will run for a long time. Either way, I've done this to avoid cluttering the
185
+ example other data structures, so we can focus on the ` ManagedProcess ` APIs
186
+ only.
187
+
188
+ Now we will write a function that handles the results. When the monitor signal
189
+ arrives, we use the async handle to obtain the result and send it back to the caller.
190
+ Because, even if we were running at capacity, we've now seen a task complete (and
191
+ therefore reduce the number of active tasks by one), we will also pull off a pending
192
+ task from the backlog (i.e., accepted), if any exists, and execute it. As with the
193
+ active task list, we're going to take from the backlog in FIFO order, which is
194
+ almost certainly not what you'd want in a real application, but that's not the
195
+ point of the example either.
196
+
197
+ The steps then, are
198
+
199
+ 1 . find the async handle for the monitor ref
200
+ 2 . pull the result out of it
201
+ 3 . send the result to the client
202
+ 4 . bump another task from the backlog (if there is one)
203
+ 5 . carry on
204
+
205
+ This chain then, looks like ` wait h >>= respond c >> bump s t >>= continue ` .
206
+
207
+ Item (3) requires special API support from ` ManagedProcess ` , because we're not
208
+ just sending * any* message back to the caller. We're replying to a ` call `
209
+ that has already taken place and is, in fact, still running. The API call for
210
+ this is ` replyTo ` .
211
+
212
+ {% highlight haskell %}
213
+ taskComplete :: forall a . Serializable a
214
+ => Pool a
215
+ -> ProcessMonitorNotification
216
+ -> Process (ProcessAction (Pool a))
217
+ taskComplete s@(Pool _ runQ _ )
218
+ (ProcessMonitorNotification ref _ _ ) =
219
+ let worker = findWorker ref runQ in
220
+ case worker of
221
+ Just t@(_ , c, h) -> wait h >>= respond c >> bump s t >>= continue
222
+ Nothing -> continue s
223
+ where
224
+ respond :: Recipient
225
+ -> AsyncResult a
226
+ -> Process ()
227
+ respond c (AsyncDone r) = replyTo c ((Right r) :: (Either String a))
228
+ respond c (AsyncFailed d) = replyTo c ((Left (show d)) :: (Either String a))
229
+ respond c (AsyncLinkFailed d) = replyTo c ((Left (show d)) :: (Either String a))
230
+ respond _ _ = die $ TerminateOther "IllegalState"
231
+
232
+ bump :: Pool a -> (MonitorRef, Recipient, Async a) -> Process (Pool a)
233
+ bump st@(Pool _ runQueue acc) worker =
234
+ let runQ2 = deleteFromRunQueue worker runQueue in
235
+ case acc of
236
+ [] -> return st { active = runQ2 }
237
+ ((tr,tc):ts) -> acceptTask (st { accepted = ts, active = runQ2 }) tr tc
238
+
239
+ findWorker :: MonitorRef
240
+ -> [ (MonitorRef, Recipient, Async a)]
241
+ -> Maybe (MonitorRef, Recipient, Async a)
242
+ findWorker key = find (\( ref,_ ,_ ) -> ref == key)
243
+
244
+ deleteFromRunQueue :: (MonitorRef, Recipient, Async a)
245
+ -> [ (MonitorRef, Recipient, Async a)]
246
+ -> [ (MonitorRef, Recipient, Async a)]
247
+ deleteFromRunQueue c@(p, _ , _ ) runQ = deleteBy (\_ (b, _ , _ ) -> b == p) c runQ
248
+ {% endhighlight %}
249
+
250
+ That was pretty simple. We've deal with mapping the ` AsyncResult ` to ` Either ` values,
251
+ which we * could* have left to the caller, but this makes the client facing API much
252
+ simpler to work with.
253
+
254
+ ### Wiring up handlers
255
+
256
+ The ` ProcessDefinition ` takes a number of different kinds of handler. The only ones
257
+ we care about are the call handler for submission handling, and the handler that
258
+ deals with monitor signals.
259
+
260
+ Call and cast handlers live in the ` apiHandlers ` list of a ` ProcessDefinition ` and
261
+ must have the type ` Dispatcher s ` where ` s ` is the state type for the process. We
262
+ cannot construct a ` Dispatcher ` ourselves, but a range of functions in the
263
+ ` ManagedProcess.Server ` module exist to lift functions like the ones we've just
264
+ defined. The particular function we need is ` handleCallFrom ` , which works with
265
+ functions over the state, ` Recipient ` and the call data/message. All the varieties
266
+ of ` handleCall ` need to return a ` ProcessReply ` , which has the following type
267
+
268
+ {% highlight haskell %}
269
+ data ProcessReply s a =
270
+ ProcessReply a (ProcessAction s)
271
+ | NoReply (ProcessAction s)
272
+ {% endhighlight %}
273
+
274
+ There are also various utility function in the API to construct a ` ProcessAction `
275
+ and we will make use of ` noReply_ ` here, which constructs ` NoReply ` for us and
276
+ presets the ` ProcessAction ` to ` ProcessContinue ` , which goes back to receiving
277
+ messages without further action. We already have a function over the right input
278
+ domain which evaluates to a new state so we end up with:
279
+
280
+ {% highlight haskell %}
281
+ storeTask :: Serializable a
282
+ => Pool a
283
+ -> Recipient
284
+ -> Closure (Process a)
285
+ -> Process (ProcessReply (Pool a) ())
286
+ storeTask s r c = acceptTask s r c >>= noReply_
287
+ {% endhighlight %}
288
+
289
+ In order to spell things out for the compiler, we need to put a type signature
290
+ in place at the call site too, so our final construct is
291
+
292
+ {% highlight haskell %}
293
+ handleCallFrom (\s f (p :: Closure (Process a)) -> storeTask s f p)
294
+ {% endhighlight %}
295
+
296
+ No such thing is required for ` taskComplete ` , as there's no ambiguity about its
297
+ type. Our process definition is finished, and here it is:
298
+
299
+ {% highlight haskell %}
300
+ poolServer :: forall a . (Serializable a) => ProcessDefinition (Pool a)
301
+ poolServer =
302
+ defaultProcess {
303
+ apiHandlers = [
304
+ handleCallFrom (\s f (p :: Closure (Process a)) -> storeTask s f p)
305
+ ]
306
+ , infoHandlers = [
307
+ handleInfo taskComplete
308
+ ]
309
+ } :: ProcessDefinition (Pool a)
310
+ {% endhighlight %}
311
+
312
+ Starting the pool is fairly simple and ` ManagedProcess ` has some utilities to help.
313
+
314
+ {% highlight haskell %}
315
+ simplePool :: forall a . (Serializable a)
316
+ => PoolSize
317
+ -> ProcessDefinition (Pool a)
318
+ -> Process (Either (InitResult (Pool a)) TerminateReason)
319
+ simplePool sz server = start sz init' server
320
+ where init' :: PoolSize -> Process (InitResult (Pool a))
321
+ init' sz' = return $ InitOk (Pool sz' [ ] [ ] ) Infinity
322
+ {% endhighlight %}
323
+
324
+ ### Putting it all together
325
+
326
+ Starting up a pool locally or on a remote node is just a matter of using ` spawn `
327
+ or ` spawnLocal ` with ` simplePool ` . The second argument should specify the type of
328
+ results, e.g.,
329
+
330
+ {% highlight haskell %}
331
+ let s' = poolServer :: ProcessDefinition (Pool String)
332
+ in simplePool s s'
333
+ {% endhighlight %}
334
+
335
+ Defining tasks is as simple as making them remote-worthy:
336
+
337
+ {% highlight haskell %}
338
+ sampleTask :: (TimeInterval, String) -> Process String
339
+ sampleTask (t, s) = sleep t >> return s
340
+
341
+ $(remotable [ 'sampleTask] )
342
+ {% endhighlight %}
343
+
344
+ And executing them is just as simple too. Given a pool which has been registered
345
+ locally as "mypool", we can simply call it directly:
346
+
347
+ {% highlight haskell %}
348
+ job <- return $ ($(mkClosure 'sampleTask) (seconds 2, "foobar"))
349
+ call "mypool" job >>= wait >>= stash result
350
+ {% endhighlight %}
351
+
352
+ Hopefully this has demonstrated a few benefits of the ` ManagedProcess ` API, although
353
+ it's really just scratching the surface. We have focussed on the code that matters -
354
+ state transitions and decision making, without getting bogged down (much) with receiving
355
+ or sending messages, apart from using some simple APIs when we needed to.
0 commit comments