|
2 | 2 | require "logstash/outputs/base"
|
3 | 3 | require "logstash/namespace"
|
4 | 4 | require "thread"
|
| 5 | +require "logstash/util/socket_peer" |
5 | 6 |
|
6 | 7 | # Write events over a TCP socket.
|
7 | 8 | #
|
@@ -65,12 +66,22 @@ def register
|
65 | 66 | workers_not_supported
|
66 | 67 |
|
67 | 68 | @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}")
|
68 |
| - @server_socket = TCPServer.new(@host, @port) |
| 69 | + begin |
| 70 | + @server_socket = TCPServer.new(@host, @port) |
| 71 | + rescue Errno::EADDRINUSE |
| 72 | + @logger.error("Could not start TCP server: Address in use", |
| 73 | + :host => @host, :port => @port) |
| 74 | + raise |
| 75 | + end |
69 | 76 | @client_threads = []
|
70 | 77 |
|
71 | 78 | @accept_thread = Thread.new(@server_socket) do |server_socket|
|
72 | 79 | loop do
|
73 | 80 | Thread.start(server_socket.accept) do |client_socket|
|
| 81 | + # monkeypatch a 'peer' method onto the socket. |
| 82 | + client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } |
| 83 | + @logger.debug("Accepted connection", :client => client_socket.peer, |
| 84 | + :server => "#{@host}:#{@port}") |
74 | 85 | client = Client.new(client_socket, @logger)
|
75 | 86 | Thread.current[:client] = client
|
76 | 87 | @client_threads << Thread.current
|
@@ -113,7 +124,10 @@ def register
|
113 | 124 | private
|
114 | 125 | def connect
|
115 | 126 | Stud::try do
|
116 |
| - return TCPSocket.new(@host, @port) |
| 127 | + client_socket = TCPSocket.new(@host, @port) |
| 128 | + client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } |
| 129 | + @logger.debug("Opened connection", :client => "#{client_socket.peer}") |
| 130 | + return client_socket |
117 | 131 | end
|
118 | 132 | end # def connect
|
119 | 133 |
|
|
0 commit comments