Skip to content

Commit 01775ff

Browse files
parse mysql prepared stmt
1 parent 2718e82 commit 01775ff

File tree

8 files changed

+55
-15
lines changed

8 files changed

+55
-15
lines changed

aggregator/data.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ type Aggregator struct {
7878
pgStmtsMu sync.RWMutex
7979
pgStmts map[string]string // pid-fd-stmtname -> query
8080

81+
// postgres prepared stmt
82+
mySqlStmtsMu sync.RWMutex
83+
mySqlStmts map[string]string // pid-fd-stmtId -> query
84+
8185
liveProcessesMu sync.RWMutex
8286
liveProcesses map[uint32]struct{} // pid -> struct{}
8387

@@ -153,6 +157,7 @@ func NewAggregator(parentCtx context.Context, ct *cri.CRITool, k8sChan chan inte
153157
liveProcesses: make(map[uint32]struct{}),
154158
rateLimiters: make(map[uint32]*rate.Limiter),
155159
pgStmts: make(map[string]string),
160+
mySqlStmts: make(map[string]string),
156161
}
157162

158163
var err error
@@ -381,6 +386,14 @@ func (a *Aggregator) processExit(pid uint32) {
381386
}
382387
}
383388
a.pgStmtsMu.Unlock()
389+
390+
a.mySqlStmtsMu.Lock()
391+
for key, _ := range a.pgStmts {
392+
if strings.HasPrefix(key, fmt.Sprint(pid)) {
393+
delete(a.mySqlStmts, key)
394+
}
395+
}
396+
a.mySqlStmtsMu.Unlock()
384397
}
385398

