Skip to content

Commit dccc5cb

Browse files
author
Richard Pijnenburg
committed
This came from elasticsearch/logstash at cf2242170011fbf2d264ae87660192754697671a
0 parents  commit dccc5cb

File tree

7 files changed

+366
-0
lines changed

7 files changed

+366
-0
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
*.gem
2+
Gemfile.lock
3+
.bundle
4+
vendor

Gemfile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
source 'https://rubygems.org'
2+
gem 'rake'
3+
gem 'gem_publisher'
4+
gem 'archive-tar-minitar'

Rakefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
@files=[]
2+
3+
task :default do
4+
system("rake -T")
5+
end
6+

lib/logstash/outputs/tcp.rb

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
# encoding: utf-8
2+
require "logstash/outputs/base"
3+
require "logstash/namespace"
4+
require "thread"
5+
6+
# Write events over a TCP socket.
7+
#
8+
# Each event json is separated by a newline.
9+
#
10+
# Can either accept connections from clients or connect to a server,
11+
# depending on `mode`.
12+
class LogStash::Outputs::Tcp < LogStash::Outputs::Base
13+
14+
config_name "tcp"
15+
milestone 2
16+
17+
default :codec, "json"
18+
19+
# When mode is `server`, the address to listen on.
20+
# When mode is `client`, the address to connect to.
21+
config :host, :validate => :string, :required => true
22+
23+
# When mode is `server`, the port to listen on.
24+
# When mode is `client`, the port to connect to.
25+
config :port, :validate => :number, :required => true
26+
27+
# When connect failed,retry interval in sec.
28+
config :reconnect_interval, :validate => :number, :default => 10
29+
30+
# Mode to operate in. `server` listens for client connections,
31+
# `client` connects to a server.
32+
config :mode, :validate => ["server", "client"], :default => "client"
33+
34+
# The format to use when writing events to the file. This value
35+
# supports any string and can include %{name} and other dynamic
36+
# strings.
37+
#
38+
# If this setting is omitted, the full json representation of the
39+
# event will be written as a single line.
40+
config :message_format, :validate => :string, :deprecated => true
41+
42+
class Client
43+
public
44+
def initialize(socket, logger)
45+
@socket = socket
46+
@logger = logger
47+
@queue = Queue.new
48+
end
49+
50+
public
51+
def run
52+
loop do
53+
begin
54+
@socket.write(@queue.pop)
55+
rescue => e
56+
@logger.warn("tcp output exception", :socket => @socket,
57+
:exception => e)
58+
break
59+
end
60+
end
61+
end # def run
62+
63+
public
64+
def write(msg)
65+
@queue.push(msg)
66+
end # def write
67+
end # class Client
68+
69+
public
70+
def register
71+
require "stud/try"
72+
if server?
73+
workers_not_supported
74+
75+
@logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}")
76+
@server_socket = TCPServer.new(@host, @port)
77+
@client_threads = []
78+
79+
@accept_thread = Thread.new(@server_socket) do |server_socket|
80+
loop do
81+
client_thread = Thread.start(server_socket.accept) do |client_socket|
82+
client = Client.new(client_socket, @logger)
83+
Thread.current[:client] = client
84+
client.run
85+
end
86+
@client_threads << client_thread
87+
end
88+
end
89+
90+
@codec.on_event do |payload|
91+
@client_threads.each do |client_thread|
92+
client_thread[:client].write(payload)
93+
end
94+
@client_threads.reject! {|t| !t.alive? }
95+
end
96+
else
97+
client_socket = nil
98+
@codec.on_event do |payload|
99+
begin
100+
client_socket = connect unless client_socket
101+
r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil)
102+
# don't expect any reads, but a readable socket might
103+
# mean the remote end closed, so read it and throw it away.
104+
# we'll get an EOFError if it happens.
105+
client_socket.sysread(16384) if r.any?
106+
107+
# Now send the payload
108+
client_socket.syswrite(payload) if w.any?
109+
rescue => e
110+
@logger.warn("tcp output exception", :host => @host, :port => @port,
111+
:exception => e, :backtrace => e.backtrace)
112+
client_socket.close rescue nil
113+
client_socket = nil
114+
sleep @reconnect_interval
115+
retry
116+
end
117+
end
118+
end
119+
end # def register
120+
121+
private
122+
def connect
123+
Stud::try do
124+
return TCPSocket.new(@host, @port)
125+
end
126+
end # def connect
127+
128+
private
129+
def server?
130+
@mode == "server"
131+
end # def server?
132+
133+
public
134+
def receive(event)
135+
return unless output?(event)
136+
137+
#if @message_format
138+
#output = event.sprintf(@message_format) + "\n"
139+
#else
140+
#output = event.to_hash.to_json + "\n"
141+
#end
142+
143+
@codec.encode(event)
144+
end # def receive
145+
end # class LogStash::Outputs::Tcp

