Skip to content

Commit 70b2d6e

Browse files
committed
Better handle ECONNRESET on connected datagram sockets. (#2979)
Motivation: When a connected datagram socket sends a datagram to a port or host that is not listening, an ICMP Destination Unreachable message may be returned. That message triggers an ECONNRESET to be produced at the socket layer. On Darwin we handled this well, but on Linux it turned out that this would push us into connection teardown and, eventually, into a crash. Not so good. To better handle this, we need to distinguish EPOLLERR from EPOLLHUP on datagram sockets. In these cases, we should check whether the socket error was fatal and, if it was not, we should continue our execution having fired the error down the pipeline. Modifications: Modify the selector code to distinguish reset and error. Add support for our channels to handle errors. Have most channels handle errors as resets. Override the logic for datagram channels to duplicate the logic in readable. Add a unit test. Result: Better datagrams for all. (cherry picked from commit 2a3a333)
1 parent 4015e33 commit 70b2d6e

12 files changed

+175
-59
lines changed

Sources/NIOPosix/BaseSocketChannel.swift

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,7 +1091,8 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
10911091
let result: Int32 = try self.socket.getOption(level: .socket, name: .so_error)
10921092
if result != 0 {
10931093
// we have a socket error, let's forward
1094-
// this path will be executed on Linux (EPOLLERR) & Darwin (ev.fflags != 0)
1094+
// this path will be executed on Linux (EPOLLERR) & Darwin (ev.fflags != 0) for
1095+
// stream sockets, and most (but not all) errors on datagram sockets
10951096
error = IOError(errnoCode: result, reason: "connection reset (error set)")
10961097
} else {
10971098
// we don't have a socket error, this must be connection reset without an error then
@@ -1209,6 +1210,14 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
12091210
true
12101211
}
12111212

1213+
/// Handles an error reported by the selector.
1214+
///
1215+
/// Default behaviour is to treat this as if it were a reset.
1216+
func error() -> ErrorResult {
1217+
self.reset()
1218+
return .fatal
1219+
}
1220+
12121221
internal final func updateCachedAddressesFromSocket(updateLocal: Bool = true, updateRemote: Bool = true) {
12131222
self.eventLoop.assertInEventLoop()
12141223
assert(updateLocal || updateRemote)
@@ -1331,7 +1340,7 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
13311340
// The initial set of interested events must not contain `.readEOF` because when connect doesn't return
13321341
// synchronously, kevent might send us a `readEOF` because the `writable` event that marks the connect as completed.
13331342
// See SocketChannelTest.testServerClosesTheConnectionImmediately for a regression test.
1334-
try self.safeRegister(interested: [.reset])
1343+
try self.safeRegister(interested: [.reset, .error])
13351344
self.lifecycleManager.finishRegistration()(nil, self.pipeline)
13361345
}
13371346

Sources/NIOPosix/PipeChannel.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,15 @@ final class PipeChannel: BaseStreamSocketChannel<PipePair> {
6868
if let inputFD = self.pipePair.inputFD {
6969
try selector.register(
7070
selectable: inputFD,
71-
interested: interested.intersection([.read, .reset]),
71+
interested: interested.intersection([.read, .reset, .error]),
7272
makeRegistration: self.registrationForInput
7373
)
7474
}
7575

7676
if let outputFD = self.pipePair.outputFD {
7777
try selector.register(
7878
selectable: outputFD,
79-
interested: interested.intersection([.write, .reset]),
79+
interested: interested.intersection([.write, .reset, .error]),
8080
makeRegistration: self.registrationForOutput
8181
)
8282
}
@@ -95,13 +95,13 @@ final class PipeChannel: BaseStreamSocketChannel<PipePair> {
9595
if let inputFD = self.pipePair.inputFD, inputFD.isOpen {
9696
try selector.reregister(
9797
selectable: inputFD,
98-
interested: interested.intersection([.read, .reset])
98+
interested: interested.intersection([.read, .reset, .error])
9999
)
100100
}
101101
if let outputFD = self.pipePair.outputFD, outputFD.isOpen {
102102
try selector.reregister(
103103
selectable: outputFD,
104-
interested: interested.intersection([.write, .reset])
104+
interested: interested.intersection([.write, .reset, .error])
105105
)
106106
}
107107
}

Sources/NIOPosix/SelectableChannel.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,17 @@ internal protocol SelectableChannel: Channel {
4444
/// Called when the `SelectableChannel` was reset (ie. is now unusable)
4545
func reset()
4646

47+
/// Called when the `SelectableChannel` had an error reported on the selector.
48+
func error() -> ErrorResult
49+
4750
func register(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws
4851

4952
func deregister(selector: Selector<NIORegistration>, mode: CloseMode) throws
5053

5154
func reregister(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws
5255
}
56+
57+
internal enum ErrorResult {
58+
case fatal
59+
case nonFatal
60+
}

Sources/NIOPosix/SelectableEventLoop.swift

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,18 @@ internal final class SelectableEventLoop: EventLoop {
443443
if ev.contains(.reset) {
444444
channel.reset()
445445
} else {
446+
if ev.contains(.error) {
447+
switch channel.error() {
448+
case .fatal:
449+
return
450+
case .nonFatal:
451+
break
452+
}
453+
454+
guard channel.isOpen else {
455+
return
456+
}
457+
}
446458
if ev.contains(.writeEOF) {
447459
channel.writeEOF()
448460

@@ -746,10 +758,10 @@ internal final class SelectableEventLoop: EventLoop {
746758
self.handleEvent(ev.io, channel: chan)
747759
case .pipeChannel(let chan, let direction):
748760
var ev = ev
749-
if ev.io.contains(.reset) {
750-
// .reset needs special treatment here because we're dealing with two separate pipes instead
761+
if ev.io.contains(.reset) || ev.io.contains(.error) {
762+
// .reset and .error needs special treatment here because we're dealing with two separate pipes instead
751763
// of one socket. So we turn .reset input .readEOF/.writeEOF.
752-
ev.io.subtract([.reset])
764+
ev.io.subtract([.reset, .error])
753765
ev.io.formUnion([direction == .input ? .readEOF : .writeEOF])
754766
}
755767
self.handleEvent(ev.io, channel: chan)

Sources/NIOPosix/SelectorEpoll.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ extension SelectorEventSet {
9090
if epollEvent.events & Epoll.EPOLLRDHUP != 0 {
9191
selectorEventSet.formUnion(.readEOF)
9292
}
93-
if epollEvent.events & Epoll.EPOLLHUP != 0 || epollEvent.events & Epoll.EPOLLERR != 0 {
93+
if epollEvent.events & Epoll.EPOLLERR != 0 {
94+
selectorEventSet.formUnion(.error)
95+
}
96+
if epollEvent.events & Epoll.EPOLLHUP != 0 {
9497
selectorEventSet.formUnion(.reset)
9598
}
9699
self = selectorEventSet

Sources/NIOPosix/SelectorGeneric.swift

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ struct SelectorEventSet: OptionSet, Equatable {
6262
/// of flags or to compare against spurious wakeups.
6363
static let _none = SelectorEventSet([])
6464

65-
/// Connection reset or other errors.
65+
/// Connection reset.
6666
static let reset = SelectorEventSet(rawValue: 1 << 0)
6767

6868
/// EOF at the read/input end of a `Selectable`.
@@ -79,6 +79,9 @@ struct SelectorEventSet: OptionSet, Equatable {
7979
/// - Note: This is rarely used because in many cases, there is no signal that this happened.
8080
static let writeEOF = SelectorEventSet(rawValue: 1 << 4)
8181

82+
/// Error encountered.
83+
static let error = SelectorEventSet(rawValue: 1 << 5)
84+
8285
init(rawValue: SelectorEventSet.RawValue) {
8386
self.rawValue = rawValue
8487
}
@@ -237,7 +240,7 @@ internal class Selector<R: Registration> {
237240
makeRegistration: (SelectorEventSet, SelectorRegistrationID) -> R
238241
) throws {
239242
assert(self.myThread == NIOThread.current)
240-
assert(interested.contains(.reset))
243+
assert(interested.contains([.reset, .error]))
241244
guard self.lifecycleState == .open else {
242245
throw IOError(errnoCode: EBADF, reason: "can't register on selector as it's \(self.lifecycleState).")
243246
}
@@ -265,7 +268,10 @@ internal class Selector<R: Registration> {
265268
guard self.lifecycleState == .open else {
266269
throw IOError(errnoCode: EBADF, reason: "can't re-register on selector as it's \(self.lifecycleState).")
267270
}
268-
assert(interested.contains(.reset), "must register for at least .reset but tried registering for \(interested)")
271+
assert(
272+
interested.contains([.reset, .error]),
273+
"must register for at least .reset & .error but tried registering for \(interested)"
274+
)
269275
try selectable.withUnsafeHandle { fd in
270276
var reg = registrations[Int(fd)]!
271277
try self.reregister0(

Sources/NIOPosix/SelectorKqueue.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ extension Selector: _SelectorBackendProtocol {
240240
) throws {
241241
try kqueueUpdateEventNotifications(
242242
selectable: selectable,
243-
interested: .reset,
243+
interested: [.reset, .error],
244244
oldInterested: oldInterested,
245245
registrationID: registrationID
246246
)

Sources/NIOPosix/SocketChannel.swift

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -840,10 +840,8 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
840840
#endif
841841
}
842842

843-
override func shouldCloseOnReadError(_ err: Error) -> Bool {
844-
guard let err = err as? IOError else { return true }
845-
846-
switch err.errnoCode {
843+
private func shouldCloseOnErrnoCode(_ errnoCode: CInt) -> Bool {
844+
switch errnoCode {
847845
// ECONNREFUSED can happen on linux if the previous sendto(...) failed.
848846
// See also:
849847
// - https://bugzilla.redhat.com/show_bug.cgi?id=1375
@@ -857,6 +855,31 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
857855
}
858856
}
859857

858+
override func shouldCloseOnReadError(_ err: Error) -> Bool {
859+
guard let err = err as? IOError else { return true }
860+
return self.shouldCloseOnErrnoCode(err.errnoCode)
861+
}
862+
863+
override func error() -> ErrorResult {
864+
// Assume we can get the error from the socket.
865+
do {
866+
let errnoCode: CInt = try self.socket.getOption(level: .socket, name: .so_error)
867+
if self.shouldCloseOnErrnoCode(errnoCode) {
868+
self.reset()
869+
return .fatal
870+
} else {
871+
self.pipeline.syncOperations.fireErrorCaught(
872+
IOError(errnoCode: errnoCode, reason: "so_error")
873+
)
874+
return .nonFatal
875+
}
876+
} catch {
877+
// Unknown error, fatal.
878+
self.reset()
879+
return .fatal
880+
}
881+
}
882+
860883
/// Buffer a write in preparation for a flush.
861884
///
862885
/// When the channel is unconnected, `data` _must_ be of type `AddressedEnvelope<ByteBuffer>`.

Tests/NIOPosixTests/DatagramChannelTests.swift

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,17 @@ extension Channel {
3434
}.wait()
3535
}
3636

37+
func waitForErrors(count: Int) throws -> [any Error] {
38+
try self.pipeline.context(name: "ByteReadRecorder").flatMap { context in
39+
if let future = (context.handler as? DatagramReadRecorder<ByteBuffer>)?.notifyForErrors(count) {
40+
return future
41+
}
42+
43+
XCTFail("Could not wait for errors")
44+
return self.eventLoop.makeSucceededFuture([])
45+
}.wait()
46+
}
47+
3748
func readCompleteCount() throws -> Int {
3849
try self.pipeline.context(name: "ByteReadRecorder").map { context in
3950
(context.handler as! DatagramReadRecorder<ByteBuffer>).readCompleteCount
@@ -66,10 +77,12 @@ final class DatagramReadRecorder<DataType>: ChannelInboundHandler {
6677
}
6778

6879
var reads: [AddressedEnvelope<DataType>] = []
80+
var errors: [any Error] = []
6981
var loop: EventLoop? = nil
7082
var state: State = .fresh
7183

7284
var readWaiters: [Int: EventLoopPromise<[AddressedEnvelope<DataType>]>] = [:]
85+
var errorWaiters: [Int: EventLoopPromise<[any Error]>] = [:]
7386
var readCompleteCount = 0
7487

7588
func channelRegistered(context: ChannelHandlerContext) {
@@ -95,6 +108,16 @@ final class DatagramReadRecorder<DataType>: ChannelInboundHandler {
95108
context.fireChannelRead(Self.wrapInboundOut(data))
96109
}
97110

111+
func errorCaught(context: ChannelHandlerContext, error: any Error) {
112+
self.errors.append(error)
113+
114+
if let promise = self.errorWaiters.removeValue(forKey: self.errors.count) {
115+
promise.succeed(self.errors)
116+
}
117+
118+
context.fireErrorCaught(error)
119+
}
120+
98121
func channelReadComplete(context: ChannelHandlerContext) {
99122
self.readCompleteCount += 1
100123
context.fireChannelReadComplete()
@@ -108,6 +131,15 @@ final class DatagramReadRecorder<DataType>: ChannelInboundHandler {
108131
readWaiters[count] = loop!.makePromise()
109132
return readWaiters[count]!.futureResult
110133
}
134+
135+
func notifyForErrors(_ count: Int) -> EventLoopFuture<[any Error]> {
136+
guard self.errors.count < count else {
137+
return self.loop!.makeSucceededFuture(.init(self.errors.prefix(count)))
138+
}
139+
140+
self.errorWaiters[count] = self.loop!.makePromise()
141+
return self.errorWaiters[count]!.futureResult
142+
}
111143
}
112144

113145
class DatagramChannelTests: XCTestCase {
@@ -1715,6 +1747,29 @@ class DatagramChannelTests: XCTestCase {
17151747
}
17161748
}
17171749

1750+
func testShutdownReadOnConnectedUDP() throws {
1751+
var buffer = self.firstChannel.allocator.buffer(capacity: 256)
1752+
buffer.writeStaticString("hello, world!")
1753+
1754+
// Connect and write
1755+
XCTAssertNoThrow(try self.firstChannel.connect(to: self.secondChannel.localAddress!).wait())
1756+
1757+
let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer)
1758+
XCTAssertNoThrow(try self.firstChannel.writeAndFlush(writeData).wait())
1759+
_ = try self.secondChannel.waitForDatagrams(count: 1)
1760+
1761+
// Ok, close on the second channel.
1762+
XCTAssertNoThrow(try self.secondChannel.close(mode: .all).wait())
1763+
print("closed")
1764+
1765+
// Write again.
1766+
XCTAssertNoThrow(try self.firstChannel.writeAndFlush(writeData).wait())
1767+
1768+
// This should trigger an error.
1769+
let errors = try self.firstChannel.waitForErrors(count: 1)
1770+
XCTAssertEqual((errors[0] as? IOError)?.errnoCode, ECONNREFUSED)
1771+
}
1772+
17181773
private func hasGoodGROSupport() throws -> Bool {
17191774
// Source code for UDP_GRO was added in Linux 5.0. However, this support is somewhat limited
17201775
// and some sources indicate support was actually added in 5.10 (perhaps more widely

0 commit comments

Comments
 (0)