Skip to content

Commit 21a18eb

Browse files
Vladislav AlekseevAvito iOSBot
Vladislav Alekseev
authored and
Avito iOSBot
committed
MBS-13416: rewrite statsd client
GitOrigin-RevId: 4d90c515a7d10df397da64464b3d1cac4782b247
1 parent fb0e802 commit 21a18eb

19 files changed

+341
-301
lines changed

Package.resolved

+9
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,15 @@
1010
"version": "1.0.21"
1111
}
1212
},
13+
{
14+
"package": "Socket",
15+
"repositoryURL": "https://github.com/Kitura/BlueSocket.git",
16+
"state": {
17+
"branch": null,
18+
"revision": "c46a3d41f5b2401d18bcb46d0101cdc5cd13e307",
19+
"version": "1.0.52"
20+
}
21+
},
1322
{
1423
"package": "Glob",
1524
"repositoryURL": "https://github.com/Bouke/Glob",

Package.swift

+6-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ let package = Package(
4949
dependencies: [
5050
.package(name: "Glob", url: "https://github.com/Bouke/Glob", .exact("1.0.5")),
5151
.package(name: "Signals", url: "https://github.com/IBM-Swift/BlueSignals.git", .exact("1.0.21")),
52+
.package(name: "Socket", url: "https://github.com/Kitura/BlueSocket.git", .exact("1.0.52")),
5253
.package(name: "swift-argument-parser", url: "https://github.com/apple/swift-argument-parser", from: "1.0.3"),
5354
],
5455
targets: [
@@ -374,16 +375,19 @@ let package = Package(
374375
"AtomicModels",
375376
"IO",
376377
"MetricsUtils",
378+
.product(name: "Socket", package: "Socket"),
377379
"SocketModels",
378-
"Waitable",
379380
],
380381
path: "Sources/Statsd"
381382
),
382383
.testTarget(
383384
name: "StatsdTests",
384385
dependencies: [
385386
"MetricsRecording",
387+
.product(name: "Socket", package: "Socket"),
388+
"SocketModels",
386389
"Statsd",
390+
"TestHelpers",
387391
],
388392
path: "Tests/StatsdTests"
389393
),
@@ -427,6 +431,7 @@ let package = Package(
427431
.target(
428432
name: "TestHelpers",
429433
dependencies: [
434+
"AtomicModels",
430435
],
431436
path: "Tests/TestHelpers"
432437
),

Sources/Statsd/AppleStatsdClient.swift

-78
This file was deleted.

Sources/Statsd/StatsdClient.swift

+11-6
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
import Dispatch
12
import Foundation
23

34
public protocol StatsdClient: AnyObject {
4-
var stateUpdateHandler: ((StatsdClientState) -> ())? { get set }
5-
var state: StatsdClientState { get }
5+
func send(
6+
content: Data,
7+
queue: DispatchQueue,
8+
completion: @escaping (Error?) -> ()
9+
)
610

7-
func start(queue: DispatchQueue)
8-
func cancel()
9-
10-
func send(content: Data, completion: @escaping (Error?) -> ())
11+
func tearDown(
12+
queue: DispatchQueue,
13+
timeout: TimeInterval,
14+
completion: @escaping () -> ()
15+
)
1116
}

Sources/Statsd/StatsdClientImpl.swift

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import AtomicModels
2+
import Dispatch
3+
import Foundation
4+
import Socket
5+
import SocketModels
6+
7+
public final class StatsdClientImpl: StatsdClient {
8+
private let statsdSocketAddress: SocketAddress
9+
private let group = DispatchGroup()
10+
private let tornDown = AtomicValue(false)
11+
12+
public struct InvalidSocketAddressError: Error, CustomStringConvertible {
13+
public let address: SocketAddress
14+
15+
public var description: String {
16+
"Invalid socket address: \(address)"
17+
}
18+
}
19+
20+
public init(
21+
statsdSocketAddress: SocketAddress
22+
) throws {
23+
self.statsdSocketAddress = statsdSocketAddress
24+
}
25+
26+
public func send(
27+
content: Data,
28+
queue: DispatchQueue,
29+
completion: @escaping (Error?) -> ()
30+
) {
31+
guard tornDown.currentValue() == false else {
32+
return
33+
}
34+
35+
guard let address = Socket.createAddress(
36+
for: statsdSocketAddress.host,
37+
on: Int32(statsdSocketAddress.port.value)
38+
) else {
39+
queue.async { [statsdSocketAddress] in
40+
completion(InvalidSocketAddressError(address: statsdSocketAddress))
41+
}
42+
return
43+
}
44+
45+
queue.async { [group] in
46+
group.enter()
47+
defer {
48+
group.leave()
49+
}
50+
51+
do {
52+
let socket = try Socket.create(
53+
type: .datagram,
54+
proto: .udp
55+
)
56+
defer {
57+
socket.close()
58+
}
59+
60+
try socket.write(
61+
from: content,
62+
to: address
63+
)
64+
completion(nil)
65+
} catch {
66+
completion(error)
67+
}
68+
}
69+
}
70+
71+
public func tearDown(
72+
queue: DispatchQueue,
73+
timeout: TimeInterval,
74+
completion: @escaping () -> ()
75+
) {
76+
self.tornDown.set(true)
77+
78+
let tearDownCompleted = AtomicValue(false)
79+
80+
let complete = {
81+
tearDownCompleted.withExclusiveAccess {
82+
if $0 == false {
83+
$0 = true
84+
completion()
85+
}
86+
}
87+
}
88+
89+
group.notify(queue: queue) {
90+
complete()
91+
}
92+
queue.asyncAfter(deadline: .now() + timeout) {
93+
complete()
94+
}
95+
}
96+
}

Sources/Statsd/StatsdClientState.swift

-7
This file was deleted.

Sources/Statsd/StatsdMetricHandlerImpl.swift

+19-52
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@ public final class StatsdMetricHandlerImpl: StatsdMetricHandler {
77
private let statsdDomain: [String]
88
private let statsdClient: StatsdClient
99
private let serialQueue: DispatchQueue
10-
11-
private let metricsBuffer = AtomicCollection([StatsdMetric]())
12-
private let metricsBeingSent = AtomicCollection([UUID]())
10+
private let group = DispatchGroup()
1311

1412
public init(
1513
statsdDomain: [String],
@@ -19,63 +17,32 @@ public final class StatsdMetricHandlerImpl: StatsdMetricHandler {
1917
self.statsdDomain = statsdDomain
2018
self.statsdClient = statsdClient
2119
self.serialQueue = serialQueue
22-
23-
self.statsdClient.stateUpdateHandler = { [weak self] state in
24-
guard let strongSelf = self else { return }
25-
26-
switch state {
27-
case .notReady:
28-
break
29-
case .ready:
30-
strongSelf.metricsBuffer.withExclusiveAccess {
31-
$0.forEach(strongSelf.send)
32-
$0.removeAll()
33-
}
34-
case .failed:
35-
strongSelf.statsdClient.cancel()
36-
}
37-
}
38-
statsdClient.start(queue: serialQueue)
3920
}
4021

4122
public func handle(metric: StatsdMetric) {
42-
// swiftlint:disable:next async
43-
serialQueue.async { [weak self] in
44-
guard let strongSelf = self else { return }
45-
let state = strongSelf.statsdClient.state
46-
47-
switch state {
48-
case .notReady:
49-
strongSelf.metricsBuffer.withExclusiveAccess { $0.append(metric) }
50-
case .ready:
51-
strongSelf.send(metric: metric)
52-
case .failed:
53-
break
54-
}
23+
let metricData = Data(metric.build(domain: self.statsdDomain).utf8)
24+
25+
group.enter()
26+
27+
statsdClient.send(
28+
content: metricData,
29+
queue: serialQueue
30+
) { [group] _ in
31+
group.leave()
5532
}
5633
}
5734

5835
public func tearDown(timeout: TimeInterval) {
59-
let tearDownRequestTimestamp = Date()
60-
_ = metricsBuffer.waitWhen(count: 0, before: tearDownRequestTimestamp.addingTimeInterval(timeout))
36+
group.enter()
6137

62-
let timeoutRemainder = timeout - Date().timeIntervalSince(tearDownRequestTimestamp)
63-
if timeoutRemainder > 0 {
64-
_ = metricsBeingSent.waitWhen(count: 0, before: Date(timeIntervalSinceNow: timeoutRemainder))
65-
}
66-
67-
statsdClient.cancel()
68-
}
69-
70-
private func send(metric: StatsdMetric) {
71-
let seed = UUID()
72-
metricsBeingSent.withExclusiveAccess { $0.append(seed) }
73-
self.statsdClient.send(
74-
content: Data(metric.build(domain: self.statsdDomain).utf8)
75-
) { _ in
76-
self.metricsBeingSent.withExclusiveAccess {
77-
$0.removeAll(where: { $0 == seed })
38+
statsdClient.tearDown(
39+
queue: serialQueue,
40+
timeout: timeout,
41+
completion: { [group] in
42+
group.leave()
7843
}
79-
}
44+
)
45+
46+
_ = group.wait(timeout: .now() + timeout)
8047
}
8148
}

Tests/IOTests/AppleEasyOutputStreamTests.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ final class AppleEasyOutputStreamTests: XCTestCase {
8787

8888
closeReadingEndAfterReadingSomeData(outputStreamProvider)
8989

90-
wait(for: [streamEndHandlerInvoked], timeout: 5)
90+
wait(for: streamEndHandlerInvoked, timeout: 5)
9191
stream.close()
9292
}
9393

@@ -110,7 +110,7 @@ final class AppleEasyOutputStreamTests: XCTestCase {
110110
try stream.enqueueWrite(data: data)
111111
)
112112

113-
wait(for: [streamErrorHandlerInvoked], timeout: 5)
113+
wait(for: streamErrorHandlerInvoked, timeout: 5)
114114
stream.close()
115115
}
116116

0 commit comments

Comments
 (0)