-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathmongodb.rb
143 lines (121 loc) · 4.96 KB
/
mongodb.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
require "mongo"
require_relative "bson/big_decimal"
require_relative "bson/logstash_timestamp"
# This output writes events to MongoDB.
class LogStash::Outputs::Mongodb < LogStash::Outputs::Base
config_name "mongodb"
# A MongoDB URI to connect to.
# See http://docs.mongodb.org/manual/reference/connection-string/.
config :uri, :validate => :string, :required => true
# The database to use.
config :database, :validate => :string, :required => true
# The collection to use. This value can use `%{foo}` values to dynamically
# select a collection based on data in the event.
config :collection, :validate => :string, :required => true
# If true, store the @timestamp field in MongoDB as an ISODate type instead
# of an ISO8601 string. For more information about this, see
# http://www.mongodb.org/display/DOCS/Dates.
config :isodate, :validate => :boolean, :default => false
# The number of seconds to wait after failure before retrying.
config :retry_delay, :validate => :number, :default => 3, :required => false
# If true, an "_id" field will be added to the document before insertion.
# The "_id" field will use the timestamp of the event and overwrite an existing
# "_id" field in the event.
config :generateId, :validate => :boolean, :default => false
config :upsert, :validate => :boolean, :default => false, :required => false
config :document_id, :validate => :string, :default => nil, :required => false
# Bulk insert flag, set to true to allow bulk insertion, else it will insert events one by one.
config :bulk, :validate => :boolean, :default => false
# Bulk interval, Used to insert events periodically if the "bulk" flag is activated.
config :bulk_interval, :validate => :number, :default => 2
# Bulk events number, if the number of events to insert into a collection raise that limit, it will be bulk inserted
# whatever the bulk interval value (mongodb hard limit is 1000).
config :bulk_size, :validate => :number, :default => 900, :maximum => 999, :min => 2
# Mutex used to synchronize access to 'documents'
@@mutex = Mutex.new
def register
if @bulk_size > 1000
raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'"
end
Mongo::Logger.logger = @logger
conn = Mongo::Client.new(@uri)
@db = conn.use(@database)
@closed = Concurrent::AtomicBoolean.new(false)
@documents = {}
@bulk_thread = Thread.new(@bulk_interval) do |bulk_interval|
while @closed.false? do
sleep(bulk_interval)
@@mutex.synchronize do
@documents.each do |collection, values|
if values.length > 0
@db[collection].insert_many(values)
@documents.delete(collection)
end
end
end
end
end
end
def receive(event)
begin
# Our timestamp object now has a to_bson method, using it here
# {}.merge(other) so we don't taint the event hash innards
document = {}.merge(event.to_hash)
if !@isodate
timestamp = event.timestamp
if timestamp
# not using timestamp.to_bson
document["@timestamp"] = timestamp.to_json
else
@logger.warn("Cannot set MongoDB document `@timestamp` field because it does not exist in the event", :event => event)
end
end
if @document_id != nil
document["_id"] = event.sprintf(@document_id)
elsif @generateId
document["_id"] = BSON::ObjectId.new
end
if @bulk
collection = event.sprintf(@collection)
@@mutex.synchronize do
if(!@documents[collection])
@documents[collection] = []
end
@documents[collection].push(document)
if(@documents[collection].length >= @bulk_size)
@db[collection].insert_many(@documents[collection])
@documents.delete(collection)
end
end
else
if @upsert
update_result = @db[event.sprintf(@collection)].update_one({_id: document['_id']}, {'$set' => document.reject {|k, v| k == '_id'}},
{:upsert => true})
else
@db[event.sprintf(@collection)].insert_one(document)
end
end
rescue => e
if e.message =~ /^E11000/
# On a duplicate key error, skip the insert.
# We could check if the duplicate key err is the _id key
# and generate a new primary key.
# If the duplicate key error is on another field, we have no way
# to fix the issue.
@logger.warn("Skipping insert because of a duplicate key error", :event => event, :exception => e)
else
@logger.warn("Failed to send event to MongoDB, retrying in #{@retry_delay.to_s} seconds", :event => event, :exception => e)
sleep(@retry_delay)
retry
end
end
end
def close
@closed.make_true
@bulk_thread.wakeup
@bulk_thread.join
end
end