Skip to content
This repository has been archived by the owner on Jun 15, 2024. It is now read-only.

Commit

Permalink
debugging #135
Browse files Browse the repository at this point in the history
  • Loading branch information
flosell committed Oct 16, 2016
1 parent b9cad00 commit 68a103e
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 34 deletions.
8 changes: 3 additions & 5 deletions example/clj/todopipeline/steps.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
[lambdacd.steps.manualtrigger :as manualtrigger]
[lambdacd.steps.support :as support]
[clojure.core.async :as async]
[clojure.tools.logging :as log]))
[clojure.tools.logging :as log]
[lambdacd.debug.core-async :as async-debug]))

;; Let's define some constants
(def backend-repo "[email protected]:flosell/todo-backend-compojure.git")
Expand Down Expand Up @@ -49,10 +50,7 @@
(doall (for [i (range 800)]
(support/if-not-killed ctx
(do
(log/info (str "trying write to " (:result-channel ctx) " " i))
(async/>!! (:result-channel ctx) [:xyz i])
(log/info (str "tried write to " (:result-channel ctx) " " i)))
)))
(async-debug/>!! (:result-channel ctx) [:xyz i])))))
{:status :success}))

(defn log-lots-of-output-in-chaining [args ctx]
Expand Down
1 change: 1 addition & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
[org.clojure/tools.logging "0.3.1"]
[org.slf4j/slf4j-api "1.7.21"]
[ring/ring-json "0.3.1"]
[throttler "1.0.0"]
[cheshire "5.4.0"]
[cljsjs/moment "2.10.6-4"]
[clj-time "0.9.0"]
Expand Down
48 changes: 48 additions & 0 deletions src/clj/lambdacd/debug/core_async.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
(ns lambdacd.debug.core-async
(:require [clojure.core.async :as async])
(:import (java.util UUID)
(org.apache.commons.lang.exception ExceptionUtils)))

(defonce chans (atom {}))
(defonce open-writes (atom {}))

(defn reset-state []
(reset! chans {})
(reset! open-writes {}))

(defn channels-with-open-writes []
(let [mapper (fn [[k v]]
(assoc v :name (get @chans (:port v))))]
(map mapper @open-writes)))

(defn open-writes-infos[]
(doall (map #(println (:name %) (:trace %)) (channels-with-open-writes)))
(doall (map #(println (:name %)) (channels-with-open-writes))))

(defn chan [name & args]
(let [ch (apply async/chan args)]
(swap! chans #(assoc % ch name))
ch))

(defn stacktrace []
(ExceptionUtils/getStackTrace (Exception.)))

(defn before-write [write-id port val]
(let [payload {:port port
:trace (stacktrace)}]
(swap! open-writes #(assoc % write-id payload))))

(defn after-write [write-id port val]
(swap! open-writes #(dissoc % write-id)))

(defmacro >! [port val]
`(let [write-id# (UUID/randomUUID)]
(before-write write-id# ~port ~val)
(async/>! ~port ~val)
(after-write write-id# ~port ~val)))

(defn >!! [port val]
(let [write-id (UUID/randomUUID)]
(before-write write-id port val)
(async/>!! port val)
(after-write write-id port val)))
14 changes: 8 additions & 6 deletions src/clj/lambdacd/event_bus.clj
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
(ns lambdacd.event-bus
(:require [clojure.core.async :as async]))
(:require [clojure.core.async :as async]
[lambdacd.debug.core-async :as async-debug])
(:import (java.util UUID)))

(defn initialize-event-bus [ctx]
(let [publisher-ch (async/chan)
(let [publisher-ch (async-debug/chan "publisher")
publication (async/pub publisher-ch :topic)]
(assoc ctx :event-publisher publisher-ch
:event-publication publication)))


(defmacro publish! [ctx topic payload]
`(async/>! (:event-publisher ~ctx) {:topic ~topic :payload ~payload}))
`(async-debug/>! (:event-publisher ~ctx) {:topic ~topic :payload ~payload}))

(defmacro publish!! [ctx topic payload]
`(async/>!! (:event-publisher ~ctx) {:topic ~topic :payload ~payload}))
`(async-debug/>!! (:event-publisher ~ctx) {:topic ~topic :payload ~payload}))

(defn publish [ctx topic payload]
"DEPRECATED, will be removed in subsequent versions. use publish!! or publish! instead"
Expand All @@ -24,11 +26,11 @@
result-ch))

(defn only-payload [subscription]
(let [result-ch (async/chan)]
(let [result-ch (async-debug/chan (str "only-payload-" (UUID/randomUUID)))]
(async/go-loop []
(if-let [result (async/<! subscription)]
(do
(async/>! result-ch (:payload result))
(async-debug/>! result-ch (:payload result))
(recur))))
result-ch))

Expand Down
38 changes: 21 additions & 17 deletions src/clj/lambdacd/internal/execution.clj
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
(ns lambdacd.internal.execution
"low level functions for job-execution"
(:require [clojure.core.async :as async]
[lambdacd.internal.pipeline-state :as legacy-pipeline-state]
[lambdacd.debug.core-async :as async-debug]
[lambdacd.state.core :as state]
[clojure.tools.logging :as log]
[lambdacd.step-id :as step-id]
[lambdacd.steps.status :as status]
[clojure.repl :as repl]
[lambdacd.event-bus :as event-bus]
[lambdacd.steps.result :as step-results]
[throttler.core :as throttler]
[lambdacd.presentation.pipeline-structure :as pipeline-structure])
(:import (java.io StringWriter)
(java.util UUID)))
Expand Down Expand Up @@ -37,18 +38,19 @@
(attach-wait-indicator-if-necessary key value)))

(defn- process-channel-result-async [c {step-id :step-id build-number :build-number :as ctx}]
(async/go-loop [cur-result {:status :running}]
(let [ev (async/<! c)
(let [throttled-chan c]
(async/go-loop [cur-result {:status :running}]
(let [ev (async/<! throttled-chan)
new-result (if (map? ev)
ev
(append-result cur-result ev))]
(if (nil? ev)
cur-result
(do
(event-bus/publish! ctx :step-result-updated {:build-number build-number
:step-id step-id
:step-result new-result})
(recur new-result))))))
:step-id step-id
:step-result new-result})
(recur new-result)))))))

(defmacro with-err-str
[& body]
Expand Down Expand Up @@ -124,7 +126,7 @@

(defn execute-step [args [ctx step]]
(let [step-id (:step-id ctx)
result-ch (async/chan)
result-ch (async-debug/chan (str "execute-step-result-ch_" (:build-number ctx) "_" step-id))
child-kill-switch (atom false)
parent-kill-switch (:is-killed ctx)
watch-key (UUID/randomUUID)
Expand Down Expand Up @@ -161,16 +163,18 @@

(defn- process-inheritance [out-ch step-results-channel unify-results-fn]
(async/go
(loop [results {}]
(if-let [{step-id :step-id
step-result :step-result} (async/<! step-results-channel)]
(let [new-results (assoc results step-id step-result)
old-unified (unify-results-fn results)
new-unified (unify-results-fn new-results)]
(if (not= old-unified new-unified)
(async/>! out-ch new-unified))
(recur new-results))
(async/close! out-ch)))))
(let [dropping-out-ch (async-debug/chan "dropping-out-ch" (async/sliding-buffer 1))]
(async/pipe dropping-out-ch out-ch)
(loop [results {}]
(if-let [{step-id :step-id
step-result :step-result} (async/<! step-results-channel)]
(let [new-results (assoc results step-id step-result)
old-unified (unify-results-fn results)
new-unified (unify-results-fn new-results)]
(if (not= old-unified new-unified)
(async-debug/>! dropping-out-ch new-unified))
(recur new-results))
(async/close! dropping-out-ch))))))

(defn contexts-for-steps
"creates contexts for steps"
Expand Down
16 changes: 10 additions & 6 deletions test/clj/lambdacd/example/steps_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
[lambdacd.testsupport.data :as data]
[lambdacd.steps.git :as git]
[lambdacd.testsupport.noop-pipeline-state :as noop-pipeline-state]
[todopipeline.steps :as steps]))
[todopipeline.steps :as steps]
[lambdacd.debug.core-async :as async-debug]))

; testing a simple build step
(deftest wait-for-frontend-repo-test
Expand All @@ -31,9 +32,12 @@
(data/some-ctx-with :pipeline-state-component (noop-pipeline-state/new-no-op-pipeline-state)))

(deftest output-load-test
;(testing "that we don't fail if we come across lots of output for just in general"
; (is (not= "timeout" (with-timeout 10000
; (execution/execute-step {} [(output-load-test-ctx) steps/log-lots-of-output])))))
(testing "that we don't fail if we come across lots of output for just in general"
(async-debug/reset-state)
(is (not= "timeout" (with-timeout 10000
(execution/execute-step {} [(output-load-test-ctx) steps/log-lots-of-output])))))
(testing "that we don't fail if we come across lots of output for chaining"
(is (not= "timeout" (with-timeout 5000
(execution/execute-step {} [(output-load-test-ctx) steps/log-lots-of-output-in-chaining]))))))
(async-debug/reset-state)
(is (not= "timeout" (with-timeout 10000
(execution/execute-step {} [(output-load-test-ctx) steps/log-lots-of-output-in-chaining]))))
(async-debug/open-writes-infos)))

0 comments on commit 68a103e

Please sign in to comment.