|
424 | 424 | (^:private mark-complete! [_ q-name]) |
425 | 425 | (^:private mark-retry! [_ q-name]) |
426 | 426 | (delete! [_] |
427 | | - "Deletes all files associated with the queues.") |
| 427 | + "DEPRECATED: Deletes all files associated with all queues. Dangerous!") |
| 428 | + (delete-queue! [_ q-name] |
| 429 | + "Deletes the specific queue, including all associated files.") |
| 430 | + (delete-all! [_] |
| 431 | + "Deletes all queues, including all associated files.") |
428 | 432 | (stats [_] |
429 | 433 | "Returns a map of queue names onto information about the immediate state of the queue.") |
430 | 434 | (fsync [_] |
431 | 435 | "Forces an fsync on all modified files.") |
432 | 436 | (take! |
433 | 437 | [_ q-name] |
434 | 438 | [_ q-name timeout timeout-val] |
435 | | - "A blocking dequeue from `name`. If `timeout` is specified, returns `timeout-val` if |
| 439 | + "A blocking dequeue from `q-name`. If `timeout` is specified, returns `timeout-val` if |
436 | 440 | no task is available within `timeout` milliseconds.") |
437 | 441 | (put! |
438 | 442 | [_ q-name task-descriptor] |
439 | 443 | [_ q-name task-descriptor timeout] |
440 | | - "A blocking enqueue to `name`. If `timeout` is specified, returns `false` if unable to |
| 444 | + "A blocking enqueue to `q-name`. If `timeout` is specified, returns `false` if unable to |
441 | 445 | enqueue within `timeout` milliseconds, `true` otherwise.")) |
442 | 446 |
|
443 | 447 | (defn queues |
|
613 | 617 |
|
614 | 618 | IQueues |
615 | 619 |
|
616 | | - (delete! [_] |
| 620 | + (delete! [_] ; DEPRECATED and dangerous since it breaks the queues |
617 | 621 | (doseq [s (->> @queue-name->slabs vals (apply concat))] |
618 | 622 | (unmap s) |
619 | 623 | (delete-slab s))) |
620 | 624 |
|
| 625 | + (delete-queue! [_ q-name] |
| 626 | + (let [q-name (munge (name q-name))] |
| 627 | + (doseq [s (get @queue-name->slabs q-name)] |
| 628 | + (unmap s) |
| 629 | + (delete-slab s)) |
| 630 | + (.clear (queue q-name)) |
| 631 | + (swap! queue-name->stats dissoc q-name) |
| 632 | + (swap! queue-name->slabs dissoc q-name) |
| 633 | + (swap! queue-name->current-slab dissoc q-name))) |
| 634 | + |
| 635 | + (delete-all! [this] |
| 636 | + (doseq [q (keys @queue-name->stats )] |
| 637 | + (delete-queue! this q))) |
| 638 | + |
621 | 639 | (fsync [_] |
622 | 640 | (doseq [slab (->> @queue-name->slabs vals (apply concat))] |
623 | 641 | (sync! slab))) |
|
637 | 655 | (stats [_] |
638 | 656 | (let [ks (keys @queue-name->stats)] |
639 | 657 | (zipmap ks |
640 | | - (map |
641 | | - (fn [q-name] |
642 | | - (merge |
643 | | - {:num-slabs (-> @queue-name->slabs (get q-name) count) |
644 | | - :num-active-slabs (->> (get @queue-name->slabs q-name) |
645 | | - (filter mapped?) |
646 | | - count)} |
647 | | - (immediate-stats (queue q-name) (get @queue-name->stats q-name)))) |
648 | | - ks)))) |
| 658 | + (map |
| 659 | + (fn [q-name] |
| 660 | + (merge |
| 661 | + {:num-slabs (-> @queue-name->slabs (get q-name) count) |
| 662 | + :num-active-slabs (->> (get @queue-name->slabs q-name) |
| 663 | + (filter mapped?) |
| 664 | + count)} |
| 665 | + (immediate-stats (queue q-name) (get @queue-name->stats q-name)))) |
| 666 | + ks)))) |
649 | 667 |
|
650 | 668 | (take! [_ q-name timeout timeout-val] |
651 | 669 | (let [q-name (munge (name q-name)) |
|
663 | 681 | (when-not (= slab old-slab) |
664 | 682 | (swap! queue-name->current-slab assoc q-name slab) |
665 | 683 | (doseq [s (->> (get @queue-name->slabs q-name) |
666 | | - butlast |
667 | | - (remove #(= slab %)))] |
| 684 | + butlast |
| 685 | + (remove #(= slab %)))] |
668 | 686 | (unmap s)))) |
669 | 687 |
|
670 | 688 | (status! t :in-progress) |
|
0 commit comments