19
19
20
20
#import < unistd.h>
21
21
#import < netinet/tcp.h>
22
-
23
22
#import < sys/socket.h>
24
23
24
+ #import " amqp.h"
25
+ #import " amqp_tcp_socket.h"
26
+ #import " amqp_socket.h"
27
+
25
28
#import " AMQPConnection.h"
29
+
26
30
#import " AMQPChannel.h"
27
31
28
32
NSString *const kAMQPConnectionException = @" AMQPConnectionException" ;
29
33
NSString *const kAMQPLoginException = @" AMQPLoginException" ;
30
34
NSString *const kAMQPOperationException = @" AMQPException" ;
31
35
32
- @interface AMQPConnection ()
33
-
34
- @property (assign , readwrite ) amqp_connection_state_t internalConnection;
35
-
36
- @end
37
-
38
36
@implementation AMQPConnection
39
37
{
40
- int _socketFD;
41
- unsigned int _nextChannel;
38
+ amqp_connection_state_t _internalConnection;
39
+ amqp_socket_t *_socket;
40
+
41
+ amqp_channel_t _nextChannel;
42
42
}
43
43
44
- - (id )init
44
+ - (instancetype )init
45
45
{
46
46
if ((self = [super init ])) {
47
47
_internalConnection = amqp_new_connection ();
48
- _socketFD = 0 ;
48
+ if (!_internalConnection) {
49
+ [NSException raise :kAMQPConnectionException format: @" Unable to create a new AMQP connection" ];
50
+ }
51
+ _socket = NULL ;
52
+
49
53
_nextChannel = 1 ;
50
54
}
51
55
@@ -54,25 +58,50 @@ - (id)init
54
58
55
59
- (void )dealloc
56
60
{
57
- // this was commented by pdcgomes on 23 January 2013 in [bab486a], to verify
58
- // [self disconnect];
59
-
61
+ if (_socket) {
62
+ [self disconnect ];
63
+ }
64
+
60
65
amqp_destroy_connection (_internalConnection);
61
66
}
62
67
63
68
- (void )connectToHost : (NSString *)host onPort : (int )port
64
69
{
65
- _socketFD = amqp_open_socket ([host UTF8String ], port);
66
- fcntl (_socketFD, F_SETFL, O_NONBLOCK);
67
- fcntl (_socketFD, F_SETFL, O_ASYNC);
68
- fcntl (_socketFD, F_SETNOSIGPIPE, 1 );
69
-
70
- if (_socketFD < 0 ) {
71
- _socketFD = 0 ;
72
- [NSException raise :kAMQPConnectionException format: @" Unable to open socket to host %@ on port %d " , host, port];
70
+ const __darwin_time_t kSocketOpenTimeout = 30 ;
71
+
72
+ struct timeval *timeout = malloc (sizeof (struct timeval ));
73
+ if (!timeout) {
74
+ [NSException raise :kAMQPConnectionException format: @" Out of memory" ];
75
+ }
76
+ timeout->tv_sec = kSocketOpenTimeout ;
77
+
78
+ _socket = amqp_tcp_socket_new (_internalConnection);
79
+ if (!_socket) {
80
+ _socket = NULL ;
81
+ [NSException raise :kAMQPConnectionException format: @" Unable to create a TCP socket" ];
73
82
}
74
83
75
- amqp_set_sockfd (_internalConnection, _socketFD);
84
+ // If necessary: set socket properties here (_socketFD should be a property in that case) (dmakarenko 14.08.2014)
85
+ // This function must not be used in conjunction with amqp_socket_open(), i.e.
86
+ // the socket connection should already be open(2) when this function is
87
+ // called.
88
+
89
+ // int _socketFD = open([host UTF8String], port);
90
+ // fcntl(_socketFD, F_SETFL, O_NONBLOCK);
91
+ // fcntl(_socketFD, F_SETFL, O_ASYNC);
92
+ // fcntl(_socketFD, F_SETNOSIGPIPE, 1);
93
+ //
94
+ // if (_socketFD < 0) {
95
+ // _socketFD = 0;
96
+ // [NSException raise:kAMQPConnectionException format:@"Unable to open socket to host %@ on port %d", host, port];
97
+ // }
98
+ // amqp_tcp_socket_set_sockfd(_socket, _socketFD);
99
+
100
+
101
+ int status = amqp_socket_open_noblock (_socket, [host UTF8String ], port, timeout);
102
+ if (status != AMQP_STATUS_OK) {
103
+ [NSException raise :kAMQPConnectionException format: @" Unable to open a TCP socket to host %@ on port %d . Error: %@ (%d )" , host, port, [NSString stringWithUTF8String: amqp_error_string2 (status)], status];
104
+ }
76
105
}
77
106
78
107
- (void )loginAsUser : (NSString *)username withPassword : (NSString *)password onVHost : (NSString *)vhost
@@ -86,16 +115,17 @@ - (void)loginAsUser:(NSString *)username withPassword:(NSString *)password onVHo
86
115
87
116
- (void )disconnect
88
117
{
89
- if (_socketFD <= 0 ) {
118
+ if (!_socket ) {
90
119
[NSException raise :kAMQPConnectionException format: @" Unable to disconnect from host: this instance of AMQPConnection has not been connected yet or the connection previously failed." ];
91
120
}
92
121
93
122
amqp_rpc_reply_t reply = amqp_connection_close (_internalConnection, AMQP_REPLY_SUCCESS);
94
- close (_socketFD);
95
-
123
+
96
124
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
97
125
[NSException raise :kAMQPConnectionException format: @" Unable to disconnect from host: %@ " , [self errorDescriptionForReply: reply]];
98
126
}
127
+
128
+ _socket = NULL ;
99
129
}
100
130
101
131
- (void )checkLastOperation : (NSString *)context
@@ -111,7 +141,7 @@ - (AMQPChannel *)openChannel
111
141
{
112
142
AMQPChannel *channel = [[AMQPChannel alloc ] init ];
113
143
[channel openChannel: _nextChannel onConnection: self ];
114
-
144
+
115
145
_nextChannel++;
116
146
117
147
return channel;
0 commit comments