-
Notifications
You must be signed in to change notification settings - Fork 5k
[Filebeat] Filestream running as Log input under Elastic Agent or feature flag #46587
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 69 commits
78fbce1
5df3a38
d84f890
937cc57
1dbbe4a
d5fd62a
44ba92f
2d4d74e
8aab9a7
04225e3
fb963d4
65f2dae
bb3008c
1fbe05d
c9a3264
e74987d
5c85111
1c0b381
fc22095
6ff022a
fd25b68
22581aa
5030e8b
0711490
95d0095
68975b2
56a0775
5e91c58
eed3286
68846f8
2a803ed
4386ba1
b0ff5ed
4299e8b
f27ac51
3db414f
f51096d
6fbc623
2042946
6f35111
0f0956c
3303f85
3c6ff48
a3afeea
5117dd6
a9818cc
60cfbbf
3d2b833
1e615f6
75444b5
072633d
ac2c6eb
2dd4cb0
9842567
2e9f6cc
15e3422
d76c4ad
d2e4059
c1f363c
09d0683
48529c7
efc6654
a9e539d
b19a2aa
76bf082
3ddd356
d979d76
bbf93c7
64385c0
b819a97
41824e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -16,6 +16,20 @@ | |||||||||
| description: > | ||||||||||
| The file offset the reported line starts at. | ||||||||||
| - name: log.file.device_id | ||||||||||
| type: keyword | ||||||||||
| required: false | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this targeting 9.3.0? Will it be in GA? Feel free to adjust as needed.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Those fields have existed since a long time, some of the tests introduced by this PR check the fields generated by the input against I could also modify the test to ignore those fields. Let me think a bit more about it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If they've existed since before 9.0.0, I'm ok with just leaving out versioning information. |
||||||||||
| description: >- | ||||||||||
| The device ID used for the log file, this is used by the | ||||||||||
| 'native' file identity. | ||||||||||
| - name: log.file.inode | ||||||||||
| type: long | ||||||||||
| required: false | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this targeting 9.3.0? Will it be in GA? Feel free to adjust as needed.
Suggested change
|
||||||||||
| description: >- | ||||||||||
| The inode of the log file, this is used by the 'native' file | ||||||||||
| identity. | ||||||||||
| - name: stream | ||||||||||
| type: keyword | ||||||||||
| required: false | ||||||||||
|
|
||||||||||
Large diffs are not rendered by default.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,182 @@ | ||
| // Licensed to Elasticsearch B.V. under one or more contributor | ||
| // license agreements. See the NOTICE file distributed with | ||
| // this work for additional information regarding copyright | ||
| // ownership. Elasticsearch B.V. licenses this file to you under | ||
| // the Apache License, Version 2.0 (the "License"); you may | ||
| // not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package logv2 | ||
|
|
||
| import ( | ||
| "errors" | ||
| "fmt" | ||
|
|
||
| "github.com/elastic/beats/v7/filebeat/channel" | ||
| v1 "github.com/elastic/beats/v7/filebeat/input" | ||
| "github.com/elastic/beats/v7/filebeat/input/filestream" | ||
| loginput "github.com/elastic/beats/v7/filebeat/input/log" | ||
| v2 "github.com/elastic/beats/v7/filebeat/input/v2" | ||
| "github.com/elastic/beats/v7/libbeat/feature" | ||
| "github.com/elastic/beats/v7/libbeat/features" | ||
| "github.com/elastic/beats/v7/libbeat/management" | ||
| "github.com/elastic/beats/v7/libbeat/statestore" | ||
| "github.com/elastic/elastic-agent-libs/config" | ||
| "github.com/elastic/elastic-agent-libs/logp" | ||
| "github.com/elastic/go-concert/unison" | ||
| ) | ||
|
|
||
| const pluginName = "log" | ||
|
|
||
| func init() { | ||
| // Register an input V1, to replace the Log input one. | ||
| if err := v1.Register(pluginName, newV1Input); err != nil { | ||
| panic(err) | ||
| } | ||
| } | ||
|
|
||
| // runAsFilestream validates cfg as a Log input configuration, if it is | ||
| // valid, then checks whether the configuration should be run as | ||
| // Filestream input. On any error the boolean value must be ignore and | ||
| // no input started. runAsFilestream also sets the input type accordingly. | ||
| func runAsFilestream(cfg *config.C) (bool, error) { | ||
| // First of all, ensure the Log input configuration is valid. | ||
| // This ensures we return configuration errors compatible | ||
| // with the log input. | ||
| if err := loginput.IsConfigValid(cfg); err != nil { | ||
| return false, err | ||
| } | ||
|
|
||
| // Global feature flag that forces all Log input instances | ||
| // to run as Filestream. | ||
| if features.LogInputRunFilestream() { | ||
| return true, nil | ||
| } | ||
|
|
||
| // Only allow to run the Log input as Filestream if Filebeat | ||
| // is running under Elastic Agent. | ||
| if !management.UnderAgent() { | ||
| return false, nil | ||
| } | ||
|
|
||
| if ok := cfg.HasField("run_as_filestream"); ok { | ||
| runAsFilestream, err := cfg.Bool("run_as_filestream", -1) | ||
| if err != nil { | ||
| return false, fmt.Errorf("cannot parse 'run_as_filestream': %w", err) | ||
| } | ||
|
|
||
| if runAsFilestream { | ||
| // ID is required to run as Filestream input | ||
| if !cfg.HasField("id") { | ||
| return false, errors.New("'id' is required to run 'log' input as 'filestream'") | ||
| } | ||
|
|
||
| // This should never fail because the Log input configuration | ||
| // already reads 'type' and validates it is a string. Overriding | ||
| // the field should never fail. | ||
| if err := cfg.SetString("type", -1, "filestream"); err != nil { | ||
| return false, fmt.Errorf("cannot set 'type': %w", err) | ||
| } | ||
|
|
||
| return true, nil | ||
| } | ||
| } | ||
|
|
||
| return false, nil | ||
| } | ||
|
|
||
| // newV1Input instantiates the Log input. If 'run_as_filestream' is | ||
| // true, then v2.ErrUnknownInput is returned so the Filestream input | ||
| // can be instantiated. | ||
| func newV1Input( | ||
| cfg *config.C, | ||
| outlet channel.Connector, | ||
| context v1.Context, | ||
| logger *logp.Logger, | ||
| ) (v1.Input, error) { | ||
| // Inputs V1 should be tried last, so if this function is run we are | ||
| // supposed to be running as the Log input. However do not rely on the | ||
| // factory implementation, also check whether to run as Log or Filestream | ||
| // inputs. | ||
| asFilestream, err := runAsFilestream(cfg) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| if asFilestream { | ||
| return nil, v2.ErrUnknownInput | ||
| } | ||
|
|
||
| inp, err := loginput.NewInput(cfg, outlet, context, logger) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("cannot create log input: %w", err) | ||
| } | ||
|
|
||
| logger.Debug("Log input running as Log input") | ||
| return inp, err | ||
| } | ||
|
|
||
| // PluginV2 returns a v2.Plugin with a manager that checks whether | ||
| // the config is from a Log input that should run as Filestream. | ||
| // If that is the case the Log input configuration is converted to | ||
| // Filestream and the Filestream input returned. | ||
| // Otherwise v2.ErrUnknownInput is returned. | ||
| func PluginV2(logger *logp.Logger, store statestore.States) v2.Plugin { | ||
| // The InputManager for Filestream input is from an internal package, so we | ||
| // cannot instantiate it directly here. To circumvent that, we instantiate | ||
| // the whole Filestream Plugin | ||
| filestreamPlugin := filestream.Plugin(logger, store) | ||
|
|
||
| m := manager{ | ||
| next: filestreamPlugin.Manager, | ||
| logger: logger, | ||
| } | ||
|
|
||
| p := v2.Plugin{ | ||
| Name: pluginName, | ||
| Stability: feature.Stable, | ||
| Info: "log input running filestream", | ||
| Doc: "Log input running Filestream input", | ||
| Manager: m, | ||
| } | ||
| return p | ||
| } | ||
|
|
||
| type manager struct { | ||
| next v2.InputManager | ||
| logger *logp.Logger | ||
| } | ||
|
|
||
| func (m manager) Init(grp unison.Group) error { | ||
| return m.next.Init(grp) | ||
| } | ||
|
|
||
| func (m manager) Create(cfg *config.C) (v2.Input, error) { | ||
| // When inputs are created, inputs V2 are tried first, so if we | ||
| // are supposed to run as the Log input, return v2.ErrUnknownInput | ||
| asFilestream, err := runAsFilestream(cfg) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| if !asFilestream { | ||
| return nil, v2.ErrUnknownInput | ||
| } | ||
|
|
||
| newCfg, err := convertConfig(m.logger, cfg) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("cannot translate log config to filestream: %w", err) | ||
| } | ||
|
|
||
| m.logger.Debug("Log input running as Filestream input") | ||
| return m.next.Create(newCfg) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| // Licensed to Elasticsearch B.V. under one or more contributor | ||
| // license agreements. See the NOTICE file distributed with | ||
| // this work for additional information regarding copyright | ||
| // ownership. Elasticsearch B.V. licenses this file to you under | ||
| // the Apache License, Version 2.0 (the "License"); you may | ||
| // not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package logv2 | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/elastic/beats/v7/libbeat/management" | ||
| "github.com/elastic/elastic-agent-libs/config" | ||
| ) | ||
|
|
||
| func TestRunAsFilestream(t *testing.T) { | ||
| testCases := map[string]struct { | ||
| cfg *config.C | ||
| expectErr bool | ||
| expected bool | ||
| underAgent bool | ||
| }{ | ||
| "simplest log input config": { | ||
| underAgent: true, | ||
| expected: false, | ||
| cfg: config.MustNewConfigFrom(map[string]any{ | ||
| "paths": []string{"/var/log.log"}, | ||
| }), | ||
| }, | ||
| "log input invalid config": { | ||
| // empty config is always invalid | ||
| cfg: config.NewConfig(), | ||
| expectErr: true, | ||
| }, | ||
| "invalid 'run_as_filestream'": { | ||
| underAgent: true, | ||
| cfg: config.MustNewConfigFrom(map[string]any{ | ||
| "paths": []string{"/var/log.log"}, | ||
| "run_as_filestream": 42, | ||
| }), | ||
| expectErr: true, | ||
| }, | ||
| "no filestream id": { | ||
| underAgent: true, | ||
| cfg: config.MustNewConfigFrom(map[string]any{ | ||
| "paths": []string{"/var/log.log"}, | ||
| "run_as_filestream": true, | ||
| }), | ||
| expectErr: true, | ||
| }, | ||
| "not under Elastic Agent": { | ||
| underAgent: false, | ||
| cfg: config.MustNewConfigFrom(map[string]any{ | ||
| "paths": []string{"/var/log.log"}, | ||
| }), | ||
| expectErr: false, | ||
| expected: false, | ||
| }, | ||
| } | ||
|
|
||
| for name, tc := range testCases { | ||
| t.Run(name, func(t *testing.T) { | ||
| underAgent := management.UnderAgent() | ||
| t.Cleanup(func() { | ||
| management.SetUnderAgent(underAgent) | ||
| }) | ||
| management.SetUnderAgent(tc.underAgent) | ||
|
|
||
| got, err := runAsFilestream(tc.cfg) | ||
| if err != nil && !tc.expectErr { | ||
| t.Errorf("did not expect an error: %s", err) | ||
| } | ||
|
|
||
| if got != tc.expected { | ||
| t.Errorf("expecting 'runAsFilestream' to return %t, got %t", tc.expected, got) | ||
| } | ||
| }) | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.