Skip to content

Commit

Permalink
Merge pull request #7 from yoheimuta/fix-thread-safety/issue6
Browse files Browse the repository at this point in the history
Fix thread safety/issue6
  • Loading branch information
yoheimuta authored Nov 3, 2018
2 parents f9d434f + b799ceb commit d518db1
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 18 deletions.
56 changes: 42 additions & 14 deletions BufferedLogger/BufferedOutput.swift
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ final class BufferedOutput {

/// setUpTimer must be called by the queue worker.
private func setUpTimer() {
if #available(iOS 10.0, *) {
dispatchPrecondition(condition: .onQueue(queue))
}

self.timer?.invalidate()

let timer = Timer(timeInterval: 1.0,
Expand Down Expand Up @@ -129,6 +133,10 @@ final class BufferedOutput {

/// reloadEntriesFromStorage must be called by the queue worker.
private func reloadEntriesFromStorage() {
if #available(iOS 10.0, *) {
dispatchPrecondition(condition: .onQueue(queue))
}

buffer.removeAll()

do {
Expand All @@ -141,6 +149,10 @@ final class BufferedOutput {

/// dropEntriesFromStorage must be called by the queue worker.
private func dropEntriesFromStorage() throws {
if #available(iOS 10.0, *) {
dispatchPrecondition(condition: .onQueue(queue))
}

let dropCountAtOneTime = config.flushEntryCount * 3
let newBuffer = Set(sortedBuffer.dropFirst(dropCountAtOneTime))
let dropped = buffer.subtracting(newBuffer)
Expand All @@ -152,6 +164,10 @@ final class BufferedOutput {

/// flush must be called by the queue worker.
private func flush() {
if #available(iOS 10.0, *) {
dispatchPrecondition(condition: .onQueue(queue))
}

lastFlushDate = now

if buffer.isEmpty {
Expand All @@ -166,25 +182,37 @@ final class BufferedOutput {
callWriteChunk(chunk)
}

/// callWriteChunk must be called by the queue worker
private func callWriteChunk(_ chunk: Chunk) {
if #available(iOS 10.0, *) {
dispatchPrecondition(condition: .onQueue(queue))
}

writer.write(chunk) { success in
if success {
do {
try self.entryStorage.remove(chunk.entries,
from: self.config.storagePath)
} catch {
self.internalErrorLogger.log("failed to remove logs from the storage: \(error)")
self.queue.async {
if #available(iOS 10.0, *) {
// Leave this check for a certain period to attest that the thread-safety bug is fixed.
dispatchPrecondition(condition: .onQueue(self.queue))
}

if success {
do {
try self.entryStorage.remove(chunk.entries,
from: self.config.storagePath)
} catch {
self.internalErrorLogger.log("failed to remove logs from the storage: \(error)")
}
return
}
return
}

var chunk = chunk
chunk.incrementRetryCount()
var chunk = chunk
chunk.incrementRetryCount()

if chunk.retryCount <= self.config.retryRule.retryLimit {
let delay = self.config.retryRule.delay(try: chunk.retryCount)
self.queue.asyncAfter(deadline: .now() + delay) {
self.callWriteChunk(chunk)
if chunk.retryCount <= self.config.retryRule.retryLimit {
let delay = self.config.retryRule.delay(try: chunk.retryCount)
self.queue.asyncAfter(deadline: .now() + delay) {
self.callWriteChunk(chunk)
}
}
}
}
Expand Down
52 changes: 49 additions & 3 deletions BufferedLoggerTests/BufferedOutputTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ final class MockWriter: Writer {
givenPayloads.append($0.payload)
}

completion(shouldSuccess)
writeCallback?(calledWriteCount-1)
let count = calledWriteCount

DispatchQueue.global().async {
completion(self.shouldSuccess)
self.writeCallback?(count-1)
}
}
}

Expand All @@ -50,8 +54,14 @@ final class MockRetryRule: RetryRule {
}

final class MockEntryStorage: EntryStorage {
private let removeCallback: ((Int) -> Void)?
private(set) var calledRemoveCount: Int = 0
private var buffer: [String: Set<Entry>] = [:]

init(removeCallback: ((Int) -> Void)? = nil) {
self.removeCallback = removeCallback
}

func retrieveAll(from path: String) throws -> Set<Entry> {
guard let logs = buffer[path] else {
return []
Expand All @@ -68,6 +78,11 @@ final class MockEntryStorage: EntryStorage {

func remove(_ logs: Set<Entry>, from path: String) throws {
buffer[path]?.subtract(logs)

calledRemoveCount += 1
if let callback = removeCallback {
callback(calledRemoveCount - 1)
}
}
}

Expand All @@ -89,6 +104,7 @@ class BufferedOutputTests: XCTestCase {
config: Config,
inputPayloads: [Data],
expectations: [XCTestExpectation],
storageRemovalExpectations: [XCTestExpectation],
waitTime: TimeInterval,
wantPayloads: [Data],
wantLeftEntryCount: Int
Expand All @@ -105,6 +121,9 @@ class BufferedOutputTests: XCTestCase {
expectations: [
self.expectation(description: "flush 1")
],
storageRemovalExpectations: [
self.expectation(description: "remove 1")
],
waitTime: 1,
wantPayloads: [
"1".data(using: .utf8)!
Expand All @@ -125,6 +144,10 @@ class BufferedOutputTests: XCTestCase {
self.expectation(description: "flush 1"),
self.expectation(description: "flush 2")
],
storageRemovalExpectations: [
self.expectation(description: "remove 1"),
self.expectation(description: "remove 2")
],
waitTime: 1,
wantPayloads: [
"1".data(using: .utf8)!,
Expand All @@ -145,6 +168,9 @@ class BufferedOutputTests: XCTestCase {
expectations: [
self.expectation(description: "flush 1")
],
storageRemovalExpectations: [
self.expectation(description: "remove 1")
],
waitTime: 1,
wantPayloads: [
"1".data(using: .utf8)!,
Expand All @@ -166,6 +192,9 @@ class BufferedOutputTests: XCTestCase {
expectations: [
self.expectation(description: "flush 1")
],
storageRemovalExpectations: [
self.expectation(description: "remove 1")
],
waitTime: 2,
wantPayloads: [
"1".data(using: .utf8)!,
Expand All @@ -190,6 +219,10 @@ class BufferedOutputTests: XCTestCase {
self.expectation(description: "flush 1"),
self.expectation(description: "flush 2")
],
storageRemovalExpectations: [
self.expectation(description: "remove 1"),
self.expectation(description: "remove 2")
],
waitTime: 4,
wantPayloads: [
"1".data(using: .utf8)!,
Expand All @@ -214,6 +247,8 @@ class BufferedOutputTests: XCTestCase {
self.expectation(description: "retry 2"),
self.expectation(description: "retry 3")
],
storageRemovalExpectations: [
],
waitTime: 4,
wantPayloads: [
"1".data(using: .utf8)!,
Expand All @@ -230,7 +265,10 @@ class BufferedOutputTests: XCTestCase {
test.expectations[count].fulfill()
}

let mStorage = MockEntryStorage()
let mStorage = MockEntryStorage(removeCallback: { count in
test.storageRemovalExpectations[count].fulfill()
})

let output = BufferedOutput(writer: test.writer,
config: test.config,
entryStorage: mStorage,
Expand Down Expand Up @@ -258,9 +296,17 @@ class BufferedOutputTests: XCTestCase {
PayloadDecorder.decode(test.wantPayloads).sorted(),
test.name)

// Wait for a while until the entries are deleted from entryStorage
// because the callback block of Writer.write is run on any thread.
// ref. https://github.com/yoheimuta/BufferedLogger/pull/7 {
wait(for: test.storageRemovalExpectations, timeout: 2.0)
XCTAssertEqual(mStorage.calledRemoveCount,
test.storageRemovalExpectations.count,
test.name)
XCTAssertEqual(try mStorage.retrieveAll(from: defaultStoragePath).count,
test.wantLeftEntryCount,
test.name)
// }
}
}

Expand Down
4 changes: 3 additions & 1 deletion Demo/Demo/MyWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class MyWriter: Writer {
print("entry is \($0)")
}

completion(true)
DispatchQueue.global().async {
completion(true)
}
}
}

0 comments on commit d518db1

Please sign in to comment.