Skip to content

Commit d8dce4c

Browse files
committed
filter_parser: Add parameter to nest parsed fields under
Signed-off-by: Ra'Jiska <[email protected]>
1 parent 417d129 commit d8dce4c

File tree

3 files changed

+134
-0
lines changed

3 files changed

+134
-0
lines changed

Diff for: plugins/filter_parser/filter_parser.c

+55
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,53 @@ static int delete_parsers(struct filter_parser_ctx *ctx)
9393
return c;
9494
}
9595

96+
static int nest_raw_map(struct filter_parser_ctx *ctx,
97+
char **buf,
98+
size_t *size,
99+
const flb_sds_t key)
100+
{
101+
msgpack_sbuffer sbuf;
102+
msgpack_packer pk;
103+
msgpack_unpacked outbuf_result;
104+
msgpack_object obj;
105+
msgpack_object_kv *kv;
106+
const size_t key_len = flb_sds_len(key);
107+
int ret = 0;
108+
109+
msgpack_sbuffer_init(&sbuf);
110+
msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
111+
112+
msgpack_unpacked_init(&outbuf_result);
113+
ret = msgpack_unpack_next(&outbuf_result, *buf, *size, NULL);
114+
if (ret != MSGPACK_UNPACK_SUCCESS) {
115+
flb_plg_error(ctx->ins,
116+
"Nest: failed to unpack msgpack data with error code %d",
117+
ret);
118+
msgpack_unpacked_destroy(&outbuf_result);
119+
return -1;
120+
}
121+
122+
/* Create a new map, unpacking map `buf` under the new `key` root key */
123+
obj = outbuf_result.data;
124+
if (obj.type == MSGPACK_OBJECT_MAP) {
125+
msgpack_pack_map(&pk, 1);
126+
msgpack_pack_str(&pk, key_len);
127+
msgpack_pack_str_body(&pk, key, key_len);
128+
msgpack_pack_map(&pk, obj.via.map.size);
129+
for (unsigned x = 0; x < obj.via.map.size; ++x) {
130+
kv = &obj.via.map.ptr[x];
131+
msgpack_pack_object(&pk, kv->key);
132+
msgpack_pack_object(&pk, kv->val);
133+
}
134+
flb_free(*buf);
135+
*buf = sbuf.data;
136+
*size = sbuf.size;
137+
}
138+
139+
msgpack_unpacked_destroy(&outbuf_result);
140+
return 0;
141+
}
142+
96143
static int configure(struct filter_parser_ctx *ctx,
97144
struct flb_filter_instance *f_ins,
98145
struct flb_config *config)
@@ -301,6 +348,9 @@ static int cb_parser_filter(const void *data, size_t bytes,
301348
}
302349

