Skip to content

Commit 6664021

Browse files
committed
Lots of tests, bugfixing concurrency, large-scale outputs
1 parent 8a0a159 commit 6664021

File tree

6 files changed

+206
-48
lines changed

6 files changed

+206
-48
lines changed

code/datasets.scm

+12-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
;;; User-facing operations
22
(define (mrs:create-data-set) (create-mrq-data-set))
33
(define (mrs:create-sink-data-set) (create-sink-data-set))
4-
(define (mrs:create-file-writer-data-set) (create-file-writer-data-set))
4+
(define (mrs:create-file-writer-data-set filename)
5+
(create-file-writer-data-set filename))
56
(define (mrs:create-output-data-set) (create-output-data-set))
67

78
;;; General data-set operations
89
(define *data-sets* '())
910

11+
(define (clear-data-sets)
12+
(set! *data-sets* '()))
13+
1014
(define (register-data-set data-set)
1115
(set! *data-sets* (cons data-set *data-sets*)))
1216

@@ -175,8 +179,9 @@
175179
(lambda ()
176180
(if (file-writer-data-set-done data-set)
177181
(begin
178-
(create-ds-elt-done)
179-
(set-file-writer-data-set-done! data-set #f))
182+
(pp 'file-writer-get-done)
183+
(set-file-writer-data-set-done! data-set #f)
184+
(create-ds-elt-done))
180185
*empty-ds-elt*)))
181186
file-writer-data-set?)
182187

@@ -238,7 +243,10 @@
238243

239244
(defhandler ds-add-elt
240245
(lambda (data-set val)
241-
((output-data-set-output-callback data-set) val))
246+
(let ((lock (ds-get-lock data-set)))
247+
(conspire:acquire-lock lock)
248+
((output-data-set-output-callback data-set) val)
249+
(conspire:unlock lock)))
242250
output-data-set?)
243251

244252
(defhandler ds-get-reader

code/load.scm

+3-1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@
66
(load "data-loader")
77
(load "multi-reader-queue")
88
(load "workers")
9-
(load "pipes")
9+
(load "pipes")
10+
(load "main")
11+
(load "util")

code/main.scm

+138-11
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
(callback
1414
(ds-elt-key ds-elt)
1515
(ds-elt-value ds-elt)))))))
16+
(clear-data-sets)
1617
(thunk)
1718
(flush-input-data-sets)
1819
(let lp ()
@@ -32,19 +33,19 @@
3233
output-value)))
3334

