Skip to content

Commit

Permalink
database_observability: split out sqlparser functions (#2743)
Browse files Browse the repository at this point in the history
Move the sqlparser functions from query_sample to a new file. This
should allow swapping the parser for a different implementation,
if needed. We're still tied to types from the underlying library,
but I think it's fine for now.

Tests are moved around while maintaining the same coverage percentage.
Now the query_sample tests are more focussed on testing the collector
logic rather than the parsing of sql queries.
  • Loading branch information
cristiangreco authored Feb 19, 2025
1 parent e84c222 commit 38f2bd3
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 321 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Main (unreleased)
- `query_sample`: capture schema name for query samples (@cristiangreco)
- `query_sample`: fix error handling during result set iteration (@cristiangreco)
- `query_sample`: improve parsing of truncated queries (@cristiangreco)
- `query_sample`: split out sql parsing logic to a separate file (@cristiangreco)
- `schema_table`: add table columns parsing (@cristiagreco)
- `schema_table`: correctly quote schema and table name in SHOW CREATE (@cristiangreco)
- `schema_table`: fix handling of view table types when detecting schema (@matthewnolf)
Expand All @@ -57,7 +58,7 @@ Main (unreleased)

- Ensure consistent service_name label handling in `pyroscope.receive_http` to match Pyroscope's behavior. (@marcsanmi)

- Improved memory and CPU performance of Prometheus pipelines by changing the underlying implementation of targets (@thampiotr)
- Improved memory and CPU performance of Prometheus pipelines by changing the underlying implementation of targets (@thampiotr)

- Add `config_merge_strategy` in `prometheus.exporter.snmp` to optionally merge custom snmp config with embedded config instead of replacing. Useful for providing SNMP auths. (@v-zhuravlev)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/xwb1989/sqlparser"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/component/common/loki"
Expand Down Expand Up @@ -142,13 +141,13 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
}
}

stmt, err := sqlparser.Parse(sampleText)
stmt, err := ParseSql(sampleText)
if err != nil {
level.Error(c.logger).Log("msg", "failed to parse sql query", "schema", schemaName, "digest", digest, "err", err)
continue
}

sampleRedactedText, err := sqlparser.RedactSQLQuery(sampleText)
sampleRedactedText, err := RedactSQL(sampleText)
if err != nil {
level.Error(c.logger).Log("msg", "failed to redact sql query", "schema", schemaName, "digest", digest, "err", err)
continue
Expand All @@ -164,12 +163,12 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
Timestamp: time.Unix(0, time.Now().UnixNano()),
Line: fmt.Sprintf(
`level=info msg="query samples fetched" schema="%s" digest="%s" query_type="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_sample_redacted="%s"`,
schemaName, digest, c.stmtType(stmt), sampleSeen, sampleTimerWait, sampleRedactedText,
schemaName, digest, StmtType(stmt), sampleSeen, sampleTimerWait, sampleRedactedText,
),
},
}

tables := c.tablesFromQuery(digest, stmt)
tables := ExtractTableNames(c.logger, digest, stmt)
for _, table := range tables {
c.entryHandler.Chan() <- loki.Entry{
Labels: model.LabelSet{
Expand All @@ -195,111 +194,3 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {

return nil
}

func (c QuerySample) stmtType(stmt sqlparser.Statement) string {
switch stmt.(type) {
case *sqlparser.Select:
return "select"
case *sqlparser.Insert:
return "insert"
case *sqlparser.Update:
return "update"
case *sqlparser.Delete:
return "delete"
case *sqlparser.Union:
return "select" // label union as a select
default:
return ""
}
}

func (c QuerySample) tablesFromQuery(digest string, stmt sqlparser.Statement) []string {
var parsedTables []string

switch stmt := stmt.(type) {
case *sqlparser.Select:
parsedTables = c.parseTableExprs(digest, stmt.From)
case *sqlparser.Update:
parsedTables = c.parseTableExprs(digest, stmt.TableExprs)
case *sqlparser.Delete:
parsedTables = c.parseTableExprs(digest, stmt.TableExprs)
case *sqlparser.Insert:
parsedTables = []string{c.parseTableName(stmt.Table)}
switch insRowsStmt := stmt.Rows.(type) {
case sqlparser.Values:
// ignore raw values
case *sqlparser.Select:
parsedTables = append(parsedTables, c.tablesFromQuery(digest, insRowsStmt)...)
case *sqlparser.Union:
for _, side := range []sqlparser.SelectStatement{insRowsStmt.Left, insRowsStmt.Right} {
parsedTables = append(parsedTables, c.tablesFromQuery(digest, side)...)
}
case *sqlparser.ParenSelect:
parsedTables = append(parsedTables, c.tablesFromQuery(digest, insRowsStmt.Select)...)
default:
level.Error(c.logger).Log("msg", "unknown insert type", "digest", digest)
}
case *sqlparser.Union:
for _, side := range []sqlparser.SelectStatement{stmt.Left, stmt.Right} {
parsedTables = append(parsedTables, c.tablesFromQuery(digest, side)...)
}
case *sqlparser.Show:
if stmt.HasOnTable() {
parsedTables = append(parsedTables, c.parseTableName(stmt.OnTable))
}
case *sqlparser.DDL:
parsedTables = append(parsedTables, c.parseTableName(stmt.Table))
case *sqlparser.Begin, *sqlparser.Commit, *sqlparser.Rollback, *sqlparser.Set, *sqlparser.DBDDL:
// ignore
default:
level.Error(c.logger).Log("msg", "unknown statement type", "digest", digest)
}

return parsedTables
}

func (c QuerySample) parseTableExprs(digest string, tables sqlparser.TableExprs) []string {
parsedTables := []string{}
for i := 0; i < len(tables); i++ {
t := tables[i]
switch tableExpr := t.(type) {
case *sqlparser.AliasedTableExpr:
switch expr := tableExpr.Expr.(type) {
case sqlparser.TableName:
parsedTables = append(parsedTables, c.parseTableName(expr))
case *sqlparser.Subquery:
switch subqueryExpr := expr.Select.(type) {
case *sqlparser.Select:
parsedTables = append(parsedTables, c.parseTableExprs(digest, subqueryExpr.From)...)
case *sqlparser.Union:
for _, side := range []sqlparser.SelectStatement{subqueryExpr.Left, subqueryExpr.Right} {
parsedTables = append(parsedTables, c.tablesFromQuery(digest, side)...)
}
case *sqlparser.ParenSelect:
parsedTables = append(parsedTables, c.tablesFromQuery(digest, subqueryExpr.Select)...)
default:
level.Error(c.logger).Log("msg", "unknown subquery type", "digest", digest)
}
default:
level.Error(c.logger).Log("msg", "unknown nested table expression", "digest", digest, "table", tableExpr)
}
case *sqlparser.JoinTableExpr:
// continue parsing both sides of join
tables = append(tables, tableExpr.LeftExpr, tableExpr.RightExpr)
case *sqlparser.ParenTableExpr:
tables = append(tables, tableExpr.Exprs...)
default:
level.Error(c.logger).Log("msg", "unknown table type", "digest", digest, "table", t)
}
}
return parsedTables
}

func (c QuerySample) parseTableName(t sqlparser.TableName) string {
qualifier := t.Qualifier.String()
tableName := t.Name.String()
if qualifier != "" {
return qualifier + "." + tableName
}
return tableName
}
Loading

0 comments on commit 38f2bd3

Please sign in to comment.