Skip to content

Commit 6ecc194

Browse files
authored
Merge pull request #147 from getanteon/develop
RESP (Redis serialization protocol)
2 parents 2dfca6a + fc9b5d8 commit 6ecc194

File tree

10 files changed

+336
-6
lines changed

10 files changed

+336
-6
lines changed

aggregator/data.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,6 +1046,10 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
10461046
_, path, _, reqHostHeader = parseHttpPayload(string(d.Payload[0:d.PayloadSize]))
10471047
}
10481048

1049+
if d.Protocol == l7_req.L7_PROTOCOL_REDIS {
1050+
path = string(d.Payload[0:d.PayloadSize])
1051+
}
1052+
10491053
err := a.setFromTo(skInfo, d, &reqDto, reqHostHeader)
10501054
if err != nil {
10511055
return
@@ -1054,11 +1058,12 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
10541058
reqDto.Path = path
10551059
reqDto.Completed = !d.Failed
10561060

1057-
// In AMQP-DELIVER event, we are capturing from read syscall,
1061+
// In AMQP-DELIVER or REDIS-PUSHED_EVENT event, we are capturing from read syscall,
10581062
// exchange sockets
10591063
// In Alaz context, From is always the one that makes the write
10601064
// and To is the one that makes the read
1061-
if d.Protocol == l7_req.L7_PROTOCOL_AMQP && d.Method == l7_req.DELIVER {
1065+
if (d.Protocol == l7_req.L7_PROTOCOL_AMQP && d.Method == l7_req.DELIVER) ||
1066+
(d.Protocol == l7_req.L7_PROTOCOL_REDIS && d.Method == l7_req.REDIS_PUSHED_EVENT) {
10621067
reqDto.FromIP, reqDto.ToIP = reqDto.ToIP, reqDto.FromIP
10631068
reqDto.FromPort, reqDto.ToPort = reqDto.ToPort, reqDto.FromPort
10641069
reqDto.FromUID, reqDto.ToUID = reqDto.ToUID, reqDto.FromUID

ebpf/c/bpf.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "http.c"
3434
#include "amqp.c"
3535
#include "postgres.c"
36+
#include "redis.c"
3637
#include "openssl.c"
3738
#include "http2.c"
3839
#include "tcp_sock.c"

ebpf/c/bpf_bpfeb.go

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ebpf/c/bpf_bpfeb.o

70.3 KB
Binary file not shown.

ebpf/c/bpf_bpfel.go

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ebpf/c/bpf_bpfel.o

70.4 KB
Binary file not shown.

ebpf/c/l7.c

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#define PROTOCOL_AMQP 2
55
#define PROTOCOL_POSTGRES 3
66
#define PROTOCOL_HTTP2 4
7+
#define PROTOCOL_REDIS 5
78

89
#define MAX_PAYLOAD_SIZE 1024
910
#define PAYLOAD_PREFIX_SIZE 16
@@ -228,6 +229,12 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha
228229
bpf_map_update_elem(&active_writes, &id, &args, BPF_ANY);
229230
}
230231
req->protocol = PROTOCOL_POSTGRES;
232+
}else if (is_redis_ping(buf, count)){
233+
req->protocol = PROTOCOL_REDIS;
234+
req->method = METHOD_REDIS_PING;
235+
}else if (!is_redis_pong(buf,count) && is_redis_command(buf,count)){
236+
req->protocol = PROTOCOL_REDIS;
237+
req->method = METHOD_UNKNOWN;
231238
}else if (is_rabbitmq_publish(buf,count)){
232239
req->protocol = PROTOCOL_AMQP;
233240
req->method = METHOD_PUBLISH;
@@ -561,6 +568,37 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64
561568
}
562569
bpf_map_delete_elem(&active_reads, &id);
563570
return 0;
571+
}else if (is_redis_pushed_event(read_info->buf, ret)){
572+
// reset payload
573+
for (int i = 0; i < MAX_PAYLOAD_SIZE; i++) {
574+
e->payload[i] = 0;
575+
}
576+
e->protocol = PROTOCOL_REDIS;
577+
e->method = METHOD_REDIS_PUSHED_EVENT;
578+
e->duration = timestamp - read_info->read_start_ns;
579+
e->write_time_ns = read_info->read_start_ns; // TODO: it is not write time, but start of read time
580+
581+
bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, read_info->buf);
582+
if (ret > MAX_PAYLOAD_SIZE){
583+
e->payload_size = MAX_PAYLOAD_SIZE;
584+
e->payload_read_complete = 0;
585+
}else{
586+
e->payload_size = ret;
587+
e->payload_read_complete = 1;
588+
}
589+
e->failed = 0; // success
590+
e->status = 0;
591+
e->fd = k.fd;
592+
e->pid = k.pid;
593+
594+
// for distributed tracing
595+
e->seq = 0; // default value
596+
e->tid = bpf_get_current_pid_tgid() & 0xFFFFFFFF;
597+
598+
bpf_map_delete_elem(&active_reads, &id);
599+
600+
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
601+
return 0;
564602
}
565603

