-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmsg.h
204 lines (154 loc) · 5.86 KB
/
msg.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
/* Primitives for supporting dataflow execution */
/* Constants */
/* Default port for controller */
#define CPORT 6616
/* Enumeration of different message types */
enum {
MSG_OPERATION, /* Dataflow operation */
MSG_OPERAND, /* Dataflow operand */
/* From client, router, or worker to controller */
MSG_REGISTER_ROUTER,
MSG_REGISTER_CLIENT,
MSG_REGISTER_WORKER,
/* From controller to client or worker */
MSG_ACK_AGENT,
/* From client or worker to router */
MSG_REGISTER_AGENT,
/* From worker back to controller */
MSG_READY_WORKER,
MSG_STAT,
/* From controller to router, worker, or client */
MSG_DO_FLUSH,
MSG_KILL,
/* Negative acknowledgement */
MSG_NACK,
/* Global operations */
/* Initiated by client */
MSG_CLIOP_DATA,
MSG_CLIOP_ACK,
/* Intiated by controller */
MSG_GC_REQUEST,
MSG_GC_START,
MSG_GC_FINISH
};
/**********************************************************
Fields and formats
Basic units (all sizes given in bytes)
Agent: 2
Message sequence number: 2+
Operation opcode: 1
Message code: 2
Port: 2
IP Address: 4
** Format of all messages **
Most messages have single-word headers.
Only operation and operands have two-word headers.
Single-word header formats
Word count: 2
Worker count: 2
Generation count: 4
Composite quantities. All listed from MSB to least:
Operator ID: 4. Combines Agent + Sequence Number
Operand ID: 5. Operator ID (4) + Offset (1)
Node ID: 6. Port (2) + IP Address (4)
Agent Map: 8. Agent (2) + Node ID (6)
*** Double word header formats ***
**********************************************************/
/** Constants **/
/* Operator header has three words:
two for control information and one for valid mask */
#define OP_HEADER_CNT 3
/* Use of bit vector for valid mask limits maximum operator length */
#define OP_MAX_LENGTH WORD_BITS
/* Operand header has two words for control information */
#define OPER_HEADER_CNT 2
/* Size of operand */
#define OPER_SIZE 2
/** Constructors **/
/* Create an operand destination */
dword_t msg_build_destination(unsigned agent, word_t operator_id,
unsigned offset);
/* Create IP address from port and host */
word_t msg_build_node_id(unsigned port, unsigned ip);
/** Extractors **/
bool msg_is_client_agent(unsigned agent);
/* For single-word headers */
unsigned msg_get_header_code(word_t header);
unsigned msg_get_header_agent(word_t header);
unsigned msg_get_header_opcode(word_t header);
unsigned msg_get_header_offset(word_t header);
unsigned msg_get_header_port(word_t header);
unsigned msg_get_header_ip(word_t header);
unsigned msg_get_header_wordcount(word_t header);
unsigned msg_get_header_workercount(word_t header);
unsigned msg_get_header_generation(word_t header);
/* For double-word headers */
unsigned msg_get_dheader_code(dword_t header);
unsigned msg_get_dheader_agent(dword_t header);
word_t msg_get_dheader_op_id(dword_t header);
unsigned msg_get_dheader_opcode(dword_t header);
unsigned msg_get_dheader_offset(dword_t header);
/* Extracting information from destination (double word) */
unsigned msg_get_dest_agent(dword_t dest);
word_t msg_get_dest_op_id(dword_t dest);
unsigned msg_get_dest_offset(dword_t dest);
/** Message builders **/
/* Create an empty operator */
/* len specifies total message size, including header */
chunk_ptr msg_new_operator(unsigned opcode, unsigned agent,
word_t operator_id, unsigned len);
/* Create destination from operator. Offset includes header size */
dword_t msg_new_destination(chunk_ptr operator, unsigned offset);
/* Create empty operand. len specifies total message size, including header */
chunk_ptr msg_new_operand(dword_t dest, unsigned len);
/* Create message to register client, router, worker, or agent */
chunk_ptr msg_new_register_router(unsigned port);
chunk_ptr msg_new_register_client();
chunk_ptr msg_new_register_worker();
chunk_ptr msg_new_register_agent(unsigned agent);
chunk_ptr msg_new_nack();
/* Create message to notify controller that worker is ready */
chunk_ptr msg_new_worker_ready(unsigned agent);
/* Create message to notify any node that it should terminate */
chunk_ptr msg_new_kill();
/* Create message to notify any node that it should flush its state */
chunk_ptr msg_new_flush();
/*
Create a message containing worker statistics.
Specify number of workers, number of values
and provide pointer to array of values.
*/
chunk_ptr msg_new_stat(unsigned nworker, unsigned nstat, size_t *vals);
/*** Unary operations ***/
/* Create message containing global operation data */
/* nwords specifies number of data words (not including header) */
chunk_ptr msg_new_cliop_data(unsigned agent, unsigned opcode, unsigned nword,
word_t *data);
chunk_ptr msg_new_cliop_ack(unsigned agent);
chunk_ptr msg_new_gc_request(unsigned gen);
chunk_ptr msg_new_gc_start();
chunk_ptr msg_new_gc_finish();
/** Useful functions **/
/* Create listening socket.
Port value of 0 indicates that port can be chosen arbitrarily.
If successful, set fdp to fd for listening socket and portp to port.
*/
/* Parameters that define range of ports tried */
#define MINPORT 6000
#define PORTCOUNT 1000
bool new_server(unsigned port, int *fdp, unsigned *portp);
/* Open connection to server. Return socket file descriptor
* Returns -1 and sets errno on Unix error.
* Returns -2 and sets h_errno on DNS (gethostbyname) error.
*/
int open_clientfd(const char *hostname, unsigned port);
/* Open connection to server given IPv4 host address.
* Return socket file descriptor
* Returns -1 and sets errno on Unix error.
* Returns -2 and sets h_errno on DNS (gethostbyname) error.
*/
int open_clientfd_ip(unsigned ip, unsigned port);
/* Accept a connection request from a client */
/* Return connection socket descriptor.
(Optionally) update pointer to IP address */
int accept_connection(int listenfd, unsigned *ipp);