Skip to content

Commit 894a619

Browse files
in_splunk: add config option for mapping records from specific tokens to specific tags
Signed-off-by: Stewart Webb <[email protected]>
1 parent 14ca011 commit 894a619

File tree

4 files changed

+103
-15
lines changed

4 files changed

+103
-15
lines changed

plugins/in_splunk/splunk.c

+6
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,12 @@ static struct flb_config_map config_map[] = {
254254
""
255255
},
256256

257+
{
258+
FLB_CONFIG_MAP_SLIST_2, "map_token_to_tag", NULL,
259+
FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_splunk, token_to_tag_mappings),
260+
"Map input records from given Splunk HEC token to given tag. Multiple of these can be set to map different tokens to different tags."
261+
},
262+
257263

258264
/* EOF */
259265
{0}

plugins/in_splunk/splunk.h

+4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
struct flb_splunk_tokens {
3636
flb_sds_t header;
3737
size_t length;
38+
/* Fluentbit routing tag that records sent in with
39+
this particular token should be given */
40+
flb_sds_t map_to_tag;
3841
struct mk_list _head;
3942
};
4043

@@ -52,6 +55,7 @@ struct flb_splunk {
5255
size_t ingested_auth_header_len;
5356
int store_token_in_metadata;
5457
flb_sds_t store_token_key;
58+
struct mk_list *token_to_tag_mappings;
5559

5660
struct flb_log_event_encoder log_encoder;
5761

plugins/in_splunk/splunk_config.c

+43
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,19 @@ static int setup_hec_tokens(struct flb_splunk *ctx)
4444
const char *raw_token;
4545
struct mk_list *head = NULL;
4646
struct mk_list *kvs = NULL;
47+
int token_idx = 0;
4748
struct flb_split_entry *cur = NULL;
4849
flb_sds_t auth_header = NULL;
4950
struct flb_splunk_tokens *splunk_token;
5051
flb_sds_t credential = NULL;
5152

53+
struct flb_config_map_val *matching_token_mapping;
54+
/* iterators for token to tag mappings */
55+
struct mk_list *ttm_list_i;
56+
struct flb_config_map_val *ttm_cmval_i;
57+
struct flb_slist_entry *ttm_i_token;
58+
struct flb_slist_entry *ttm_tag;
59+
5260
raw_token = flb_input_get_property("splunk_token", ctx->ins);
5361
if (raw_token) {
5462
kvs = flb_utils_split(raw_token, ',', -1 );
@@ -93,10 +101,45 @@ static int setup_hec_tokens(struct flb_splunk *ctx)
93101
splunk_token->header = auth_header;
94102
splunk_token->length = flb_sds_len(auth_header);
95103

104+
if (ctx->token_to_tag_mappings != NULL) {
105+
int mapping_idx = 0;
106+
matching_token_mapping = NULL;
107+
/* search all the configured token_to_tag_mappings to see if the current
108+
token is one that a mapping was specified for */
109+
flb_config_map_foreach(ttm_list_i, ttm_cmval_i, ctx->token_to_tag_mappings) {
110+
ttm_i_token = mk_list_entry_first(ttm_cmval_i->val.list,
111+
struct flb_slist_entry,
112+
_head);
113+
114+
if (flb_sds_cmp(ttm_i_token->str, credential, flb_sds_len(credential)) == 0) {
115+
matching_token_mapping = ttm_cmval_i;
116+
break;
117+
}
118+
mapping_idx += 1;
119+
}
120+
if (matching_token_mapping != NULL) {
121+
/* Token is the first arg (list->next),
122+
Tag is the second arg (list->next->next) */
123+
ttm_tag = container_of(matching_token_mapping->val.list->next->next,
124+
struct flb_slist_entry,
125+
_head);
126+
flb_plg_debug(ctx->ins, "token #%d will map to tag %s", token_idx + 1, ttm_tag->str);
127+
splunk_token->map_to_tag = flb_sds_create(ttm_tag->str);
128+
}
129+
else {
130+
flb_plg_warn(ctx->ins, "token #%d has no tag mapping, records from this token will not re-map to specific tag", token_idx + 1);
131+
splunk_token->map_to_tag = NULL;
132+
}
133+
}
134+
else {
135+
splunk_token->map_to_tag = NULL;
136+
}
137+
96138
flb_sds_destroy(credential);
97139

98140
/* Link to parent list */
99141
mk_list_add(&splunk_token->_head, &ctx->auth_tokens);
142+
token_idx++;
100143
}
101144
}
102145

plugins/in_splunk/splunk_prot.c

+50-15
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag,
483483
return 0;
484484
}
485485

486-
static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *request)
486+
static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *request, struct flb_splunk_tokens **matched_token_out)
487487
{
488488
int ret = 0;
489489
struct mk_list *head;
@@ -517,6 +517,7 @@ static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *
517517
authorization,
518518
splunk_token->length) == 0) {
519519
flb_sds_destroy(authorization);
520+
*matched_token_out = splunk_token;
520521

521522
return SPLUNK_AUTH_SUCCESS;
522523
}
@@ -716,6 +717,8 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
716717
off_t diff;
717718
flb_sds_t tag;
718719
struct mk_http_header *header;
720+
struct flb_splunk_tokens *matched_token = NULL;
721+
flb_sds_t tag_from_token = NULL;
719722