logstash-output-tcp.gemspec

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
Gem::Specification.new do |s|
2+
3+
s.name = 'logstash-output-tcp'
4+
s.version = '0.1.0'
5+
s.licenses = ['Apache License (2.0)']
6+
s.summary = "Write events over a TCP socket."
7+
s.description = "Write events over a TCP socket."
8+
s.authors = ["Elasticsearch"]
9+
s.email = 'richard.pijnenburg@elasticsearch.com'
10+
s.homepage = "http://logstash.net/"
11+
s.require_paths = ["lib"]
12+
13+
# Files
14+
s.files = `git ls-files`.split($\)+::Dir.glob('vendor/*')
15+
16+
# Tests
17+
s.test_files = s.files.grep(%r{^(test|spec|features)/})
18+
19+
# Special flag to let us know this is actually a logstash plugin
20+
s.metadata = { "logstash_plugin" => "true", "group" => "output" }
21+
22+
# Gem dependencies
23+
s.add_runtime_dependency 'logstash', '>= 1.4.0', '< 2.0.0'
24+
25+
s.add_runtime_dependency 'logstash-codec-json'
26+
s.add_runtime_dependency 'stud'
27+
28+
end
29+

rakelib/publish.rake

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
require "gem_publisher"
2+
3+
desc "Publish gem to RubyGems.org"
4+
task :publish_gem do |t|
5+
gem_file = Dir.glob(File.expand_path('../*.gemspec',File.dirname(__FILE__))).first
6+
gem = GemPublisher.publish_if_updated(gem_file, :rubygems)
7+
puts "Published #{gem}" if gem
8+
end
9+

