Skip to content

Commit fa5daab

Browse files
committed
Merge branch 'forwarding_udp' into devel
2 parents 9580622 + 78cd2b6 commit fa5daab

File tree

8 files changed

+220
-23
lines changed

8 files changed

+220
-23
lines changed

base/src/storage/forwarding/configuration.c

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,14 @@
4949

5050
/** Default destination port */
5151
#define DEF_PORT "4739"
52-
/** Default retry interval (seconds) */
53-
#define DEF_RETRY_INT (5)
52+
/** Default transport protocol */
53+
#define DEF_PROTO IPPROTO_TCP
54+
/** Default re-connection period (in milliseconds)*/
55+
#define DEF_RECONN_PERIOD (1000)
5456
/** Default maximal packet size */
5557
#define DEF_PACKET_SIZE (4096)
58+
/** Default template refresh timeout */
59+
#define DEF_TEMPLATE_REFRESH (300U)
5660

5761
static const char *msg_module = "forwarding(config)";
5862

@@ -134,6 +138,26 @@ static enum DIST_MODE config_parse_distr(const char *str)
134138
}
135139
}
136140

141+
/**
142+
* \brief Convert string to transport protocol
143+
* \param[in] str String
144+
* \return Return either parsed value or default protocol
145+
*/
146+
static int config_parse_proto(const char *str)
147+
{
148+
if (!str) {
149+
return DEF_PROTO;
150+
}
151+
152+
if (!strcasecmp(str, "tcp")) {
153+
return IPPROTO_TCP;
154+
} else if (!strcasecmp(str, "udp")) {
155+
return IPPROTO_UDP;
156+
} else {
157+
return DEF_PROTO;
158+
}
159+
}
160+
137161
/**
138162
* \brief Parse only default values from the plugin configuration
139163
* \param[in,out] ctx Parser context
@@ -145,12 +169,21 @@ static int config_parse_def_values(struct parser_context *ctx)
145169
xmlNode *cur = ctx->node->children;
146170
struct plugin_config *cfg = ctx->cfg;
147171

172+
// Set default protocol
173+
cfg->def_proto = DEF_PROTO;
174+
148175
while (cur) {
149176
if (!xmlStrcasecmp(cur->name, (const xmlChar *) "defaultPort")) {
150177
// Default port
151178
free(cfg->def_port);
152179
cfg->def_port = (char *) xmlNodeListGetString(doc,
153180
cur->xmlChildrenNode, 1);
181+
} else if (!xmlStrcasecmp(cur->name, (const xmlChar *) "defaultProtocol")) {
182+
// Default protocol
183+
char *str_proto = (char *) xmlNodeListGetString(doc,
184+
cur->xmlChildrenNode, 1);
185+
cfg->def_proto = config_parse_proto(str_proto);
186+
free(str_proto);
154187
} else {
155188
// Other nodes -> skip
156189
}
@@ -195,6 +228,7 @@ static fwd_sender_t *config_parse_destination(struct parser_context *ctx)
195228
{
196229
xmlChar *str_ip = NULL;
197230
xmlChar *str_port = NULL;
231+
xmlChar *str_proto = NULL;
198232
xmlNode *cur = ctx->node;
199233

200234
// Find all related XML nodes & parse them
@@ -213,6 +247,12 @@ static fwd_sender_t *config_parse_destination(struct parser_context *ctx)
213247
xmlFree(str_port);
214248
}
215249
str_port = xmlNodeListGetString(ctx->doc, cur->xmlChildrenNode, 1);
250+
} else if (!xmlStrcasecmp(cur->name, (const xmlChar *) "protocol")) {
251+
// Destination protocol
252+
if (str_proto) {
253+
xmlFree(str_proto);
254+
}
255+
str_proto = xmlNodeListGetString(ctx->doc, cur->xmlChildrenNode, 1);
216256
} else {
217257
// Other unknown nodes
218258
MSG_WARNING(msg_module, "Unknown node '%s' in 'destination' node "
@@ -226,15 +266,21 @@ static fwd_sender_t *config_parse_destination(struct parser_context *ctx)
226266
const char *dst_ip = (const char *) str_ip;
227267
const char *dst_port = (str_port != NULL)
228268
? (const char *) str_port : config_def_port(ctx->cfg);
269+
int proto = ctx->cfg->def_proto;
270+
if (str_proto != NULL) {
271+
proto = config_parse_proto((const char *) str_proto);
272+
}
229273

230274
fwd_sender_t *new_sender;
231-
new_sender = sender_create(dst_ip, dst_port);
275+
new_sender = sender_create(dst_ip, dst_port, proto);
232276

233277
// Clean up
234278
if (str_ip)
235279
xmlFree(str_ip);
236280
if (str_port)
237281
xmlFree(str_port);
282+
if (str_proto)
283+
xmlFree(str_proto);
238284

239285
return new_sender;
240286
}
@@ -260,6 +306,8 @@ static int config_parse_xml(struct parser_context *ctx)
260306
while (cur && !failed) {
261307
if (!xmlStrcasecmp(cur->name, (const xmlChar *) "defaultPort")) {
262308
// Default values were already processed -> skip
309+
} else if (!xmlStrcasecmp(cur->name, (const xmlChar *) "defaultProtocol")) {
310+
// Default values were already processed -> skip
263311
} else if (!xmlStrcasecmp(cur->name, (const xmlChar *) "fileFormat")) {
264312
// Useless node -> skip
265313
} else if (cur->type == XML_COMMENT_NODE) {
@@ -302,7 +350,7 @@ static int config_parse_xml(struct parser_context *ctx)
302350
aux_str = xmlNodeListGetString(doc, cur->xmlChildrenNode, 1);
303351
if (config_parse_int((char *) aux_str, &result)) {
304352
// Conversion failed
305-
MSG_ERROR(msg_module, "Failed to parse 'reconnectionPeriod' "
353+
MSG_ERROR(msg_module, "Failed to parse the 'reconnectionPeriod' "
306354
"node.");
307355
failed = true;
308356
} else if (result <= 0) {
@@ -313,6 +361,24 @@ static int config_parse_xml(struct parser_context *ctx)
313361
} else {
314362
ctx->cfg->reconn_period = result;
315363
}
364+
} else if (!xmlStrcasecmp(cur->name, (const xmlChar *) "udpTemplateRefreshTimeout")) {
365+
// Refresh timeout for UDP (options) templates
366+
int result;
367+
aux_str = xmlNodeListGetString(doc, cur->xmlChildrenNode, 1);
368+
if (config_parse_int((char *) aux_str, &result)) {
369+
// Conversion failed
370+
MSG_ERROR(msg_module, "Failed to parse the "
371+
"'udpTemplateRefreshTimeout' node.");
372+
failed = true;
373+
} else if (result <= 0) {
374+
// Out of range
375+
MSG_ERROR(msg_module, "Template refresh timeout cannot be zero "
376+
"or negative.");
377+
failed = true;
378+
} else {
379+
// Success (now it is safe to convert int to unsigned int)
380+
ctx->cfg->udp_refresh_timeout = (unsigned int) result;
381+
}
316382
} else {
317383
// Other unknown nodes
318384
MSG_WARNING(msg_module, "Unknown node '%s' skipped.", cur->name);
@@ -374,7 +440,8 @@ struct plugin_config *config_parse(const char *cfg_string)
374440
// Set default values
375441
config->mode = DIST_ALL;
376442
config->packet_size = DEF_PACKET_SIZE;
377-
config->reconn_period = 1000; // milliseconds
443+
config->reconn_period = DEF_RECONN_PERIOD; // milliseconds
444+
config->udp_refresh_timeout = DEF_TEMPLATE_REFRESH; // seconds
378445

379446
config->builder_all = bldr_create();
380447
config->builder_tmplt = bldr_create();

base/src/storage/forwarding/configuration.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,12 @@
5858
*/
5959
struct plugin_config {
6060
char *def_port; /**< Default port */
61+
int def_proto; /**< Default protocol */
6162
enum DIST_MODE mode; /**< Distribution mode */
6263
uint16_t packet_size; /**< Maximal size per generated packet */
6364
int reconn_period; /**< Reconnection period (in milliseconds) */
65+
unsigned int udp_refresh_timeout; /**< UDP template refresh timeout
66+
* (in seconds) */
6467

6568
fwd_dest_t *dest_mgr; /**< Destination manager */
6669

base/src/storage/forwarding/destination.c

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,15 @@ struct tmplts_for_reconnected {
134134
uint32_t cnt;
135135
};
136136

137+
/**
138+
* \brief Auxiliary structure for checking timeout of UDP templates
139+
*/
140+
struct udp_template_timeout {
141+
time_t now; /**< Current time */
142+
unsigned int timeout; /**< Template timeout (in seconds) */
143+
};
144+
145+
137146
/**
138147
* \brief Callback function prototype
139148
* \param[in,out] sndr Destination
@@ -506,6 +515,28 @@ static bool aux_dummy_true(struct dst_client *client, void *data)
506515
return true;
507516
}
508517

518+
/**
519+
* \brief Test whether UDP client has expired templates
520+
* \param[in] client Destination
521+
* \param[in] data UDP template timeout structure
522+
* \return When the client expired returns true and updates expiration time.
523+
* Otherwise returns false.
524+
*/
525+
static bool aux_udp_expired(struct dst_client *client, void *data)
526+
{
527+
struct udp_template_timeout *params = (struct udp_template_timeout *) data;
528+
if (sender_get_proto(client->sender) != IPPROTO_UDP) {
529+
return false;
530+
}
531+
532+
if (sender_get_tmpl_time(client->sender) + params->timeout > params->now) {
533+
return false;
534+
}
535+
536+
sender_set_tmpl_time(client->sender, params->now);
537+
return true;
538+
}
539+
509540
/**
510541
* \brief Thread for reconnection of disconnected destinations
511542
* \param[in,out] arg Configuration of Destination Manager
@@ -811,6 +842,18 @@ static struct tmplts_per_odid *dest_templates_prepare(
811842
return result;
812843
}
813844

845+
// Check UDP connections for expired templates and move them to ready state
846+
void dest_check_expired_udp(fwd_dest_t *dst_mgr, unsigned int timeout)
847+
{
848+
struct udp_template_timeout params;
849+
params.now = time(NULL);
850+
params.timeout = timeout;
851+
852+
pthread_mutex_lock(&dst_mgr->group_mtx);
853+
group_move(dst_mgr->conn, dst_mgr->ready, &aux_udp_expired, &params);
854+
pthread_mutex_unlock(&dst_mgr->group_mtx);
855+
}
856+
814857
// Check and move reconnected destinations to connected destinations
815858
void dest_check_reconnected(fwd_dest_t *dst_mgr)
816859
{

base/src/storage/forwarding/destination.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,14 @@ int dest_connector_start(fwd_dest_t *dst_mgr, int period);
113113
*/
114114
int dest_connector_stop(fwd_dest_t *dst_mgr);
115115

116+
/**
117+
* \brief Check UDP connections for expired templates and move them to ready
118+
* state
119+
* \param[in,out] dst_mgr Destination manager
120+
* \param[in] timeout Template refresh timeout
121+
*/
122+
void dest_check_expired_udp(fwd_dest_t *dst_mgr, unsigned int timeout);
123+
116124
/**
117125
* \brief Check and move reconnected destinations to connected destinations
118126
* \warning This functions must be called only by the thread that use

base/src/storage/forwarding/forwarding.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,9 @@ int store_packet (void *config, const struct ipfix_message *ipfix_msg,
488488
} else
489489
*/
490490

491+
// Check for UDP connections with expired templates
492+
dest_check_expired_udp(cfg->dest_mgr, cfg->udp_refresh_timeout);
493+
491494
// Add reconnected clients
492495
dest_check_reconnected(cfg->dest_mgr);
493496

base/src/storage/forwarding/ipfixcol-forwarding-output.dbk

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<?xml version="1.0" encoding="utf-8"?>
2-
<refentry
3-
xmlns="http://docbook.org/ns/docbook"
4-
xmlns:xlink="http://www.w3.org/1999/xlink"
2+
<refentry
3+
xmlns="http://docbook.org/ns/docbook"
4+
xmlns:xlink="http://www.w3.org/1999/xlink"
55
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
66
xsi:schemaLocation="http://www.w3.org/1999/xlink http://docbook.org/xml/5.0/xsd/xlink.xsd
77
http://docbook.org/ns/docbook http://docbook.org/xml/5.0/xsd/docbook.xsd"
@@ -35,14 +35,14 @@
3535
<refname>ipfixcol-forwarding-output</refname>
3636
<refpurpose>Forwarding output plugin for IPFIXcol.</refpurpose>
3737
</refnamediv>
38-
38+
3939
<refsect1>
4040
<title>Description</title>
4141
<simpara>
4242
The <command>ipfixcol-forwarding-output.so</command> is output plugin for IPFIXcol (IPFIX collector).
4343
</simpara>
4444
<simpara>
45-
The plugin distributes IPFIX packets over the network to one or more destinations using TCP protocol and non-blocking sockets. As a destination can be used another instance of IPFIXcol or any other collector. Every packet can be distributed to all destinations or forwarded to one of destinations using Round Robin distribution model.
45+
The plugin distributes IPFIX packets over the network to one or more destinations using TCP protocol and non-blocking sockets. The plugins also supports UDP protocol transfer although this options is only experimental. When it is possible, always prefer TCP over UDP. As a destination can be used another instance of IPFIXcol or any other collector. Every packet can be distributed to all destinations or forwarded to one of destinations using Round Robin distribution model.
4646
</simpara>
4747
<simpara>
4848
The plugin preserves Observation Domain ID (ODID) of all packets. If more (independent) metering processes (i.e. sources of IPFIX packets) use the same ODID, the plugin remap identification numbers of templates of packets to prevent misinterpretation of IPFIX records. It is very <emphasis>important</emphasis> to avoid using different types and configurations of flow sampling by the metering processes as the packets are mixed. (Flow sampling is not recommended).
@@ -68,9 +68,9 @@
6868
]]>
6969
</programlisting>
7070
<para></para>
71-
72-
<simpara>The collector must be configured to use forwarding output plugin in startup.xml configuration (<filename>/etc/ipfixcol/startup.xml</filename>).
73-
The configuration specifies which plugins are used by the collector to process data and provides configuration for the plugins themselves.
71+
72+
<simpara>The collector must be configured to use forwarding output plugin in startup.xml configuration (<filename>/etc/ipfixcol/startup.xml</filename>).
73+
The configuration specifies which plugins are used by the collector to process data and provides configuration for the plugins themselves.
7474
</simpara>
7575
<simpara><filename>startup.xml</filename> forwarding example</simpara>
7676
<programlisting>
@@ -82,6 +82,7 @@
8282
8383
<distribution>RoundRobin</distribution>
8484
<defaultPort>4739</defaultPort>
85+
<defaultProtocol>tcp</defaultProtocol>
8586
8687
<destination>
8788
<ip>192.168.0.1</ip>
@@ -92,6 +93,7 @@
9293
<destination>
9394
<ip>192.168.0.3</ip>
9495
<port>4740</port>
96+
<protocol>udp</protocol>
9597
</destination>
9698
</fileWriter>
9799
</destination>
@@ -118,6 +120,25 @@
118120
</simpara>
119121
</listitem>
120122
</varlistentry>
123+
<varlistentry>
124+
<term>
125+
<command>defaultProtocol</command>
126+
</term>
127+
<listitem>
128+
<simpara>Default transport protocol. It can be overriden by <command>protocol</command> tag in <command>destination</command>. Allowed values are <emphasis>tcp</emphasis> and <emphasis>udp</emphasis>. TCP protocol is the default when this configuration is missing.
129+
</simpara>
130+
</listitem>
131+
</varlistentry>
132+
133+
<varlistentry>
134+
<term>
135+
<command>udpTemplateRefreshTimeout</command>
136+
</term>
137+
<listitem>
138+
<simpara>Template refresh timeout for UDP destinations (in seconds). This value defines how often the plugin retransmits all templates to the destinations. [default == 300]
139+
</simpara>
140+
</listitem>
141+
</varlistentry>
121142

122143
<varlistentry>
123144
<term>
@@ -140,6 +161,13 @@
140161
<simpara>Destination port number</simpara>
141162
</varlistentry>
142163

164+
<varlistentry>
165+
<term>
166+
<command>protocol</command>
167+
</term>
168+
<simpara>Transport protocol. Allowed values are same as for <command>defaultProtocol</command></simpara>
169+
</varlistentry>
170+
143171
</listitem>
144172
</varlistentry>
145173
<varlistentry>

0 commit comments

Comments
 (0)