From 87921847f94763e8696d38c80bda8d659ea123ea Mon Sep 17 00:00:00 2001 From: Gaurav Aradhye Date: Fri, 19 Jun 2020 18:29:49 -0700 Subject: [PATCH] Adding support for :upsert functionality for single writes Change-Id: Ic6899e5bb8537f85c7f9231d7e55576fd4d4777b --- lib/logstash/outputs/mongodb.rb | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/mongodb.rb b/lib/logstash/outputs/mongodb.rb index 0b88c68..a9c2c1f 100644 --- a/lib/logstash/outputs/mongodb.rb +++ b/lib/logstash/outputs/mongodb.rb @@ -34,6 +34,8 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base # "_id" field in the event. config :generateId, :validate => :boolean, :default => false + config :upsert, :validate => :boolean, :default => false, :required => false + config :document_id, :validate => :string, :default => nil, :required => false # Bulk insert flag, set to true to allow bulk insertion, else it will insert events one by one. config :bulk, :validate => :boolean, :default => false @@ -90,7 +92,9 @@ def receive(event) end end - if @generateId + if @document_id != nil + document["_id"] = event.sprintf(@document_id) + elsif @generateId document["_id"] = BSON::ObjectId.new end @@ -108,7 +112,12 @@ def receive(event) end end else - @db[event.sprintf(@collection)].insert_one(document) + if @upsert + update_result = @db[event.sprintf(@collection)].update_one({_id: document['_id']}, {'$set' => document.reject {|k, v| k == '_id'}}, + {:upsert => true}) + else + @db[event.sprintf(@collection)].insert_one(document) + end end rescue => e if e.message =~ /^E11000/