Skip to content

Commit 2f7cf56

Browse files
committed
fix: updates remaining static impl
add: - all the remaining fixes into redis static functions on order to fit the tests. - tests it all
1 parent 36af507 commit 2f7cf56

File tree

3 files changed

+120
-52
lines changed

3 files changed

+120
-52
lines changed

src/com/moclojer/rq/queue.clj

Lines changed: 90 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -88,32 +88,18 @@
8888
[client queue-name tmot & {:keys [direction pattern]
8989
:or {direction :l pattern :rq}}]
9090
(let [packed-queue-name (utils/pack-pattern pattern queue-name)
91-
result (if (= direction :l)
91+
return (if (= direction :l)
9292
(.blpop @client tmot packed-queue-name)
9393
(.brpop @client tmot packed-queue-name))]
94-
(when result
95-
(let [message (second result)]
94+
(when return
95+
(let [message (second return)]
9696
(log/debug "popped from queue"
9797
{:client client
9898
:queue-name packed-queue-name
9999
:options {:direction direction :pattern pattern}
100100
:message message})
101-
(edn/read-string message)))))
101+
(edn/read-string message)))))
102102

103-
(defn lrange
104-
"Return an entire range given min and max indexes
105-
106-
Parameters:
107-
- client: Redis client
108-
- queue-name: Name of the queue
109-
- floor: floor index
110-
- ceil: ceiling index"
111-
[client queue-name floor ceil & options]
112-
(let [{:keys [pattern]
113-
:or {pattern :rq}} options
114-
packed-queue (utils/pack-pattern pattern queue-name)
115-
result (.lrange @client packed-queue floor ceil)]
116-
result))
117103

118104
(defn lindex
119105
"Return a element in a specified index
@@ -126,9 +112,9 @@
126112
(let [{:keys [pattern]
127113
:or {pattern :rq}} options
128114
packed-queue-name (utils/pack-pattern pattern queue-name)
129-
result (.lindex @client packed-queue-name index)]
115+
return (.lindex @client packed-queue-name index)]
130116

131-
(let [message (clojure.edn/read-string result)]
117+
(let [message (clojure.edn/read-string return)]
132118
(log/debug "message found"
133119
{:client client
134120
:queue-name packed-queue-name
@@ -148,8 +134,8 @@
148134
(let [{:keys [pattern]
149135
:or {pattern :rq} :as opts} options
150136
packed-queue-name (utils/pack-pattern pattern queue-name)
151-
encoded-message (clojure.edn/read-string (str message))
152-
return (.lset @client packed-queue-name index (str encoded-message))]
137+
encoded-message (str (clojure.edn/read-string (str message)))
138+
return (.lset @client packed-queue-name index encoded-message)]
153139

154140
(log/debug "set in queue"
155141
{:client client
@@ -186,33 +172,57 @@
186172
return))
187173

188174
(defn linsert
189-
"Insert a message before the first occurance of a pivot given.
175+
"insert a message before the first occurance of a pivot given.
190176
191-
Parameters:
192-
- client: Redis client
193-
- queue-name: Name of the queue
177+
parameters:
178+
- client: redis client
179+
- queue-name: name of the queue
194180
- msg: new msg to be added
195181
- pivot: pivot message to be added before or after
196182
- options:
197183
- pos (keywords):
198184
- before: insert the message before the pivot
199185
- after: insert the message after the pivot"
200-
[client queue-name msg pivot & options]
186+
[client queue-name pivot msg & options]
201187
(let [{:keys [pos pattern]
202188
:or {pos :before
203189
pattern :rq} :as opts} options
204190
packed-queue-name (utils/pack-pattern pattern queue-name)
205191
encoded-message (str (clojure.edn/read-string (str msg)))
206192
encoded-pivot (str (clojure.edn/read-string (str pivot)))
207-
encoded-pos (str (s/capitalize (str pivot)))
193+
encoded-pos (if (= pos :before)
194+
redis.clients.jedis.args.ListPosition/BEFORE
195+
redis.clients.jedis.args.ListPosition/AFTER)
208196
return (.linsert @client packed-queue-name encoded-pos encoded-pivot encoded-message)]
209-
(log/debug "inserted in queue"
197+
(log/debug "inserted in queue"
198+
{:client client
199+
:queue-name queue-name
200+
:msg encoded-message
201+
:opts opts
202+
:return return})
203+
return))
204+
205+
206+
(defn lrange
207+
"Return an entire range given min and max indexes
208+
209+
Parameters:
210+
- client: Redis client
211+
- queue-name: Name of the queue
212+
- floor: floor index
213+
- ceil: ceiling index"
214+
[client queue-name floor ceil & options]
215+
(let [{:keys [pattern]
216+
:or {pattern :rq} :as opts} options
217+
packed-queue-name (utils/pack-pattern pattern queue-name)
218+
return (.lrange @client packed-queue-name floor ceil)]
219+
(log/debug "queue specified range"
210220
{:client client
211-
:queue-name queue-name
212-
:msg (clojure.edn/read-string (str msg))
221+
:queue-name packed-queue-name
213222
:opts opts
214-
:return return})
215-
return))
223+
:result return})
224+
(mapv clojure.edn/read-string return)))
225+
216226

217227
(defn ltrim
218228
"Trim a list to the specified range.
@@ -226,9 +236,17 @@
226236
- pattern: pattern to pack the queue name"
227237
[client queue-name start stop & options]
228238
(let [{:keys [pattern]
229-
:or {pattern :rq}} options
239+
:or {pattern :rq} :as opts} options
230240
packed-queue-name (utils/pack-pattern pattern queue-name)]
231-
(.ltrim @client packed-queue-name start stop)))
241+
(let [return (.ltrim @client packed-queue-name start stop)]
242+
(log/debug "queue trimmed"
243+
{:client client
244+
:queue-name queue-name
245+
:opts opts
246+
:result return})
247+
return)))
248+
249+
232250

