forked from EFEducationFirstMobile/librabbitmq-objc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAMQPExchange+Additions.m
132 lines (114 loc) · 4.94 KB
/
AMQPExchange+Additions.m
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//
// AMQPExchange+Additions.m
// Objective-C wrapper for librabbitmq-c
//
// Created by Pedro Gomes on 27/11/2012.
// Copyright (c) 2012 EF Education First. All rights reserved.
//
#import "AMQPExchange+Additions.h"
#import "AMQPChannel.h"
#import "AMQPQueue.h"
#import "AMQPConnection.h"
@implementation AMQPExchange(Additions)
- (void)publishMessage:(NSString *)body
messageID:(NSString *)messageID
usingRoutingKey:(NSString *)theRoutingKey
{
const amqp_basic_properties_t properties = (amqp_basic_properties_t){
.message_id = amqp_cstring_bytes([messageID UTF8String]),
};
amqp_basic_publish(self.channel.connection.internalConnection,
self.channel.internalChannel,
self.internalExchange,
amqp_cstring_bytes([theRoutingKey UTF8String]),
NO,
NO,
&properties,
amqp_cstring_bytes([body UTF8String]));
[self.channel.connection checkLastOperation:@"Failed to publish message"];
}
// TODO: we need to add support for appID -- we can use this for versioning
- (void)publishMessage:(NSString *)messageType
messageID:(NSString *)messageID
payload:(NSString *)body
usingRoutingKey:(NSString *)theRoutingKey
{
const amqp_basic_properties_t properties = (amqp_basic_properties_t) {
._flags = AMQP_BASIC_MESSAGE_ID_FLAG | AMQP_BASIC_TYPE_FLAG | AMQP_BASIC_CONTENT_TYPE_FLAG,
.type = amqp_cstring_bytes([messageType UTF8String]),
.message_id = amqp_cstring_bytes([messageID UTF8String]),
.content_type = amqp_cstring_bytes([@"t" UTF8String]),
};
amqp_basic_publish(self.channel.connection.internalConnection,
self.channel.internalChannel,
self.internalExchange,
amqp_cstring_bytes([theRoutingKey UTF8String]),
NO,
NO,
&properties,
amqp_cstring_bytes([body UTF8String]));
[self.channel.connection checkLastOperation:@"Failed to publish message"];
}
- (void)publishMessage:(NSString *)messageType
messageID:(NSString *)messageID
payloadData:(NSData *)body
usingRoutingKey:(NSString *)theRoutingKey
{
if (body.length == 0) {
NSLog(@"payload is empty!!!");
return;
}
const amqp_basic_properties_t properties = (amqp_basic_properties_t) {
._flags = AMQP_BASIC_MESSAGE_ID_FLAG | AMQP_BASIC_TYPE_FLAG | AMQP_BASIC_CONTENT_TYPE_FLAG,
.type = amqp_cstring_bytes([messageType UTF8String]),
.message_id = amqp_cstring_bytes([messageID UTF8String]),
.content_type = amqp_cstring_bytes([@"b" UTF8String]),
};
amqp_bytes_t amqp_bytes = amqp_bytes_malloc(body.length);
[body getBytes:amqp_bytes.bytes];
amqp_basic_publish(self.channel.connection.internalConnection,
self.channel.internalChannel,
self.internalExchange,
amqp_cstring_bytes([theRoutingKey UTF8String]),
NO,
NO,
&properties,
amqp_bytes);
amqp_bytes_free(amqp_bytes);
[self.channel.connection checkLastOperation:@"Failed to publish message"];
}
- (void)publishMessage:(NSString *)messageType
messageID:(NSString *)messageID
payloadData:(NSData *)body
usingRoutingKey:(NSString *)routingKey
correlationID:(NSString *)correlationID
callbackQueue:(NSString *)callbackQueue
{
amqp_basic_properties_t properties = (amqp_basic_properties_t) {
._flags = (AMQP_BASIC_MESSAGE_ID_FLAG |
AMQP_BASIC_TYPE_FLAG |
AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_CORRELATION_ID_FLAG),
.type = amqp_cstring_bytes([messageType UTF8String]),
.message_id = amqp_cstring_bytes([messageID UTF8String]),
.content_type = amqp_cstring_bytes([@"b" UTF8String]),
.correlation_id = amqp_cstring_bytes([correlationID UTF8String]),
};
if (callbackQueue) {
properties._flags |= AMQP_BASIC_REPLY_TO_FLAG;
properties.reply_to = amqp_cstring_bytes([callbackQueue UTF8String]);
}
amqp_bytes_t amqp_body = amqp_bytes_malloc(body.length);
[body getBytes:amqp_body.bytes];
amqp_basic_publish(self.channel.connection.internalConnection,
self.channel.internalChannel,
self.internalExchange,
amqp_cstring_bytes([routingKey UTF8String]),
NO,
NO,
&properties,
amqp_body);
amqp_bytes_free(amqp_body);
[self.channel.connection checkLastOperation:@"RPC call Invocation failed."];
}
@end