Skip to content

Commit f12f0f8

Browse files
committed
Handle broker-initiated channel.close
Also, back-fill integration test for client-initiated channel.close. Issue #21 [#119621443]
1 parent 1ffd2eb commit f12f0f8

File tree

3 files changed

+118
-69
lines changed

3 files changed

+118
-69
lines changed

RMQClient/RMQSuspendResumeDispatcher.m

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
#import "RMQSuspendResumeDispatcher.h"
22
#import "RMQErrors.h"
33

4+
typedef NS_ENUM(NSUInteger, DispatcherState) {
5+
DispatcherStateOpen = 1,
6+
DispatcherStateClosedByClient,
7+
DispatcherStateClosedByServer,
8+
};
9+
410
@interface RMQSuspendResumeDispatcher ()
511
@property (nonatomic, readwrite) id<RMQChannel> channel;
612
@property (nonatomic, readwrite) id<RMQSender> sender;
713
@property (nonatomic, readwrite) RMQFramesetValidator *validator;
814
@property (nonatomic, readwrite) id<RMQLocalSerialQueue> commandQueue;
915
@property (nonatomic, readwrite) id<RMQConnectionDelegate> delegate;
10-
@property (nonatomic, readwrite) BOOL channelCloseRequested;
16+
@property (nonatomic, readwrite) DispatcherState state;
1117
@end
1218

1319
@implementation RMQSuspendResumeDispatcher
@@ -20,7 +26,7 @@ - (instancetype)initWithSender:(id<RMQSender>)sender
2026
self.sender = sender;
2127
self.validator = [RMQFramesetValidator new];
2228
self.commandQueue = commandQueue;
23-
self.channelCloseRequested = NO;
29+
self.state = DispatcherStateOpen;
2430
}
2531
return self;
2632
}
@@ -34,7 +40,7 @@ - (void)activateWithChannel:(id<RMQChannel>)channel
3440

