Skip to content

Commit 4409544

Browse files
committed
fix broken gzip file produced sometimes
1 parent 0c34403 commit 4409544

File tree

2 files changed

+87
-19
lines changed

2 files changed

+87
-19
lines changed

Diff for: lib/logstash/outputs/file.rb

+73-17
Original file line numberDiff line numberDiff line change
@@ -121,22 +121,53 @@ def multi_receive_encoded(events_and_encoded)
121121
# append to the file
122122
chunks.each {|chunk| fd.write(chunk) }
123123
end
124-
fd.flush unless @flusher && @flusher.alive?
124+
on_flush(fd, path) unless @flusher && @flusher.alive?
125125
end
126126

127127
close_stale_files
128128
end
129129
end
130130

131+
def on_flush(fd, path)
132+
fd.flush
133+
if @files[path][:isTempFile]
134+
copy_to_gzip(fd, path)
135+
end
136+
end
137+
138+
def copy_to_gzip(fd, path)
139+
zipfd = get_file(path)
140+
zipfd = Zlib::GzipWriter.new(zipfd)
141+
fd.seek(0, IO::SEEK_SET)
142+
data = fd.read
143+
fd.truncate(0)
144+
fd.seek(0, IO::SEEK_SET)
145+
if @write_behavior == "overwrite"
146+
zipfd.truncate(0)
147+
zipfd.seek(0, IO::SEEK_SET)
148+
end
149+
zipfd.write(data)
150+
zipfd.flush
151+
zipfd.to_io.flush
152+
zipfd.close
153+
end
154+
131155
def close
132156
@flusher.stop unless @flusher.nil?
133157
@io_mutex.synchronize do
134158
@logger.debug("Close: closing files")
135159

136-
@files.each do |path, fd|
160+
@files.each do |path, fileObj|
137161
begin
138-
fd.close
139-
@logger.debug("Closed file #{path}", :fd => fd)
162+
if fileObj[:isTempFile]
163+
copy_to_gzip(fileObj[:fd], path)
164+
end
165+
fileObj[:fd].close
166+
if fileObj[:isTempFile] && File.exist?(fileObj[:fd].path)
167+
File.delete(fileObj[:fd].path)
168+
@logger.debug("Deleted temp file ", :path => fileObj[:fd].path)
169+
end
170+
@logger.debug("Closed file #{fileObj[:fd]}", :fd => fileObj[:fd])
140171
rescue Exception => e
141172
@logger.error("Exception while flushing and closing files.", :exception => e)
142173
end
@@ -199,9 +230,12 @@ def flush_pending_files
199230
@io_mutex.synchronize do
200231
@logger.debug("Starting flush cycle")
201232

202-
@files.each do |path, fd|
203-
@logger.debug("Flushing file", :path => path, :fd => fd)
204-
fd.flush
233+
@files.each do |path, fileObj|
234+
@logger.debug("Flushing file", :path => path, :fd => fileObj[:fd])
235+
fileObj[:fd].flush
236+
if fileObj[:isTempFile]
237+
copy_to_gzip(fileObj[:fd], path)
238+
end
205239
end
206240
end
207241
rescue => e
@@ -215,15 +249,18 @@ def close_stale_files
215249
return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval
216250

217251
@logger.debug("Starting stale files cleanup cycle", :files => @files)
218-
inactive_files = @files.select { |path, fd| not fd.active }
252+
inactive_files = @files.select { |path, fileObj| not fileObj[:fd].active }
219253
@logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files)
220-
inactive_files.each do |path, fd|
254+
inactive_files.each do |path, fileObj|
221255
@logger.info("Closing file %s" % path)
222-
fd.close
256+
fileObj[:fd].close
257+
if fileObj[:isTempFile] && File.exist?(fileObj[:fd].path)
258+
File.delete(fileObj[:fd].path)
259+
end
223260
@files.delete(path)
224261
end
225262
# mark all files as inactive, a call to write will mark them as active again
226-
@files.each { |path, fd| fd.active = false }
263+
@files.each { |path, fileObj| fileObj[:fd].active = false }
227264
@last_stale_cleanup_cycle = now
228265
end
229266

@@ -236,21 +273,41 @@ def deleted?(path)
236273
end
237274

238275
def open(path)
276+
originalPath = path
239277
if !deleted?(path) && cached?(path)
240-
return @files[path]
278+
return @files[path][:fd]
241279
end
242280

243281
if deleted?(path)
244282
if @create_if_deleted
245283
@logger.debug("Required path was deleted, creating the file again", :path => path)
246284
@files.delete(path)
247285
else
248-
return @files[path] if cached?(path)
286+
return @files[path][:fd] if cached?(path)
287+
end
288+
end
289+
290+
#Fix for broken gzip issue.
291+
if gzip
292+
tmpfile = java.io.File.createTempFile("outfile-", "-temp");
293+
path = tmpfile.path
294+
#create file at original path also, so that temp file is not created again
295+
make_dir(originalPath)
296+
gzFile = get_file(originalPath)
297+
#if gzFile is fifo type, file writer object is returned that needs to closed.
298+
if gzFile.class == Java::JavaIo::FileWriter
299+
gzFile.close
249300
end
250301
end
251302

252303
@logger.info("Opening file", :path => path)
304+
make_dir(path)
305+
fd = get_file(path)
306+
@files[originalPath] = {:fd => IOWriter.new(fd), :isTempFile => gzip}
307+
return @files[originalPath][:fd]
308+
end
253309

310+
def make_dir(path)
254311
dir = File.dirname(path)
255312
if !Dir.exist?(dir)
256313
@logger.info("Creating directory", :directory => dir)
@@ -260,7 +317,9 @@ def open(path)
260317
FileUtils.mkdir_p(dir)
261318
end
262319
end
320+
end
263321

322+
def get_file(path)
264323
# work around a bug opening fifos (bug JRUBY-6280)
265324
stat = File.stat(path) rescue nil
266325
if stat && stat.ftype == "fifo"
@@ -272,10 +331,7 @@ def open(path)
272331
fd = File.new(path, "a+")
273332
end
274333
end
275-
if gzip
276-
fd = Zlib::GzipWriter.new(fd)
277-
end
278-
@files[path] = IOWriter.new(fd)
334+
return fd
279335
end
280336

281337
##

Diff for: spec/outputs/file_spec.rb

+14-2
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,21 @@
6969

7070
agent do
7171
line_num = 0
72+
global_events = []
7273
# Now check all events for order and correctness.
73-
events = Zlib::GzipReader.open(tmp_file.path).map {|line| LogStash::Event.new(LogStash::Json.load(line)) }
74-
sorted = events.sort_by {|e| e.get("sequence")}
74+
File.open(tmp_file.path) do |file|
75+
zio = file
76+
loop do
77+
io = Zlib::GzipReader.new(zio)
78+
events = io.map {|line| LogStash::Event.new(LogStash::Json.load(line)) }
79+
global_events = global_events + events
80+
unused = io.unused
81+
io.finish
82+
break if unused.nil?
83+
zio.pos -= unused.length
84+
end
85+
end
86+
sorted = global_events.sort_by {|e| e.get("sequence")}
7587
sorted.each do |event|
7688
insist {event.get("message")} == "hello world"
7789
insist {event.get("sequence")} == line_num

0 commit comments

Comments
 (0)