rakelib/vendor.rake

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
require "net/http"
2+
require "uri"
3+
require "digest/sha1"
4+
5+
def vendor(*args)
6+
return File.join("vendor", *args)
7+
end
8+
9+
directory "vendor/" => ["vendor"] do |task, args|
10+
mkdir task.name
11+
end
12+
13+
def fetch(url, sha1, output)
14+
15+
puts "Downloading #{url}"
16+
actual_sha1 = download(url, output)
17+
18+
if actual_sha1 != sha1
19+
fail "SHA1 does not match (expected '#{sha1}' but got '#{actual_sha1}')"
20+
end
21+
end # def fetch
22+
23+
def file_fetch(url, sha1)
24+
filename = File.basename( URI(url).path )
25+
output = "vendor/#{filename}"
26+
task output => [ "vendor/" ] do
27+
begin
28+
actual_sha1 = file_sha1(output)
29+
if actual_sha1 != sha1
30+
fetch(url, sha1, output)
31+
end
32+
rescue Errno::ENOENT
33+
fetch(url, sha1, output)
34+
end
35+
end.invoke
36+
37+
return output
38+
end
39+
40+
def file_sha1(path)
41+
digest = Digest::SHA1.new
42+
fd = File.new(path, "r")
43+
while true
44+
begin
45+
digest << fd.sysread(16384)
46+
rescue EOFError
47+
break
48+
end
49+
end
50+
return digest.hexdigest
51+
ensure
52+
fd.close if fd
53+
end
54+
55+
def download(url, output)
56+
uri = URI(url)
57+
digest = Digest::SHA1.new
58+
tmp = "#{output}.tmp"
59+
Net::HTTP.start(uri.host, uri.port, :use_ssl => (uri.scheme == "https")) do |http|
60+
request = Net::HTTP::Get.new(uri.path)
61+
http.request(request) do |response|
62+
fail "HTTP fetch failed for #{url}. #{response}" if [200, 301].include?(response.code)
63+
size = (response["content-length"].to_i || -1).to_f
64+
count = 0
65+
File.open(tmp, "w") do |fd|
66+
response.read_body do |chunk|
67+
fd.write(chunk)
68+
digest << chunk
69+
if size > 0 && $stdout.tty?
70+
count += chunk.bytesize
71+
$stdout.write(sprintf("\r%0.2f%%", count/size * 100))
72+
end
73+
end
74+
end
75+
$stdout.write("\r \r") if $stdout.tty?
76+
end
77+
end
78+
79+
File.rename(tmp, output)
80+
81+
return digest.hexdigest
82+
rescue SocketError => e
83+
puts "Failure while downloading #{url}: #{e}"
84+
raise
85+
ensure
86+
File.unlink(tmp) if File.exist?(tmp)
87+
end # def download
88+
89+
def untar(tarball, &block)
90+
require "archive/tar/minitar"
91+
tgz = Zlib::GzipReader.new(File.open(tarball))
92+
# Pull out typesdb
93+
tar = Archive::Tar::Minitar::Input.open(tgz)
94+
tar.each do |entry|
95+
path = block.call(entry)
96+
next if path.nil?
97+
parent = File.dirname(path)
98+
99+
mkdir_p parent unless File.directory?(parent)
100+
101+
# Skip this file if the output file is the same size
102+
if entry.directory?
103+
mkdir path unless File.directory?(path)
104+
else
105+
entry_mode = entry.instance_eval { @mode } & 0777
106+
if File.exists?(path)
107+
stat = File.stat(path)
108+
# TODO(sissel): Submit a patch to archive-tar-minitar upstream to
109+
# expose headers in the entry.
110+
entry_size = entry.instance_eval { @size }
111+
# If file sizes are same, skip writing.
112+
next if stat.size == entry_size && (stat.mode & 0777) == entry_mode
113+
end
114+
puts "Extracting #{entry.full_name} from #{tarball} #{entry_mode.to_s(8)}"
115+
File.open(path, "w") do |fd|
116+
# eof? check lets us skip empty files. Necessary because the API provided by
117+
# Archive::Tar::Minitar::Reader::EntryStream only mostly acts like an
118+
# IO object. Something about empty files in this EntryStream causes
119+
# IO.copy_stream to throw "can't convert nil into String" on JRuby
120+
# TODO(sissel): File a bug about this.
121+
while !entry.eof?
122+
chunk = entry.read(16384)
123+
fd.write(chunk)
124+
end
125+
#IO.copy_stream(entry, fd)
126+
end
127+
File.chmod(entry_mode, path)
128+
end
129+
end
130+
tar.close
131+
File.unlink(tarball) if File.file?(tarball)
132+
end # def untar
133+
134+
def ungz(file)
135+
136+
outpath = file.gsub('.gz', '')
137+
tgz = Zlib::GzipReader.new(File.open(file))
138+
begin
139+
File.open(outpath, "w") do |out|
140+
IO::copy_stream(tgz, out)
141+
end
142+
File.unlink(file)
143+
rescue
144+
File.unlink(outpath) if File.file?(outpath)
145+
raise
146+
end
147+
tgz.close
148+
end
149+
150+
desc "Process any vendor files required for this plugin"
151+
task "vendor" do |task, args|
152+
153+
@files.each do |file|
154+
download = file_fetch(file['url'], file['sha1'])
155+
if download =~ /.tar.gz/
156+
prefix = download.gsub('.tar.gz', '').gsub('vendor/', '')
157+
untar(download) do |entry|
158+
if !file['files'].nil?
159+
next unless file['files'].include?(entry.full_name.gsub(prefix, ''))
160+
out = entry.full_name.split("/").last
161+
end
162+
File.join('vendor', out)
163+
end
164+
elsif download =~ /.gz/
165+
ungz(download)
166+
end
167+
end
168+
169+
end

0 commit comments

Comments
 (0)