-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathasync.clj
110 lines (77 loc) · 2.73 KB
/
async.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
(ns otus-18.async
(:require [clojure.core.async :as a :refer [<! <!! >! >!!
alts! chan close!
go go-loop
pipeline-async pipeline
thread timeout]]))
;; Всем ли хорошо видно?
;; * Каналы (Channels)
(def echo-chan (chan))
(go (println (<! echo-chan)))
(>!! echo-chan "bottle")
; => true
; => bottle
;; * Буферизация (Buffers)
(def echo-buffer (chan 2))
(>!! echo-buffer :hello)
; => true
(>!! echo-buffer :world)
; => true
(>!! echo-buffer :!)
; Третий вызов будет блокирующим, т.к. буфер переполнен
;; Note: `sliding-buffer` - при переполнении старые данные будут удаляться из буфера
;; `dropping-buffer` - новые данные не будут попадать в переполненный буфер
;; при их использовании `>!!` никогда не заблокирует поток выполнения
;; * Блокирование и Паркинг (Blocking and Parking)
;; `>!`, `<!` - могут быть использованы только внутри `go`-блока
;; `>!!`, `<!!` - могут быть использованы как в `go`-блоках, так и в основном коде
(def n-ch (chan))
(doseq [n (range 1000)]
(go (>! n-ch n)))
;; `>!`, `<!` - park thread
;; `>!!`, `<!!` - block thread
;; * thread
(def ch (chan))
(time (do
(doseq [n (range 9)]
(go
(Thread/sleep 1000)
(>! ch n)))
(doseq [_ (range 9)]
(<!! ch))))
; => "Elapsed time: 2016.62955 msecs"
(def another-echo-chan (chan))
(thread (println (<!! another-echo-chan)))
(>!! another-echo-chan "hi")
; => true
; => hi
;; * alts!
(def ch-a (chan))
(def ch-b (chan))
(go (println (alts! [ch-a ch-b])))
(>!! ch-b "bottle")
(go (println (alts! [ch-a ch-b (timeout 1000)])))
;; * pipeline
(def capitalizer (map clojure.string/capitalize))
(def input (chan))
(def output (chan))
(go-loop []
(when-let [x (<! output)]
(println x)
(recur)))
(pipeline 1 output capitalizer input)
(>!! input "hello")
(close! input)
(<!! output)
;; * pipeline-async
(def ca> (chan 1))
(def cb> (chan 1))
(defn c-af [val result] ; notice the signature is different for `pipeline-async`, it includes a channel
(go (<! (timeout 1000))
(>! result (str val "!!!"))
(>! result (str val "!!!"))
(>! result (str val "!!!"))
(close! result)))
(pipeline-async 1 cb> c-af ca>)
(go (println (<! cb>)))
(>!! ca> "hello")