Skip to content

Commit 1f67638

Browse files
author
Jakub Štiller
committed
Export query itself together with queryId in stat_statement metrics
The feature must be enabled via flag or via environment variable. The query is not added to every metrics, but instead of new metric stat_statement_query_id is introduced that contains mapping between queryId and query. Fix #813 Signed-off-by: Jakub Štiller <[email protected]>
1 parent e2892a7 commit 1f67638

File tree

3 files changed

+204
-11
lines changed

3 files changed

+204
-11
lines changed

README.md

+3
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ This will build the docker image as `prometheuscommunity/postgres_exporter:${bra
128128
* `[no-]collector.stat_statements`
129129
Enable the `stat_statements` collector (default: disabled).
130130

131+
* `[no-]collector.stat_statements.include_query`
132+
Enable selecting statement query together with queryId. (default: disabled)
133+
131134
* `[no-]collector.stat_user_tables`
132135
Enable the `stat_user_tables` collector (default: enabled).
133136

collector/pg_stat_statements.go

+65-8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ package collector
1616
import (
1717
"context"
1818
"database/sql"
19+
"fmt"
20+
"github.com/alecthomas/kingpin/v2"
21+
"strings"
1922

2023
"github.com/blang/semver/v4"
2124
"github.com/go-kit/log"
@@ -24,19 +27,31 @@ import (
2427

2528
const statStatementsSubsystem = "stat_statements"
2629

30+
var includeQueryFlag *bool = nil
31+
2732
func init() {
2833
// WARNING:
2934
// Disabled by default because this set of metrics can be quite expensive on a busy server
3035
// Every unique query will cause a new timeseries to be created
3136
registerCollector(statStatementsSubsystem, defaultDisabled, NewPGStatStatementsCollector)
37+
38+
flagName := fmt.Sprintf("collector.%s.include_query", statStatementsSubsystem)
39+
flagEnvName := fmt.Sprintf("PG_EXPORTER_COLLECTOR_%s_INCLUDE_QUERY", strings.ToUpper(statStatementsSubsystem))
40+
flagHelp := "Enable selecting statement query together with queryId. (default: false)"
41+
defaultValue := fmt.Sprintf("%v", defaultDisabled)
42+
includeQueryFlag = kingpin.Flag(flagName, flagHelp).Default(defaultValue).Envar(flagEnvName).Bool()
3243
}
3344

3445
type PGStatStatementsCollector struct {
35-
log log.Logger
46+
log log.Logger
47+
includeQueryStatement bool
3648
}
3749

3850
func NewPGStatStatementsCollector(config collectorConfig) (Collector, error) {
39-
return &PGStatStatementsCollector{log: config.logger}, nil
51+
return &PGStatStatementsCollector{
52+
log: config.logger,
53+
includeQueryStatement: *includeQueryFlag,
54+
}, nil
4055
}
4156

4257
var (
@@ -71,10 +86,20 @@ var (
7186
prometheus.Labels{},
7287
)
7388

89+
statStatementsQuery = prometheus.NewDesc(
90+
prometheus.BuildFQName(namespace, statStatementsSubsystem, "query_id"),
91+
"SQL Query to queryid mapping",
92+
[]string{"queryid", "query"},
93+
prometheus.Labels{},
94+
)
95+
96+
pgStatStatementQuerySelect = "pg_stat_statements.query,"
97+
7498
pgStatStatementsQuery = `SELECT
7599
pg_get_userbyid(userid) as user,
76100
pg_database.datname,
77101
pg_stat_statements.queryid,
102+
%s
78103
pg_stat_statements.calls as calls_total,
79104
pg_stat_statements.total_time / 1000.0 as seconds_total,
80105
pg_stat_statements.rows as rows_total,
@@ -96,6 +121,7 @@ var (
96121
pg_get_userbyid(userid) as user,
97122
pg_database.datname,
98123
pg_stat_statements.queryid,
124+
%s
99125
pg_stat_statements.calls as calls_total,
100126
pg_stat_statements.total_exec_time / 1000.0 as seconds_total,
101127
pg_stat_statements.rows as rows_total,
@@ -114,25 +140,37 @@ var (
114140
LIMIT 100;`
115141
)
116142

117-
func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
118-
query := pgStatStatementsQuery
143+
func (c PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
144+
queryTemplate := pgStatStatementsQuery
119145
if instance.version.GE(semver.MustParse("13.0.0")) {
120-
query = pgStatStatementsNewQuery
146+
queryTemplate = pgStatStatementsNewQuery
147+
}
148+
var querySelect = ""
149+
if c.includeQueryStatement {
150+
querySelect = pgStatStatementQuerySelect
121151
}
152+
query := fmt.Sprintf(queryTemplate, querySelect)
122153

123154
db := instance.getDB()
124155
rows, err := db.QueryContext(ctx, query)
125156

157+
var presentQueryIds = make(map[string]struct{})
158+
126159
if err != nil {
127160
return err
128161
}
129162
defer rows.Close()
130163
for rows.Next() {
131-
var user, datname, queryid sql.NullString
164+
var user, datname, queryid, statement sql.NullString
132165
var callsTotal, rowsTotal sql.NullInt64
133166
var secondsTotal, blockReadSecondsTotal, blockWriteSecondsTotal sql.NullFloat64
134-
135-
if err := rows.Scan(&user, &datname, &queryid, &callsTotal, &secondsTotal, &rowsTotal, &blockReadSecondsTotal, &blockWriteSecondsTotal); err != nil {
167+
var columns []any
168+
if c.includeQueryStatement {
169+
columns = []any{&user, &datname, &queryid, &statement, &callsTotal, &secondsTotal, &rowsTotal, &blockReadSecondsTotal, &blockWriteSecondsTotal}
170+
} else {
171+
columns = []any{&user, &datname, &queryid, &callsTotal, &secondsTotal, &rowsTotal, &blockReadSecondsTotal, &blockWriteSecondsTotal}
172+
}
173+
if err := rows.Scan(columns...); err != nil {
136174
return err
137175
}
138176

@@ -203,6 +241,25 @@ func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance,
203241
blockWriteSecondsTotalMetric,
204242
userLabel, datnameLabel, queryidLabel,
205243
)
244+
245+
if c.includeQueryStatement {
246+
_, ok := presentQueryIds[queryidLabel]
247+
if !ok {
248+
presentQueryIds[queryidLabel] = struct{}{}
249+
250+
queryLabel := "unknown"
251+
if statement.Valid {
252+
queryLabel = statement.String
253+
}
254+
255+
ch <- prometheus.MustNewConstMetric(
256+
statStatementsQuery,
257+
prometheus.CounterValue,
258+
1,
259+
queryidLabel, queryLabel,
260+
)
261+
}
262+
}
206263
}
207264
if err := rows.Err(); err != nil {
208265
return err

collector/pg_stat_statements_test.go

+136-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ package collector
1414

1515
import (
1616
"context"
17+
"fmt"
1718
"testing"
1819

1920
"github.com/DATA-DOG/go-sqlmock"
@@ -35,7 +36,7 @@ func TestPGStateStatementsCollector(t *testing.T) {
3536
columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
3637
rows := sqlmock.NewRows(columns).
3738
AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2)
38-
mock.ExpectQuery(sanitizeQuery(pgStatStatementsQuery)).WillReturnRows(rows)
39+
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, ""))).WillReturnRows(rows)
3940

4041
ch := make(chan prometheus.Metric)
4142
go func() {
@@ -66,6 +67,50 @@ func TestPGStateStatementsCollector(t *testing.T) {
6667
}
6768
}
6869

70+
func TestPGStateStatementsCollectorWithStatement(t *testing.T) {
71+
db, mock, err := sqlmock.New()
72+
if err != nil {
73+
t.Fatalf("Error opening a stub db connection: %s", err)
74+
}
75+
defer db.Close()
76+
77+
inst := &instance{db: db, version: semver.MustParse("12.0.0")}
78+
79+
columns := []string{"user", "datname", "queryid", "query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
80+
rows := sqlmock.NewRows(columns).
81+
AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2)
82+
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, pgStatStatementQuerySelect))).WillReturnRows(rows)
83+
84+
ch := make(chan prometheus.Metric)
85+
go func() {
86+
defer close(ch)
87+
c := PGStatStatementsCollector{includeQueryStatement: true}
88+
89+
if err := c.Update(context.Background(), inst, ch); err != nil {
90+
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
91+
}
92+
}()
93+
94+
expected := []MetricResult{
95+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5},
96+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4},
97+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100},
98+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1},
99+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2},
100+
{labels: labelMap{"queryid": "1500", "query": "select 1 from foo"}, metricType: dto.MetricType_COUNTER, value: 1},
101+
}
102+
103+
convey.Convey("Metrics comparison", t, func() {
104+
for _, expect := range expected {
105+
m := readMetric(<-ch)
106+
convey.So(expect, convey.ShouldResemble, m)
107+
}
108+
})
109+
if err := mock.ExpectationsWereMet(); err != nil {
110+
t.Errorf("there were unfulfilled exceptions: %s", err)
111+
}
112+
}
113+
69114
func TestPGStateStatementsCollectorNull(t *testing.T) {
70115
db, mock, err := sqlmock.New()
71116
if err != nil {
@@ -78,7 +123,7 @@ func TestPGStateStatementsCollectorNull(t *testing.T) {
78123
columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
79124
rows := sqlmock.NewRows(columns).
80125
AddRow(nil, nil, nil, nil, nil, nil, nil, nil)
81-
mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows)
126+
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, ""))).WillReturnRows(rows)
82127

83128
ch := make(chan prometheus.Metric)
84129
go func() {
@@ -109,6 +154,50 @@ func TestPGStateStatementsCollectorNull(t *testing.T) {
109154
}
110155
}
111156

157+
func TestPGStateStatementsCollectorNullWithStatement(t *testing.T) {
158+
db, mock, err := sqlmock.New()
159+
if err != nil {
160+
t.Fatalf("Error opening a stub db connection: %s", err)
161+
}
162+
defer db.Close()
163+
164+
inst := &instance{db: db, version: semver.MustParse("13.3.7")}
165+
166+
columns := []string{"user", "datname", "queryid", "query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
167+
rows := sqlmock.NewRows(columns).
168+
AddRow(nil, nil, nil, nil, nil, nil, nil, nil, nil)
169+
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, pgStatStatementQuerySelect))).WillReturnRows(rows)
170+
171+
ch := make(chan prometheus.Metric)
172+
go func() {
173+
defer close(ch)
174+
c := PGStatStatementsCollector{includeQueryStatement: true}
175+
176+
if err := c.Update(context.Background(), inst, ch); err != nil {
177+
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
178+
}
179+
}()
180+
181+
expected := []MetricResult{
182+
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
183+
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
184+
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
185+
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
186+
{labels: labelMap{"user": "unknown", "datname": "unknown", "queryid": "unknown"}, metricType: dto.MetricType_COUNTER, value: 0},
187+
{labels: labelMap{"queryid": "unknown", "query": "unknown"}, metricType: dto.MetricType_COUNTER, value: 1},
188+
}
189+
190+
convey.Convey("Metrics comparison", t, func() {
191+
for _, expect := range expected {
192+
m := readMetric(<-ch)
193+
convey.So(expect, convey.ShouldResemble, m)
194+
}
195+
})
196+
if err := mock.ExpectationsWereMet(); err != nil {
197+
t.Errorf("there were unfulfilled exceptions: %s", err)
198+
}
199+
}
200+
112201
func TestPGStateStatementsCollectorNewPG(t *testing.T) {
113202
db, mock, err := sqlmock.New()
114203
if err != nil {
@@ -121,7 +210,7 @@ func TestPGStateStatementsCollectorNewPG(t *testing.T) {
121210
columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
122211
rows := sqlmock.NewRows(columns).
123212
AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2)
124-
mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows)
213+
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, ""))).WillReturnRows(rows)
125214

126215
ch := make(chan prometheus.Metric)
127216
go func() {
@@ -151,3 +240,47 @@ func TestPGStateStatementsCollectorNewPG(t *testing.T) {
151240
t.Errorf("there were unfulfilled exceptions: %s", err)
152241
}
153242
}
243+
244+
func TestPGStateStatementsCollectorNewPGWithStatement(t *testing.T) {
245+
db, mock, err := sqlmock.New()
246+
if err != nil {
247+
t.Fatalf("Error opening a stub db connection: %s", err)
248+
}
249+
defer db.Close()
250+
251+
inst := &instance{db: db, version: semver.MustParse("13.3.7")}
252+
253+
columns := []string{"user", "datname", "queryid", "query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
254+
rows := sqlmock.NewRows(columns).
255+
AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2)
256+
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, pgStatStatementQuerySelect))).WillReturnRows(rows)
257+
258+
ch := make(chan prometheus.Metric)
259+
go func() {
260+
defer close(ch)
261+
c := PGStatStatementsCollector{includeQueryStatement: true}
262+
263+
if err := c.Update(context.Background(), inst, ch); err != nil {
264+
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
265+
}
266+
}()
267+
268+
expected := []MetricResult{
269+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5},
270+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4},
271+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100},
272+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1},
273+
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2},
274+
{labels: labelMap{"queryid": "1500", "query": "select 1 from foo"}, metricType: dto.MetricType_COUNTER, value: 1},
275+
}
276+
277+
convey.Convey("Metrics comparison", t, func() {
278+
for _, expect := range expected {
279+
m := readMetric(<-ch)
280+
convey.So(expect, convey.ShouldResemble, m)
281+
}
282+
})
283+
if err := mock.ExpectationsWereMet(); err != nil {
284+
t.Errorf("there were unfulfilled exceptions: %s", err)
285+
}
286+
}

0 commit comments

Comments
 (0)