Skip to content

Commit 285371b

Browse files
committed
fix broken gzip file produced sometimes
1 parent 0c34403 commit 285371b

File tree

2 files changed

+82
-9
lines changed

2 files changed

+82
-9
lines changed

Diff for: lib/logstash/outputs/file.rb

+68-7
Original file line numberDiff line numberDiff line change
@@ -121,22 +121,53 @@ def multi_receive_encoded(events_and_encoded)
121121
# append to the file
122122
chunks.each {|chunk| fd.write(chunk) }
123123
end
124-
fd.flush unless @flusher && @flusher.alive?
124+
on_flush(fd, path) unless @flusher && @flusher.alive?
125125
end
126126

127127
close_stale_files
128128
end
129129
end
130130

131+
def on_flush(fd, path)
132+
fd.flush
133+
if fd.is_temp
134+
copy_to_gzip(fd, path)
135+
end
136+
end
137+
138+
def copy_to_gzip(fd, path)
139+
zipfd = get_file(path)
140+
zipfd = Zlib::GzipWriter.new(zipfd)
141+
fd.seek(0, IO::SEEK_SET)
142+
data = fd.read
143+
fd.truncate(0)
144+
fd.seek(0, IO::SEEK_SET)
145+
if @write_behavior == "overwrite"
146+
zipfd.to_io.truncate(0)
147+
zipfd.to_io.seek(0, IO::SEEK_SET)
148+
end
149+
zipfd.write(data)
150+
zipfd.flush
151+
zipfd.to_io.flush
152+
zipfd.close
153+
end
154+
131155
def close
132156
@flusher.stop unless @flusher.nil?
133157
@io_mutex.synchronize do
134158
@logger.debug("Close: closing files")
135159

136160
@files.each do |path, fd|
137161
begin
162+
if fd.is_temp
163+
copy_to_gzip(fd, path)
164+
end
138165
fd.close
139-
@logger.debug("Closed file #{path}", :fd => fd)
166+
if fd.is_temp && File.exist?(fd.path)
167+
File.delete(fd.path)
168+
@logger.debug("Deleted temp file ", :path => fd.path)
169+
end
170+
@logger.debug("Closed file #{fd}", :fd => fd)
140171
rescue Exception => e
141172
@logger.error("Exception while flushing and closing files.", :exception => e)
142173
end
@@ -202,6 +233,9 @@ def flush_pending_files
202233
@files.each do |path, fd|
203234
@logger.debug("Flushing file", :path => path, :fd => fd)
204235
fd.flush
236+
if fd.is_temp
237+
copy_to_gzip(fd, path)
238+
end
205239
end
206240
end
207241
rescue => e
@@ -220,6 +254,9 @@ def close_stale_files
220254
inactive_files.each do |path, fd|
221255
@logger.info("Closing file %s" % path)
222256
fd.close
257+
if fd.is_temp && File.exist?(fd.path)
258+
File.delete(fd.path)
259+
end
223260
@files.delete(path)
224261
end
225262
# mark all files as inactive, a call to write will mark them as active again
@@ -236,6 +273,7 @@ def deleted?(path)
236273
end
237274

238275
def open(path)
276+
originalPath = path
239277
if !deleted?(path) && cached?(path)
240278
return @files[path]
241279
end
@@ -249,8 +287,27 @@ def open(path)
249287
end
250288
end
251289

290+
#Fix for broken gzip issue.
291+
if gzip
292+
tmpfile = java.io.File.createTempFile("outfile-", "-temp");
293+
path = tmpfile.path
294+
#create file at original path also, so that temp file is not created again
295+
make_dir(originalPath)
296+
gzFile = get_file(originalPath)
297+
#if gzFile is fifo type, file writer object is returned that needs to closed.
298+
if gzFile.class == Java::JavaIo::FileWriter
299+
gzFile.close
300+
end
301+
end
302+
252303
@logger.info("Opening file", :path => path)
304+
make_dir(path)
305+
fd = get_file(path)
306+
@files[originalPath] = IOWriter.new(fd, gzip)
307+
return @files[originalPath]
308+
end
253309

310+
def make_dir(path)
254311
dir = File.dirname(path)
255312
if !Dir.exist?(dir)
256313
@logger.info("Creating directory", :directory => dir)
@@ -260,7 +317,9 @@ def open(path)
260317
FileUtils.mkdir_p(dir)
261318
end
262319
end
320+
end
263321

322+
def get_file(path)
264323
# work around a bug opening fifos (bug JRUBY-6280)
265324
stat = File.stat(path) rescue nil
266325
if stat && stat.ftype == "fifo"
@@ -272,10 +331,7 @@ def open(path)
272331
fd = File.new(path, "a+")
273332
end
274333
end
275-
if gzip
276-
fd = Zlib::GzipWriter.new(fd)
277-
end
278-
@files[path] = IOWriter.new(fd)
334+
return fd
279335
end
280336

281337
##
@@ -356,8 +412,9 @@ def run
356412
class IOWriter
357413
attr_accessor :active
358414

359-
def initialize(io)
415+
def initialize(io, is_temp)
360416
@io = io
417+
@is_temp = is_temp
361418
end
362419

363420
def write(*args)
@@ -372,6 +429,10 @@ def flush
372429
end
373430
end
374431

432+
def is_temp
433+
return @is_temp
434+
end
435+
375436
def method_missing(method_name, *args, &block)
376437
if @io.respond_to?(method_name)
377438

Diff for: spec/outputs/file_spec.rb

+14-2
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,21 @@
6969

7070
agent do
7171
line_num = 0
72+
global_events = []
7273
# Now check all events for order and correctness.
73-
events = Zlib::GzipReader.open(tmp_file.path).map {|line| LogStash::Event.new(LogStash::Json.load(line)) }
74-
sorted = events.sort_by {|e| e.get("sequence")}
74+
File.open(tmp_file.path) do |file|
75+
zio = file
76+
loop do
77+
io = Zlib::GzipReader.new(zio)
78+
events = io.map {|line| LogStash::Event.new(LogStash::Json.load(line)) }
79+
global_events = global_events + events
80+
unused = io.unused
81+
io.finish
82+
break if unused.nil?
83+
zio.pos -= unused.length
84+
end
85+
end
86+
sorted = global_events.sort_by {|e| e.get("sequence")}
7587
sorted.each do |event|
7688
insist {event.get("message")} == "hello world"
7789
insist {event.get("sequence")} == line_num

0 commit comments

Comments
 (0)