Skip to content

Commit e5e069d

Browse files
committed
make sure put! returns a boolean value, mark 0.1.3
1 parent 35be2fe commit e5e069d

File tree

3 files changed

+26
-20
lines changed

3 files changed

+26
-20
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ This library implements a disk-backed task queue, allowing for queues that can s
55
### usage
66

77
```clj
8-
[factual/durable-queue "0.1.2"]
8+
[factual/durable-queue "0.1.3"]
99
```
1010

1111
To interact with queues, first create a `queues` object by specifying a directory in the filesystem and an options map:

project.clj

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
(defproject factual/durable-queue "0.1.2"
1+
(defproject factual/durable-queue "0.1.3"
22
:description "a in-process task-queue that is backed by disk."
33
:license {:name "Eclipse Public License"
44
:url "http://www.eclipse.org/legal/epl-v10.html"}
55
:dependencies [[com.taoensso/nippy "2.5.2"]
66
[primitive-math "0.1.3"]
7-
[byte-streams "0.1.9"]]
7+
[byte-streams "0.1.9"]
8+
[manifold "0.1.0-alpha3"]]
89
:profiles {:dev {:dependencies [[org.clojure/clojure "1.5.1"]
910
[criterium "0.4.3"]
1011
[codox-md "0.2.0" :exclusions [org.clojure/clojure]]]}}

src/durable_queue.clj

+22-17
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
[byte-streams :as bs]
55
[clojure.string :as str]
66
[primitive-math :as p]
7+
[manifold.stream :as s]
78
[taoensso.nippy :as nippy])
89
(:import
910
[java.lang.reflect
@@ -726,9 +727,7 @@
726727
(let [^AtomicLong counter (get-in @queue-name->stats [q-name :enqueued])]
727728
(.incrementAndGet counter))
728729
true)
729-
false)
730-
731-
nil))
730+
false)))
732731

733732
(put! [this q-name task-descriptor]
734733
(put! this q-name task-descriptor Long/MAX_VALUE))))
@@ -774,21 +773,27 @@
774773
(defn complete!
775774
"Marks a task as complete."
776775
[task]
777-
(status! task :complete)
778-
(when (-> task meta ::fsync?)
779-
(sync! (:slab task)))
780-
(mark-complete! @(-> task meta ::this) (-> task meta ::queue-name))
781-
true)
776+
(if (identical? :complete (status task))
777+
false
778+
(do
779+
(status! task :complete)
780+
(when (-> task meta ::fsync?)
781+
(sync! (:slab task)))
782+
(mark-complete! @(-> task meta ::this) (-> task meta ::queue-name))
783+
true)))
782784

783785
(defn retry!
784786
"Marks a task as available for retry."
785787
[task]
786-
(status! task :incomplete)
787-
(when (-> task meta ::fsync?)
788-
(sync! (:slab task)))
789-
(mark-retry! @(-> task meta ::this) (-> task meta ::queue-name))
790-
(let [^LinkedBlockingQueue q (-> task meta ::queue)]
791-
(.offer q
792-
task
793-
Long/MAX_VALUE
794-
TimeUnit/MILLISECONDS)))
788+
(if (or
789+
(identical? :complete (status task))
790+
(identical? :incomplete (status task)))
791+
false
792+
(do
793+
(status! task :incomplete)
794+
(when (-> task meta ::fsync?)
795+
(sync! (:slab task)))
796+
(mark-retry! @(-> task meta ::this) (-> task meta ::queue-name))
797+
(let [^LinkedBlockingQueue q (-> task meta ::queue)]
798+
(.put q task))
799+
true)))

0 commit comments

Comments
 (0)