3435
#|
35-
(define (test1)
36+
(define (test-print-streaming)
3637
(define ds-input (mrs:create-data-set))
3738
(define ds1 (mrs:create-data-set))
3839
(mrs:print-streaming ds-input 'ds1)
3940
(mrs:feed-value-list ds-input '(1 2 3 4)))
40-
(mrs:run-computation test1)
41+
(mrs:run-computation test-print-streaming)
4142
;-> (ds1 0 1)
4243
; (ds1 1 2)
4344
; (ds1 2 3)
4445
; (ds1 3 4)
4546
; (ds1 done)
4647

47-
(define (test2)
48+
(define (test-basic-map))
4849
(define ds-input (mrs:create-data-set))
4950
(define ds1 (mrs:create-data-set))
5051
(mrs:map
@@ -54,14 +55,14 @@
5455
ds1)
5556
(mrs:print-streaming ds1 'ds1)
5657
(mrs:feed-value-list ds-input '(1 2 3 4)))
57-
(mrs:run-computation test2)
58+
(mrs:run-computation test-basic-map)
5859
; (ds1 3 40)
5960
; (ds1 0 10)
6061
; (ds1 1 20)
6162
; (ds1 2 30)
6263
; (ds1 done)
6364

64-
(define (test3)
65+
(define (test-chain-of-maps)
6566
(define ds-input (mrs:create-data-set))
6667
(define ds1 (mrs:create-data-set))
6768
(mrs:map
@@ -76,7 +77,7 @@
7677
ds1)
7778
(mrs:print-streaming ds1 'ds1)
7879
(mrs:feed-value-list ds-input '(1 2 3 4)))
79-
(mrs:run-computation test3)
80+
(mrs:run-computation test-chain-of-maps)
8081
; (ds1 1 20)
8182
; (ds1 2 30)
8283
; (ds1 3 40)
@@ -87,7 +88,7 @@
8788
; (ds1 0 11)
8889
; (ds1 done)
8990

90-
(define (test4)
91+
(define (test-aggregate)
9192
(define ds-input (mrs:create-data-set))
9293
(define ds1 (mrs:create-data-set))
9394
(mrs:aggregate
@@ -97,14 +98,14 @@
9798
(mrs:feed-value-list ds-input '(1 2 3 4))
9899
(mrs:feed-value-list ds-input '(3 3 3 3))
99100
(mrs:feed-value-list ds-input '(1 4 3 4)))
100-
(mrs:run-computation test4)
101+
(mrs:run-computation test-aggregate)
101102
; (ds1 3 (4 3 4))
102103
; (ds1 2 (3 3 3))
103104
; (ds1 1 (4 3 2))
104105
; (ds1 0 (1 3 1))
105106
; (ds1 done)
106107

107-
(define (test5)
108+
(define (test-map-reduce)
108109
(define ds-input (mrs:create-data-set))
109110
(define ds1 (mrs:create-data-set))
110111
(define ds2 (mrs:create-data-set))
@@ -123,7 +124,7 @@
123124
(mrs:feed-value-list ds-input '(1 1 3 5))
124125
(mrs:feed-value-list ds-input '(2 2 1 4))
125126
(mrs:feed-value-list ds-input '(3 1 0 3)))
126-
(mrs:run-computation test5)
127+
(mrs:run-computation test-map-reduce)
127128
; (ds1 3 50)
128129
; (ds1 0 20)
129130
; (ds1 0 10)
@@ -161,5 +162,131 @@
161162
(mrs:feed-value-list ds-input '(1 2 3 4)))
162163
(mrs:run-computation test-with-output-data-set)
163164
;;; Note that this returns directly!
164-
;-> ((1 20) (0 10) (3 40) (2 30))
165+
;Value -> ((1 20) (0 10) (3 40) (2 30))
166+
167+
(define (test-with-user-specified-callback)
168+
(define ds-input (mrs:create-data-set))
169+
(define ds-output (mrs:create-output-data-set))
170+
(mrs:map
171+
(lambda (key value)
172+
(mrs:emit key (* 10 value)))
173+
ds-input
174+
ds-output)
175+
(mrs:feed-value-list ds-input '(1 2 3 4)))
176+
(mrs:run-computation-with-callback
177+
test-with-user-specified-callback
178+
(lambda (k v)
179+
(pp (list 'user k v))))
180+
;;; Printed:
181+
; (user 1 20)
182+
; (user 2 30)
183+
; (user 3 40)
184+
; (user 0 10)
185+
186+
(define (test-multiple-writers-to-output)
187+
(define ds-input (mrs:create-data-set))
188+
(define ds-output (mrs:create-output-data-set))
189+
(mrs:map
190+
(lambda (key value)
191+
(mrs:emit key (* 10 value)))
192+
ds-input
193+
ds-output)
194+
(mrs:map
195+
(lambda (key value)
196+
(mrs:emit key (* 11 value)))
197+
ds-input
198+
ds-output)
199+
(mrs:feed-value-list ds-input '(1 2 3 4)))
200+
(mrs:run-computation test-multiple-writers-to-output)
201+
;;; Note that this returns directly!
202+
;Value -> ((3 44) (2 33) (1 22) (0 11) (0 10) (3 40) (2 30) (1 20))
203+
204+
(define (test-file-writer-output-no-dependency)
205+
(define ds-file-input (mrs:create-data-set))
206+
(define ds-file-output (mrs:create-file-writer-data-set "output.txt"))
207+
(define ds-main-input (mrs:create-data-set))
208+
(define ds-main-output (mrs:create-output-data-set))
209+
(mrs:map
210+
(lambda (key value)
211+
(pp 'file-map)
212+
(conspire:thread-yield)
213+
(pp 'emit)
214+
(mrs:emit key (* 10 value)))
215+
ds-file-input
216+
ds-file-output)
217+
(mrs:map
218+
(lambda (key value)
219+
(pp 'main-map)
220+
(mrs:emit key (* 11 value)))
221+
ds-main-input
222+
ds-main-output)
223+
(mrs:feed-value-list ds-main-input '(1 2 3))
224+
(mrs:feed-value-list ds-file-input '(1 2 3 4 5 6 7 8 9 10 11 12)))
225+
(mrs:run-computation test-file-writer-output-no-dependency)
226+
227+
;;; Returned
228+
;Value 40: ((0 11) (2 33) (1 22))
229+
230+
;;; BUT the file output.txt is empty! This is because the main
231+
;;; dataflow finished and returned before the file writing was
232+
;;; finished. We can fix that by making the done of the main dataflow
233+
;;; depend on the completion of the others:
234+
235+
236+
(define (test-file-writer-output-with-dependency)
237+
(define ds-file-input (mrs:create-data-set))
238+
(define ds-file-output (mrs:create-file-writer-data-set "output.txt"))
239+
(define ds-main-input (mrs:create-data-set))
240+
(define ds-main-output (mrs:create-output-data-set))
241+
(mrs:map
242+
(lambda (key value)
243+
(conspire:thread-yield)
244+
(mrs:emit key (* 10 value)))
245+
ds-file-input
246+
ds-file-output)
247+
(mrs:map
248+
(lambda (key value)
249+
(mrs:emit key (* 11 value)))
250+
ds-main-input
251+
ds-main-output)
252+
(mrs:depends-on
253+
ds-file-output
254+
ds-main-output)
255+
(mrs:feed-value-list ds-main-input '(1 2 3))
256+
(mrs:feed-value-list ds-file-input '(1 2 3 4 5 6 7 8 9 10 11 12)))
257+
(mrs:run-computation test-file-writer-output-with-dependency)
258+
;Value 28: ((0 11) (2 33) (1 22))
259+
;; Value is returned to the REPL
260+
261+
;; And, unlike the test case above, the "output.txt" file contains all
262+
;; 12 key-value pairs:
263+
; (2 30)
264+
; (3 40)
265+
; (4 50)
266+
; (0 10)
267+
; (1 20)
268+
; (6 70)
269+
; (7 80)
270+
; (8 90)
271+
; (9 100)
272+
; (5 60)
273+
; (11 120)
274+
; (10 110)
275+
276+
277+
(define (test-large-data-fact)
278+
(define ds-fib-input (mrs:create-data-set))
279+
(define ds-fib-output (mrs:create-file-writer-data-set "output.txt"))
280+
(mrs:map
281+
(lambda (key value)
282+
(mrs:emit key (fact value)))
283+
ds-fib-input
284+
ds-fib-output)
285+
(mrs:feed-value-list ds-fib-input (make-range 500)))
286+
(mrs:run-computation test-large-data-fact)
287+
288+
;;; Works! The output file has the first 500 factorials. They are not
289+
;;; in strictly increasing ordering, demonstrating that they are
290+
;;; indeed being distributed across our workers.
291+
165292
|#

code/operations.scm

+3-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@
7373
(define (mm-func emit)
7474
(lambda (ds-elt)
7575
(if (ds-elt-done? ds-elt)
76-
(emit ds-elt))))
76+
(begin
77+
(pp 'depends-on-done)
78+
(emit ds-elt)))))
7779
(make-distributor mm-func ds-in ds-out 1))
7880

7981

code/util.scm

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
(define (make-range n)
2+
(let lp ((i (- n 1))
3+
(range '()))
4+
(if (< i 0)
5+
range
6+
(lp (- i 1)
7+
(cons i range)))))
8+
9+
#|
10+
(make-range 5)
11+
:-> (0 1 2 3 4)
12+
|#
13+
14+
(define (fact n)
15+
(cond ((= n 0) 1)
16+
(else (* n (fact (- n 1))))))
17+
18+
#|
19+
(map fact (make-range 10))
20+
;Value: (1 1 2 6 24 120 720 5040 40320 362880)
21+
|#
22+
23+
(define (fib n)
24+
(cond ((< n 2) n)
25+
(else (+ (fib (- n 1))
26+
(fib (- n 2))))))
27+
28+
#|
29+
(map fib (make-range 10))
30+
;Value:: (0 1 1 2 3 5 8 13 21 34)
31+
|#
32+
33+
(define (sum lst)
34+
(reduce + 0 lst))
35+
36+
#|
37+
(sum '(1 3 2))
38+
;-> 6
39+
40+
(sum '(1 2 3 4 5))
41+
;-> 15
42+
43+
44+
(sum '())
45+
;-> 0
46+
47+
|#
48+
49+
50+

other-stuff/util.scm

-31
This file was deleted.

0 commit comments

Comments
 (0)