4
4
[clojure.string :as str])
5
5
(:import
6
6
[java.io BufferedReader Writer]
7
- [java.net ServerSocket])
7
+ [java.net ServerSocket]
8
+ [java.util.concurrent Executors ScheduledExecutorService TimeUnit])
8
9
(:gen-class ))
9
10
10
11
13
14
(atom {}))
14
15
15
16
17
+ (def keys-to-expire
18
+ (atom []))
19
+
20
+
21
+ (defn expire-key [key]
22
+ (fn []
23
+ (swap! database assoc key nil )
24
+ (swap! keys-to-expire
25
+ (fn [expire-keys]
26
+ (remove #(= (second %) key) expire-keys)))))
27
+
28
+
29
+ (defn set-key-to-expire [key timestamp]
30
+ (swap! keys-to-expire
31
+ (fn [expire-keys]
32
+ (->> (conj expire-keys [timestamp key])
33
+ (sort-by first)
34
+ (vec )))))
35
+
36
+
37
+ (comment
38
+ @keys-to-expire
39
+ (set-key-to-expire " name" 123123123 )
40
+ (expire-key " name" ))
41
+
42
+
43
+ (def ^ScheduledExecutorService cleanup-pool
44
+ (Executors/newScheduledThreadPool 10 ))
45
+
46
+
47
+ (defn schedule-cleanup-task []
48
+ (let [current-time (System/currentTimeMillis )
49
+ five-seconds-later (+ current-time 5000 )
50
+ keys-to-schedule (->> @keys-to-expire
51
+ (take-while #(< (first %) five-seconds-later)))]
52
+ (doseq [[timestamp key] keys-to-schedule]
53
+ (.schedule cleanup-pool
54
+ ^Runnable (expire-key key)
55
+ ^Long (- timestamp current-time)
56
+ TimeUnit/MILLISECONDS))))
57
+
58
+
59
+ (defn start-cleanup-worker []
60
+ (.scheduleAtFixedRate cleanup-pool schedule-cleanup-task 0 5 TimeUnit/SECONDS))
61
+
62
+
16
63
17
64
; ; обрабатываем команды от клиентов
18
65
(defmulti handle-command
36
83
(swap! database assoc key val)
37
84
38
85
(when (and opt (= (.toUpperCase opt) " PX" ))
39
- (let [timeout (Integer/parseInt optarg)]
40
- (future
41
- (Thread/sleep timeout)
42
- (swap! database dissoc key))))
86
+ (let [delay (Integer/parseInt optarg)
87
+ current-time (System/currentTimeMillis )
88
+ timestamp (+ current-time delay)]
89
+ (if (< timestamp (+ current-time 5000 ))
90
+ (.schedule cleanup-pool ^Runnable (expire-key key) delay TimeUnit/MILLISECONDS)
91
+ (set-key-to-expire key timestamp))))
92
+
43
93
; ; ответ клиенту
44
94
" OK" )
45
95
46
96
47
97
(defmethod handle-command :get
48
98
[[_ [key-len key]]]
49
- (if-some [entry (find @database key)]
50
- (val entry)
51
- " (nil)" ))
99
+ (let [entry (find @database key)]
100
+ (if (some? (val entry))
101
+ (val entry)
102
+ " (nil)" )))
52
103
53
104
54
105
; ; needed for redis-cli
129
180
(doseq [msg-in (repeatedly #(read-message reader))
130
181
:while (not (empty? msg-in))
131
182
:let [msg-out (handler msg-in)]]
132
- (println " msg-in" msg-in " msg-out " msg-out " \n " )
183
+ (println " msg-in" msg-in)
133
184
; ; отправляем ответ
134
185
(send-message writer msg-out)))))
135
186
154
205
155
206
156
207
; ; graceful shutdown
157
- (defn shutdown-hook [server]
208
+ (defn shutdown-hook [server worker ]
158
209
(.addShutdownHook (Runtime/getRuntime )
159
210
(Thread. ^Runnable
160
211
(fn []
161
212
(.close server)
213
+ (future-cancel worker)
214
+ (.shutdown cleanup-pool)
162
215
(shutdown-agents )))))
163
216
164
217
165
218
; ; точка входа
166
219
(defn -main
167
220
[& args]
168
- (let [server (run-server 6379 handle-message)]
169
- (shutdown-hook server)
221
+ (let [server (run-server 6379 handle-message)
222
+ worker (start-cleanup-worker )]
223
+ (shutdown-hook server worker)
170
224
server))
171
225
172
226
176
230
(-main ))
177
231
178
232
@database
233
+ @keys-to-expire
179
234
180
235
(.close server)
181
236
nil )
0 commit comments