233251
(defn rpoplpush
234252
"Remove the last element in a list and append it to another list.
@@ -243,8 +261,15 @@
243261
(let [{:keys [pattern]
244262
:or {pattern :rq}} options
245263
packed-source-queue (utils/pack-pattern pattern source-queue)
246-
packed-destination-queue (utils/pack-pattern pattern destination-queue)]
247-
(.rpoplpush @client packed-source-queue packed-destination-queue)))
264+
packed-destination-queue (utils/pack-pattern pattern destination-queue)
265+
return (.rpoplpush @client packed-source-queue packed-destination-queue)]
266+
(log/debug "rpoplpush operation"
267+
{:client client
268+
:source-queue packed-source-queue
269+
:destination-queue packed-destination-queue
270+
:result return})
271+
return))
272+
248273

249274
(defn brpoplpush
250275
"Remove the last element in a list and append it to another list, blocking if necessary.
@@ -260,11 +285,20 @@
260285
(let [{:keys [pattern]
261286
:or {pattern :rq}} options
262287
packed-source-queue (utils/pack-pattern pattern source-queue)
263-
packed-destination-queue (utils/pack-pattern pattern destination-queue)]
264-
(.brpoplpush @client packed-source-queue packed-destination-queue timeout)))
288+
packed-destination-queue (utils/pack-pattern pattern destination-queue)
289+
result (.brpoplpush @client packed-source-queue packed-destination-queue timeout)]
290+
(log/debug "brpoplpush operation"
291+
{:client client
292+
:source-queue packed-source-queue
293+
:destination-queue packed-destination-queue
294+
:timeout timeout
295+
:result result})
296+
result))
297+
298+
265299