386399
func (a *Aggregator) signalTlsAttachment(pid uint32) {
@@ -1373,17 +1386,29 @@ func (a *Aggregator) findRelatedSocket(ctx context.Context, d *l7_req.L7Event) (
13731386
func (a *Aggregator) parseMySQLCommand(d *l7_req.L7Event) (string, error) {
13741387
r := d.Payload[:d.PayloadSize]
13751388
var sqlCommand string
1389+
// 3 bytes len, 1 byte package number, 1 byte command type
1390+
if len(r) < 5 {
1391+
return "", fmt.Errorf("too short for a sql query")
1392+
}
1393+
r = r[5:]
1394+
sqlCommand = string(r)
13761395
if d.Method == l7_req.MYSQL_TEXT_QUERY {
1377-
// 3 bytes len, 1 byte package number, 1 byte command type
1378-
if len(r) < 5 {
1379-
return "", fmt.Errorf("too short for a sql query")
1380-
}
1381-
r = r[5:]
1382-
sqlCommand = string(r)
1383-
13841396
if !containsSQLKeywords(sqlCommand) {
13851397
return "", fmt.Errorf("no sql command found")
13861398
}
1399+
} else if d.Method == l7_req.MYSQL_PREPARE_STMT {
1400+
a.mySqlStmtsMu.Lock()
1401+
a.mySqlStmts[fmt.Sprintf("%d-%d-%d", d.Pid, d.Fd, d.MySqlPrepStmtId)] = string(r)
1402+
a.mySqlStmtsMu.Unlock()
1403+
} else if d.Method == l7_req.MYSQL_EXEC_STMT {
1404+
a.mySqlStmtsMu.RLock()
1405+
query, ok := a.mySqlStmts[fmt.Sprintf("%d-%d-%d", d.Pid, d.Fd, d.MySqlPrepStmtId)]
1406+
a.mySqlStmtsMu.RUnlock()
1407+
if !ok || query == "" { // we don't have the query for the prepared statement
1408+
// Execute (name of prepared statement) [(parameter)]
1409+
return fmt.Sprintf("EXECUTE %d *values*", d.MySqlPrepStmtId), nil
1410+
}
1411+
sqlCommand = query
13871412
}
13881413
return sqlCommand, nil
13891414
}

ebpf/c/bpf_bpfeb.go

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ebpf/c/bpf_bpfeb.o

896 Bytes
Binary file not shown.

ebpf/c/bpf_bpfel.go

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ebpf/c/bpf_bpfel.o

496 Bytes
Binary file not shown.

ebpf/c/l7.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ struct l7_event {
3131
__u8 is_tls;
3232

3333
__u32 seq; // tcp sequence number
34-
__u32 tid;
34+
__u32 tid; // thread id
3535

3636
__s16 kafka_api_version; // used only for kafka
37+
__u32 prep_statement_id; // used only for mysql
3738

39+
// socket pair
3840
__u32 saddr;
3941
__u16 sport;
4042
__u32 daddr;
@@ -816,7 +818,7 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64
816818
e->kafka_api_version = active_req->api_version;
817819
}
818820
}else if (e->protocol == PROTOCOL_MYSQL) {
819-
e->status = is_mysql_response(read_info->buf, ret);
821+
e->status = is_mysql_response(read_info->buf, ret, active_req->request_type, &(e->prep_statement_id));
820822
e->method = METHOD_UNKNOWN;
821823
if (active_req->request_type == MYSQL_COM_STMT_PREPARE) {
822824
e->method = METHOD_MYSQL_PREPARE_STMT;

ebpf/c/mysql.c

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
#define METHOD_MYSQL_PREPARE_STMT 2
2626
#define METHOD_MYSQL_EXEC_STMT 3
2727

28-
#define MYSQL_STATUS_FAILED 0
2928
#define MYSQL_STATUS_OK 1
29+
#define MYSQL_STATUS_FAILED 2
3030

3131
static __always_inline
3232
int is_mysql_query(char *buf, __u64 buf_size, __u8 *request_type) {
@@ -63,8 +63,8 @@ int is_mysql_query(char *buf, __u64 buf_size, __u8 *request_type) {
6363

6464
// __u32 *statement_id
6565
static __always_inline
66-
int is_mysql_response(char *buf, __u64 buf_size) {
67-
__u8 b[5]; // first 5 bytes, first 3 represents length, 4th is packet number, 5th is command type
66+
int is_mysql_response(char *buf, __u64 buf_size, __u8 request_type, __u32 *statement_id) {
67+
__u8 b[5]; // first 5 bytes, first 3 represents length, 4th is packet number, 5th is response code
6868
if (bpf_probe_read(&b, sizeof(b), (void *)((char *)buf)) < 0) {
6969
return 0;
7070
}
@@ -77,6 +77,12 @@ int is_mysql_response(char *buf, __u64 buf_size) {
7777
return MYSQL_STATUS_OK;
7878
}
7979
if (b[4] == MYSQL_RESPONSE_OK) {
80+
if (request_type == MYSQL_COM_STMT_PREPARE) {
81+
// 6-9th bytes returns statement id
82+
if (bpf_probe_read(statement_id, sizeof(*statement_id), (void *)((char *)buf+5)) < 0) {
83+
return 0;
84+
}
85+
}
8086
return MYSQL_STATUS_OK;
8187
}
8288
if (b[4] == MYSQL_RESPONSE_ERROR) {

ebpf/l7_req/l7.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,12 +353,13 @@ type bpfL7Event struct {
353353
Tid uint32
354354
KafkaApiVersion int16
355355
_ [2]byte
356+
PrepStatementId uint32 // for mysql
356357
Saddr uint32
357358
Sport uint16
358359
_ [2]byte
359360
Daddr uint32
360361
Dport uint16
361-
_ [2]byte
362+
_ [6]byte
362363
}
363364

364365
type bpfTraceEvent struct {
@@ -401,6 +402,7 @@ type L7Event struct {
401402
Tid uint32
402403
Seq uint32 // tcp seq num
403404
KafkaApiVersion int16
405+
MySqlPrepStmtId uint32
404406
Saddr uint32
405407
Sport uint16
406408
Daddr uint32
@@ -739,6 +741,7 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) {
739741
Tid: l7Event.Tid,
740742
Seq: l7Event.Seq,
741743
KafkaApiVersion: l7Event.KafkaApiVersion,
744+
MySqlPrepStmtId: l7Event.PrepStatementId,
742745
Saddr: l7Event.Saddr,
743746
Sport: l7Event.Sport,
744747
Daddr: l7Event.Daddr,
@@ -750,6 +753,8 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) {
750753
Str("payload", string(userspacel7Event.Payload[:userspacel7Event.PayloadSize])).
751754
Uint32("pid", userspacel7Event.Pid).
752755
Str("method", userspacel7Event.Method).
756+
Uint32("stmtId", userspacel7Event.MySqlPrepStmtId).
757+
Uint64("fd", userspacel7Event.Fd).
753758
Msg("mysql-event")
754759
}
755760

0 commit comments

Comments
 (0)