@@ -77,6 +77,8 @@ class LogStash::Outputs::File < LogStash::Outputs::Base
77
77
default :codec , "json_lines"
78
78
79
79
def register
80
+
81
+
80
82
require "fileutils" # For mkdir_p
81
83
82
84
@files = { }
@@ -119,24 +121,49 @@ def multi_receive_encoded(events_and_encoded)
119
121
fd . write ( chunks . last )
120
122
else
121
123
# append to the file
122
- chunks . each { |chunk | fd . write ( chunk ) }
124
+ chunks . each { |chunk |
125
+ fd . write ( chunk )
126
+ }
123
127
end
124
- fd . flush unless @flusher && @flusher . alive?
128
+ onflush ( fd , path ) unless @flusher && @flusher . alive?
125
129
end
126
-
127
130
close_stale_files
128
131
end
129
132
end
130
133
134
+ def onflush ( fd , path )
135
+ fd . flush
136
+ if @files [ path ] [ :isTempFile ]
137
+ copy_to_gzip ( fd , path )
138
+ end
139
+ end
140
+
141
+ def copy_to_gzip ( fd , path )
142
+ zipfd = getFile ( path )
143
+ zipfd = Zlib ::GzipWriter . new ( zipfd )
144
+ fd . seek ( 0 , IO ::SEEK_SET )
145
+ data = fd . read
146
+ fd . truncate ( 0 )
147
+ fd . seek ( 0 , IO ::SEEK_SET )
148
+ if @write_behavior == "overwrite"
149
+ zipfd . truncate ( 0 )
150
+ zipfd . seek ( 0 , IO ::SEEK_SET )
151
+ end
152
+ zipfd . write ( data )
153
+ zipfd . flush
154
+ zipfd . to_io . flush
155
+ zipfd . close
156
+ end
157
+
131
158
def close
132
159
@flusher . stop unless @flusher . nil?
133
160
@io_mutex . synchronize do
134
161
@logger . debug ( "Close: closing files" )
135
162
136
- @files . each do |path , fd |
163
+ @files . each do |path , fileObj |
137
164
begin
138
- fd . close
139
- @logger . debug ( "Closed file #{ path } " , :fd => fd )
165
+ fileObj [ :fd ] . close
166
+ @logger . debug ( "Closed file #{ fileObj [ :fd ] } " , :fd => fileObj [ :fd ] )
140
167
rescue Exception => e
141
168
@logger . error ( "Exception while flushing and closing files." , :exception => e )
142
169
end
@@ -199,9 +226,12 @@ def flush_pending_files
199
226
@io_mutex . synchronize do
200
227
@logger . debug ( "Starting flush cycle" )
201
228
202
- @files . each do |path , fd |
203
- @logger . debug ( "Flushing file" , :path => path , :fd => fd )
204
- fd . flush
229
+ @files . each do |path , fileObj |
230
+ @logger . debug ( "Flushing file" , :path => path , :fd => fileObj [ :fd ] )
231
+ fileObj [ :fd ] . flush
232
+ if fileObj [ :isTempFile ]
233
+ copy_to_gzip ( fileObj [ :fd ] , path )
234
+ end
205
235
end
206
236
end
207
237
rescue => e
@@ -215,15 +245,18 @@ def close_stale_files
215
245
return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval
216
246
217
247
@logger . debug ( "Starting stale files cleanup cycle" , :files => @files )
218
- inactive_files = @files . select { |path , fd | not fd . active }
248
+ inactive_files = @files . select { |path , fileObj | not fileObj [ :fd ] . active }
219
249
@logger . debug ( "%d stale files found" % inactive_files . count , :inactive_files => inactive_files )
220
- inactive_files . each do |path , fd |
250
+ inactive_files . each do |path , fileObj |
221
251
@logger . info ( "Closing file %s" % path )
222
- fd . close
252
+ fileObj [ :fd ] . close
253
+ if fileObj [ :isTempFile ] && File . exist? ( fileObj [ :fd ] . path )
254
+ File . delete ( fileObj [ :fd ] . path )
255
+ end
223
256
@files . delete ( path )
224
257
end
225
258
# mark all files as inactive, a call to write will mark them as active again
226
- @files . each { |path , fd | fd . active = false }
259
+ @files . each { |path , fileObj | fileObj [ :fd ] . active = false }
227
260
@last_stale_cleanup_cycle = now
228
261
end
229
262
@@ -236,21 +269,41 @@ def deleted?(path)
236
269
end
237
270
238
271
def open ( path )
272
+ originalPath = path
239
273
if !deleted? ( path ) && cached? ( path )
240
- return @files [ path ]
274
+ return @files [ path ] [ :fd ]
241
275
end
242
276
243
277
if deleted? ( path )
244
278
if @create_if_deleted
245
279
@logger . debug ( "Required path was deleted, creating the file again" , :path => path )
246
280
@files . delete ( path )
247
281
else
248
- return @files [ path ] if cached? ( path )
282
+ return @files [ path ] [ :fd ] if cached? ( path )
283
+ end
284
+ end
285
+
286
+ #Fix for broken gzip issue.
287
+ if gzip
288
+ tmpfile = java . io . File . createTempFile ( "outfile-" , "-temp" ) ;
289
+ path = tmpfile . path
290
+ #create file at original path also, so that temp file is not created again
291
+ makeDir ( originalPath )
292
+ gzFile = getFile ( originalPath )
293
+ #if gzFile is fifo type, file writer object is returned that needs to closed.
294
+ if gzFile . class == Java ::JavaIo ::FileWriter
295
+ gzFile . close
249
296
end
250
297
end
251
298
252
299
@logger . info ( "Opening file" , :path => path )
300
+ makeDir ( path )
301
+ fd = getFile ( path )
302
+ @files [ originalPath ] = { :fd => IOWriter . new ( fd ) , :isTempFile => gzip }
303
+ return @files [ originalPath ] [ :fd ]
304
+ end
253
305
306
+ def makeDir ( path )
254
307
dir = File . dirname ( path )
255
308
if !Dir . exist? ( dir )
256
309
@logger . info ( "Creating directory" , :directory => dir )
@@ -260,7 +313,9 @@ def open(path)
260
313
FileUtils . mkdir_p ( dir )
261
314
end
262
315
end
316
+ end
263
317
318
+ def getFile ( path )
264
319
# work around a bug opening fifos (bug JRUBY-6280)
265
320
stat = File . stat ( path ) rescue nil
266
321
if stat && stat . ftype == "fifo"
@@ -272,10 +327,7 @@ def open(path)
272
327
fd = File . new ( path , "a+" )
273
328
end
274
329
end
275
- if gzip
276
- fd = Zlib ::GzipWriter . new ( fd )
277
- end
278
- @files [ path ] = IOWriter . new ( fd )
330
+ return fd
279
331
end
280
332
281
333
##
@@ -380,4 +432,4 @@ def method_missing(method_name, *args, &block)
380
432
super
381
433
end
382
434
end
383
- end
435
+ end
0 commit comments