266300
(defn lmove
267-
"Atomically return and remove the first/last element (head/tail depending on the wherefrom argument) of the source list, and push the element as the first/last element (head/tail depending on the whereto argument) of the destination list.
301+
"Atomically return and remove the first/last element of the source list, and push the element as the first/last element of the destination list.
268302
269303
Parameters:
270304
- client: Redis client
@@ -278,5 +312,19 @@
278312
(let [{:keys [pattern]
279313
:or {pattern :rq}} options
280314
packed-source-queue (utils/pack-pattern pattern source-queue)
281-
packed-destination-queue (utils/pack-pattern pattern destination-queue)]
282-
(.lmove @client packed-source-queue packed-destination-queue wherefrom whereto)))
315+
packed-destination-queue (utils/pack-pattern pattern destination-queue)
316+
from-direction (if (= wherefrom "LEFT")
317+
redis.clients.jedis.args.ListDirection/LEFT
318+
redis.clients.jedis.args.ListDirection/RIGHT)
319+
to-direction (if (= whereto "LEFT")
320+
redis.clients.jedis.args.ListDirection/LEFT
321+
redis.clients.jedis.args.ListDirection/RIGHT)
322+
result (.lmove @client packed-source-queue packed-destination-queue from-direction to-direction)]
323+
(log/debug "lmove operation"
324+
{:client client
325+
:source-queue packed-source-queue
326+
:destination-queue packed-destination-queue
327+
:from-direction from-direction
328+
:to-direction to-direction
329+
:result result})
330+
result))

src/com/moclojer/rq/utils.clj

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
(ns com.moclojer.rq.utils)
1+
(ns com.moclojer.rq.utils
2+
(:require
3+
[clojure.string :as s]))
24

35
(defn- pattern->str
46
"Adapts given pattern keyword to a know internal pattern. Raises
@@ -21,3 +23,6 @@
2123
(defn unpack-pattern
2224
[pattern queue-name]
2325
(subs queue-name (count (pattern->str pattern))))
26+
27+
28+

test/com/moclojer/rq/queue_test.clj

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,27 +54,42 @@
5454
(t/is (= message (rq-queue/lindex client queue-name 1)))
5555
(rq-queue/pop! client queue-name :direction :l)
5656
(rq-queue/pop! client queue-name :direction :l))
57+
5758
(t/testing "lrem"
5859
(rq-queue/push! client queue-name message)
5960
(rq-queue/lrem client queue-name 1 message)
6061
(t/is (= 0 (rq-queue/llen client queue-name))))
6162

6263
(t/testing "linsert"
6364
(rq-queue/push! client queue-name message)
64-
(rq-queue/linsert client queue-name another-message message :pos :before)
65-
(t/is (= another-message (rq-queue/lindex client queue-name 0))))
65+
(rq-queue/linsert client queue-name message another-message :pos :before)
66+
(t/is (= another-message (rq-queue/lindex client queue-name 0)))
67+
(rq-queue/pop! client queue-name :direction :l)
68+
(rq-queue/pop! client queue-name :direction :l))
6669

67-
(t/testing "ltrim"
68-
(rq-queue/push! client queue-name message)
69-
(rq-queue/push! client queue-name another-message)
70-
(rq-queue/ltrim client queue-name 1 -1)
71-
(t/is (= another-message (rq-queue/lindex client queue-name 0))))
7270

7371
(t/testing "lrange"
74-
(while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l}))))
7572
(rq-queue/push! client queue-name message)
7673
(rq-queue/push! client queue-name another-message)
77-
(t/is (= [message another-message] (rq-queue/lrange client queue-name 0 1))))
74+
(let [result (rq-queue/lrange client queue-name 0 1)]
75+
(t/is (= [message another-message] (reverse result))))
76+
(rq-queue/pop! client queue-name :direction :l)
77+
(rq-queue/pop! client queue-name :direction :l))
78+
79+
80+
(t/testing "ltrim"
81+
(let [base-message {:test "hello", :my/test2 "123", :foobar ["321"]}
82+
message (assoc base-message :uuid (java.util.UUID/randomUUID))
83+
another-message (assoc base-message :uuid (java.util.UUID/randomUUID))]
84+
(rq-queue/push! client queue-name message)
85+
(rq-queue/push! client queue-name another-message)
86+
(t/is (= "OK" (rq-queue/ltrim client queue-name 1 -1)))
87+
(let [result (rq-queue/lrange client queue-name 0 -1)]
88+
(t/is (= [(dissoc another-message :uuid)]
89+
(map #(dissoc % :uuid) result)))))
90+
(rq-queue/pop! client queue-name :direction :l)
91+
(rq-queue/pop! client queue-name :direction :l))
92+
7893

7994
(t/testing "rpoplpush"
8095
(rq-queue/push! client queue-name message)

0 commit comments

Comments
 (0)