1
+ (ns mini-redis.core
2
+ (:require
3
+ [clojure.java.io :as io]
4
+ [clojure.string :as str])
5
+ (:import
6
+ [java.io BufferedReader Writer]
7
+ [java.net ServerSocket])
8
+ (:gen-class ))
9
+
10
+
11
+ ; ; используем атом в качестве in-memory хранилища
12
+ (def database
13
+ (atom {}))
14
+
15
+
16
+
17
+ ; ; обрабатываем команды от клиентов
18
+ (defmulti handle-command
19
+ (fn [[command & _]]
20
+ (keyword (str/lower-case command))))
21
+
22
+
23
+ (defmethod handle-command :ping
24
+ [_]
25
+ " PONG" )
26
+
27
+
28
+ (defmethod handle-command :echo
29
+ [[_ [arg-len arg]]]
30
+ arg )
31
+
32
+
33
+ (defmethod handle-command :set
34
+ [[_ [key-len key val-len val opt-len opt optarg-len optarg]]]
35
+ ; ; сохраняем значение
36
+ (swap! database assoc key val)
37
+
38
+ (when (and opt (= (.toUpperCase opt) " PX" ))
39
+ (let [timeout (Integer/parseInt optarg)]
40
+ (future
41
+ (Thread/sleep timeout)
42
+ (swap! database dissoc key))))
43
+ ; ; ответ клиенту
44
+ " OK" )
45
+
46
+
47
+ (defmethod handle-command :get
48
+ [[_ [key-len key]]]
49
+ (if-some [entry (find @database key)]
50
+ (val entry)
51
+ " (nil)" ))
52
+
53
+
54
+ ; ; needed for redis-cli
55
+ (defmethod handle-command :command
56
+ [_]
57
+ " O hai" )
58
+
59
+
60
+
61
+ ; ; обрабатываем сообщения от клиента
62
+ (defn error? [data]
63
+ (-> (class data)
64
+ (supers )
65
+ (contains? Throwable)))
66
+
67
+
68
+ (defn reply [data]
69
+ (let [data-type (cond
70
+ (string? data) " +"
71
+ (integer? data) " :"
72
+ (error? data) " -" )]
73
+ (str/join [data-type data " \r\n " ])))
74
+
75
+
76
+ (defn handle-message
77
+ " Pass parsed message to the dispatch function and format result for client"
78
+ [message]
79
+ (let [[number-of-arguments command-string-len command & args] message]
80
+ (-> (handle-command [command args])
81
+ (reply ))))
82
+
83
+
84
+
85
+ (comment
86
+ (handle-command [" ECHO" [5 " HELLO" ]])
87
+ (handle-message [nil nil " ECHO" 5 " HELLO" ]))
88
+
89
+
90
+
91
+ (defn read-message
92
+ " Read all lines of textual data from the given socket"
93
+ [^BufferedReader socket-reader]
94
+ (loop [line (.readLine socket-reader)
95
+ res []]
96
+ (cond
97
+ ; ; сокет закрылся
98
+ (nil? line) res
99
+ ; ; клиент ничего не ввёл, но соединение еще открыто
100
+ (not (.ready socket-reader)) (conj res line)
101
+ ; ; читаем следующую строку
102
+ :otherwise (recur (.readLine socket-reader)
103
+ (conj res line)))))
104
+
105
+
106
+
107
+ (comment
108
+ (read-message (io/reader (char-array " *2\r\n $4\r\n echo\r\n $5\r\n hello" )))
109
+ (read-message (io/reader (char-array " *5\r\n $3\r\n set\r\n $4\r\n name\r\n $6\r\n Sergey\r\n $2\r\n RX\r\n $5\r\n 10000" ))))
110
+
111
+
112
+
113
+ (defn send-message
114
+ " Send the given string message out over the given socket"
115
+ [^Writer socket-writer ^String msg]
116
+ (.write socket-writer msg)
117
+ (.flush socket-writer))
118
+
119
+
120
+
121
+ (defn handle-client
122
+ " Create a separate thread for each client to execute commands"
123
+ [socket handler]
124
+ ; ; запускаем отдельный поток для каждого клиента
125
+ (future
126
+ (with-open [reader (io/reader socket)
127
+ writer (io/writer socket)]
128
+ ; ; обрабатываем команды от клиента
129
+ (doseq [msg-in (repeatedly #(read-message reader))
130
+ :while (not (empty? msg-in))
131
+ :let [msg-out (handler msg-in)]]
132
+ (println " msg-in" msg-in " msg-out" msg-out " \n " )
133
+ ; ; отправляем ответ
134
+ (send-message writer msg-out)))))
135
+
136
+
137
+
138
+ (defn run-server
139
+ " Run socket server on the specified port"
140
+ [port handler]
141
+ ; ; создаём сокет сервер
142
+ (let [server-sock (ServerSocket. port)]
143
+ (.setReuseAddress server-sock true )
144
+
145
+ ; ; обрабатываем подключения в отдельном потоке
146
+ (future
147
+ (while true
148
+ (let [socket (.accept server-sock)]
149
+ (handle-client socket handler))))
150
+
151
+ ; ; возвращаем объект сервера
152
+ server-sock))
153
+
154
+
155
+
156
+ ; ; graceful shutdown
157
+ (defn shutdown-hook [server]
158
+ (.addShutdownHook (Runtime/getRuntime )
159
+ (Thread. ^Runnable
160
+ (fn []
161
+ (.close server)
162
+ (shutdown-agents )))))
163
+
164
+
165
+ ; ; точка входа
166
+ (defn -main
167
+ [& args]
168
+ (let [server (run-server 6379 handle-message)]
169
+ (shutdown-hook server)
170
+ server))
171
+
172
+
173
+
174
+ (comment
175
+ (def server
176
+ (-main ))
177
+
178
+ @database
179
+
180
+ (.close server)
181
+ nil )
0 commit comments