Skip to content

Commit

Permalink
Merge pull request #57 from ddosify/develop
Browse files Browse the repository at this point in the history
prioritize sql parsing
  • Loading branch information
fatihbaltaci authored Dec 12, 2023
2 parents 1d34fa0 + 6663404 commit 7e26cf9
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 51 deletions.
99 changes: 50 additions & 49 deletions ebpf/l7_req/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,26 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha
if (method != -1){
req->protocol = PROTOCOL_HTTP;
req-> method = method;
}else if (parse_client_postgres_data(buf, count, &req->request_type)){
// TODO: should wait for CloseComplete message in case of statement close
if (req->request_type == POSTGRES_MESSAGE_CLOSE || req->request_type == POSTGRES_MESSAGE_TERMINATE){
req->protocol = PROTOCOL_POSTGRES;
req->method = METHOD_STATEMENT_CLOSE_OR_CONN_TERMINATE;
struct write_args args = {};
args.fd = fd;
args.write_start_ns = timestamp;
bpf_map_update_elem(&active_writes, &id, &args, BPF_ANY);
}
unsigned char log_msg[] = "parse_client_postgres_data -- count||";
log_to_userspace(ctx, DEBUG, func_name, log_msg, count, 0, 0);
req->protocol = PROTOCOL_POSTGRES;
}else if (is_rabbitmq_publish(buf,count)){
req->protocol = PROTOCOL_AMQP;
req->method = METHOD_PUBLISH;
struct write_args args = {};
args.fd = fd;
args.write_start_ns = timestamp;
bpf_map_update_elem(&active_writes, &id, &args, BPF_ANY);
}else if (is_http2_frame(buf, count)){
struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
if (!e) {
Expand Down Expand Up @@ -253,25 +273,6 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha
log_to_userspace(ctx, WARN, func_name, log_msg, r, e->fd, e->payload_size);
}
return 0;
}
else if (is_rabbitmq_publish(buf,count)){
req->protocol = PROTOCOL_AMQP;
req->method = METHOD_PUBLISH;
struct write_args args = {};
args.fd = fd;
args.write_start_ns = timestamp;
bpf_map_update_elem(&active_writes, &id, &args, BPF_ANY);
}else if (parse_client_postgres_data(buf, count, &req->request_type)){
// TODO: should wait for CloseComplete message in case of statement close
if (req->request_type == POSTGRES_MESSAGE_CLOSE || req->request_type == POSTGRES_MESSAGE_TERMINATE){
req->protocol = PROTOCOL_POSTGRES;
req->method = METHOD_STATEMENT_CLOSE_OR_CONN_TERMINATE;
struct write_args args = {};
args.fd = fd;
args.write_start_ns = timestamp;
bpf_map_update_elem(&active_writes, &id, &args, BPF_ANY);
}
req->protocol = PROTOCOL_POSTGRES;
}else{
req->protocol = PROTOCOL_UNKNOWN;
req->method = METHOD_UNKNOWN;
Expand Down Expand Up @@ -464,36 +465,6 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64
}
e->is_tls = is_tls;

// if http2, send directly to userspace
if(is_http2_frame(read_info->buf,ret)){
e->protocol = PROTOCOL_HTTP2;
e->write_time_ns = timestamp;
e->fd = read_info->fd;
e->pid = k.pid;
e->method = SERVER_FRAME;
e->status = 0;
e->failed = 0; // success
e->duration = 0; // total write time
e->is_tls = 1;
bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, read_info->buf);
if(ret > MAX_PAYLOAD_SIZE){
// will not be able to copy all of it
e->payload_size = MAX_PAYLOAD_SIZE;
e->payload_read_complete = 0;
}else{
e->payload_size = ret;
e->payload_read_complete = 1;
}

long r = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
if (r < 0) {
unsigned char log_msg[] = "failed write to l7_events h2 -- res|fd|psize";
log_to_userspace(ctx, WARN, func_name, log_msg, r, e->fd, e->payload_size);
}
bpf_map_delete_elem(&go_active_reads, &k);
return 0;
}

// For a amqp consume, there will be no write, so we will not have a request in active_l7_requests
// Process amqp consume first, if it is not amqp consume, look for a request in active_l7_requests

Expand Down Expand Up @@ -521,6 +492,36 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64

struct l7_request *active_req = bpf_map_lookup_elem(&active_l7_requests, &k);
if (!active_req) {
// if http2 server frame, send directly to userspace
if(is_http2_frame(read_info->buf,ret)){
e->protocol = PROTOCOL_HTTP2;
e->write_time_ns = timestamp;
e->fd = read_info->fd;
e->pid = k.pid;
e->method = SERVER_FRAME;
e->status = 0;
e->failed = 0; // success
e->duration = 0; // total write time
e->is_tls = 1;
bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, read_info->buf);
if(ret > MAX_PAYLOAD_SIZE){
// will not be able to copy all of it
e->payload_size = MAX_PAYLOAD_SIZE;
e->payload_read_complete = 0;
}else{
e->payload_size = ret;
e->payload_read_complete = 1;
}

long r = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
if (r < 0) {
unsigned char log_msg[] = "failed write to l7_events h2 -- res|fd|psize";
log_to_userspace(ctx, WARN, func_name, log_msg, r, e->fd, e->payload_size);
}
bpf_map_delete_elem(&go_active_reads, &k);
return 0;
}

bpf_map_delete_elem(&active_reads, &id);
return 0;
}
Expand Down
8 changes: 6 additions & 2 deletions ebpf/l7_req/postgres.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,12 @@ int parse_client_postgres_data(char *buf, int buf_size, __u8 *request_type) {
return 1;
}

// len + 1 byte of identifier == buf_size
if ((identifier == POSTGRES_MESSAGE_SIMPLE_QUERY || identifier == POSTGRES_MESSAGE_CLOSE) && len+1 == buf_size) {

// long queries can be split into multiple packets
// therefore specified length can exceed the buf_size
// normally (len + 1 byte of identifier == buf_size) should be true

if ((identifier == POSTGRES_MESSAGE_SIMPLE_QUERY || identifier == POSTGRES_MESSAGE_CLOSE)) {
*request_type = identifier;
return 1;
}
Expand Down

0 comments on commit 7e26cf9

Please sign in to comment.