566604
bpf_map_delete_elem(&active_reads, &id);
@@ -591,8 +629,8 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64
591629
e->tid = active_req->tid;
592630

593631
e->status = 0;
594-
if(read_info->buf && ret > PAYLOAD_PREFIX_SIZE){
595-
if(e->protocol==PROTOCOL_HTTP){ // if http, try to parse status code
632+
if(read_info->buf){
633+
if(e->protocol==PROTOCOL_HTTP && ret > PAYLOAD_PREFIX_SIZE){ // if http, try to parse status code
596634
// read first 16 bytes of read buffer
597635
char buf_prefix[PAYLOAD_PREFIX_SIZE];
598636
long r = bpf_probe_read(&buf_prefix, sizeof(buf_prefix), (void *)(read_info->buf)) ;
@@ -626,6 +664,13 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64
626664
}else if (active_req->request_type == POSTGRES_MESSAGE_PARSE || active_req->request_type == POSTGRES_MESSAGE_BIND){
627665
e->method = METHOD_EXTENDED_QUERY;
628666
}
667+
}else if (e->protocol == PROTOCOL_REDIS){
668+
if (e->method == METHOD_REDIS_PING){
669+
e->status = is_redis_pong(read_info->buf, ret);
670+
}else{
671+
e->status = parse_redis_response(read_info->buf, ret);
672+
e->method = METHOD_REDIS_COMMAND;
673+
}
629674
}
630675
}else{
631676
bpf_map_delete_elem(&active_reads, &id);
@@ -858,6 +903,25 @@ int sys_enter_write(struct trace_event_raw_sys_enter_write* ctx) {
858903
return process_enter_of_syscalls_write_sendto(ctx, ctx->fd, 0, ctx->buf, ctx->count);
859904
}
860905

906+
// SEC("tracepoint/syscalls/sys_enter_writev")
907+
// int sys_enter_writev(struct trace_event_raw_sys_enter_write* ctx) {
908+
// return process_enter_of_syscalls_write_sendto(ctx, ctx->fd, 0, ctx->buf, ctx->count);
909+
// }
910+
911+
912+
struct iov {
913+
char* buf;
914+
__u64 size;
915+
};
916+
SEC("tracepoint/syscalls/sys_enter_writev")
917+
int sys_enter_writev(struct trace_event_raw_sys_enter_writev* ctx) {
918+
struct iov iov0 = {};
919+
if (bpf_probe_read(&iov0, sizeof(struct iov), (void *)ctx->vec) < 0) {
920+
return 0;
921+
}
922+
return process_enter_of_syscalls_write_sendto(ctx, ctx->fd, 0, iov0.buf, iov0.size);
923+
}
924+
861925
SEC("tracepoint/syscalls/sys_enter_sendto")
862926
int sys_enter_sendto(struct trace_event_raw_sys_enter_sendto* ctx) {
863927
return process_enter_of_syscalls_write_sendto(ctx, ctx->fd, 0 ,ctx->buff, ctx->len);
@@ -868,6 +932,11 @@ int sys_exit_write(struct trace_event_raw_sys_exit_write* ctx) {
868932
return process_exit_of_syscalls_write_sendto(ctx, ctx->ret);
869933
}
870934

935+
SEC("tracepoint/syscalls/sys_exit_writev")
936+
int sys_exit_writev(struct trace_event_raw_sys_exit_writev* ctx) {
937+
return process_exit_of_syscalls_write_sendto(ctx, ctx->ret);
938+
}
939+
871940
SEC("tracepoint/syscalls/sys_exit_sendto")
872941
int sys_exit_sendto(struct trace_event_raw_sys_exit_sendto* ctx) {
873942
return process_exit_of_syscalls_write_sendto(ctx, ctx->ret);

0 commit comments

Comments
 (0)