|
| 1 | +(ns puppetlabs.puppetdb.test-in-parallel |
| 2 | + "Supports \"lein test-in-parallel --concurrency N [NAMESPACE ...] [SELECTOR ...]\". |
| 3 | + By default, divides all the test namespaces (listed on the command |
| 4 | + line, or all in test-paths, into N roughly equal (count-wise) |
| 5 | + partitions and tests each partition in parallel in its own JVM via |
| 6 | + \"lein test PARTITION [SELECTOR ...]\". |
| 7 | +
|
| 8 | + This can be useful for crudely parallelizing a test suite that isn't |
| 9 | + safe to parallelize within a single JVM, perhaps due to reliance on |
| 10 | + with-redefs, etc. |
| 11 | +
|
| 12 | + Generally, you'll want to run a --calibration first (and at least |
| 13 | + once), which will collect test timing information in |
| 14 | + target/pdb-parallel-testing.edn and use it to partition (balance) |
| 15 | + the test namespaces more effectively. |
| 16 | +
|
| 17 | + It should be possible to collect the timing information from each |
| 18 | + jvm during a concurrent run, and make the calibration more |
| 19 | + automatic." |
| 20 | + (:require |
| 21 | + [bultitude.core :refer [namespaces-on-classpath]] |
| 22 | + [clojure.edn :as edn] |
| 23 | + [clojure.java.io :as io] |
| 24 | + [clojure.pprint :refer [pprint]] |
| 25 | + [clojure.string :as str] |
| 26 | + [clojure.test :as test :refer [*test-out* report]] |
| 27 | + [clojure.tools.cli :refer [parse-opts]]) |
| 28 | + (:import |
| 29 | + (java.io FileNotFoundException) |
| 30 | + (java.lang ProcessBuilder ProcessBuilder$Redirect) |
| 31 | + (java.nio.file Path))) |
| 32 | + |
| 33 | +;; Related to lein test selectors (which require eval): |
| 34 | +;; https://github.com/cognitect-labs/test-runner include/exclude tags |
| 35 | + |
| 36 | +(defn logn [& args] (binding [*out* *err*] (apply println args))) |
| 37 | +(defn logf [& args] (binding [*out* *err*] (print (apply format args)) (flush))) |
| 38 | + |
| 39 | +(defn as-file [x] (if (instance? Path x) (.toFile x) (io/as-file x))) |
| 40 | + |
| 41 | +(def options |
| 42 | + [[nil "--test-paths PATH_LIST" "lein project.clj :test-paths as an EDN collection" |
| 43 | + :parse-fn #(distinct (edn/read-string %)) |
| 44 | + :validate [#(every? string? %) "must be an EDN collection of strings"]] |
| 45 | + [nil "--state FILE" "persistent state (target/pdb-parallel-testing.edn)" |
| 46 | + :default "target/pdb-parallel-testing.edn" |
| 47 | + :default-desc ""] ;; too long for help |
| 48 | + ["-n" "--concurrency N" "number of batches to run in parallel (1)" |
| 49 | + :default 1 :default-desc "" |
| 50 | + :parse-fn #(Long/parseLong %) :validate [pos? "must be positive"]] |
| 51 | + [nil "--calibrate" :desc "collect test timing for future scheduling"] |
| 52 | + ;; Because "lein test-in-parallel --help" shows generic info |
| 53 | + [nil "--test-in-parallel-help"]]) |
| 54 | + |
| 55 | +(defn lein-test-ns-syms |
| 56 | + "Given leiningen project :test-paths, returns the same collection of |
| 57 | + test namespaces (as symbols) that \"lein test\" would select." |
| 58 | + [test-paths] |
| 59 | + (sort (namespaces-on-classpath :classpath (map io/file test-paths) |
| 60 | + :ignore-unreadable? false))) |
| 61 | + |
| 62 | +(defn create-batch-test-process [batch cmd] |
| 63 | + (let [adjust-env #(doto ^java.util.Map (.environment %) |
| 64 | + (.remove "CLASSPATH") |
| 65 | + (.put "PDB_TEST_ID" (str batch)))] |
| 66 | + (doto (ProcessBuilder. cmd) |
| 67 | + (.redirectOutput ProcessBuilder$Redirect/INHERIT) |
| 68 | + (.redirectError ProcessBuilder$Redirect/INHERIT) |
| 69 | + adjust-env))) |
| 70 | + |
| 71 | +(defn partition-tests |
| 72 | + "Partitions the namespaces into n partitions while attempting to |
| 73 | + minimize the longest expected test time of any partition. Currently |
| 74 | + intends to be LPT-ish: |
| 75 | + https://en.wikipedia.org/wiki/Longest-processing-time-first_scheduling" |
| 76 | + [n namespaces expected-times] |
| 77 | + (assert (pos? n)) |
| 78 | + (assert (every? symbol? namespaces)) |
| 79 | + (assert (or (nil? expected-times) (map? expected-times))) |
| 80 | + (doseq [[k v] expected-times] (assert (symbol? k)) (assert (number? v))) |
| 81 | + (let [avg-time (if (seq expected-times) |
| 82 | + (/ (apply + (vals expected-times)) |
| 83 | + (count expected-times)) |
| 84 | + 1) |
| 85 | + ns-and-times (->> (mapv #(vector %1 (get expected-times %1 avg-time)) namespaces) |
| 86 | + (sort #(compare (second %2) (second %1))))] |
| 87 | + (loop [result (mapv #(vector % 0 []) (range n)) |
| 88 | + [[ns time] & nsts] ns-and-times] |
| 89 | + (if-not ns |
| 90 | + (filterv seq (map #(nth % 2) result)) |
| 91 | + (let [[i total part-nsts] (apply min-key second result)] |
| 92 | + (recur (update result i #(assoc % |
| 93 | + 1 (+ total time) |
| 94 | + 2 (conj part-nsts ns))) |
| 95 | + nsts)))))) |
| 96 | + |
| 97 | +(defn run-concurrent-test-batches [namespaces selectors n expected-ns-times] |
| 98 | + (assert (every? symbol? namespaces)) |
| 99 | + (assert (every? keyword? selectors)) |
| 100 | + (let [batches (partition-tests n namespaces expected-ns-times) |
| 101 | + cmds (map (fn [namespaces] |
| 102 | + (-> (into ["lein" "trampoline" "test"] (map name namespaces)) |
| 103 | + (into (map str selectors)))) |
| 104 | + batches) |
| 105 | + |
| 106 | + procs (mapv (fn [i cmd] |
| 107 | + (logf "[%d] Starting %s\n" i (pr-str cmd)) |
| 108 | + (.start (create-batch-test-process i cmd))) |
| 109 | + (range) cmds) |
| 110 | + statuses (mapv #(.waitFor %) procs)] |
| 111 | + (if (every? zero? statuses) |
| 112 | + 0 |
| 113 | + (do |
| 114 | + (doseq [[cmd status] (map vector cmds statuses) |
| 115 | + :when (not= 0 status)] |
| 116 | + (apply logn "ERROR: exit" status "for" cmd)) |
| 117 | + 2)))) |
| 118 | + |
| 119 | +(defn read-state-file [path] |
| 120 | + (try |
| 121 | + (with-open [rdr (io/reader (as-file path)) |
| 122 | + rdr (java.io.PushbackReader. rdr)] |
| 123 | + (edn/read rdr)) |
| 124 | + (catch FileNotFoundException _))) |
| 125 | + |
| 126 | +(defn pdb-test-nss [dirs] |
| 127 | + ;; The integration tests launch their own vms... |
| 128 | + (remove #(str/starts-with? % "puppetlabs.puppetdb.integration") |
| 129 | + (lein-test-ns-syms dirs))) |
| 130 | + |
| 131 | +(defn test-in-parallel |
| 132 | + [dirs namespaces & {:keys [concurrency selectors state]}] |
| 133 | + ;; dirs only matter if no namespaces are given |
| 134 | + ;; FIXME: cleanup on C-c and other parent crashes. |
| 135 | + (let [namespaces (or (seq namespaces) (pdb-test-nss dirs))] |
| 136 | + (binding [*out* *err*] |
| 137 | + (if (empty? selectors) |
| 138 | + (println "Testing: ") |
| 139 | + (println (str "Testing (selection " (str/join " " selectors) "):"))) |
| 140 | + (doseq [n namespaces] |
| 141 | + (println " " n))) |
| 142 | + (run-concurrent-test-batches namespaces selectors concurrency |
| 143 | + (:time state)))) |
| 144 | + |
| 145 | +(def test-times (atom {})) |
| 146 | + |
| 147 | +(def calibration-methods-initialized? (atom false)) |
| 148 | + |
| 149 | +(defn initialize-calibration-methods [] |
| 150 | + ;; The begin/end defaults, e.g. via (get-method |
| 151 | + ;; report :begin-test-ns) before these defmethods, just print or do |
| 152 | + ;; nothing, so we don't want/need to call them. |
| 153 | + (locking initialize-calibration-methods |
| 154 | + (when-not @calibration-methods-initialized? |
| 155 | + (defmethod report :begin-test-ns [{:keys [ns]}] |
| 156 | + (binding [*out* *test-out*] (println "\nTesting" (ns-name ns))) |
| 157 | + (swap! test-times assoc-in [(ns-name ns) :start] (System/nanoTime))) |
| 158 | + (defmethod report :end-test-ns [{:keys [ns]}] |
| 159 | + (let [end (System/nanoTime) |
| 160 | + nn (ns-name ns) |
| 161 | + start (get-in @test-times [nn :start]) |
| 162 | + elapsed (- end start)] |
| 163 | + (swap! test-times update nn #(merge % {:end end :elapsed elapsed})) |
| 164 | + (binding [*out* *test-out*] |
| 165 | + (-> "Tested %s in %.3fs\n" |
| 166 | + (format (ns-name ns) (/ elapsed 1000000000.0)) |
| 167 | + print)))) |
| 168 | + (reset! calibration-methods-initialized? true)))) |
| 169 | + |
| 170 | +(defn calibrate [dirs namespaces state-file] |
| 171 | + ;; Currently ignores selectors - see |
| 172 | + ;; leiningen.test/form-for-suppressing-unselected-tests for the |
| 173 | + ;; details. |
| 174 | + (let [namespaces (map symbol (or (seq namespaces) (pdb-test-nss dirs)))] |
| 175 | + (initialize-calibration-methods) |
| 176 | + (apply require namespaces) |
| 177 | + (let [{:keys [fail error]} (apply test/run-tests namespaces)] |
| 178 | + (if-not (zero? (+ fail error)) |
| 179 | + 2 |
| 180 | + (let [info (reduce-kv (fn select-timing [result ns {:keys [elapsed]}] |
| 181 | + (assoc result ns elapsed)) |
| 182 | + {} |
| 183 | + @test-times)] |
| 184 | + (with-open [out (io/writer state-file)] |
| 185 | + (pprint {:time info} out) |
| 186 | + 0)))))) |
| 187 | + |
| 188 | +(defn read-selector [x] |
| 189 | + (let [v (edn/read-string x)] |
| 190 | + (when-not (keyword? v) |
| 191 | + ;; FIXME: friendlier error handling |
| 192 | + (throw (ex-info (str "Expected test selector, found " (pr-str x)) {:val x}))) |
| 193 | + v)) |
| 194 | + |
| 195 | +(defn read-ns-sym [x] |
| 196 | + (let [v (edn/read-string x)] |
| 197 | + (when-not (symbol? v) |
| 198 | + ;; FIXME: friendlier error handling |
| 199 | + (throw (ex-info (str "Expected namespace symbol, found " (pr-str x)) {:val x}))) |
| 200 | + v)) |
| 201 | + |
| 202 | +(defn validate-nss-and-selectors [args] |
| 203 | + (let [parts (partition-by #(str/starts-with? % ":") args)] |
| 204 | + (case (count parts) |
| 205 | + 0 nil |
| 206 | + 1 (if (str/starts-with? (ffirst parts) ":") |
| 207 | + [nil (->> parts first (mapv read-selector))] |
| 208 | + [(->> parts first (mapv read-ns-sym)) nil]) |
| 209 | + 2 (let [[namespaces selectors] parts] |
| 210 | + (if (str/starts-with? (ffirst namespaces) ":") |
| 211 | + (do |
| 212 | + (logn "error: test selectors must come after namespaces") |
| 213 | + 2) |
| 214 | + [(mapv read-ns-sym namespaces) (mapv read-selector selectors)])) |
| 215 | + (do |
| 216 | + (logn "error: all test selectors must come after namespaces") |
| 217 | + 2)))) |
| 218 | + |
| 219 | +(defn main [& args] |
| 220 | + (let [{:keys [arguments errors options summary]} (parse-opts args options)] |
| 221 | + (System/exit |
| 222 | + (cond |
| 223 | + errors (do (logn (str/join \newline errors)) 2) |
| 224 | + |
| 225 | + (:test-in-parallel-help options) |
| 226 | + (do |
| 227 | + (println "Usage: lein test-in-parallel OPT...") |
| 228 | + (println summary) |
| 229 | + 0) |
| 230 | + |
| 231 | + (:calibrate options) |
| 232 | + (if-not (= 1 (:concurrency options)) |
| 233 | + (do |
| 234 | + (logn "error: --calibration requires a ---concurrency of 1") |
| 235 | + 2) |
| 236 | + (let [ns-sel (validate-nss-and-selectors arguments)] |
| 237 | + (if (integer? ns-sel) |
| 238 | + ns-sel |
| 239 | + (let [[namespaces selectors] ns-sel] |
| 240 | + (if (seq selectors) |
| 241 | + (do |
| 242 | + (logn "error: --calibration doesn't currently allow or respect selectors") |
| 243 | + 2) |
| 244 | + (calibrate (:test-paths options) namespaces (:state options))))))) |
| 245 | + |
| 246 | + :else |
| 247 | + (let [ns-sel (validate-nss-and-selectors arguments)] |
| 248 | + (if (integer? ns-sel) |
| 249 | + ns-sel |
| 250 | + (let [[namespaces selectors] ns-sel] |
| 251 | + (test-in-parallel (:test-paths options) namespaces |
| 252 | + (assoc (select-keys options [:concurrency]) |
| 253 | + :selectors selectors |
| 254 | + :state (-> options :state read-state-file)))))))))) |
0 commit comments