Skip to content

Commit 512acb5

Browse files
committed
Merge branch 'master' of github.com:lrsjohnson/scheme-mapreduce
2 parents 6a1b5d8 + 114bde2 commit 512acb5

File tree

5 files changed

+87
-15
lines changed

5 files changed

+87
-15
lines changed

data-loader.scm

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
;; TODO: Make generic? Only works on list right now
22
(define (mrs:feed-data data-set ds-elt-sequence)
3-
(pp 'feed-data)
43
(for-each (lambda (ds-elt)
54
(ds-add-elt data-set ds-elt))
65
ds-elt-sequence))
@@ -16,11 +15,16 @@
1615
(lp (+ i 1)
1716
(cdr value-list)))))))
1817

18+
(define (mrs:feed-key-value-list data-set key-value-list)
19+
(mrs:feed-data
20+
data-set
21+
(map (lambda (kv)
22+
(create-ds-elt (car kv) (cadr kv))) key-value-list)))
23+
1924
(define (mrs:run-computation thunk)
2025
(with-time-sharing-conspiracy
2126
(lambda ()
2227
(thunk)
23-
(pp 'run-c)
2428
(flush-input-data-sets)
2529
(conspire:null-job))))
2630

examples/demo.scm

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
;;; Flow computation demo
2+
(define (demo)
3+
(define 2d-vectors (create-data-set))
4+
(define 3d-vectors (create-data-set))
5+
(define magnitudes (create-data-set))
6+
(define in-out-flow (create-data-set))
7+
(mrs:map
8+
(lambda (key 2d-vec)
9+
(let ((x (car 2d-vec)) (y (cadr 2d-vec)))
10+
(mrs:emit key (sqrt (+ (* x x) (* y y))))))
11+
2d-vectors
12+
magnitudes)
13+
(mrs:map
14+
(lambda (key 3d-vec)
15+
(let ((x (car 3d-vec)) (y (cadr 3d-vec)) (z (caddr 3d-vec)))
16+
(mrs:emit key (sqrt (+ (* x x) (* y y) (* z z))))))
17+
3d-vectors
18+
magnitudes)
19+
(mrs:reduce
20+
(lambda (key values)
21+
(mrs:emit key (apply + values)))
22+
magnitudes
23+
in-out-flow)
24+
(mrs:feed-key-value-list 2d-vectors '((in (1 2))
25+
(in (3 3))
26+
(out (5 0))))
27+
(mrs:feed-key-value-list 3d-vectors '((in (1 1 1))
28+
(out (5 0 0))))
29+
(mrs:print-streaming in-out-flow 'flows))
30+
(mrs:run-computation demo)

examples/word-count.scm

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
(define (word-count)
2+
(define documents (create-data-set))
3+
(define words (create-data-set))
4+
(define word-counts (create-data-set))
5+
(mrs:map
6+
(lambda (key document)
7+
(for-each (lambda (word) (mrs:emit word 1)) document))
8+
documents words)
9+
(mrs:reduce
10+
(lambda (word counts)
11+
(mrs:emit word (apply + counts)))
12+
words word-counts)
13+
(mrs:feed-value-list documents '((hello world this is a document)))
14+
(mrs:feed-value-list documents '((another document) (and another)))
15+
(mrs:feed-value-list documents '((this document is somewhat long) (a short document)))
16+
(mrs:print-streaming word-counts 'count))
17+
(mrs:run-computation word-count)

operations.scm

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,9 @@
3535
(cons value (hash-table/get hash-table key '()))))
3636

3737
(define (mrs:aggregate ds-in ds-out)
38-
(pp 'mrs-aggregate)
3938
(define (mm-func emit)
40-
(pp 'mm-func-aggregator)
4139
(let ((data-from-mapper (make-equal-hash-table)))
42-
(pp 'made-aggregator)
4340
(lambda (ds-elt)
44-
(pp (list 'aggregate-data ds-elt))
4541
(if (not (ds-elt-done? ds-elt))
4642
(append-data-in-hashtable
4743
data-from-mapper
@@ -50,9 +46,7 @@
5046
(begin (hash-table/for-each
5147
data-from-mapper
5248
(lambda (key values)
53-
(pp (list 'ht-kv key values))
54-
(emit (create-ds-elt key values))
55-
))
49+
(emit (create-ds-elt key values))))
5650
(emit ds-elt))))))
5751
(make-distributor mm-func ds-in ds-out 1))
5852

@@ -69,4 +63,36 @@
6963
(if (ds-elt-done? ds-elt)
7064
(pp `(,tag done))
7165
(pp `(,tag ,(ds-elt-key ds-elt) ,(ds-elt-value ds-elt))))))
72-
(make-distributor mm-func ds-in (create-sink-data-set) 1))
66+
(make-distributor mm-func ds-in (create-sink-data-set) 1))
67+
68+
69+
;;; Constructor-style operations
70+
(define (make-constructor-operation operation)
71+
(define (new-op . args)
72+
(define ds-out (create-data-set))
73+
(apply operation (append args (list ds-out)))
74+
ds-out)
75+
new-op)
76+
77+
(define mrs:c-map (make-constructor-operation mrs:map))
78+
(define mrs:c-filter (make-constructor-operation mrs:filter))
79+
(define mrs:c-aggregate (make-constructor-operation mrs:aggregate))
80+
(define mrs:c-reduce (make-constructor-operation mrs:reduce))
81+
82+
#|
83+
(define (test1)
84+
(define ds-input (create-data-set))
85+
(define ds-out
86+
(mrs:c-map
87+
(lambda (key value)
88+
(mrs:emit key (* 10 value)))
89+
ds-input))
90+
(mrs:print-streaming ds-out 'out)
91+
(mrs:feed-value-list ds-input '(1 2 3 4)))
92+
(mrs:run-computation test1)
93+
;-> (out 0 10)
94+
; (out 1 20)
95+
; (out 2 30)
96+
; (out 3 40)
97+
; (out done)
98+
|#

workers.scm

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
(input-pipe (make-conspire-pipe)))
99
(let ((output-writer (get-pipe-writer output-pipe))
1010
(input-reader (get-pipe-reader input-pipe)))
11-
(pp 'make-worker)
1211
(conspire:make-thread
1312
conspire:runnable
1413
(lambda ()
@@ -59,7 +58,6 @@
5958
(ds-in-reader (ds-get-reader ds-in))
6059
(ds-out-writer (ds-get-writer ds-out))
6160
(num-workers-outstanding 0))
62-
(pp 'make-distributor)
6361
(conspire:make-thread
6462
conspire:runnable
6563
(lambda ()
@@ -69,16 +67,13 @@
6967
(cond ((empty-ds-elt? ds-in-elt)
7068
'continue)
7169
((ds-elt-done? ds-in-elt)
72-
(pp (list 'ds-done-elt-into-distributor ds-in-elt))
7370
(set! num-workers-outstanding num-workers)
7471
(vector-for-each (lambda (worker)
7572
(write-to-worker
7673
worker
7774
ds-in-elt))
7875
workers))
7976
(else
80-
(pp (list 'ds-elt-into-distributor-1 ds-in-elt))
81-
(pp (list 'ds-elt-into-distributor-2 (ds-elt-value ds-in-elt)))
8277
(write-to-worker
8378
(choose-worker ds-in-elt workers)
8479
ds-in-elt))))

0 commit comments

Comments
 (0)