Refactor Fluent Logger for Improved Thread Safety and Error Handling#130
Merged
cosmo0920 merged 9 commits intofluent:masterfrom May 19, 2025
Merged
Refactor Fluent Logger for Improved Thread Safety and Error Handling#130cosmo0920 merged 9 commits intofluent:masterfrom
cosmo0920 merged 9 commits intofluent:masterfrom
Conversation
The channel and its usage is redundant since we already wire Fluent object with a cancellable context. Removed stopRunning to use the context instead. Instead of waiting for stopRunning in run(), we can just wait for context to be done as well. Signed-off-by: Anirudh Aithal <aithal@amazon.com>
Added a new test to ensure that pending channel is accessed in a thread safe manner. The code can be simplified by removing the pendingMutex lock. There's a risk in this codebase wrt how muconn and pendingMutex are acquired and released in different orders in different methods. This test is a precursor to the change to remove the pendingMutex to ensure that nothing is broken. Signed-off-by: Anirudh Aithal <aithal@amazon.com>
Changed the "closed" variable to be an atomic int. There is a race condition b/w
appendBuffer and Close() method wrt how this variable is accessed. This could be
an atomic.Bool, but the CI/CD config uses golang version 1.17. Since atomic.Bool
is supported 1.19 onwards, using that would fail tests and build.
Data race trace without this fix (using `go test -race`):
==================
WARNING: DATA RACE
Write at 0x00c0003324e0 by goroutine 95:
github.com/fluent/fluent-logger-golang/fluent.(*Fluent).close()
/Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:426 +0x13c
github.com/fluent/fluent-logger-golang/fluent.(*Fluent).Close()
/Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:393 +0x108
github.com/fluent/fluent-logger-golang/fluent.TestCloseWhileWaitingForAckResponse.func1()
/Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent_test.go:737 +0x2c
github.com/fluent/fluent-logger-golang/fluent.timeout.func1()
/Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent_test.go:557 +0x38
Previous read at 0x00c0003324e0 by goroutine 94:
github.com/fluent/fluent-logger-golang/fluent.(*Fluent).write()
/Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:619 +0x1ac
github.com/fluent/fluent-logger-golang/fluent.(*Fluent).writeWithRetry()
/Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:553 +0x78
github.com/fluent/fluent-logger-golang/fluent.(*Fluent).run()
/Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:527 +0x1d0
github.com/fluent/fluent-logger-golang/fluent.newWithDialer.gowrap1()
/Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:184 +0x4c
Goroutine 95 (running) created at:
github.com/fluent/fluent-logger-golang/fluent.timeout()
/Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent_test.go:556 +0xe8
github.com/fluent/fluent-logger-golang/fluent.TestCloseWhileWaitingForAckResponse()
/Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent_test.go:736 +0x444
testing.tRunner()
/opt/homebrew/Cellar/go/1.24.2/libexec/src/testing/testing.go:1792 +0x180
testing.(*T).Run.gowrap1()
/opt/homebrew/Cellar/go/1.24.2/libexec/src/testing/testing.go:1851 +0x40
Goroutine 94 (running) created at:
github.com/fluent/fluent-logger-golang/fluent.newWithDialer()
/Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent.go:184 +0x368
github.com/fluent/fluent-logger-golang/fluent.TestCloseWhileWaitingForAckResponse()
/Users/aithal/workspace/src/github.com/aaithal/fluent-logger-golang/fluent/fluent_test.go:722 +0xd8
testing.tRunner()
/opt/homebrew/Cellar/go/1.24.2/libexec/src/testing/testing.go:1792 +0x180
testing.(*T).Run.gowrap1()
/opt/homebrew/Cellar/go/1.24.2/libexec/src/testing/testing.go:1851 +0x40
==================
[2025-05-06T08:42:08-07:00] Discarding queued events...
testing.go:1490: race detected during execution of test
Signed-off-by: Anirudh Aithal <aithal@amazon.com>
Refactored the code to follow the pattern of releasing the mutex lock in a defer block as much as possible for the muconn lock. This should make the code more maintainable, avoiding issues with not accidentally releasing the lock. Signed-off-by: Anirudh Aithal <aithal@amazon.com>
…nnection There are a number of places in the code where we do not check if the context is cancelled before reading from or writing to the connection. This commit adds those checks. Signed-off-by: Anirudh Aithal <aithal@amazon.com> cr: https://code.amazon.com/reviews/CR-194284782 Signed-off-by: Anirudh Aithal <aithal@amazon.com>
This can be used by clients to exit early when the connection is unresponsive while reading acks from the server. Signed-off-by: Anirudh Aithal <aithal@amazon.com>
64e3b2b to
1f759a4
Compare
cosmo0920
reviewed
May 19, 2025
Contributor
cosmo0920
left a comment
There was a problem hiding this comment.
Looks good to me.
Plus, I tested with race detector like as:
go test -v -race -cover -covermode=atomic ./fluent
There is no error from race detector.
The current master is still failing with that.
Contributor
|
Could you rebase off current master? I modified to run CI tasks correctly. So, we wanted to check CI results before merging. |
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
cosmo0920
approved these changes
May 19, 2025
Contributor
Author
|
Thank you for merging this PR and apologies for not getting back on this sooner. I taken a break for a few weeks. Appreciate you merging these changes! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR refactors the Fluent logger implementation to improve certain aspects of thread safety and error handling.
Key Changes
1. Thread Safety
2. Error Handling
3. Connection Management
4. Read Timeout Support
5. Context Propagation
Testing
All existing tests pass with these changes. The refactoring maintains backward compatibility while improving the robustness of the code.