720723
if (request->uri.data[0] != '/') {
721724
send_response(conn, 400, "error: invalid request\n");
@@ -818,7 +821,7 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
818821

819822
/* Under services/collector endpoints are required for
820823
* authentication if provided splunk_token */
821-
ret = validate_auth_header(ctx, request);
824+
ret = validate_auth_header(ctx, request, &matched_token);
822825
if (ret < 0){
823826
send_response(conn, 401, "error: unauthorized\n");
824827
if (ret == SPLUNK_AUTH_MISSING_CRED) {
@@ -834,6 +837,12 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
834837
return -1;
835838
}
836839

840+
/* Tokens can be configured to map to a particular tag */
841+
if (matched_token != NULL && matched_token->map_to_tag != NULL) {
842+
tag_from_token = matched_token->map_to_tag;
843+
tag = tag_from_token;
844+
}
845+
837846
/* If the request contains chunked transfer encoded data, decode it */\
838847
if (mk_http_parser_is_content_chunked(&session->parser)) {
839848
ret = mk_http_parser_chunked_decode(&session->parser,
@@ -845,7 +854,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
845854
flb_plg_error(ctx->ins, "failed to decode chunked data");
846855
send_response(conn, 400, "error: invalid chunked data\n");
847856

848-
flb_sds_destroy(tag);
857+
/* Free the tag only if it was a temporarily-allocated/calculated one */
858+
if (tag_from_token == NULL) {
859+
flb_sds_destroy(tag);
860+
}
849861
mk_mem_free(uri);
850862

851863
return -1;
@@ -879,7 +891,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
879891

880892
ret = process_hec_payload(ctx, conn, tag, session, request);
881893
if (ret == -2) {
882-
flb_sds_destroy(tag);
894+
/* Free the tag only if it was a temporarily-allocated/calculated one */
895+
if (tag_from_token == NULL) {
896+
flb_sds_destroy(tag);
897+
}
883898
mk_mem_free(uri);
884899

885900
if (out_chunked) {
@@ -899,7 +914,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
899914
else {
900915
send_response(conn, 400, "error: invalid HTTP endpoint\n");
901916

902-
flb_sds_destroy(tag);
917+
/* Free the tag only if it was a temporarily-allocated/calculated one */
918+
if (tag_from_token == NULL) {
919+
flb_sds_destroy(tag);
920+
}
903921
mk_mem_free(uri);
904922

905923
if (out_chunked) {
@@ -914,7 +932,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
914932
else {
915933
/* HEAD, PUT, PATCH, and DELETE methods are prohibited to use.*/
916934

917-
flb_sds_destroy(tag);
935+
/* Free the tag only if it was a temporarily-allocated/calculated one */
936+
if (tag_from_token == NULL) {
937+
flb_sds_destroy(tag);
938+
}
918939
mk_mem_free(uri);
919940

920941
if (out_chunked) {
@@ -927,7 +948,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
927948
return -1;
928949
}
929950

930-
flb_sds_destroy(tag);
951+
/* Free the tag only if it was a temporarily-allocated/calculated one */
952+
if (tag_from_token == NULL) {
953+
flb_sds_destroy(tag);
954+
}
931955
mk_mem_free(uri);
932956

933957
if (out_chunked) {
@@ -1022,7 +1046,7 @@ static int send_json_message_response_ng(struct flb_http_response *response,
10221046
return 0;
10231047
}
10241048

1025-
static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_request *request)
1049+
static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_request *request, struct flb_splunk_tokens **matched_token_out)
10261050
{
10271051
struct mk_list *tmp;
10281052
struct mk_list *head;
@@ -1049,6 +1073,7 @@ static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_reque
10491073
if (strncasecmp(splunk_token->header,
10501074
auth_header,
10511075
splunk_token->length) == 0) {
1076+
*matched_token_out = splunk_token;
10521077
return SPLUNK_AUTH_SUCCESS;
10531078
}
10541079
}
@@ -1147,6 +1172,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
11471172
struct flb_splunk *context;
11481173
int ret = -1;
11491174
flb_sds_t tag;
1175+
struct flb_splunk_tokens *matched_token = NULL;
1176+
flb_sds_t tag_from_token = NULL;
11501177

11511178
context = (struct flb_splunk *) response->stream->user_data;
11521179

@@ -1176,7 +1203,7 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
11761203

11771204
/* Under services/collector endpoints are required for
11781205
* authentication if provided splunk_token */
1179-
ret = validate_auth_header_ng(context, request);
1206+
ret = validate_auth_header_ng(context, request, &matched_token);
11801207

11811208
if (ret < 0) {
11821209
send_response_ng(response, 401, "error: unauthorized\n");
@@ -1201,10 +1228,16 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
12011228
return -1;
12021229
}
12031230

1204-
tag = flb_sds_create(context->ins->tag);
1205-
1206-
if (tag == NULL) {
1207-
return -1;
1231+
/* Tokens can be configured to map to a particular tag */
1232+
if (matched_token != NULL && matched_token->map_to_tag != NULL) {
1233+
tag_from_token = matched_token->map_to_tag;
1234+
tag = tag_from_token;
1235+
}
1236+
else {
1237+
tag = flb_sds_create(context->ins->tag);
1238+
if (tag == NULL) {
1239+
return -1;
1240+
}
12081241
}
12091242

12101243
if (strcasecmp(request->path, "/services/collector/raw/1.0") == 0 ||
@@ -1236,7 +1269,9 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
12361269
send_response_ng(response, 400, "error: invalid HTTP endpoint\n");
12371270
ret = -1;
12381271
}
1239-
1240-
flb_sds_destroy(tag);
1272+
/* Free the tag only if it was a temporarily-allocated/calculated one */
1273+
if (tag_from_token == NULL) {
1274+
flb_sds_destroy(tag);
1275+
}
12411276
return ret;
12421277
}

0 commit comments

Comments
 (0)