diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 0fa418c..5c65a27 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -50,6 +50,8 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|Yes | <> |<>|No +| <> |<>|No +| <> |<>|No |======================================================================= Also see <> for a list of options supported by all diff --git a/lib/logstash/outputs/file.rb b/lib/logstash/outputs/file.rb index c644ad3..438ff0e 100644 --- a/lib/logstash/outputs/file.rb +++ b/lib/logstash/outputs/file.rb @@ -74,6 +74,40 @@ class LogStash::Outputs::File < LogStash::Outputs::Base # recent event will appear in the file. config :write_behavior, :validate => [ "overwrite", "append" ], :default => "append" + # Size based file rotation + # + # Set the filesize in `bytes` after which the file is automatically rotated. + # The rotation automatically appends a number ending .0, .1, .2, .3 ... to the + # file name. + # + # The current rotation number is evaluated dyamically by scanning the directory, + # use either `max_file_rotation` or a date based file name pattern to avoid + # performance issues due to large amount of files to be moved. + # Files ending with .0.gz .1.gz ... will be deteced automatially to intigrate with + # log compression performed by other tools + # + # If set to `file_rotation_size => 0` no rotation will be performed + config :file_rotation_size, :validate => :number, :default => 0 + + # Max number of rotations to keep + # + # Set the maximum number of rotation for each logfile to keep. The deletion + # of out-dated files is performed after each rotation. + # Example: `"max_file_rotations" => 3` will allow up to `4` files + # `/path/to/logfile`, `/path/to/logfile.0`,`/path/to/logfile.1`,`/path/to/logfile.2`, + # + # If set to `max_file_rotations => 0` no cleanup will be performed + # If `file_rotation_size => 0` this setting will be ignored + config :max_file_rotations, :validate => :number, :default => 0 + + # Keep file extension with log rotation + # + # Set whether the file extension, segment of the filename after the last `.`, should + # be preserved when rotating logfiles + # Example: `"keep_file_extension" => true` will preserve the extension + # `/path/to/logfile.log`, `/path/to/logfile.0.log`,`/path/to/logfile.1.log`,`/path/to/logfile.2.log ...`, + config :keep_file_extension, :validate => :boolean, :default => false + default :codec, "json_lines" def register @@ -85,6 +119,7 @@ def register @path = File.expand_path(path) validate_path + validate_file_rotation_settings if path_with_field_ref? @file_root = extract_file_root @@ -153,6 +188,17 @@ def validate_path end end + def validate_file_rotation_settings + if (file_rotation_size < 0) + @logger.error("File: The file_rotation_size must not be a negative number", :file_rotation_size => @file_rotation_size) + raise LogStash::ConfigurationError.new("The file_rotation_size must not be a negative number.") + end + if (max_file_rotations < 0) + @logger.error("File: The max_file_rotations must not be a negative number", :max_file_rotations => @max_file_rotations) + raise LogStash::ConfigurationError.new("Setting max_file_rotations must not be a negative number.") + end + end + def root_directory parts = @path.split(File::SEPARATOR).select { |item| !item.empty? } if Gem.win_platform? @@ -170,17 +216,95 @@ def inside_file_root?(log_path) def event_path(event) file_output_path = generate_filepath(event) + + rotate_log_file(file_output_path) + if path_with_field_ref? && !inside_file_root?(file_output_path) @logger.warn("File: the event tried to write outside the files root, writing the event to the failure file", :event => event, :filename => @failure_path) file_output_path = @failure_path elsif !@create_if_deleted && deleted?(file_output_path) file_output_path = @failure_path end + @logger.debug("File, writing event to file.", :filename => file_output_path) file_output_path end + def cleanup_rotated_files(file_output_path) + return unless max_file_rotations > 0 + + fileName = get_rotated_output_file_name(file_output_path, max_file_rotations, false) + if File.exist?(fileName) + File.unlink(fileName) + @logger.info("Deleted rotated file: #{fileName}") + elsif + File.exist?("#{fileName}.gz") + File.unlink("#{fileName}.gz") + @logger.info("Deleted rotated file: #{fileName}.gz") + end + end + + def get_rotated_output_file_name(filename, rotation, compressed) + newname = filename + if (keep_file_extension) + newname = "#{File.dirname(filename)}/#{File.basename(filename, ".*")}.#{rotation}#{File.extname(filename)}" + else + newname = "#{filename}.#{rotation}" + end + if compressed + newname = "#{newname}.gz" + else + newname = "#{newname}" + end + return newname + end + + + def rotate_log_file(file_output_path) + return unless file_rotation_size > 0 + @io_mutex.synchronize do + # Check current size + return unless (File.exist?(file_output_path) && File.stat(file_output_path).size > file_rotation_size) + + cnt = 0 + while File.exist?(get_rotated_output_file_name(file_output_path, cnt, false)) or File.exist?(get_rotated_output_file_name(file_output_path, cnt, true)) + cnt += 1 + end + + # Flush file + if (@files.include?(file_output_path)) + @logger.debug("Flush and close file: #{file_output_path}") + @files[file_output_path].flush + @files[file_output_path].close + @files.delete(file_output_path) + end + + until cnt == 0 + if File.exist?(get_rotated_output_file_name(file_output_path, cnt - 1, false)) + @logger.debug("Move file: #{get_rotated_output_file_name(file_output_path, cnt - 1, false)} => #{get_rotated_output_file_name(file_output_path, cnt , false)}") + File.rename( + get_rotated_output_file_name(file_output_path, cnt - 1, false), + get_rotated_output_file_name(file_output_path, cnt, false)) + elsif File.exist?(get_rotated_output_file_name(file_output_path, cnt - 1, true)) + @logger.debug("Move file: #{get_rotated_output_file_name(file_output_path, cnt - 1, true)} => #{get_rotated_output_file_name(file_output_path, cnt, true)}") + File.rename( + get_rotated_output_file_name(file_output_path, cnt - 1, true), + get_rotated_output_file_name(file_output_path, cnt, true)) + end + cnt -= 1 + end + if (File.exist?(file_output_path)) + @logger.debug("Move file: #{file_output_path} => #{get_rotated_output_file_name(file_output_path, 0, false)}") + File.rename(file_output_path, get_rotated_output_file_name(file_output_path, 0, false)) + end + + cleanup_rotated_files(file_output_path) + @logger.info("Finished file rotation") + end + end + + def generate_filepath(event) event.sprintf(@path) end diff --git a/logstash-output-file.gemspec b/logstash-output-file.gemspec index 6e705a4..bf494a5 100644 --- a/logstash-output-file.gemspec +++ b/logstash-output-file.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-file' - s.version = '4.2.6' + s.version = '4.3.0' s.licenses = ['Apache License (2.0)'] s.summary = "Writes events to files on disk" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/outputs/file_spec.rb b/spec/outputs/file_spec.rb index e3a2e77..3a18cca 100644 --- a/spec/outputs/file_spec.rb +++ b/spec/outputs/file_spec.rb @@ -83,6 +83,249 @@ end end + describe "Size based file rotation" do + describe "create files of configured maximum size (5% tolerance)" do + tmp_file = Tempfile.new('logstash-spec-output-file') + event_count = 300000 + file_rotation_size = 1024*1024 + + config <<-CONFIG + input { + generator { + message => "hello world" + count => #{event_count} + type => "generator" + } + } + output { + file { + path => "#{tmp_file.path}" + file_rotation_size => #{file_rotation_size} + } + } + CONFIG + + agent do + line_num = 0 + # Check that files are within 5% size tolerance + max_tolarated_size = (file_rotation_size + (file_rotation_size * 0.05)) + cnt = 0 + + # puts "Check: #{File.stat("#{tmp_file.path}").size} < #{max_tolarated_size}" + insist { File.stat("#{tmp_file.path}").size } < max_tolarated_size + + while File.exists?("#{tmp_file.path}.#{cnt}") do + actual_size = File.stat("#{tmp_file.path}.#{cnt}").size + # puts "Actual size: #{actual_size} Max: #{max_tolarated_size} Configured: #{file_rotation_size} ratio: #{((((actual_size.to_f/file_rotation_size.to_f))*100)-100).round(3)}%" + insist { actual_size } < max_tolarated_size + cnt += 1 + end + + Dir.glob("#{tmp_file.path}.*").each do |file| + File.delete(file) + end + end # agent + end + + describe "no events are lost during file rotation" do + tmp_file = Tempfile.new('logstash-spec-output-file') + event_count = 300000 + file_rotation_size = 1024*1024 + + config <<-CONFIG + input { + generator { + message => "hello world" + count => #{event_count} + type => "generator" + } + } + output { + file { + path => "#{tmp_file.path}" + file_rotation_size => #{file_rotation_size} + } + } + CONFIG + + agent do + line_num = 0 + max_tolarated_size = (file_rotation_size + (file_rotation_size * 0.05)) + cnt = 0 + line_num = 0 + + # Check that all events are logged + events = File.open("#{tmp_file.path}").map {|line| LogStash::Event.new(LogStash::Json.load(line))} + sorted = events.sort_by {|e| e.get('sequence')} + sorted.each do |event| + line_num += 1 + end + while File.exists?("#{tmp_file.path}.#{cnt}") do + events = File.open("#{tmp_file.path}.#{cnt}").map {|line| LogStash::Event.new(LogStash::Json.load(line))} + sorted = events.sort_by {|e| e.get('sequence')} + sorted.each do |event| + line_num += 1 + end + cnt += 1 + end + + insist {line_num} == event_count + Dir.glob("#{tmp_file.path}.*").each do |file| + File.delete(file) + end + end # agent + end + + describe "handles `max_file_rotations` correctly" do + tmp_file = Tempfile.new('logstash-spec-output-file') + event_count = 50000 + file_rotation_size = 1024*1024 + max_file_rotations = 2 + + config <<-CONFIG + input { + generator { + message => "hello world" + count => #{event_count} + type => "generator" + } + } + output { + file { + path => "#{tmp_file.path}" + file_rotation_size => #{file_rotation_size} + max_file_rotations => #{max_file_rotations} + } + } + CONFIG + + agent do + insist { File.exists?("#{tmp_file.path}") } == true + insist { File.exists?("#{tmp_file.path}.0") } == true + insist { File.exists?("#{tmp_file.path}.1") } == true + insist { File.exists?("#{tmp_file.path}.2") } == false + insist { File.exists?("#{tmp_file.path}.3") } == false + + Dir.glob("#{tmp_file.path}.*").each do |file| + File.delete(file) + end + end # agent + end + + describe "handles `keep_file_extension` correctly" do + tmp_file = Tempfile.new('logstash-spec-output-file') + event_count = 50000 + max_file_rotations = 2 + file_rotation_size = 1024*1024 + + config <<-CONFIG + input { + generator { + message => "hello world" + count => #{event_count} + type => "generator" + } + } + output { + file { + path => "#{tmp_file.path}.log" + file_rotation_size => #{file_rotation_size} + max_file_rotations => #{max_file_rotations} + keep_file_extension => true + } + } + CONFIG + + agent do + puts "#{tmp_file.path}.log" + insist { File.exists?("#{tmp_file.path}.log") } == true + insist { File.exists?("#{tmp_file.path}.0.log") } == true + insist { File.exists?("#{tmp_file.path}.1.log") } == true + insist { File.exists?("#{tmp_file.path}.2.log") } == false + insist { File.exists?("#{tmp_file.path}.3.log") } == false + + Dir.glob("#{tmp_file.path}.*").each do |file| + File.delete(file) + end + end # agent + end + + describe "handles `keep_file_extension` without extension correctly" do + tmp_file = Tempfile.new('logstash-spec-output-file') + event_count = 50000 + max_file_rotations = 2 + file_rotation_size = 1024*1024 + + config <<-CONFIG + input { + generator { + message => "hello world" + count => #{event_count} + type => "generator" + } + } + output { + file { + path => "#{tmp_file.path}" + file_rotation_size => #{file_rotation_size} + max_file_rotations => #{max_file_rotations} + keep_file_extension => true + } + } + CONFIG + + agent do + puts "#{tmp_file.path}.log" + insist { File.exists?("#{tmp_file.path}") } == true + insist { File.exists?("#{tmp_file.path}.0") } == true + insist { File.exists?("#{tmp_file.path}.1") } == true + insist { File.exists?("#{tmp_file.path}.2") } == false + insist { File.exists?("#{tmp_file.path}.3") } == false + + Dir.glob("#{tmp_file.path}.*").each do |file| + File.delete(file) + end + end # agent + end + + describe "handles `keep_file_extension` with `.` in filename correctly" do + tmp_file = Tempfile.new('logstash-spec-output-file') + event_count = 50000 + max_file_rotations = 2 + file_rotation_size = 1024*1024 + + config <<-CONFIG + input { + generator { + message => "hello world" + count => #{event_count} + type => "generator" + } + } + output { + file { + path => "#{tmp_file.path}.afterdot.log" + file_rotation_size => #{file_rotation_size} + max_file_rotations => #{max_file_rotations} + keep_file_extension => true + } + } + CONFIG + + agent do + puts "#{tmp_file.path}.log" + insist { File.exists?("#{tmp_file.path}.afterdot.log") } == true + insist { File.exists?("#{tmp_file.path}.afterdot.0.log") } == true + insist { File.exists?("#{tmp_file.path}.afterdot.1.log") } == true + insist { File.exists?("#{tmp_file.path}.afterdot.2.log") } == false + insist { File.exists?("#{tmp_file.path}.afterdot.3.log") } == false + + Dir.glob("#{tmp_file.path}.*").each do |file| + File.delete(file) + end + end # agent + end + end describe "#register" do let(:path) { '/%{name}' } let(:output) { LogStash::Outputs::File.new({ "path" => path }) } @@ -110,14 +353,23 @@ output = LogStash::Outputs::File.new({ "path" => path }) expect { output.register }.not_to raise_error end + + it 'does not allow negative "file_rotation_size"' do + output = LogStash::Outputs::File.new({ "path" => '/tmp/%{name}', "file_rotation_size" => -1 }) + expect { output.register }.to raise_error(LogStash::ConfigurationError) + end + + it 'does not allow negative "max_file_rotations"' do + output = LogStash::Outputs::File.new({ "path" => '/tmp/%{name}', "max_file_rotations" => -1 }) + expect { output.register }.to raise_error(LogStash::ConfigurationError) + end end describe "receiving events" do - context "when write_behavior => 'overwrite'" do let(:tmp) { Stud::Temporary.pathname } let(:config) { - { + { "write_behavior" => "overwrite", "path" => tmp, "codec" => LogStash::Codecs::JSONLines.new, @@ -127,7 +379,7 @@ let(:output) { LogStash::Outputs::File.new(config) } let(:count) { Flores::Random.integer(1..10) } - let(:events) do + let(:events) do Flores::Random.iterations(1..10).collect do |i| LogStash::Event.new("value" => i) end @@ -179,7 +431,7 @@ event = LogStash::Event.new("event_id" => i+10) output.multi_receive([event]) end - + expect(FileTest.size(temp_file.path)).to be > 0 end