3541
- (void)blockingWaitOn:(Class)method {
3642
[self.commandQueue blockingEnqueue:^{
37-
[self checkNotAlreadyClosed:^{
43+
[self handleClosure:^{
3844
[self.commandQueue suspend];
3945
}];
4046
}];
@@ -50,9 +56,9 @@ - (void)blockingWaitOn:(Class)method {
5056
- (void)sendSyncMethod:(id<RMQMethod>)method
5157
completionHandler:(void (^)(RMQFramesetValidationResult *result))completionHandler {
5258
[self.commandQueue enqueue:^{
53-
[self checkNotAlreadyClosed:^{
59+
[self handleClosure:^{
5460
if ([method isKindOfClass:[RMQChannelClose class]]) {
55-
self.channelCloseRequested = YES;
61+
self.state = DispatcherStateClosedByClient;
5662
}
5763

5864
RMQFrameset *outgoingFrameset = [[RMQFrameset alloc] initWithChannelNumber:self.channelNumber
@@ -63,13 +69,11 @@ - (void)sendSyncMethod:(id<RMQMethod>)method
6369
}];
6470

6571
[self.commandQueue enqueue:^{
66-
if (!self.channelCloseRequested) {
67-
RMQFramesetValidationResult *result = [self.validator expect:method.syncResponse];
68-
if (result.error) {
69-
[self.delegate channel:self.channel error:result.error];
70-
} else {
71-
completionHandler(result);
72-
}
72+
RMQFramesetValidationResult *result = [self.validator expect:method.syncResponse];
73+
if (self.state == DispatcherStateOpen && result.error) {
74+
[self.delegate channel:self.channel error:result.error];
75+
} else if (self.state == DispatcherStateOpen) {
76+
completionHandler(result);
7377
}
7478
}];
7579
}
@@ -81,7 +85,7 @@ - (void)sendSyncMethod:(id<RMQMethod>)method {
8185

8286
- (void)sendSyncMethodBlocking:(id<RMQMethod>)method {
8387
[self.commandQueue blockingEnqueue:^{
84-
[self checkNotAlreadyClosed:^{
88+
[self handleClosure:^{
8589
RMQFrameset *frameset = [[RMQFrameset alloc] initWithChannelNumber:self.channelNumber method:method];
8690
[self.commandQueue suspend];
8791
[self.sender sendFrameset:frameset];
@@ -98,7 +102,7 @@ - (void)sendSyncMethodBlocking:(id<RMQMethod>)method {
98102

99103
- (void)sendAsyncFrameset:(RMQFrameset *)frameset {
100104
[self.commandQueue enqueue:^{
101-
[self checkNotAlreadyClosed:^{
105+
[self handleClosure:^{
102106
[self.sender sendFrameset:frameset];
103107
}];
104108
}];
@@ -109,7 +113,13 @@ - (void)sendAsyncMethod:(id<RMQMethod>)method {
109113
}
110114

111115
- (void)handleFrameset:(RMQFrameset *)frameset {
112-
[self.validator fulfill:frameset];
116+
if (self.state != DispatcherStateClosedByServer && [frameset.method isKindOfClass:[RMQChannelClose class]]) {
117+
self.state = DispatcherStateClosedByServer;
118+
[self.sender sendFrameset:[[RMQFrameset alloc] initWithChannelNumber:self.channelNumber
119+
method:[RMQChannelCloseOk new]]];
120+
} else if (self.state != DispatcherStateClosedByServer) {
121+
[self.validator fulfill:frameset];
122+
}
113123
[self.commandQueue resume];
114124
}
115125

@@ -119,8 +129,8 @@ - (NSNumber *)channelNumber {
119129
return self.channel.channelNumber;
120130
}
121131

122-
- (void)checkNotAlreadyClosed:(void (^)())operation {
123-
if (self.channelCloseRequested) {
132+
- (void)handleClosure:(void (^)())operation {
133+
if (self.state != DispatcherStateOpen) {
124134
[self sendChannelClosedError];
125135
} else {
126136
operation();

RMQClientIntegrationTests/IntegrationTests.swift

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,4 +200,39 @@ class IntegrationTests: XCTestCase {
200200
)
201201
}
202202

203+
func testClientChannelCloseCausesFutureOperationsToFail() {
204+
let delegate = ConnectionDelegateSpy()
205+
let conn = RMQConnection(delegate: delegate)
206+
conn.start()
207+
let ch = conn.createChannel()
208+
209+
ch.close()
210+
211+
XCTAssert(
212+
TestHelper.pollUntil {
213+
ch.basicQos(1, global: false)
214+
return delegate.lastChannelError?.code == RMQError.ChannelClosed.rawValue
215+
}
216+
)
217+
}
218+
219+
func testServerChannelCloseCausesFutureOperationsToFail() {
220+
let delegate = ConnectionDelegateSpy()
221+
let conn = RMQConnection(delegate: delegate)
222+
conn.start()
223+
let ch = conn.createChannel()
224+
225+
causeServerChannelClose(ch)
226+
227+
XCTAssert(
228+
TestHelper.pollUntil {
229+
ch.basicQos(1, global: false)
230+
return delegate.lastChannelError?.code == RMQError.ChannelClosed.rawValue
231+
}
232+
)
233+
}
234+
235+
private func causeServerChannelClose(ch: RMQChannel) {
236+
ch.basicPublish("", routingKey: "a route that can't be found", exchange: "a non-existent exchange", persistent: false)
237+
}
203238
}

RMQClientTests/RMQSuspendResumeDispatcherTest.swift

Lines changed: 56 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,7 @@ class RMQSuspendResumeDispatcherTest: XCTestCase {
1111
}
1212

1313
func testSyncMethodsSentToSender() {
14-
let q = FakeSerialQueue()
15-
let sender = SenderSpy()
16-
let dispatcher = RMQSuspendResumeDispatcher(sender: sender, commandQueue: q)
17-
let ch = RMQAllocatedChannel(123, contentBodySize: 1, dispatcher: dispatcher, commandQueue: q)
18-
dispatcher.activateWithChannel(ch, delegate: nil)
14+
let (dispatcher, q, sender, _, _) = setupActivated()
1915

2016
dispatcher.sendSyncMethod(MethodFixtures.basicGet())
2117

@@ -26,36 +22,22 @@ class RMQSuspendResumeDispatcherTest: XCTestCase {
2622
}
2723

2824
func testSyncMethodFailureSendsErrorToDelegate() {
29-
let q = FakeSerialQueue()
30-
let sender = SenderSpy()
31-
let delegate = ConnectionDelegateSpy()
32-
let dispatcher = RMQSuspendResumeDispatcher(sender: sender, commandQueue: q)
33-
let ch = RMQAllocatedChannel(123, contentBodySize: 1, dispatcher: dispatcher, commandQueue: q)
34-
dispatcher.activateWithChannel(ch, delegate: delegate)
25+
let (dispatcher, q, _, delegate, _) = setupActivated()
3526

3627
dispatcher.sendSyncMethod(MethodFixtures.basicGet())
37-
3828
try! q.step()
3929

4030
dispatcher.handleFrameset(RMQFrameset(channelNumber: 123, method: MethodFixtures.basicQosOk()))
41-
4231
try! q.step()
4332

4433
XCTAssertEqual(RMQError.ChannelIncorrectSyncMethod.rawValue, delegate.lastChannelError!.code)
4534
}
4635

4736
func testBlockingSyncMethodsSentToSender() {
48-
let q = FakeSerialQueue()
49-
let sender = SenderSpy()
50-
let dispatcher = RMQSuspendResumeDispatcher(sender: sender, commandQueue: q)
51-
let delegate = ConnectionDelegateSpy()
52-
let ch = RMQAllocatedChannel(123, contentBodySize: 1, dispatcher: dispatcher, commandQueue: q)
53-
dispatcher.activateWithChannel(ch, delegate: delegate)
37+
let (dispatcher, q, sender, delegate, ch) = setupActivated()
5438

5539
dispatcher.sendSyncMethodBlocking(MethodFixtures.basicGet())
56-
5740
XCTAssertEqual(2, q.blockingItems.count)
58-
5941
try! q.step()
6042

6143
let expectedFrameset = RMQFrameset(channelNumber: 123, method: MethodFixtures.basicGet())
@@ -68,12 +50,7 @@ class RMQSuspendResumeDispatcherTest: XCTestCase {
6850
}
6951

7052
func testBlockingErrorsSentToDelegate() {
71-
let q = FakeSerialQueue()
72-
let sender = SenderSpy()
73-
let dispatcher = RMQSuspendResumeDispatcher(sender: sender, commandQueue: q)
74-
let ch = RMQAllocatedChannel(123, contentBodySize: 1, dispatcher: dispatcher, commandQueue: q)
75-
let delegate = ConnectionDelegateSpy()
76-
dispatcher.activateWithChannel(ch, delegate: delegate)
53+
let (dispatcher, q, _, delegate, _) = setupActivated()
7754

7855
dispatcher.sendSyncMethodBlocking(MethodFixtures.basicGet())
7956

@@ -86,12 +63,7 @@ class RMQSuspendResumeDispatcherTest: XCTestCase {
8663
}
8764

8865
func testAsyncMethodSendsFrameset() {
89-
let q = FakeSerialQueue()
90-
let sender = SenderSpy()
91-
let delegate = ConnectionDelegateSpy()
92-
let dispatcher = RMQSuspendResumeDispatcher(sender: sender, commandQueue: q)
93-
let ch = RMQAllocatedChannel(123, contentBodySize: 1, dispatcher: dispatcher, commandQueue: q)
94-
dispatcher.activateWithChannel(ch, delegate: delegate)
66+
let (dispatcher, q, sender, _, _) = setupActivated()
9567

9668
dispatcher.sendAsyncMethod(MethodFixtures.channelOpen())
9769

@@ -102,12 +74,7 @@ class RMQSuspendResumeDispatcherTest: XCTestCase {
10274
}
10375

10476
func testAsyncFramesetSendsFrameset() {
105-
let q = FakeSerialQueue()
106-
let sender = SenderSpy()
107-
let delegate = ConnectionDelegateSpy()
108-
let dispatcher = RMQSuspendResumeDispatcher(sender: sender, commandQueue: q)
109-
let ch = RMQAllocatedChannel(123, contentBodySize: 1, dispatcher: dispatcher, commandQueue: q)
110-
dispatcher.activateWithChannel(ch, delegate: delegate)
77+
let (dispatcher, q, sender, _, _) = setupActivated()
11178

11279
let frameset = RMQFrameset(channelNumber: 123, method: MethodFixtures.channelClose())
11380
dispatcher.sendAsyncFrameset(frameset)
@@ -117,9 +84,9 @@ class RMQSuspendResumeDispatcherTest: XCTestCase {
11784
XCTAssertEqual(frameset, sender.sentFramesets.last!)
11885
}
11986

120-
// MARK: After-close tests
87+
// MARK: Client close tests
12188

122-
func testFutureBlockingWaitOnProducesErrorAfterClose() {
89+
func testFutureBlockingWaitOnProducesErrorAfterClientClose() {
12390
let (dispatcher, q, delegate) = setUpAfterCloseTest()
12491
dispatcher.handleFrameset(RMQFrameset(channelNumber: 123, method: MethodFixtures.channelCloseOk()))
12592
try! q.step()
@@ -133,7 +100,7 @@ class RMQSuspendResumeDispatcherTest: XCTestCase {
133100
"Didn't receive correct error\nGot: \(delegate.lastChannelError)")
134101
}
135102

136-
func testFutureSyncMethodBlockingProducesErrorAfterClose() {
103+
func testFutureSyncMethodBlockingProducesErrorAfterClientClose() {
137104
let (dispatcher, q, delegate) = setUpAfterCloseTest()
138105
dispatcher.handleFrameset(RMQFrameset(channelNumber: 123, method: MethodFixtures.channelCloseOk()))
139106
try! q.step()
@@ -147,7 +114,7 @@ class RMQSuspendResumeDispatcherTest: XCTestCase {
147114
"Didn't receive correct error\nGot: \(delegate.lastChannelError)")
148115
}
149116

150-
func testFutureSyncMethodProducesErrorAfterClose() {
117+
func testFutureSyncMethodProducesErrorAfterClientClose() {
151118
let (dispatcher, q, delegate) = setUpAfterCloseTest()
152119
dispatcher.handleFrameset(RMQFrameset(channelNumber: 123, method: MethodFixtures.channelCloseOk()))
153120
try! q.step()
@@ -161,7 +128,7 @@ class RMQSuspendResumeDispatcherTest: XCTestCase {
161128
"Didn't receive correct error\nGot: \(delegate.lastChannelError)")
162129
}
163130

164-
func testSendAsyncFramesetProducesErrorAfterClose() {
131+
func testSendAsyncFramesetProducesErrorAfterClientClose() {
165132
let (dispatcher, q, delegate) = setUpAfterCloseTest()
166133
dispatcher.handleFrameset(RMQFrameset(channelNumber: 123, method: MethodFixtures.channelCloseOk()))
167134
try! q.step()
@@ -201,12 +168,7 @@ class RMQSuspendResumeDispatcherTest: XCTestCase {
201168
}
202169

203170
func testCloseDoesNotCauseErrorIfNotTheFirstOperation() {
204-
let q = FakeSerialQueue()
205-
let sender = SenderSpy()
206-
let delegate = ConnectionDelegateSpy()
207-
let dispatcher = RMQSuspendResumeDispatcher(sender: sender, commandQueue: q)
208-
let ch = RMQAllocatedChannel(123, contentBodySize: 1, dispatcher: dispatcher, commandQueue: q)
209-
dispatcher.activateWithChannel(ch, delegate: delegate)
171+
let (dispatcher, q, _, delegate, _) = setupActivated()
210172

211173
dispatcher.sendSyncMethod(MethodFixtures.channelOpen())
212174
dispatcher.sendSyncMethod(MethodFixtures.channelClose())
@@ -222,19 +184,61 @@ class RMQSuspendResumeDispatcherTest: XCTestCase {
222184
XCTAssertNil(delegate.lastChannelError)
223185
}
224186

187+
// MARK: Server close tests
188+
189+
func testServerCloseCausesCloseOkToBeSentInResponse() {
190+
let (dispatcher, _, sender, _, _) = setupActivated()
191+
dispatcher.handleFrameset(RMQFrameset(channelNumber: 123, method: MethodFixtures.channelClose()))
192+
XCTAssertEqual(RMQFrameset(channelNumber: 123, method: MethodFixtures.channelCloseOk()),
193+
sender.sentFramesets.last)
194+
}
195+
196+
func testServerCloseStopsFutureConsumersFromTriggering() {
197+
let (dispatcher, q, _, _, _) = setupActivated()
198+
var called = false
199+
dispatcher.sendSyncMethod(MethodFixtures.basicConsume("", consumerTag: "", options: [])) { result in
200+
called = true
201+
}
202+
dispatcher.handleFrameset(RMQFrameset(channelNumber: 123, method: MethodFixtures.channelClose()))
203+
204+
try! q.step() // send basic.consume
205+
dispatcher.handleFrameset(RMQFrameset(channelNumber: 123, method: MethodFixtures.basicConsumeOk("")))
206+
try! q.step() // run basic.consume-ok response block
207+
XCTAssertFalse(called)
208+
}
209+
210+
func testServerCloseTriggersErrorsForFutureOperations() {
211+
let (dispatcher, q, _, delegate, _) = setupActivated()
212+
dispatcher.handleFrameset(RMQFrameset(channelNumber: 123, method: MethodFixtures.channelClose()))
213+
dispatcher.sendSyncMethod(MethodFixtures.basicGet())
214+
XCTAssertNil(delegate.lastChannelError)
215+
try! q.step()
216+
XCTAssertEqual(RMQError.ChannelClosed.rawValue, delegate.lastChannelError?.code)
217+
}
218+
219+
func testServerCloseResumesCommandQueueToAllowErrorsToPropagate() {
220+
let (dispatcher, q, _, _, _) = setupActivated()
221+
q.suspend()
222+
dispatcher.handleFrameset(RMQFrameset(channelNumber: 123, method: MethodFixtures.channelClose()))
223+
XCTAssertFalse(q.suspended)
224+
}
225+
225226
// MARK: Helpers
226227

227-
func setUpAfterCloseTest() -> (dispatcher: RMQSuspendResumeDispatcher, q: FakeSerialQueue, delegate: ConnectionDelegateSpy) {
228+
func setupActivated() -> (dispatcher: RMQSuspendResumeDispatcher, q: FakeSerialQueue, sender: SenderSpy, delegate: ConnectionDelegateSpy, ch: RMQAllocatedChannel) {
228229
let q = FakeSerialQueue()
229230
let sender = SenderSpy()
230231
let delegate = ConnectionDelegateSpy()
231232
let dispatcher = RMQSuspendResumeDispatcher(sender: sender, commandQueue: q)
232233
let ch = RMQAllocatedChannel(123, contentBodySize: 1, dispatcher: dispatcher, commandQueue: q)
233234
dispatcher.activateWithChannel(ch, delegate: delegate)
235+
return (dispatcher, q, sender, delegate, ch)
236+
}
234237

238+
func setUpAfterCloseTest() -> (dispatcher: RMQSuspendResumeDispatcher, q: FakeSerialQueue, delegate: ConnectionDelegateSpy) {
239+
let (dispatcher, q, _, delegate, _) = setupActivated()
235240
dispatcher.sendSyncMethod(MethodFixtures.channelClose())
236241
try! q.step()
237-
238242
return (dispatcher, q, delegate)
239243
}
240244

0 commit comments

Comments
 (0)