Skip to content
This repository has been archived by the owner on Jun 15, 2024. It is now read-only.


Reduce amount of step-update events to reduce resource-consumption, p…
Browse files Browse the repository at this point in the history
…erformance and resolve potential deadlocks #135, #140)
  • Loading branch information
flosell committed Oct 23, 2016
1 parent 8b690d0 commit dbf66f3
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 83 deletions.
6 changes: 4 additions & 2 deletions
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ The official release will have a defined and more stable API. If you are already
## 0.11.0

* Improvements:
* Keeps a history of pipeline structure if persistence component supports it (#131, #6); Implemented for default persistence
* Keeps a history of pipeline structure if persistence component supports it (#131, #6); Implemented for default persistence
* Improved performance and resource consumption by compressing and throttling step-result update events (#140).
Can be configured with the configuration parameter `:step-updates-per-sec`.
* Bug fixes:
* Fix deadlock occurring when steps write a lot of step-results in quick succession and step results are inherited by their parents (as in chaining) (#135)
* Fix deadlock occurring when steps write a lot of step-results in quick succession and step results are inherited by their parents (as in chaining) (#135, #140)
* API changes:
* New state handling (#131):
* Protocols in `lambdacd.state.protocols` replace `lambdacd.internal.pipeline-state/PipelineStateComponent` which is now deprecated. Custom persistence-mechanisms need to migrate.
Expand Down
1 change: 1 addition & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
:test-paths ["test/clj" "example/clj"]
:jar-exclusions [#"logback.xml"]
:dependencies [[org.clojure/clojure "1.7.0"]
[throttler "1.0.0"]
[hiccup "1.0.5"]
[org.clojure/data.json "0.2.6"]
[me.raynes/conch "0.8.0"]
Expand Down
3 changes: 2 additions & 1 deletion src/clj/lambdacd/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

(def default-config
{:ms-to-wait-for-shutdown (* 10 1000)
:shutdown-sequence default-shutdown-sequence})
:shutdown-sequence default-shutdown-sequence
:step-updates-per-sec 10})

(defn- initialize-pipeline-state-updater [ctx]
(let [updater (pipeline-state-updater/start-pipeline-state-updater ctx)]
Expand Down
133 changes: 79 additions & 54 deletions src/clj/lambdacd/internal/execution.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
[clojure.repl :as repl]
[lambdacd.event-bus :as event-bus]
[lambdacd.steps.result :as step-results]
[lambdacd.presentation.pipeline-structure :as pipeline-structure])
[lambdacd.presentation.pipeline-structure :as pipeline-structure]
[throttler.core :as throttler])
(:import ( StringWriter)
(java.util UUID)))

(defn- step-output [step-id step-result]
{:outputs { step-id step-result}
:status (get step-result :status)})
{:outputs {step-id step-result}
:status (get step-result :status)})

(defn- is-finished [key value]
(and (= key :status) (not= value :waiting)))
Expand All @@ -36,19 +37,43 @@
(assoc key value)
(attach-wait-indicator-if-necessary key value)))

(defn- drop-and-throttle-ch [step-updates-per-sec in-ch]
(let [dropping-ch (async/chan (async/sliding-buffer 1))
slow-chan (throttler/throttle-chan dropping-ch step-updates-per-sec :second)]
(async/pipe in-ch dropping-ch)

(defn- publish-from-ch [ctx topic in-ch]
(let [updates-per-sec (get-in ctx [:config :step-updates-per-sec])
slow-chan (if updates-per-sec
(drop-and-throttle-ch updates-per-sec in-ch)
(async/go-loop []
(if-let [msg (async/<! slow-chan)]
(event-bus/publish! ctx topic msg)

(defn- process-channel-result-async [c {step-id :step-id build-number :build-number :as ctx}]
(async/go-loop [cur-result {:status :running}]
(let [ev (async/<! c)
new-result (if (map? ev)
(append-result cur-result ev))]
(if (nil? ev)
(event-bus/publish! ctx :step-result-updated {:build-number build-number
(let [publisher-ch (async/chan)
publisher-finished (publish-from-ch ctx :step-result-updated publisher-ch)
processed-result (async/go-loop [cur-result {:status :running}]
(let [ev (async/<! c)
new-result (if (map? ev)
(append-result cur-result ev))]
(if (nil? ev)
(async/close! publisher-ch)
(async/>! publisher-ch {:build-number build-number
:step-id step-id
:step-result new-result})
(recur new-result))))))
(recur new-result)))))]
(async/<! publisher-finished)
(async/<! processed-result))))

(defmacro with-err-str
[& body]
Expand Down Expand Up @@ -102,43 +127,43 @@
(event-bus/unsubscribe ctx :kill-step subscription))

(defn- report-step-finished [ctx complete-step-result]
(event-bus/publish ctx :step-finished {:step-id (:step-id ctx)
:build-number (:build-number ctx)
:final-result complete-step-result
(event-bus/publish ctx :step-finished {:step-id (:step-id ctx)
:build-number (:build-number ctx)
:final-result complete-step-result
:rerun-for-retrigger (boolean
(and (:retriggered-build-number ctx)
(:retriggered-step-id ctx)))}))

(defn- report-step-started [ctx]
(send-step-result!! ctx {:status :running})
(event-bus/publish ctx :step-started {:step-id (:step-id ctx)
:build-number (:build-number ctx)}))
(event-bus/publish ctx :step-started {:step-id (:step-id ctx)
:build-number (:build-number ctx)}))

(defn report-received-kill [ctx]
(async/>!! (:result-channel ctx) [:received-kill true]))

(defn add-kill-switch-reporter [ctx]
(add-watch (:is-killed ctx) (UUID/randomUUID) (fn [_ _ _ new-is-killed-val]
(if new-is-killed-val
(report-received-kill ctx)))))
(if new-is-killed-val
(report-received-kill ctx)))))

(defn execute-step [args [ctx step]]
(let [step-id (:step-id ctx)
result-ch (async/chan)
child-kill-switch (atom false)
parent-kill-switch (:is-killed ctx)
watch-key (UUID/randomUUID)
_ (add-watch parent-kill-switch watch-key (fn [key reference old new] (reset! child-kill-switch new)))
_ (reset! child-kill-switch @parent-kill-switch) ; make sure kill switch has the parents state in the beginning and is updated through the watch
ctx-for-child (assoc ctx :result-channel result-ch
:is-killed child-kill-switch)
_ (add-kill-switch-reporter ctx-for-child)
(let [step-id (:step-id ctx)
result-ch (async/chan)
child-kill-switch (atom false)
parent-kill-switch (:is-killed ctx)
watch-key (UUID/randomUUID)
_ (add-watch parent-kill-switch watch-key (fn [key reference old new] (reset! child-kill-switch new)))
_ (reset! child-kill-switch @parent-kill-switch) ; make sure kill switch has the parents state in the beginning and is updated through the watch
ctx-for-child (assoc ctx :result-channel result-ch
:is-killed child-kill-switch)
_ (add-kill-switch-reporter ctx-for-child)
processed-async-result-ch (process-channel-result-async result-ch ctx)
kill-subscription (kill-step-handling ctx-for-child)
_ (report-step-started ctx)
immediate-step-result (execute-or-catch step args ctx-for-child)
processed-async-result (async/<!! processed-async-result-ch)
complete-step-result (merge processed-async-result immediate-step-result)]
kill-subscription (kill-step-handling ctx-for-child)
_ (report-step-started ctx)
immediate-step-result (execute-or-catch step args ctx-for-child)
processed-async-result (async/<!! processed-async-result-ch)
complete-step-result (merge processed-async-result immediate-step-result)]
(log/debug (str "executed step " step-id complete-step-result))
(clean-up-kill-handling ctx-for-child kill-subscription)
(remove-watch parent-kill-switch watch-key)
Expand All @@ -155,8 +180,8 @@
(defn- to-context-and-step [ctx]
(fn [idx step]
(let [parent-step-id (:step-id ctx)
new-step-id (step-id/child-id parent-step-id (inc idx))
step-ctx (assoc ctx :step-id new-step-id)]
new-step-id (step-id/child-id parent-step-id (inc idx))
step-ctx (assoc ctx :step-id new-step-id)]
[step-ctx step])))

(defn- process-inheritance [out-ch step-results-channel unify-results-fn]
Expand All @@ -180,9 +205,9 @@
(map-indexed (to-context-and-step base-context) steps))

(defn keep-globals [old-args step-result]
(let [existing-globals (:global old-args)
new-globals (:global step-result)
merged-globals (merge existing-globals new-globals)
(let [existing-globals (:global old-args)
new-globals (:global step-result)
merged-globals (merge existing-globals new-globals)
args-with-old-and-new-globals (assoc step-result :global merged-globals)]

Expand Down Expand Up @@ -232,7 +257,7 @@
(defn retrigger-mock-step [retriggered-build-number]
(fn [args ctx]
(let [original-build-result (state/get-step-results ctx retriggered-build-number)
original-step-result (get original-build-result (:step-id ctx))]
original-step-result (get original-build-result (:step-id ctx))]
(publish-child-step-results!! ctx retriggered-build-number original-build-result)
(assoc original-step-result
:retrigger-mock-for-build-number retriggered-build-number))))
Expand All @@ -243,7 +268,7 @@
:retriggered-step-id nil))

(defn sequential-retrigger-predicate [ctx step]
(let [cur-step-id (:step-id ctx)
(let [cur-step-id (:step-id ctx)
retriggered-step-id (:retriggered-step-id ctx)]
Expand Down Expand Up @@ -278,18 +303,18 @@
unify-status-fn status/successful-when-all-successful
unify-results-fn nil ; dependent on unify-status-fn, can't have it here for now
retrigger-predicate sequential-retrigger-predicate}}]
(let [unify-results-fn (or unify-results-fn (unify-only-status unify-status-fn))
steps (filter not-nil? steps)
base-ctx-with-kill-switch (assoc ctx :is-killed is-killed)
subscription (event-bus/subscribe ctx :step-result-updated)
children-step-results-channel (->> subscription
(async/filter< (inherit-message-from-parent? ctx)))
step-contexts (contexts-for-steps steps base-ctx-with-kill-switch)
_ (process-inheritance (:result-channel ctx) children-step-results-channel unify-results-fn)
(let [unify-results-fn (or unify-results-fn (unify-only-status unify-status-fn))
steps (filter not-nil? steps)
base-ctx-with-kill-switch (assoc ctx :is-killed is-killed)
subscription (event-bus/subscribe ctx :step-result-updated)
children-step-results-channel (->> subscription
(async/filter< (inherit-message-from-parent? ctx)))
step-contexts (contexts-for-steps steps base-ctx-with-kill-switch)
_ (process-inheritance (:result-channel ctx) children-step-results-channel unify-results-fn)
step-contexts-with-retrigger-mocks (add-retrigger-mocks retrigger-predicate ctx step-contexts)
step-results (step-result-producer args step-contexts-with-retrigger-mocks)
result (reduce merge-two-step-results step-results)]
step-results (step-result-producer args step-contexts-with-retrigger-mocks)
result (reduce merge-two-step-results step-results)]
(event-bus/unsubscribe ctx :step-result-updated subscription)

Expand All @@ -313,7 +338,7 @@
(defn retrigger-async [pipeline context build-number step-id-to-run]
(let [next-build-number (state/next-build-number context)]
(retrigger pipeline context build-number step-id-to-run next-build-number ))
(retrigger pipeline context build-number step-id-to-run next-build-number))

(defn kill-step [ctx build-number step-id]
Expand Down
40 changes: 23 additions & 17 deletions test/clj/lambdacd/internal/execution_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@
(testing "that the final pipeline-state is properly set for a step returning a static and an async result"
(let [ctx (some-ctx-with :step-id [0 0]
:build-number 5
:pipeline-state-component (noop-pipeline-state/new-no-op-pipeline-state))
:pipeline-state-component (noop-pipeline-state/new-no-op-pipeline-state)
:config {:step-updates-per-sec nil})
step-results-channel (step-result-updates-for ctx)]
(execute-step {} [ctx some-step-that-sends-failure-on-ch-returns-success])
(is (= [{ :build-number 5 :step-id [0 0] :step-result {:status :running } }
Expand All @@ -181,26 +182,31 @@
(is (= {:outputs {[0 0] {:status :success } } :status :success }
(execute-step {} [(some-ctx-with :step-id [0 0]) some-step-that-sends-failure-on-ch-returns-success]))))
(testing "that the accumulated step-result is sent over the event-bus"
(let [ctx (some-ctx-with :step-id [0 0]
:build-number 5
:pipeline-state-component (noop-pipeline-state/new-no-op-pipeline-state))
(let [ctx (some-ctx-with :step-id [0 0]
:build-number 5
:pipeline-state-component (noop-pipeline-state/new-no-op-pipeline-state)
:config {:step-updates-per-sec nil})
step-results-channel (step-result-updates-for ctx)]
(execute-step {} [ctx some-step-building-up-result-state-incrementally])
(is (= [{:build-number 5 :step-id [0 0] :step-result {:status :running } }
{:build-number 5 :step-id [0 0] :step-result {:status :running :out "hello"} }
{:build-number 5 :step-id [0 0] :step-result {:status :running :out "hello world"} }
{:build-number 5 :step-id [0 0] :step-result {:status :running :some-value 42 :out "hello world"} }
{:build-number 5 :step-id [0 0] :step-result {:status :success :some-value 42 :out "hello world"} }] (slurp-chan step-results-channel)))))(testing "that the accumulated step-result is sent over the event-bus and can be resetted"
(let [ctx (some-ctx-with :step-id [0 0]
:build-number 5
:pipeline-state-component (noop-pipeline-state/new-no-op-pipeline-state))
(is (= [{:build-number 5 :step-id [0 0] :step-result {:status :running}}
{:build-number 5 :step-id [0 0] :step-result {:status :running :out "hello"}}
{:build-number 5 :step-id [0 0] :step-result {:status :running :out "hello world"}}
{:build-number 5 :step-id [0 0] :step-result {:status :running :some-value 42 :out "hello world"}}
{:build-number 5 :step-id [0 0] :step-result {:status :success :some-value 42 :out "hello world"}}]
(slurp-chan step-results-channel)))))
(testing "that the accumulated step-result is sent over the event-bus and can be resetted"
(let [ctx (some-ctx-with :step-id [0 0]
:build-number 5
:pipeline-state-component (noop-pipeline-state/new-no-op-pipeline-state)
:config {:step-updates-per-sec nil})
step-results-channel (step-result-updates-for ctx)]
(execute-step {} [ctx some-step-building-up-result-state-incrementally-and-resetting])
(is (= [{:build-number 5 :step-id [0 0] :step-result {:status :running } }
{:build-number 5 :step-id [0 0] :step-result {:status :running :out "hello"} }
{:build-number 5 :step-id [0 0] :step-result {:status :running :some-value 42 :out "hello"} }
{:build-number 5 :step-id [0 0] :step-result {:status :running :other-value 21} }
{:build-number 5 :step-id [0 0] :step-result {:status :success :other-value 21} }] (slurp-chan step-results-channel)))))
(is (= [{:build-number 5 :step-id [0 0] :step-result {:status :running}}
{:build-number 5 :step-id [0 0] :step-result {:status :running :out "hello"}}
{:build-number 5 :step-id [0 0] :step-result {:status :running :some-value 42 :out "hello"}}
{:build-number 5 :step-id [0 0] :step-result {:status :running :other-value 21}}
{:build-number 5 :step-id [0 0] :step-result {:status :success :other-value 21}}]
(slurp-chan step-results-channel)))))
(testing "that the event bus is notified when a step finishes"
(let [ctx (some-ctx-with :build-number 3
:step-id [1 2 3])
Expand Down

0 comments on commit dbf66f3

Please sign in to comment.