Skip to content

Commit 46cd574

Browse files
lrsjohnsongkanwar
authored andcommitted
Output Data Sets, run-computation returns
1 parent 8127ec2 commit 46cd574

File tree

3 files changed

+239
-92
lines changed

3 files changed

+239
-92
lines changed

code/data-loader.scm

+156-88
Original file line numberDiff line numberDiff line change
@@ -21,100 +21,168 @@
2121
(map (lambda (kv)
2222
(create-ds-elt (car kv) (cadr kv))) key-value-list)))
2323

24-
(define (mrs:run-computation thunk)
24+
(define *output-callback* #f)
25+
26+
(define (mrs:run-computation-with-callback thunk callback)
2527
(with-time-sharing-conspiracy
2628
(lambda ()
27-
(thunk)
28-
(flush-input-data-sets)
29-
(conspire:null-job))))
29+
(let ((output-done #f))
30+
(fluid-let
31+
((*output-callback*
32+
(lambda (ds-elt)
33+
(cond ((ds-elt-done? ds-elt)
34+
(set! output-done #t))
35+
(else
36+
(callback
37+
(ds-elt-key ds-elt)
38+
(ds-elt-value ds-elt)))))))
39+
(thunk)
40+
(flush-input-data-sets)
41+
(let lp ()
42+
(conspire:thread-yield)
43+
(if output-done
44+
'done
45+
(lp))))))))
46+
47+
(define (mrs:run-computation thunk)
48+
(let ((output-value '()))
49+
(let ((output-callback
50+
(lambda (k v)
51+
(set! output-value
52+
(cons (list k v)
53+
output-value)))))
54+
(mrs:run-computation-with-callback thunk output-callback)
55+
output-value)))
3056

3157
#|
32-
(define (test1)
33-
(define ds-input (create-data-set))
34-
(define ds1 (create-data-set))
35-
(mrs:print-streaming ds-input 'ds1)
36-
(mrs:feed-value-list ds-input '(1 2 3 4)))
37-
(mrs:run-computation test1)
58+
(define (test1)
59+
(define ds-input (mrs:create-data-set))
60+
(define ds1 (mrs:create-data-set))
61+
(mrs:print-streaming ds-input 'ds1)
62+
(mrs:feed-value-list ds-input '(1 2 3 4)))
63+
(mrs:run-computation test1)
64+
;-> (ds1 0 1)
65+
; (ds1 1 2)
66+
; (ds1 2 3)
67+
; (ds1 3 4)
68+
; (ds1 done)
69+
70+
(define (test2)
71+
(define ds-input (mrs:create-data-set))
72+
(define ds1 (mrs:create-data-set))
73+
(mrs:map
74+
(lambda (key value)
75+
(mrs:emit key (* 10 value)))
76+
ds-input
77+
ds1)
78+
(mrs:print-streaming ds1 'ds1)
79+
(mrs:feed-value-list ds-input '(1 2 3 4)))
80+
(mrs:run-computation test2)
81+
; (ds1 3 40)
82+
; (ds1 0 10)
83+
; (ds1 1 20)
84+
; (ds1 2 30)
85+
; (ds1 done)
86+
87+
(define (test3)
88+
(define ds-input (mrs:create-data-set))
89+
(define ds1 (mrs:create-data-set))
90+
(mrs:map
91+
(lambda (key value)
92+
(mrs:emit key (* 10 value)))
93+
ds-input
94+
ds1)
95+
(mrs:map
96+
(lambda (key value)
97+
(mrs:emit key (* 11 value)))
98+
ds-input
99+
ds1)
100+
(mrs:print-streaming ds1 'ds1)
101+
(mrs:feed-value-list ds-input '(1 2 3 4)))
102+
(mrs:run-computation test3)
103+
; (ds1 1 20)
104+
; (ds1 2 30)
105+
; (ds1 3 40)
106+
; (ds1 0 10)
107+
; (ds1 1 22)
108+
; (ds1 2 33)
109+
; (ds1 3 44)
110+
; (ds1 0 11)
111+
; (ds1 done)
112+
113+
(define (test4)
114+
(define ds-input (mrs:create-data-set))
115+
(define ds1 (mrs:create-data-set))
116+
(mrs:aggregate
117+
ds-input
118+
ds1)
119+
(mrs:print-streaming ds1 'ds1)
120+
(mrs:feed-value-list ds-input '(1 2 3 4))
121+
(mrs:feed-value-list ds-input '(3 3 3 3))
122+
(mrs:feed-value-list ds-input '(1 4 3 4)))
123+
(mrs:run-computation test4)
124+
; (ds1 3 (4 3 4))
125+
; (ds1 2 (3 3 3))
126+
; (ds1 1 (4 3 2))
127+
; (ds1 0 (1 3 1))
128+
; (ds1 done)
38129

39-
(define (test2)
40-
(define ds-input (create-data-set))
41-
(define ds1 (create-data-set))
42-
(mrs:map
43-
(lambda (key value)
44-
(pp value)
45-
(mrs:emit key (* 10 value)))
46-
ds-input
47-
ds1)
48-
(mrs:print-streaming ds1 'ds1)
49-
(mrs:feed-value-list ds-input '(1 2 3 4)))
50-
(mrs:run-computation test2)
51-
; (ds1 3 40)
52-
; (ds1 0 10)
53-
; (ds1 1 20)
54-
; (ds1 2 30)
55-
; (ds1 done)
130+
(define (test5)
131+
(define ds-input (mrs:create-data-set))
132+
(define ds1 (mrs:create-data-set))
133+
(define ds2 (mrs:create-data-set))
134+
(mrs:map
135+
(lambda (key value)
136+
(mrs:emit key (* 10 value)))
137+
ds-input
138+
ds1)
139+
(mrs:reduce
140+
(lambda (key values)
141+
(mrs:emit key (apply + values)))
142+
ds1
143+
ds2)
144+
(mrs:print-streaming ds1 'ds1)
145+
(mrs:print-streaming ds2 'ds2)
146+
(mrs:feed-value-list ds-input '(1 1 3 5))
147+
(mrs:feed-value-list ds-input '(2 2 1 4))
148+
(mrs:feed-value-list ds-input '(3 1 0 3)))
149+
(mrs:run-computation test5)
150+
; (ds1 3 50)
151+
; (ds1 0 20)
152+
; (ds1 0 10)
153+
; (ds1 1 10)
154+
; (ds1 2 30)
155+
; (ds1 0 30)
156+
; (ds1 1 10)
157+
; (ds1 1 20)
158+
; (ds1 2 10)
159+
; (ds1 3 40)
160+
; (ds1 2 0)
161+
; (ds1 3 30)
162+
; (ds1 done)
163+
; (ds2 3 120)
164+
; (ds2 2 40)
165+
; (ds2 1 40)
166+
; (ds2 0 60)
167+
; (ds2 done)
56168

57-
(define (test3)
58-
(define ds-input (create-data-set))
59-
(define ds1 (create-data-set))
60-
(mrs:map
61-
(lambda (key value)
62-
(mrs:emit key (* 10 value)))
63-
ds-input
64-
ds1)
65-
(mrs:map
66-
(lambda (key value)
67-
(mrs:emit key (* 11 value)))
68-
ds-input
69-
ds1)
70-
(mrs:print-streaming ds1 'ds1)
71-
(mrs:feed-value-list ds-input '(1 2 3 4)))
72-
(mrs:run-computation test3)
73-
; (ds1 1 20)
74-
; (ds1 2 30)
75-
; (ds1 3 40)
76-
; (ds1 0 10)
77-
; (ds1 1 22)
78-
; (ds1 2 33)
79-
; (ds1 3 44)
80-
; (ds1 0 11)
81-
; (ds1 done)
82169

83-
(define (test4)
84-
(define ds-input (create-data-set))
85-
(define ds1 (create-data-set))
86-
(mrs:aggregate
87-
ds-input
88-
ds1)
89-
(mrs:print-streaming ds1 'ds1)
90-
(mrs:feed-value-list ds-input '(1 2 3 4))
91-
(mrs:feed-value-list ds-input '(3 3 3 3))
92-
(mrs:feed-value-list ds-input '(1 4 3 4)))
93-
(mrs:run-computation test4)
94-
; (ds1 3 (4 3 4))
95-
; (ds1 2 (3 3 3))
96-
; (ds1 1 (4 3 2))
97-
; (ds1 0 (1 3 1))
98-
; (ds1 done)
170+
; (ds1 3 (4 3 4))
171+
; (ds1 2 (3 3 3))
172+
; (ds1 1 (4 3 2))
173+
; (ds1 0 (1 3 1))
174+
; (ds1 done)
99175

100-
(define (test5)
101-
(define ds-input (create-data-set))
102-
(define ds1 (create-data-set))
103-
(define ds2 (create-data-set))
104-
(mrs:map
105-
(lambda (key value)
106-
(mrs:emit key (* 10 value)))
107-
ds-input
108-
ds1)
109-
(mrs:reduce
110-
(lambda (key values)
111-
(mrs:emit key (apply + values)))
112-
ds1
113-
ds2)
114-
(mrs:print-streaming ds1 'ds1)
115-
(mrs:print-streaming ds2 'ds2)
116-
(mrs:feed-value-list ds-input '(1 1 3 5))
117-
(mrs:feed-value-list ds-input '(2 2 1 4))
118-
(mrs:feed-value-list ds-input '(3 1 0 3)))
119-
(mrs:run-computation test5)
176+
(define (test-with-output-data-set)
177+
(define ds-input (mrs:create-data-set))
178+
(define ds-output (mrs:create-output-data-set))
179+
(mrs:map
180+
(lambda (key value)
181+
(mrs:emit key (* 10 value)))
182+
ds-input
183+
ds-output)
184+
(mrs:feed-value-list ds-input '(1 2 3 4)))
185+
(mrs:run-computation test-with-output-data-set)
186+
;;; Note that this returns directly!
187+
;-> ((1 20) (0 10) (3 40) (2 30))
120188
|#

code/datasets.scm

+81-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
;;; User-facing operations
22
(define (mrs:create-data-set) (create-mrq-data-set))
3+
(define (mrs:create-sink-data-set) (create-sink-data-set))
34
(define (mrs:create-file-writer-data-set) (create-file-writer-data-set))
5+
(define (mrs:create-output-data-set) (create-output-data-set))
46

57
;;; General data-set operations
68
(define *data-sets* '())
@@ -76,7 +78,7 @@
7678

7779
(defhandler ds-add-elt
7880
(lambda (mrq-data-set val)
79-
(let ((lock (mrq-data-set-lock mrq-data-set)))
81+
(let ((lock (ds-get-lock mrq-data-set)))
8082
(conspire:acquire-lock lock)
8183
(mr-queue-add-elt (mrq-data-set-queue mrq-data-set) val)
8284
(conspire:unlock lock)))
@@ -227,6 +229,57 @@
227229
|#
228230

229231

232+
;;; Output data set
233+
(define-structure output-data-set output-callback lock done-count writer-count)
234+
(define (create-output-data-set)
235+
(make-output-data-set
236+
*output-callback*
237+
(conspire:make-lock) 0 0))
238+
239+
(defhandler ds-add-elt
240+
(lambda (data-set val)
241+
((output-data-set-output-callback data-set) val))
242+
output-data-set?)
243+
244+
(defhandler ds-get-reader
245+
(lambda (data-set)
246+
(error "Cannot read from output data set"))
247+
output-data-set?)
248+
249+
(defhandler ds-get-writer-count
250+
output-data-set-writer-count
251+
output-data-set?)
252+
253+
(defhandler ds-set-writer-count!
254+
set-output-data-set-writer-count!
255+
output-data-set?)
256+
257+
(defhandler ds-get-done-count
258+
output-data-set-done-count
259+
output-data-set?)
260+
261+
(defhandler ds-set-done-count!
262+
set-output-data-set-done-count!
263+
output-data-set?)
264+
265+
(defhandler ds-get-lock
266+
output-data-set-lock
267+
output-data-set?)
268+
269+
#|
270+
(define *output-callback* #f)
271+
(fluid-let ((*output-callback* pp))
272+
(define (test-output)
273+
(define ds (create-output-data-set))
274+
(define writer (ds-get-writer ds))
275+
(writer 1)
276+
(writer 2))
277+
(with-time-sharing-conspiracy test-output))
278+
;-> 1
279+
; 2
280+
|#
281+
282+
230283
;;; Sink data set
231284
(define *sink-data-set* (list 'sink-data-set))
232285
(define (sink-data-set? data-set) (eq? data-set *sink-data-set*))
@@ -236,7 +289,7 @@
236289

237290
(defhandler ds-get-reader
238291
(lambda (sink-data-set)
239-
(lambda () (error "Cannot read from sink data set")))
292+
(error "Cannot read from sink data set"))
240293
sink-data-set?)
241294

242295
(defhandler ds-get-writer
@@ -254,6 +307,32 @@
254307
(error "Writer count does not apply to /dev/null"))
255308
sink-data-set?)
256309

310+
(defhandler ds-get-done-count
311+
(lambda (data-set)
312+
(error "Done count does not apply to /dev/null"))
313+
sink-data-set?)
314+
315+
(defhandler ds-set-done-count!
316+
(lambda (data-set count)
317+
(error "Done count does not apply to /dev/null"))
318+
sink-data-set?)
319+
320+
(defhandler ds-get-lock
321+
(lambda (data-set)
322+
(error "Lock does not apply to /dev/null"))
323+
sink-data-set?)
324+
325+
#|
326+
(define (test-sink)
327+
(define ds (create-sink-data-set))
328+
(define writer (ds-get-writer ds))
329+
(writer 1)
330+
(writer 2))
331+
(with-time-sharing-conspiracy test-sink)
332+
;; Should do nothing with writes (just returns 'pass)
333+
;-> pass
334+
|#
335+
257336

258337
;;; Data-set elements
259338
(define-structure ds-elt key value done)

code/operations.scm

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252

5353
;;; Reduce
5454
(define (mrs:reduce reduce-func ds-in ds-out)
55-
(define ds-intermediate (create-data-set))
55+
(define ds-intermediate (mrs:create-data-set))
5656
(mrs:aggregate ds-in ds-intermediate)
5757
(mrs:map reduce-func ds-intermediate ds-out))
5858

@@ -63,7 +63,7 @@
6363
(if (ds-elt-done? ds-elt)
6464
(pp `(,tag done))
6565
(pp `(,tag ,(ds-elt-key ds-elt) ,(ds-elt-value ds-elt))))))
66-
(make-distributor mm-func ds-in (create-sink-data-set) 1))
66+
(make-distributor mm-func ds-in (mrs:create-sink-data-set) 1))
6767

6868
;;; ds-out only receives a done once ds-in is done
6969
;;; Example use case: User specifies return output data

0 commit comments

Comments
 (0)