|
4 | 4 | require 'time'
|
5 | 5 | require 'optparse'
|
6 | 6 | require 'socket'
|
| 7 | +require 'net/http' |
| 8 | +require 'uri' |
7 | 9 |
|
8 |
| -class Nc |
9 |
| - def initialize(host) |
10 |
| - @host = host |
| 10 | +class NetworkOutput |
| 11 | + # TODO: Support HTTPS. |
| 12 | + def initialize(host_url) |
| 13 | + @url = URI.parse(host_url) unless host_url.is_a?(URI) |
| 14 | + open |
11 | 15 | end
|
12 | 16 |
|
13 |
| - def socket |
14 |
| - return @socket if @socket && !@socket.closed? |
15 |
| - @socket = TCPSocket.new(@host, 2003) |
| 17 | + def open |
| 18 | + return if @output |
| 19 | + |
| 20 | + @output = case @url.scheme |
| 21 | + when 'tcp' |
| 22 | + TCPSocket.new(@url.host, @url.port) |
| 23 | + when 'http' |
| 24 | + http = Net::HTTP.new(@url.hostname, @url.port) |
| 25 | + http.keep_alive_timeout = 20 |
| 26 | + http.start |
| 27 | + |
| 28 | + http |
| 29 | + end |
16 | 30 | end
|
17 | 31 |
|
18 | 32 | def write(str, timeout = 1)
|
19 |
| - begin |
20 |
| - socket.write("#{str}\r\n") |
21 |
| - rescue Errno::EPIPE, Errno::EHOSTUNREACH, Errno::ECONNREFUSED |
22 |
| - @socket = nil |
23 |
| - STDERR.puts "WARNING: write to #{@host} failed; sleeping for #{timeout} seconds and retrying..." |
24 |
| - sleep timeout |
25 |
| - write(str, timeout * 2) |
| 33 | + case @url.scheme |
| 34 | + when 'tcp' |
| 35 | + begin |
| 36 | + @output.write(str) |
| 37 | + rescue Errno::EPIPE, Errno::EHOSTUNREACH, Errno::ECONNREFUSED |
| 38 | + close |
| 39 | + STDERR.puts "WARNING: write to #{@host} failed; sleeping for #{timeout} seconds and retrying..." |
| 40 | + sleep timeout |
| 41 | + open |
| 42 | + write(str, timeout * 2) |
| 43 | + end |
| 44 | + when 'http' |
| 45 | + request = Net::HTTP::Post.new(@url) |
| 46 | + request['Connection'] = 'keep-alive' |
| 47 | + response = @output.request(request, str) |
| 48 | + |
| 49 | + STDERR.puts "POST: #{@url} #{response.code}" |
26 | 50 | end
|
27 | 51 | end
|
28 | 52 |
|
29 |
| - def close_socket |
30 |
| - @socket.close if @socket |
31 |
| - @socket = nil |
| 53 | + def close |
| 54 | + case @url.scheme |
| 55 | + when 'tcp' |
| 56 | + @output.close |
| 57 | + when 'http' |
| 58 | + @output.finish |
| 59 | + end |
| 60 | + ensure |
| 61 | + @output = nil |
32 | 62 | end
|
33 | 63 | end
|
34 | 64 |
|
35 | 65 | def parse_file(filename)
|
36 |
| - nc = nil |
37 |
| - if $options[:host] |
38 |
| - nc = Nc.new($options[:host]) |
39 |
| - end |
40 |
| - begin |
41 |
| - data = JSON.parse(File.read(filename)) |
| 66 | + data = JSON.parse(File.read(filename)) |
42 | 67 |
|
43 |
| - # Newer versions of the log tool insert a timestamp field into the JSON. |
44 |
| - if data['timestamp'] |
45 |
| - timestamp = Time.parse(data.delete('timestamp')) |
46 |
| - parent_key = nil |
47 |
| - else |
48 |
| - timestamp = get_timestamp(filename) |
49 |
| - # The only data supported in the older log tool comes from puppetserver. |
50 |
| - parent_key = 'servers.' + get_hoststr(filename) + '.puppetserver' |
51 |
| - end |
| 68 | + # Newer versions of the log tool insert a timestamp field into the JSON. |
| 69 | + if data['timestamp'] |
| 70 | + timestamp = Time.parse(data.delete('timestamp')) |
| 71 | + parent_key = nil |
| 72 | + else |
| 73 | + timestamp = get_timestamp(filename) |
| 74 | + # The only data supported in the older log tool comes from puppetserver. |
| 75 | + parent_key = 'servers.' + get_hoststr(filename) + '.puppetserver' |
| 76 | + end |
52 | 77 |
|
53 |
| - array = metrics(data, timestamp, parent_key) |
54 |
| - lines = array.map do |item| |
| 78 | + case $options[:output_format] |
| 79 | + when 'influxdb' |
| 80 | + influx_metrics(data, timestamp, parent_key).join("\n") |
| 81 | + else |
| 82 | + metrics(data, timestamp, parent_key).map do |item| |
55 | 83 | item.split('\n')
|
56 |
| - end.flatten |
57 |
| - |
58 |
| - lines.each do |line| |
59 |
| - if nc |
60 |
| - # IS THIS NECESSARY??? I HAVE NO IDEA!!! |
61 |
| - #sleep 0.0001 |
62 |
| - nc.write("#{line}\n") |
63 |
| - else |
64 |
| - puts(line) |
65 |
| - end |
66 |
| - end |
67 |
| - rescue => e |
68 |
| - STDERR.puts "ERROR: #{filename}: #{e.message}" |
| 84 | + end.flatten.join("\r\n") |
69 | 85 | end
|
70 | 86 | end
|
71 | 87 |
|
@@ -115,6 +131,17 @@ def error_name(str)
|
115 | 131 | end
|
116 | 132 | end
|
117 | 133 |
|
| 134 | +def return_tag(a, n) |
| 135 | + if a[n].is_a? String |
| 136 | + return a[n] |
| 137 | + else |
| 138 | + if n > -1 |
| 139 | + return_tag(a, n-1) |
| 140 | + else return "none" |
| 141 | + end |
| 142 | +end |
| 143 | +end |
| 144 | + |
118 | 145 | def metrics(data, timestamp, parent_key = nil)
|
119 | 146 | data.collect do |key, value|
|
120 | 147 | current_key = [parent_key, safe_name(key)].compact.join('.')
|
@@ -149,18 +176,158 @@ def metrics(data, timestamp, parent_key = nil)
|
149 | 176 | end.flatten.compact
|
150 | 177 | end
|
151 | 178 |
|
| 179 | +def remove_trailing_comma(str) |
| 180 | + str.nil? ? nil : str.chomp(",") |
| 181 | +end |
| 182 | + |
| 183 | +def influx_tag_parser(tag) |
| 184 | + delete_set = ["status", "metrics", "routes", "status-service", "experimental", "app", "max", "min", "used", "init", "committed", "aggregate", "mean", "std-dev", "count", "total", "1", "5", "15"] |
| 185 | + tag = tag - delete_set |
| 186 | + tag_set = nil |
| 187 | + |
| 188 | + if tag.include? "servers" |
| 189 | + n = tag.index "servers" |
| 190 | + server_name = $options[:server_tag] || tag[n.to_i + 1] |
| 191 | + tag_set = "server=#{server_name}," |
| 192 | + tag.delete_at(tag.index("servers")+1) |
| 193 | + tag.delete("servers") |
| 194 | + end |
| 195 | + |
| 196 | + if tag.include? "orchestrator" |
| 197 | + tag_set = "#{tag_set}service=orchestrator," |
| 198 | + tag.delete("orchestrator") |
| 199 | + end |
| 200 | + |
| 201 | + if tag.include? "puppet_server" |
| 202 | + tag_set = "#{tag_set}service=puppet_server," |
| 203 | + tag.delete("puppet_server") |
| 204 | + end |
| 205 | + |
| 206 | + if tag.include? "puppetdb" |
| 207 | + tag_set = "#{tag_set}service=puppetdb," |
| 208 | + tag.delete("puppetdb") |
| 209 | + end |
| 210 | + |
| 211 | + if tag.include? "gc-stats" |
| 212 | + n = tag.index "gc-stats" |
| 213 | + gcstats_name = tag[n.to_i + 1] |
| 214 | + tag_set = "#{tag_set}gc-stats=#{gcstats_name}," |
| 215 | + tag.delete_at(tag.index("gc-stats")+1) |
| 216 | + tag.delete("gc-stats") |
| 217 | + end |
| 218 | + |
| 219 | + if tag.include? "broker-service" |
| 220 | + n = tag.index "broker-service" |
| 221 | + brokerservice_name = tag[n.to_i + 1] |
| 222 | + tag_set = "#{tag_set}broker-service_name=#{brokerservice_name}," |
| 223 | + tag.delete_at(tag.index("broker-service")+1) |
| 224 | + tag.delete("broker-service") |
| 225 | + end |
| 226 | + |
| 227 | + if tag.length > 1 |
| 228 | + measurement = tag.compact.join('.') |
| 229 | + tag_set = "#{measurement},#{tag_set}" |
| 230 | + elsif tag.length == 1 |
| 231 | + measurement = tag[0] |
| 232 | + tag_set = "#{measurement},#{tag_set}" |
| 233 | + end |
| 234 | + |
| 235 | + tag_set = remove_trailing_comma(tag_set) |
| 236 | + return tag_set |
| 237 | + |
| 238 | +end |
| 239 | + |
| 240 | +def influx_metrics(data, timestamp, parent_key = nil) |
| 241 | + data.collect do |key, value| |
| 242 | + current_key = [parent_key, safe_name(key)].compact.join('.') |
| 243 | + case value |
| 244 | + when Hash |
| 245 | + influx_metrics(value, timestamp, current_key) |
| 246 | + when Numeric |
| 247 | + temp_key = current_key.split(".") |
| 248 | + field_key = return_tag(temp_key, temp_key.length) |
| 249 | + if field_key.eql? "none" |
| 250 | + break |
| 251 | + end |
| 252 | + field_value = value |
| 253 | + tag_set = influx_tag_parser(temp_key) |
| 254 | + "#{tag_set} #{field_key}=#{field_value} #{timestamp.to_i}" |
| 255 | + when Array |
| 256 | + # Puppet Profiler metric. |
| 257 | + pp_metric = case current_key |
| 258 | + when /resource-metrics\Z/ |
| 259 | + "resource" |
| 260 | + when /function-metrics\Z/ |
| 261 | + "function" |
| 262 | + when /catalog-metrics\Z/ |
| 263 | + "metric" |
| 264 | + when /http-metrics\Z/ |
| 265 | + "route-id" |
| 266 | + else |
| 267 | + # Skip all other array valued metrics. |
| 268 | + next |
| 269 | + end |
| 270 | + |
| 271 | + temp_key = current_key.split(".") |
| 272 | + tag_set = influx_tag_parser(temp_key) |
| 273 | + |
| 274 | + value.map do |metrics| |
| 275 | + working_set = metrics.dup |
| 276 | + entry_name = working_set.delete(pp_metric) |
| 277 | + next if entry_name.nil? |
| 278 | + |
| 279 | + # Strip characters reserved by InfluxDB. |
| 280 | + entry_name.gsub(/\s,=/, '') |
| 281 | + leader = "#{tag_set},name=#{entry_name}" |
| 282 | + |
| 283 | + measurements = working_set.map {|k,v| [k,v].join("=")}.join(',') |
| 284 | + |
| 285 | + "#{leader} #{measurements} #{timestamp.to_i}" |
| 286 | + end |
| 287 | + else |
| 288 | + nil |
| 289 | + end |
| 290 | + end.flatten.compact |
| 291 | +end |
| 292 | + |
152 | 293 | $options = {}
|
153 | 294 | OptionParser.new do |opt|
|
154 | 295 | opt.on('--pattern PATTERN') { |o| $options[:pattern] = o }
|
155 | 296 | opt.on('--netcat HOST') { |o| $options[:host] = o }
|
| 297 | + opt.on('--convert-to FORMAT') { |o| $options[:output_format] = o } |
| 298 | + opt.on('--server-tag SERVER_NAME') { |o| $options[:server_tag] = o } |
| 299 | + |
| 300 | + # InfluxDB options |
| 301 | + opt.on('--influx-db DATABASE_NAME') {|o| $options[:influx_db] = o } |
156 | 302 | end.parse!
|
157 | 303 |
|
158 |
| -if $options[:pattern] |
159 |
| - Dir.glob($options[:pattern]).each do |filename| |
160 |
| - parse_file(filename) |
161 |
| - end |
| 304 | +if $options[:host] |
| 305 | + url = case $options[:output_format] |
| 306 | + when 'influxdb' |
| 307 | + raise ArgumentError, "--influx-db must be passsed along with --netcat" unless $options[:influx_db] |
| 308 | + "http://#{$options[:host]}:8086/write?db=#{$options[:influx_db]}&precision=s" |
| 309 | + else |
| 310 | + "tcp://#{$options[:host]}:2003" |
| 311 | + end |
| 312 | + |
| 313 | + $net_output = NetworkOutput.new(url) |
162 | 314 | end
|
163 | 315 |
|
164 |
| -while filename = ARGV.shift |
165 |
| - parse_file(filename) |
| 316 | +data_files = ARGV |
| 317 | +data_files += Dir.glob($options[:pattern]) if $options[:pattern] |
| 318 | + |
| 319 | +data_files.each do |filename| |
| 320 | + begin |
| 321 | + converted_data = parse_file(filename) |
| 322 | + |
| 323 | + if $options[:host] |
| 324 | + $net_output.write(converted_data) |
| 325 | + else |
| 326 | + STDOUT.write(converted_data) |
| 327 | + end |
| 328 | + rescue => e |
| 329 | + STDERR.puts "ERROR: #{filename}: #{e.message}" |
| 330 | + end |
166 | 331 | end
|
| 332 | + |
| 333 | +$net_output.close if $options[:host] |
0 commit comments