diff --git a/lib/logstash/outputs/file.rb b/lib/logstash/outputs/file.rb index c644ad3..0a2a68e 100644 --- a/lib/logstash/outputs/file.rb +++ b/lib/logstash/outputs/file.rb @@ -121,13 +121,37 @@ def multi_receive_encoded(events_and_encoded) # append to the file chunks.each {|chunk| fd.write(chunk) } end - fd.flush unless @flusher && @flusher.alive? + on_flush(fd, path) unless @flusher && @flusher.alive? end close_stale_files end end + def on_flush(fd, path) + fd.flush + if fd.is_temp + copy_to_gzip(fd, path) + end + end + + def copy_to_gzip(fd, path) + zipfd = get_file(path) + zipfd = Zlib::GzipWriter.new(zipfd) + fd.seek(0, IO::SEEK_SET) + data = fd.read + fd.truncate(0) + fd.seek(0, IO::SEEK_SET) + if @write_behavior == "overwrite" + zipfd.to_io.truncate(0) + zipfd.to_io.seek(0, IO::SEEK_SET) + end + zipfd.write(data) + zipfd.flush + zipfd.to_io.flush + zipfd.close + end + def close @flusher.stop unless @flusher.nil? @io_mutex.synchronize do @@ -135,8 +159,15 @@ def close @files.each do |path, fd| begin + if fd.is_temp + copy_to_gzip(fd, path) + end fd.close - @logger.debug("Closed file #{path}", :fd => fd) + if fd.is_temp && File.exist?(fd.path) + File.delete(fd.path) + @logger.debug("Deleted temp file ", :path => fd.path) + end + @logger.debug("Closed file #{fd}", :fd => fd) rescue Exception => e @logger.error("Exception while flushing and closing files.", :exception => e) end @@ -202,6 +233,9 @@ def flush_pending_files @files.each do |path, fd| @logger.debug("Flushing file", :path => path, :fd => fd) fd.flush + if fd.is_temp + copy_to_gzip(fd, path) + end end end rescue => e @@ -220,6 +254,9 @@ def close_stale_files inactive_files.each do |path, fd| @logger.info("Closing file %s" % path) fd.close + if fd.is_temp && File.exist?(fd.path) + File.delete(fd.path) + end @files.delete(path) end # mark all files as inactive, a call to write will mark them as active again @@ -236,6 +273,7 @@ def deleted?(path) end def open(path) + originalPath = path if !deleted?(path) && cached?(path) return @files[path] end @@ -249,8 +287,27 @@ def open(path) end end + #Fix for broken gzip issue. + if gzip + tmpfile = java.io.File.createTempFile("outfile-", "-temp"); + path = tmpfile.path + #create file at original path also, so that temp file is not created again + make_dir(originalPath) + gzFile = get_file(originalPath) + #if gzFile is fifo type, file writer object is returned that needs to closed. + if gzFile.class == Java::JavaIo::FileWriter + gzFile.close + end + end + @logger.info("Opening file", :path => path) + make_dir(path) + fd = get_file(path) + @files[originalPath] = IOWriter.new(fd, gzip) + return @files[originalPath] + end + def make_dir(path) dir = File.dirname(path) if !Dir.exist?(dir) @logger.info("Creating directory", :directory => dir) @@ -260,7 +317,9 @@ def open(path) FileUtils.mkdir_p(dir) end end + end + def get_file(path) # work around a bug opening fifos (bug JRUBY-6280) stat = File.stat(path) rescue nil if stat && stat.ftype == "fifo" @@ -272,10 +331,7 @@ def open(path) fd = File.new(path, "a+") end end - if gzip - fd = Zlib::GzipWriter.new(fd) - end - @files[path] = IOWriter.new(fd) + return fd end ## @@ -356,8 +412,9 @@ def run class IOWriter attr_accessor :active - def initialize(io) + def initialize(io, is_temp) @io = io + @is_temp = is_temp end def write(*args) @@ -372,6 +429,10 @@ def flush end end + def is_temp + return @is_temp + end + def method_missing(method_name, *args, &block) if @io.respond_to?(method_name) diff --git a/spec/outputs/file_spec.rb b/spec/outputs/file_spec.rb index e1342b1..72fcf92 100644 --- a/spec/outputs/file_spec.rb +++ b/spec/outputs/file_spec.rb @@ -69,9 +69,21 @@ agent do line_num = 0 + global_events = [] # Now check all events for order and correctness. - events = Zlib::GzipReader.open(tmp_file.path).map {|line| LogStash::Event.new(LogStash::Json.load(line)) } - sorted = events.sort_by {|e| e.get("sequence")} + File.open(tmp_file.path) do |file| + zio = file + loop do + io = Zlib::GzipReader.new(zio) + events = io.map {|line| LogStash::Event.new(LogStash::Json.load(line)) } + global_events = global_events + events + unused = io.unused + io.finish + break if unused.nil? + zio.pos -= unused.length + end + end + sorted = global_events.sort_by {|e| e.get("sequence")} sorted.each do |event| insist {event.get("message")} == "hello world" insist {event.get("sequence")} == line_num