-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathKafkaError.swift
280 lines (242 loc) · 8.51 KB
/
KafkaError.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
//===----------------------------------------------------------------------===//
//
// This source file is part of the swift-kafka-client open source project
//
// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Crdkafka
/// An error that can occur on `Kafka` operations
///
/// - Note: `Hashable` conformance only considers the ``KafkaError/code``.
public struct KafkaError: Error, CustomStringConvertible, @unchecked Sendable {
// Note: @unchecked because we use a backing class for storage (copy-on-write).
private var backing: Backing
/// Represents the kind of error that was encountered.
public var code: ErrorCode {
get {
self.backing.code
}
set {
self.makeUnique()
self.backing.code = newValue
}
}
private var reason: String {
self.backing.reason
}
private var file: String {
self.backing.file
}
private var line: UInt {
self.backing.line
}
public var description: String {
"KafkaError.\(self.code): \(self.reason) \(self.file):\(self.line)"
}
private mutating func makeUnique() {
if !isKnownUniquelyReferenced(&self.backing) {
self.backing = self.backing.copy()
}
}
static func rdKafkaError(
wrapping error: rd_kafka_resp_err_t, isFatal: Bool = false, file: String = #fileID, line: UInt = #line
) -> KafkaError {
let errorMessage = String(cString: rd_kafka_err2str(error))
return KafkaError(
backing: .init(
code: .underlying, reason: errorMessage, file: file, line: line
)
)
}
static func config(
reason: String, file: String = #fileID, line: UInt = #line
) -> KafkaError {
return KafkaError(
backing: .init(
code: .config, reason: reason, file: file, line: line
)
)
}
static func topicConfig(
reason: String, file: String = #fileID, line: UInt = #line
) -> KafkaError {
return KafkaError(
backing: .init(
code: .topicConfig, reason: reason, file: file, line: line
)
)
}
static func client(
reason: String, file: String = #fileID, line: UInt = #line
) -> KafkaError {
return KafkaError(
backing: .init(
code: .connectionFailed, reason: reason, file: file, line: line
)
)
}
static func connectionClosed(
reason: String, file: String = #fileID, line: UInt = #line
) -> KafkaError {
return KafkaError(
backing: .init(
code: .shutdown, reason: reason, file: file, line: line
)
)
}
static func messageConsumption(
reason: String, file: String = #fileID, line: UInt = #line
) -> KafkaError {
return KafkaError(
backing: .init(
code: .messageConsumptionFailed, reason: reason, file: file, line: line
)
)
}
static func topicCreation(
reason: String, file: String = #fileID, line: UInt = #line
) -> KafkaError {
return KafkaError(
backing: .init(
code: .topicCreationFailed, reason: reason, file: file, line: line
)
)
}
static func topicDeletion(
reason: String, file: String = #fileID, line: UInt = #line
) -> KafkaError {
return KafkaError(
backing: .init(
code: .topicDeletionFailed, reason: reason, file: file, line: line
)
)
}
static func transactionAborted(
reason: String, file: String = #fileID, line: UInt = #line
) -> KafkaError {
return KafkaError(
backing: .init(
code: .transactionAborted, reason: reason, file: file, line: line
)
)
}
static func transactionIncomplete(
reason: String, file: String = #fileID, line: UInt = #line
) -> KafkaError {
return KafkaError(
backing: .init(
code: .transactionIncomplete, reason: reason, file: file, line: line
)
)
}
static func transactionOutOfAttempts(
numOfAttempts: UInt64, file: String = #fileID, line: UInt = #line
) -> KafkaError {
return KafkaError(
backing: .init(
code: .transactionOutOfAttempts, reason: "Out of \(numOfAttempts) attempts", file: file, line: line
)
)
}
}
extension KafkaError {
/// Represents the kind of error.
///
/// The same error may be thrown from more than one place for more than one reason.
/// This type represents only a relatively high-level error:
/// use the string representation of ``KafkaError`` to get more details about the specific cause.
public struct ErrorCode: Hashable, Sendable, CustomStringConvertible {
fileprivate enum BackingCode {
case rdKafkaError
case config
case topicConfig
case connectionClosed
case client
case messageConsumption
case topicCreation
case topicDeletion
case transactionAborted
case transactionIncomplete
case notInTransaction // FIXME: maybe add subcode ?
case transactionOutOfAttempts
}
fileprivate var backingCode: BackingCode
fileprivate init(_ backingCode: BackingCode) {
self.backingCode = backingCode
}
/// Errors caused in the underlying transport.
public static let underlying = ErrorCode(.rdKafkaError)
/// There is an error in the Kafka client configuration.
public static let config = ErrorCode(.config)
/// There is an error in the Kafka topic configuration.
public static let topicConfig = ErrorCode(.topicConfig)
/// The Kafka connection is already shutdown.
public static let shutdown = ErrorCode(.connectionClosed)
/// Establishing a connection to Kafka failed.
public static let connectionFailed = ErrorCode(.client)
/// Consuming a message failed.
public static let messageConsumptionFailed = ErrorCode(.messageConsumption)
/// Creating a topic failed.
public static let topicCreationFailed = ErrorCode(.topicCreation)
/// Deleting a topic failed.
public static let topicDeletionFailed = ErrorCode(.topicDeletion)
/// Transaction was aborted (can be re-tried from scratch).
public static let transactionAborted = ErrorCode(.transactionAborted)
/// Transaction could not be completed
public static let transactionIncomplete = ErrorCode(.transactionIncomplete)
/// Out of provided number of attempts
public static let transactionOutOfAttempts = ErrorCode(.transactionOutOfAttempts)
public var description: String {
return String(describing: self.backingCode)
}
}
}
// MARK: - KafkaError + Backing
extension KafkaError {
final class Backing: Hashable {
var code: KafkaError.ErrorCode
let reason: String
let file: String
let line: UInt
let isFatal: Bool
fileprivate init(
code: KafkaError.ErrorCode,
reason: String,
file: String,
line: UInt,
isFatal: Bool = false
) {
self.code = code
self.reason = reason
self.file = file
self.line = line
self.isFatal = isFatal
}
// Only the error code matters for equality.
static func == (lhs: Backing, rhs: Backing) -> Bool {
return lhs.code == rhs.code
}
func hash(into hasher: inout Hasher) {
hasher.combine(self.code)
}
fileprivate func copy() -> Backing {
return Backing(code: self.code, reason: self.reason, file: self.file, line: self.line)
}
}
}
// MARK: - KafkaError + Hashable
extension KafkaError: Hashable {
public static func == (lhs: KafkaError, rhs: KafkaError) -> Bool {
return lhs.backing == rhs.backing
}
public func hash(into hasher: inout Hasher) {
hasher.combine(self.backing)
}
}