Skip to content

Commit 8a0a159

Browse files
committed
Extracting data loader
1 parent 46cd574 commit 8a0a159

File tree

2 files changed

+166
-167
lines changed

2 files changed

+166
-167
lines changed

code/data-loader.scm

+1-167
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
;; TODO: Make generic? Only works on list right now
21
(define (mrs:feed-data data-set ds-elt-sequence)
32
(for-each (lambda (ds-elt)
43
(ds-add-elt data-set ds-elt))
@@ -20,169 +19,4 @@
2019
data-set
2120
(map (lambda (kv)
2221
(create-ds-elt (car kv) (cadr kv))) key-value-list)))
23-
24-
(define *output-callback* #f)
25-
26-
(define (mrs:run-computation-with-callback thunk callback)
27-
(with-time-sharing-conspiracy
28-
(lambda ()
29-
(let ((output-done #f))
30-
(fluid-let
31-
((*output-callback*
32-
(lambda (ds-elt)
33-
(cond ((ds-elt-done? ds-elt)
34-
(set! output-done #t))
35-
(else
36-
(callback
37-
(ds-elt-key ds-elt)
38-
(ds-elt-value ds-elt)))))))
39-
(thunk)
40-
(flush-input-data-sets)
41-
(let lp ()
42-
(conspire:thread-yield)
43-
(if output-done
44-
'done
45-
(lp))))))))
46-
47-
(define (mrs:run-computation thunk)
48-
(let ((output-value '()))
49-
(let ((output-callback
50-
(lambda (k v)
51-
(set! output-value
52-
(cons (list k v)
53-
output-value)))))
54-
(mrs:run-computation-with-callback thunk output-callback)
55-
output-value)))
56-
57-
#|
58-
(define (test1)
59-
(define ds-input (mrs:create-data-set))
60-
(define ds1 (mrs:create-data-set))
61-
(mrs:print-streaming ds-input 'ds1)
62-
(mrs:feed-value-list ds-input '(1 2 3 4)))
63-
(mrs:run-computation test1)
64-
;-> (ds1 0 1)
65-
; (ds1 1 2)
66-
; (ds1 2 3)
67-
; (ds1 3 4)
68-
; (ds1 done)
69-
70-
(define (test2)
71-
(define ds-input (mrs:create-data-set))
72-
(define ds1 (mrs:create-data-set))
73-
(mrs:map
74-
(lambda (key value)
75-
(mrs:emit key (* 10 value)))
76-
ds-input
77-
ds1)
78-
(mrs:print-streaming ds1 'ds1)
79-
(mrs:feed-value-list ds-input '(1 2 3 4)))
80-
(mrs:run-computation test2)
81-
; (ds1 3 40)
82-
; (ds1 0 10)
83-
; (ds1 1 20)
84-
; (ds1 2 30)
85-
; (ds1 done)
86-
87-
(define (test3)
88-
(define ds-input (mrs:create-data-set))
89-
(define ds1 (mrs:create-data-set))
90-
(mrs:map
91-
(lambda (key value)
92-
(mrs:emit key (* 10 value)))
93-
ds-input
94-
ds1)
95-
(mrs:map
96-
(lambda (key value)
97-
(mrs:emit key (* 11 value)))
98-
ds-input
99-
ds1)
100-
(mrs:print-streaming ds1 'ds1)
101-
(mrs:feed-value-list ds-input '(1 2 3 4)))
102-
(mrs:run-computation test3)
103-
; (ds1 1 20)
104-
; (ds1 2 30)
105-
; (ds1 3 40)
106-
; (ds1 0 10)
107-
; (ds1 1 22)
108-
; (ds1 2 33)
109-
; (ds1 3 44)
110-
; (ds1 0 11)
111-
; (ds1 done)
112-
113-
(define (test4)
114-
(define ds-input (mrs:create-data-set))
115-
(define ds1 (mrs:create-data-set))
116-
(mrs:aggregate
117-
ds-input
118-
ds1)
119-
(mrs:print-streaming ds1 'ds1)
120-
(mrs:feed-value-list ds-input '(1 2 3 4))
121-
(mrs:feed-value-list ds-input '(3 3 3 3))
122-
(mrs:feed-value-list ds-input '(1 4 3 4)))
123-
(mrs:run-computation test4)
124-
; (ds1 3 (4 3 4))
125-
; (ds1 2 (3 3 3))
126-
; (ds1 1 (4 3 2))
127-
; (ds1 0 (1 3 1))
128-
; (ds1 done)
129-
130-
(define (test5)
131-
(define ds-input (mrs:create-data-set))
132-
(define ds1 (mrs:create-data-set))
133-
(define ds2 (mrs:create-data-set))
134-
(mrs:map
135-
(lambda (key value)
136-
(mrs:emit key (* 10 value)))
137-
ds-input
138-
ds1)
139-
(mrs:reduce
140-
(lambda (key values)
141-
(mrs:emit key (apply + values)))
142-
ds1
143-
ds2)
144-
(mrs:print-streaming ds1 'ds1)
145-
(mrs:print-streaming ds2 'ds2)
146-
(mrs:feed-value-list ds-input '(1 1 3 5))
147-
(mrs:feed-value-list ds-input '(2 2 1 4))
148-
(mrs:feed-value-list ds-input '(3 1 0 3)))
149-
(mrs:run-computation test5)
150-
; (ds1 3 50)
151-
; (ds1 0 20)
152-
; (ds1 0 10)
153-
; (ds1 1 10)
154-
; (ds1 2 30)
155-
; (ds1 0 30)
156-
; (ds1 1 10)
157-
; (ds1 1 20)
158-
; (ds1 2 10)
159-
; (ds1 3 40)
160-
; (ds1 2 0)
161-
; (ds1 3 30)
162-
; (ds1 done)
163-
; (ds2 3 120)
164-
; (ds2 2 40)
165-
; (ds2 1 40)
166-
; (ds2 0 60)
167-
; (ds2 done)
168-
169-
170-
; (ds1 3 (4 3 4))
171-
; (ds1 2 (3 3 3))
172-
; (ds1 1 (4 3 2))
173-
; (ds1 0 (1 3 1))
174-
; (ds1 done)
175-
176-
(define (test-with-output-data-set)
177-
(define ds-input (mrs:create-data-set))
178-
(define ds-output (mrs:create-output-data-set))
179-
(mrs:map
180-
(lambda (key value)
181-
(mrs:emit key (* 10 value)))
182-
ds-input
183-
ds-output)
184-
(mrs:feed-value-list ds-input '(1 2 3 4)))
185-
(mrs:run-computation test-with-output-data-set)
186-
;;; Note that this returns directly!
187-
;-> ((1 20) (0 10) (3 40) (2 30))
188-
|#
22+

code/main.scm

+165
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
(define *output-callback* #f)
2+
3+
(define (mrs:run-computation-with-callback thunk callback)
4+
(with-time-sharing-conspiracy
5+
(lambda ()
6+
(let ((output-done #f))
7+
(fluid-let
8+
((*output-callback*
9+
(lambda (ds-elt)
10+
(cond ((ds-elt-done? ds-elt)
11+
(set! output-done #t))
12+
(else
13+
(callback
14+
(ds-elt-key ds-elt)
15+
(ds-elt-value ds-elt)))))))
16+
(thunk)
17+
(flush-input-data-sets)
18+
(let lp ()
19+
(conspire:thread-yield)
20+
(if output-done
21+
'done
22+
(lp))))))))
23+
24+
(define (mrs:run-computation thunk)
25+
(let ((output-value '()))
26+
(let ((output-callback
27+
(lambda (k v)
28+
(set! output-value
29+
(cons (list k v)
30+
output-value)))))
31+
(mrs:run-computation-with-callback thunk output-callback)
32+
output-value)))
33+
34+
#|
35+
(define (test1)
36+
(define ds-input (mrs:create-data-set))
37+
(define ds1 (mrs:create-data-set))
38+
(mrs:print-streaming ds-input 'ds1)
39+
(mrs:feed-value-list ds-input '(1 2 3 4)))
40+
(mrs:run-computation test1)
41+
;-> (ds1 0 1)
42+
; (ds1 1 2)
43+
; (ds1 2 3)
44+
; (ds1 3 4)
45+
; (ds1 done)
46+
47+
(define (test2)
48+
(define ds-input (mrs:create-data-set))
49+
(define ds1 (mrs:create-data-set))
50+
(mrs:map
51+
(lambda (key value)
52+
(mrs:emit key (* 10 value)))
53+
ds-input
54+
ds1)
55+
(mrs:print-streaming ds1 'ds1)
56+
(mrs:feed-value-list ds-input '(1 2 3 4)))
57+
(mrs:run-computation test2)
58+
; (ds1 3 40)
59+
; (ds1 0 10)
60+
; (ds1 1 20)
61+
; (ds1 2 30)
62+
; (ds1 done)
63+
64+
(define (test3)
65+
(define ds-input (mrs:create-data-set))
66+
(define ds1 (mrs:create-data-set))
67+
(mrs:map
68+
(lambda (key value)
69+
(mrs:emit key (* 10 value)))
70+
ds-input
71+
ds1)
72+
(mrs:map
73+
(lambda (key value)
74+
(mrs:emit key (* 11 value)))
75+
ds-input
76+
ds1)
77+
(mrs:print-streaming ds1 'ds1)
78+
(mrs:feed-value-list ds-input '(1 2 3 4)))
79+
(mrs:run-computation test3)
80+
; (ds1 1 20)
81+
; (ds1 2 30)
82+
; (ds1 3 40)
83+
; (ds1 0 10)
84+
; (ds1 1 22)
85+
; (ds1 2 33)
86+
; (ds1 3 44)
87+
; (ds1 0 11)
88+
; (ds1 done)
89+
90+
(define (test4)
91+
(define ds-input (mrs:create-data-set))
92+
(define ds1 (mrs:create-data-set))
93+
(mrs:aggregate
94+
ds-input
95+
ds1)
96+
(mrs:print-streaming ds1 'ds1)
97+
(mrs:feed-value-list ds-input '(1 2 3 4))
98+
(mrs:feed-value-list ds-input '(3 3 3 3))
99+
(mrs:feed-value-list ds-input '(1 4 3 4)))
100+
(mrs:run-computation test4)
101+
; (ds1 3 (4 3 4))
102+
; (ds1 2 (3 3 3))
103+
; (ds1 1 (4 3 2))
104+
; (ds1 0 (1 3 1))
105+
; (ds1 done)
106+
107+
(define (test5)
108+
(define ds-input (mrs:create-data-set))
109+
(define ds1 (mrs:create-data-set))
110+
(define ds2 (mrs:create-data-set))
111+
(mrs:map
112+
(lambda (key value)
113+
(mrs:emit key (* 10 value)))
114+
ds-input
115+
ds1)
116+
(mrs:reduce
117+
(lambda (key values)
118+
(mrs:emit key (apply + values)))
119+
ds1
120+
ds2)
121+
(mrs:print-streaming ds1 'ds1)
122+
(mrs:print-streaming ds2 'ds2)
123+
(mrs:feed-value-list ds-input '(1 1 3 5))
124+
(mrs:feed-value-list ds-input '(2 2 1 4))
125+
(mrs:feed-value-list ds-input '(3 1 0 3)))
126+
(mrs:run-computation test5)
127+
; (ds1 3 50)
128+
; (ds1 0 20)
129+
; (ds1 0 10)
130+
; (ds1 1 10)
131+
; (ds1 2 30)
132+
; (ds1 0 30)
133+
; (ds1 1 10)
134+
; (ds1 1 20)
135+
; (ds1 2 10)
136+
; (ds1 3 40)
137+
; (ds1 2 0)
138+
; (ds1 3 30)
139+
; (ds1 done)
140+
; (ds2 3 120)
141+
; (ds2 2 40)
142+
; (ds2 1 40)
143+
; (ds2 0 60)
144+
; (ds2 done)
145+
146+
147+
; (ds1 3 (4 3 4))
148+
; (ds1 2 (3 3 3))
149+
; (ds1 1 (4 3 2))
150+
; (ds1 0 (1 3 1))
151+
; (ds1 done)
152+
153+
(define (test-with-output-data-set)
154+
(define ds-input (mrs:create-data-set))
155+
(define ds-output (mrs:create-output-data-set))
156+
(mrs:map
157+
(lambda (key value)
158+
(mrs:emit key (* 10 value)))
159+
ds-input
160+
ds-output)
161+
(mrs:feed-value-list ds-input '(1 2 3 4)))
162+
(mrs:run-computation test-with-output-data-set)
163+
;;; Note that this returns directly!
164+
;-> ((1 20) (0 10) (3 40) (2 30))
165+
|#

0 commit comments

Comments
 (0)