forked from logstash-plugins/logstash-output-tcp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtcp.rb
132 lines (112 loc) · 3.47 KB
/
tcp.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
require "thread"
# Write events over a TCP socket.
#
# Each event json is separated by a newline.
#
# Can either accept connections from clients or connect to a server,
# depending on `mode`.
class LogStash::Outputs::Tcp < LogStash::Outputs::Base
config_name "tcp"
default :codec, "json"
# When mode is `server`, the address to listen on.
# When mode is `client`, the address to connect to.
config :host, :validate => :string, :required => true
# When mode is `server`, the port to listen on.
# When mode is `client`, the port to connect to.
config :port, :validate => :number, :required => true
# When connect failed,retry interval in sec.
config :reconnect_interval, :validate => :number, :default => 10
# Mode to operate in. `server` listens for client connections,
# `client` connects to a server.
config :mode, :validate => ["server", "client"], :default => "client"
declare_workers_not_supported!
class Client
public
def initialize(socket, logger)
@socket = socket
@logger = logger
@queue = Queue.new
end
public
def run
loop do
begin
@socket.write(@queue.pop)
rescue => e
@logger.warn("tcp output exception", :socket => @socket,
:exception => e)
break
end
end
end # def run
public
def write(msg)
@queue.push(msg)
end # def write
end # class Client
public
def register
require "socket"
require "stud/try"
workers_not_supported
if server?
@logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}")
@server_socket = TCPServer.new(@host, @port)
@client_threads = []
@accept_thread = Thread.new(@server_socket) do |server_socket|
loop do
client_thread = Thread.start(server_socket.accept) do |client_socket|
client = Client.new(client_socket, @logger)
Thread.current[:client] = client
client.run
end
@client_threads << client_thread
end
end
@codec.on_event do |event, payload|
@client_threads.each do |client_thread|
client_thread[:client].write(payload)
end
@client_threads.reject! {|t| !t.alive? }
end
else
client_socket = nil
@codec.on_event do |event, payload|
begin
client_socket = connect unless client_socket
r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil)
# don't expect any reads, but a readable socket might
# mean the remote end closed, so read it and throw it away.
# we'll get an EOFError if it happens.
client_socket.sysread(16384) if r.any?
# Now send the payload
client_socket.syswrite(payload) if w.any?
rescue => e
@logger.warn("tcp output exception", :host => @host, :port => @port,
:exception => e, :backtrace => e.backtrace)
client_socket.close rescue nil
client_socket = nil
sleep @reconnect_interval
retry
end
end
end
end # def register
private
def connect
Stud::try do
return TCPSocket.new(@host, @port)
end
end # def connect
private
def server?
@mode == "server"
end # def server?
public
def receive(event)
@codec.encode(event)
end # def receive
end # class LogStash::Outputs::Tcp