303350
if (out_buf != NULL && parse_ret >= 0) {
351+
if (ctx->nest_under) {
352+
nest_raw_map(ctx, &out_buf, &out_size, ctx->nest_under);
353+
}
304354
if (append_arr != NULL && append_arr_len > 0) {
305355
char *new_buf = NULL;
306356
int new_size;
@@ -440,6 +490,11 @@ static struct flb_config_map config_map[] = {
440490
"Keep all other original fields in the parsed result. "
441491
"If false, all other original fields will be removed."
442492
},
493+
{
494+
FLB_CONFIG_MAP_STR, "Nest_Under", NULL,
495+
0, FLB_TRUE, offsetof(struct filter_parser_ctx, nest_under),
496+
"Specify field name to nest parsed records under."
497+
},
443498
{
444499
FLB_CONFIG_MAP_DEPRECATED, "Unescape_key", NULL,
445500
0, FLB_FALSE, 0,

Diff for: plugins/filter_parser/filter_parser.h

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ struct filter_parser_ctx {
3535
int key_name_len;
3636
int reserve_data;
3737
int preserve_key;
38+
flb_sds_t nest_under;
3839
struct mk_list parsers;
3940
struct flb_filter_instance *ins;
4041
};

Diff for: tests/runtime/filter_parser.c

+78
Original file line numberDiff line numberDiff line change
@@ -1299,6 +1299,83 @@ void flb_test_filter_parser_reserve_on_preserve_on()
12991299
test_ctx_destroy(ctx);
13001300
}
13011301

1302+
void flb_test_filter_parser_nest_under_on()
1303+
{
1304+
int ret;
1305+
int bytes;
1306+
char *p, *output, *expected;
1307+
flb_ctx_t *ctx;
1308+
int in_ffd;
1309+
int out_ffd;
1310+
int filter_ffd;
1311+
struct flb_parser *parser;
1312+
1313+
struct flb_lib_out_cb cb;
1314+
cb.cb = callback_test;
1315+
cb.data = NULL;
1316+
1317+
clear_output();
1318+
1319+
ctx = flb_create();
1320+
1321+
/* Configure service */
1322+
flb_service_set(ctx, "Flush", FLUSH_INTERVAL, "Grace" "1", "Log_Level", "debug", NULL);
1323+
1324+
/* Input */
1325+
in_ffd = flb_input(ctx, (char *) "lib", NULL);
1326+
TEST_CHECK(in_ffd >= 0);
1327+
flb_input_set(ctx, in_ffd,
1328+
"Tag", "test",
1329+
NULL);
1330+
1331+
/* Parser */
1332+
parser = flb_parser_create("json", "json", NULL,
1333+
FLB_FALSE,
1334+
NULL, NULL, NULL, MK_FALSE, MK_TRUE, FLB_FALSE, FLB_FALSE,
1335+
NULL, 0, NULL, ctx->config);
1336+
TEST_CHECK(parser != NULL);
1337+
1338+
/* Filter */
1339+
filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
1340+
TEST_CHECK(filter_ffd >= 0);
1341+
ret = flb_filter_set(ctx, filter_ffd,
1342+
"Match", "test",
1343+
"Key_Name", "to_parse",
1344+
"Nest_Under", "nest_key",
1345+
"Parser", "json",
1346+
NULL);
1347+
TEST_CHECK(ret == 0);
1348+
1349+
/* Output */
1350+
out_ffd = flb_output(ctx, (char *) "lib", &cb);
1351+
TEST_CHECK(out_ffd >= 0);
1352+
flb_output_set(ctx, out_ffd,
1353+
"Match", "*",
1354+
"format", "json",
1355+
NULL);
1356+
1357+
/* Start the engine */
1358+
ret = flb_start(ctx);
1359+
TEST_CHECK(ret == 0);
1360+
1361+
/* Ingest data */
1362+
p = "[1,{\"hello\":\"world\",\"some_object\":{\"foo\":\"bar\"},\"to_parse\":\"{\\\"key\\\":\\\"value\\\",\\\"object\\\":{\\\"a\\\":\\\"b\\\"}}\"}]";
1363+
bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
1364+
TEST_CHECK(bytes == strlen(p));
1365+
1366+
wait_with_timeout(1500, &output); /* waiting flush and ensuring data flush */
1367+
TEST_CHECK_(output != NULL, "Expected output to not be NULL");
1368+
if (output != NULL) {
1369+
/* check extra data was not preserved */
1370+
expected = "{\"nest_key\":{\"key\":\"value\",\"object\":{\"a\":\"b\"}}}";
1371+
TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain key one , got '%s'", output);
1372+
free(output);
1373+
}
1374+
1375+
flb_stop(ctx);
1376+
flb_destroy(ctx);
1377+
}
1378+
13021379
TEST_LIST = {
13031380
{"filter_parser_extract_fields", flb_test_filter_parser_extract_fields },
13041381
{"filter_parser_reserve_data_off", flb_test_filter_parser_reserve_data_off },
@@ -1313,6 +1390,7 @@ TEST_LIST = {
13131390
{"filter_parser_reserve_off_preserve_on", flb_test_filter_parser_reserve_off_preserve_on},
13141391
{"filter_parser_reserve_on_preserve_off", flb_test_filter_parser_reserve_on_preserve_off},
13151392
{"filter_parser_reserve_on_preserve_on", flb_test_filter_parser_reserve_on_preserve_on},
1393+
{"filter_parser_nest_under_on", flb_test_filter_parser_nest_under_on},
13161394
{NULL, NULL}
13171395
};
13181396

0 commit comments

Comments
 (0)