1
1
(ns mini-redis.core
2
2
(:require
3
3
[clojure.java.io :as io]
4
- [clojure.string :as str])
4
+ [clojure.string :as str]
5
+ [clojure.core.async :as a])
5
6
(:import
6
7
[java.io BufferedReader Writer]
7
- [java.net ServerSocket]
8
- [java.util.concurrent Executors ScheduledExecutorService TimeUnit])
8
+ [java.net ServerSocket])
9
9
(:gen-class ))
10
10
11
11
34
34
(vec )))))
35
35
36
36
37
- (comment
38
- @keys-to-expire
39
- (set-key-to-expire " name" 123123123 )
40
- (expire-key " name" ))
37
+ (def keys-to-clean-up
38
+ (a/chan ))
41
39
42
40
43
- (def ^ScheduledExecutorService cleanup-pool
44
- (Executors/newScheduledThreadPool 10 ))
41
+ (defn start-cleanup-worker []
42
+ (a/go-loop []
43
+ ; ; принимаем сообщения для удаления ключей из базы
44
+ (let [{:keys [key delay timestamp]} (a/<! keys-to-clean-up)]
45
+ ; ; для каждого ключа создаём свой go блок
46
+ (a/go
47
+ (set-key-to-expire key timestamp)
48
+ ; ; паркуем блок до нужного момента
49
+ (a/<! (a/timeout delay))
50
+ ; ; удаляем ключ из базы
51
+ (expire-key key))
52
+ (recur ))))
45
53
46
54
47
- (defn schedule-cleanup-task []
48
- (let [current-time (System/currentTimeMillis )
49
- five-seconds-later (+ current-time 5000 )
50
- keys-to-schedule (->> @keys-to-expire
51
- (take-while #(< (first %) five-seconds-later)))]
52
- (doseq [[timestamp key] keys-to-schedule]
53
- (.schedule cleanup-pool
54
- ^Runnable (expire-key key)
55
- ^Long (- timestamp current-time)
56
- TimeUnit/MILLISECONDS))))
55
+ ; ; канал для рассылки публикаций
56
+ (def publications-channel
57
+ (a/chan 1 ))
57
58
58
-
59
- (defn start-cleanup-worker []
60
- (.scheduleAtFixedRate cleanup-pool schedule-cleanup-task 0 5 TimeUnit/SECONDS ))
59
+ ; ; специальный объект для подписки на сообщения
60
+ (def publications
61
+ (a/pub publications-channel :channel ))
61
62
62
63
63
64
64
65
; ; обрабатываем команды от клиентов
65
66
(defmulti handle-command
66
- (fn [[command & _]]
67
+ (fn [_ctx [command & _]]
67
68
(keyword (str/lower-case command))))
68
69
69
70
70
71
(defmethod handle-command :ping
71
- [_]
72
+ [_ctx _]
72
73
" PONG" )
73
74
74
75
75
76
(defmethod handle-command :echo
76
- [[_ [arg-len arg]]]
77
+ [_ctx [_ [arg-len arg]]]
77
78
arg )
78
79
79
80
80
81
(defmethod handle-command :set
81
- [[_ [key-len key val-len val opt-len opt optarg-len optarg]]]
82
+ [_ctx [_ [key-len key val-len val opt-len opt optarg-len optarg]]]
82
83
; ; сохраняем значение
83
84
(swap! database assoc key val)
84
85
85
86
(when (and opt (= (.toUpperCase opt) " PX" ))
86
87
(let [delay (Integer/parseInt optarg)
87
88
current-time (System/currentTimeMillis )
88
89
timestamp (+ current-time delay)]
89
- (if (< timestamp (+ current-time 5000 ))
90
- (.schedule cleanup-pool ^Runnable (expire-key key) delay TimeUnit/MILLISECONDS)
91
- (set-key-to-expire key timestamp))))
90
+ ; ; отправляем сообщение в канал для удаления ключей из базы
91
+ (a/put! keys-to-clean-up
92
+ {:key key
93
+ :delay delay
94
+ :timestamp timestamp})))
92
95
93
96
; ; ответ клиенту
94
97
" OK" )
95
98
96
99
97
100
(defmethod handle-command :get
98
- [[_ [key-len key]]]
101
+ [_ctx [_ [key-len key]]]
99
102
(let [entry (find @database key)]
100
- (if (some? ( val entry))
103
+ (if (and ( some? entry) ( some? ( val entry) ))
101
104
(val entry)
102
105
" (nil)" )))
103
106
104
107
108
+ (defmethod handle-command :subscribe
109
+ [{:keys [pub-listener client-subscriptions]}
110
+ [_ [_channel-name-len channel-name]]]
111
+ (let [channel-listener (a/chan 1 )]
112
+ (a/sub publications channel-name channel-listener) ; ; подписываем канал на публикации по топику
113
+ (a/admix pub-listener channel-listener) ; ; мерджим сообщения из канала для топика в общий канал сообщений клиента
114
+ (swap! client-subscriptions assoc channel-name channel-listener) ; ; сохраняем канал для отписки от топика
115
+ [" subscribe" channel-name (count @client-subscriptions)]))
116
+
117
+
118
+ (defmethod handle-command :unsubscribe
119
+ [{:keys [pub-listener client-subscriptions]}
120
+ [_ [_channel-name-len channel-name]]]
121
+ (let [channel-listener (get @client-subscriptions channel-name)]
122
+ (a/unmix pub-listener channel-listener) ; ; убираем сообщения канала для топика из общего канал сообщений клиента
123
+ (a/close! channel-listener) ; ; закрываем канал
124
+ (swap! client-subscriptions dissoc channel-name) ; ; убираем канал из атома
125
+ [" unsubscribe" channel-name (count @client-subscriptions)]))
126
+
127
+
128
+ (defmethod handle-command :publish
129
+ [_ctx [_ [_channel-name-len channel-name _message-len message]]]
130
+ ; ; отправляем все публикации в глобальный канал для рассылки всем подписавшимся клиентам
131
+ (a/put! publications-channel
132
+ {:channel channel-name ; ; по этому ключу выбираются каналы клиентов куда будет доставлено сообщение
133
+ :message [" *3" " $7" " MESSAGE" channel-name message]})
134
+ 1 )
135
+
136
+
137
+ (defmethod handle-command :message
138
+ [_ctx [_ [channel-name message]]]
139
+ [" message" channel-name message])
140
+
141
+
105
142
; ; needed for redis-cli
106
143
(defmethod handle-command :command
107
- [_]
144
+ [_ctx _]
108
145
" O hai" )
109
146
110
147
117
154
118
155
119
156
(defn reply [data]
157
+ ; ; форматируем ответ клиенту согласно протоколу Redis
120
158
(let [data-type (cond
121
159
(string? data) " +"
122
160
(integer? data) " :"
123
- (error? data) " -" )]
161
+ (error? data) " -"
162
+ (vector? data) (str " *" (count data) " \r\n " ))
163
+ data (if (vector? data)
164
+ (->> data
165
+ (mapcat #(cond (integer? %) [(str " :" %)]
166
+ (string? %) [(str " $" (count %)) %]))
167
+ (str/join " \r\n " ))
168
+ data)]
124
169
(str/join [data-type data " \r\n " ])))
125
170
126
171
127
172
(defn handle-message
128
173
" Pass parsed message to the dispatch function and format result for client"
129
- [message]
174
+ [ctx message]
130
175
(let [[number-of-arguments command-string-len command & args] message]
131
- (-> (handle-command [command args])
176
+ (println " handling command" command)
177
+ (-> (handle-command ctx [command args])
132
178
(reply ))))
133
179
134
180
135
-
136
- (comment
137
- (handle-command [" ECHO" [5 " HELLO" ]])
138
- (handle-message [nil nil " ECHO" 5 " HELLO" ]))
181
+ (defn read-message [^BufferedReader socket-reader]
182
+ ; ; .readLine блокирующий вызов поэтому выносим в отдельный поток
183
+ (a/thread
184
+ (loop [line (.readLine socket-reader)
185
+ res []]
186
+ (cond
187
+ ; ; сокет закрылся
188
+ (nil? line) res
189
+ ; ; клиент ничего не ввёл, но соединение еще открыто
190
+ (not (.ready socket-reader)) (conj res line)
191
+ ; ; читаем следующую строку
192
+ :otherwise (recur (.readLine socket-reader)
193
+ (conj res line))))))
139
194
140
195
141
-
142
- (defn read-message
196
+ (defn read-messages
143
197
" Read all lines of textual data from the given socket"
144
- [^BufferedReader socket-reader]
145
- (loop [line (.readLine socket-reader)
146
- res []]
147
- (cond
148
- ; ; сокет закрылся
149
- (nil? line) res
150
- ; ; клиент ничего не ввёл, но соединение еще открыто
151
- (not (.ready socket-reader)) (conj res line)
152
- ; ; читаем следующую строку
153
- :otherwise (recur (.readLine socket-reader)
154
- (conj res line)))))
155
-
156
-
157
-
158
- (comment
159
- (read-message (io/reader (char-array " *2\r\n $4\r\n echo\r\n $5\r\n hello" )))
160
- (read-message (io/reader (char-array " *5\r\n $3\r\n set\r\n $4\r\n name\r\n $6\r\n Sergey\r\n $2\r\n RX\r\n $5\r\n 10000" ))))
161
-
198
+ [^ServerSocket socket]
199
+ (let [messages-channel (a/chan )
200
+ socket-reader (io/reader socket)]
201
+ (a/go-loop []
202
+ ; ; получаем сообщение от клиента
203
+ (let [message (a/<! (read-message socket-reader))]
204
+ (if (empty? message)
205
+ ; ; клиент отключился, подчищаем ресурсы
206
+ (do (a/close! messages-channel)
207
+ (.close socket-reader))
208
+
209
+ (do (println " got message" message)
210
+ (a/>! messages-channel message)
211
+ (recur )))))
212
+ messages-channel))
162
213
163
214
164
215
(defn send-message
168
219
(.flush socket-writer))
169
220
170
221
171
-
172
222
(defn handle-client
173
223
" Create a separate thread for each client to execute commands"
174
224
[socket handler]
175
- ; ; запускаем отдельный поток для каждого клиента
176
- (future
177
- (with-open [reader (io/reader socket)
178
- writer (io/writer socket)]
179
- ; ; обрабатываем команды от клиента
180
- (doseq [msg-in (repeatedly #(read-message reader))
181
- :while (not (empty? msg-in))
182
- :let [msg-out (handler msg-in)]]
183
- (println " msg-in" msg-in)
184
- ; ; отправляем ответ
185
- (send-message writer msg-out)))))
186
-
225
+ ; ; запускаем отдельный go блок для каждого клиента
226
+ (let [messages (read-messages socket) ; ; канал в который попадают сообщения из сокета самого клиента
227
+ publications (a/chan 1 (map :message )) ; ; канал в который попадают сообщения из сокетов других клиентов
228
+ ctx {:pub-listener (a/mix publications) ; ; mix похож на a/merge, но позволяет динамически добавлять и удалять каналы
229
+ :client-subscriptions (atom {})}
230
+ all-messages (a/merge [publications messages]) ; ; объединяем все сообщения в один канал
231
+ responses (a/pipe all-messages (a/chan 1 (map (partial handler ctx)))) ; ; создаём пайплайн для трансформации сообщений в ответы клиентам
232
+ socket-writer (io/writer socket)]
233
+ (a/go-loop []
234
+ (let [response (a/<! responses)]
235
+ (println " got response" response)
236
+ (if (and (some? response)
237
+ (not (.isClosed socket)))
238
+ ; ; отправляем ответ
239
+ (do (send-message socket-writer response)
240
+ (recur ))
241
+ ; ; если приходит nil сокет закрылся со стороны клиента или клиент отключился
242
+ (do (println " closing socket" )
243
+ (.close socket-writer)
244
+ (a/close! responses)))))))
245
+
246
+
247
+ (defn accept-connection [^ServerSocket server-socket]
248
+ ; ; метод .accept блокирующий, поэтому выносим в отдельный поток
249
+ (a/thread
250
+ (.accept server-socket)))
187
251
188
252
189
253
(defn run-server
193
257
(let [server-sock (ServerSocket. port)]
194
258
(.setReuseAddress server-sock true )
195
259
196
- ; ; обрабатываем подключения в отдельном потоке
197
- (future
198
- (while true
199
- (let [socket (.accept server-sock)]
200
- (handle-client socket handler))))
260
+ ; ; обрабатываем подключения в пользователей в go блоке
261
+ (a/go-loop []
262
+ ; ; тут go блок не блокируется, а паркуется
263
+ (let [socket (a/<! (accept-connection server-sock))]
264
+ (println " got connection" )
265
+ (#'handle-client socket handler)
266
+ (recur )))
201
267
202
268
; ; возвращаем объект сервера
203
269
server-sock))
204
270
205
271
206
-
207
272
; ; graceful shutdown
208
- (defn shutdown-hook [server worker ]
273
+ (defn shutdown-hook [server]
209
274
(.addShutdownHook (Runtime/getRuntime )
210
275
(Thread. ^Runnable
211
276
(fn []
212
277
(.close server)
213
- (future-cancel worker)
214
- (.shutdown cleanup-pool)
215
278
(shutdown-agents )))))
216
279
217
280
218
281
; ; точка входа
219
282
(defn -main
220
283
[& args]
221
- (let [server (run-server 6379 handle-message)
222
- worker (start-cleanup-worker )]
223
- (shutdown-hook server worker )
284
+ (let [server (run-server 6379 handle-message)]
285
+ (start-cleanup-worker )
286
+ (shutdown-hook server)
224
287
server))
225
288
226
289
233
296
@keys-to-expire
234
297
235
298
(.close server)
236
- nil )
299
+
300
+ nil )
301
+
0 commit comments