Skip to content

Commit 3000e76

Browse files
committed
add pg_stat_wal collector
Signed-off-by: Harry Cho <[email protected]>
1 parent f9c7457 commit 3000e76

File tree

2 files changed

+372
-0
lines changed

2 files changed

+372
-0
lines changed

Diff for: collector/pg_stat_wal.go

+240
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
// Copyright 2023 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
package collector
14+
15+
import (
16+
"context"
17+
"database/sql"
18+
"fmt"
19+
"log/slog"
20+
"strings"
21+
22+
"github.com/prometheus/client_golang/prometheus"
23+
)
24+
25+
const statWALSubsystem = "stat_wal"
26+
27+
func init() {
28+
registerCollector(statWALSubsystem, defaultDisabled, NewPGStatWALCollector)
29+
}
30+
31+
type PGStatWALCollector struct {
32+
log *slog.Logger
33+
}
34+
35+
func NewPGStatWALCollector(config collectorConfig) (Collector, error) {
36+
return &PGStatWALCollector{log: config.logger}, nil
37+
}
38+
39+
var statsWALRecordsDesc = prometheus.NewDesc(
40+
prometheus.BuildFQName(namespace, statWALSubsystem, "wal_records"),
41+
"Total number of WAL records generated",
42+
[]string{},
43+
prometheus.Labels{},
44+
)
45+
46+
var statsWALFPIDesc = prometheus.NewDesc(
47+
prometheus.BuildFQName(namespace, statWALSubsystem, "wal_fpi"),
48+
"Total number of WAL full page images generated",
49+
[]string{},
50+
prometheus.Labels{},
51+
)
52+
53+
var statsWALBytesDesc = prometheus.NewDesc(
54+
prometheus.BuildFQName(namespace, statWALSubsystem, "wal_bytes"),
55+
"Total amount of WAL generated in bytes",
56+
[]string{},
57+
prometheus.Labels{},
58+
)
59+
60+
var statsWALBuffersFullDesc = prometheus.NewDesc(
61+
prometheus.BuildFQName(namespace, statWALSubsystem, "wal_buffers_full"),
62+
"Number of times WAL data was written to disk because WAL buffers became full",
63+
[]string{},
64+
prometheus.Labels{},
65+
)
66+
67+
var statsWALWriteDesc = prometheus.NewDesc(
68+
prometheus.BuildFQName(namespace, statWALSubsystem, "wal_write"),
69+
"Number of times WAL buffers were written out to disk via XLogWrite request. See Section 30.5 for more information about the internal WAL function XLogWrite.",
70+
[]string{},
71+
prometheus.Labels{},
72+
)
73+
74+
var statsWALSyncDesc = prometheus.NewDesc(
75+
prometheus.BuildFQName(namespace, statWALSubsystem, "wal_sync"),
76+
"Number of times WAL files were synced to disk via issue_xlog_fsync request (if fsync is on and wal_sync_method is either fdatasync, fsync or fsync_writethrough, otherwise zero). See Section 30.5 for more information about the internal WAL function issue_xlog_fsync.",
77+
[]string{},
78+
prometheus.Labels{},
79+
)
80+
81+
var statsWALWriteTimeDesc = prometheus.NewDesc(
82+
prometheus.BuildFQName(namespace, statWALSubsystem, "wal_write_time"),
83+
"Total amount of time spent writing WAL buffers to disk via XLogWrite request, in milliseconds (if track_wal_io_timing is enabled, otherwise zero). This includes the sync time when wal_sync_method is either open_datasync or open_sync.",
84+
[]string{},
85+
prometheus.Labels{},
86+
)
87+
88+
var statsWALSyncTimeDesc = prometheus.NewDesc(
89+
prometheus.BuildFQName(namespace, statWALSubsystem, "wal_sync_time"),
90+
"Total amount of time spent syncing WAL files to disk via issue_xlog_fsync request, in milliseconds (if track_wal_io_timing is enabled, fsync is on, and wal_sync_method is either fdatasync, fsync or fsync_writethrough, otherwise zero).",
91+
[]string{},
92+
prometheus.Labels{},
93+
)
94+
95+
var statsWALStatsResetDesc = prometheus.NewDesc(
96+
prometheus.BuildFQName(namespace, statWALSubsystem, "stats_reset"),
97+
"Time at which these statistics were last reset",
98+
[]string{},
99+
prometheus.Labels{},
100+
)
101+
102+
func statWALQuery(columns []string) string {
103+
return fmt.Sprintf("SELECT %s FROM pg_stat_wal;", strings.Join(columns, ","))
104+
}
105+
106+
func (c *PGStatWALCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
107+
db := instance.getDB()
108+
109+
columns := []string{
110+
"wal_records", // bigint
111+
"wal_fpi", // bigint
112+
"wal_bytes", // numeric
113+
"wal_buffers_full", // bigint
114+
"wal_write", // bigint
115+
"wal_sync", // bigint
116+
"wal_write_time", // double precision
117+
"wal_sync_time", // double precision
118+
"stats_reset", // timestamp with time zone
119+
}
120+
121+
rows, err := db.QueryContext(ctx,
122+
statWALQuery(columns),
123+
)
124+
if err != nil {
125+
return err
126+
}
127+
defer rows.Close()
128+
129+
for rows.Next() {
130+
var walRecords, walFPI, walBytes, walBuffersFull, walWrite, walSync sql.NullInt64
131+
var walWriteTime, walSyncTime sql.NullFloat64
132+
var statsReset sql.NullTime
133+
134+
err := rows.Scan(
135+
&walRecords,
136+
&walFPI,
137+
&walBytes,
138+
&walBuffersFull,
139+
&walWrite,
140+
&walSync,
141+
&walWriteTime,
142+
&walSyncTime,
143+
&statsReset,
144+
)
145+
if err != nil {
146+
return err
147+
}
148+
149+
walRecordsMetric := 0.0
150+
if walRecords.Valid {
151+
walRecordsMetric = float64(walRecords.Int64)
152+
}
153+
ch <- prometheus.MustNewConstMetric(
154+
statsWALRecordsDesc,
155+
prometheus.CounterValue,
156+
walRecordsMetric,
157+
)
158+
159+
walFPIMetric := 0.0
160+
if walFPI.Valid {
161+
walFPIMetric = float64(walFPI.Int64)
162+
}
163+
ch <- prometheus.MustNewConstMetric(
164+
statsWALFPIDesc,
165+
prometheus.CounterValue,
166+
walFPIMetric,
167+
)
168+
169+
walBytesMetric := 0.0
170+
if walBytes.Valid {
171+
walBytesMetric = float64(walBytes.Int64)
172+
}
173+
ch <- prometheus.MustNewConstMetric(
174+
statsWALBytesDesc,
175+
prometheus.CounterValue,
176+
walBytesMetric,
177+
)
178+
179+
walBuffersFullMetric := 0.0
180+
if walBuffersFull.Valid {
181+
walBuffersFullMetric = float64(walBuffersFull.Int64)
182+
}
183+
ch <- prometheus.MustNewConstMetric(
184+
statsWALBuffersFullDesc,
185+
prometheus.CounterValue,
186+
walBuffersFullMetric,
187+
)
188+
189+
walWriteMetric := 0.0
190+
if walWrite.Valid {
191+
walWriteMetric = float64(walWrite.Int64)
192+
}
193+
ch <- prometheus.MustNewConstMetric(
194+
statsWALWriteDesc,
195+
prometheus.CounterValue,
196+
walWriteMetric,
197+
)
198+
199+
walSyncMetric := 0.0
200+
if walSync.Valid {
201+
walSyncMetric = float64(walSync.Int64)
202+
}
203+
ch <- prometheus.MustNewConstMetric(
204+
statsWALSyncDesc,
205+
prometheus.CounterValue,
206+
walSyncMetric,
207+
)
208+
209+
walWriteTimeMetric := 0.0
210+
if walWriteTime.Valid {
211+
walWriteTimeMetric = float64(walWriteTime.Float64)
212+
}
213+
ch <- prometheus.MustNewConstMetric(
214+
statsWALWriteTimeDesc,
215+
prometheus.CounterValue,
216+
walWriteTimeMetric,
217+
)
218+
219+
walSyncTimeMetric := 0.0
220+
if walSyncTime.Valid {
221+
walSyncTimeMetric = float64(walSyncTime.Float64)
222+
}
223+
ch <- prometheus.MustNewConstMetric(
224+
statsWALSyncTimeDesc,
225+
prometheus.CounterValue,
226+
walSyncTimeMetric,
227+
)
228+
229+
resetMetric := 0.0
230+
if statsReset.Valid {
231+
resetMetric = float64(statsReset.Time.Unix())
232+
}
233+
ch <- prometheus.MustNewConstMetric(
234+
statsWALStatsResetDesc,
235+
prometheus.CounterValue,
236+
resetMetric,
237+
)
238+
}
239+
return nil
240+
}

Diff for: collector/pg_stat_wal_test.go

+132
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package collector
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/DATA-DOG/go-sqlmock"
9+
"github.com/prometheus/client_golang/prometheus"
10+
dto "github.com/prometheus/client_model/go"
11+
"github.com/smartystreets/goconvey/convey"
12+
)
13+
14+
func TestPGStatWALCollector(t *testing.T) {
15+
db, mock, err := sqlmock.New()
16+
if err != nil {
17+
t.Fatalf("Error opening a stub db connection: %s", err)
18+
}
19+
defer db.Close()
20+
21+
inst := &instance{db: db}
22+
23+
columns := []string{
24+
"wal_records", // bigint
25+
"wal_fpi", // bigint
26+
"wal_bytes", // numeric
27+
"wal_buffers_full", // bigint
28+
"wal_write", // bigint
29+
"wal_sync", // bigint
30+
"wal_write_time", // double precision
31+
"wal_sync_time", // double precision
32+
"stats_reset", // timestamp with time zone
33+
}
34+
35+
srT, err := time.Parse("2006-01-02 15:04:05.00000-07", "2023-05-25 17:10:42.81132-07")
36+
if err != nil {
37+
t.Fatalf("Error parsing time: %s", err)
38+
}
39+
40+
rows := sqlmock.NewRows(columns).
41+
AddRow(354, 4945, 289097744, 1242257, int(3275602074), 89320867, 450.123439, 1234.5678, srT)
42+
mock.ExpectQuery(sanitizeQuery(statWALQuery(columns))).WillReturnRows(rows)
43+
44+
ch := make(chan prometheus.Metric)
45+
go func() {
46+
defer close(ch)
47+
c := PGStatWALCollector{}
48+
49+
if err := c.Update(context.Background(), inst, ch); err != nil {
50+
t.Errorf("Error calling PGStatWALCollector.Update: %s", err)
51+
}
52+
}()
53+
54+
expected := []MetricResult{
55+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 354},
56+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 4945},
57+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 289097744},
58+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 1242257},
59+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 3275602074},
60+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 89320867},
61+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 450.123439},
62+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 1234.5678},
63+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 1685059842},
64+
}
65+
66+
convey.Convey("Metrics comparison", t, func() {
67+
for _, expect := range expected {
68+
m := readMetric(<-ch)
69+
convey.So(expect, convey.ShouldResemble, m)
70+
}
71+
})
72+
if err := mock.ExpectationsWereMet(); err != nil {
73+
t.Errorf("there were unfulfilled exceptions: %s", err)
74+
}
75+
}
76+
77+
func TestPGStatWALCollectorNullValues(t *testing.T) {
78+
db, mock, err := sqlmock.New()
79+
if err != nil {
80+
t.Fatalf("Error opening a stub db connection: %s", err)
81+
}
82+
defer db.Close()
83+
84+
inst := &instance{db: db}
85+
columns := []string{
86+
"wal_records", // bigint
87+
"wal_fpi", // bigint
88+
"wal_bytes", // numeric
89+
"wal_buffers_full", // bigint
90+
"wal_write", // bigint
91+
"wal_sync", // bigint
92+
"wal_write_time", // double precision
93+
"wal_sync_time", // double precision
94+
"stats_reset", // timestamp with time zone
95+
}
96+
97+
rows := sqlmock.NewRows(columns).
98+
AddRow(nil, nil, nil, nil, nil, nil, nil, nil, nil)
99+
mock.ExpectQuery(sanitizeQuery(statWALQuery(columns))).WillReturnRows(rows)
100+
101+
ch := make(chan prometheus.Metric)
102+
go func() {
103+
defer close(ch)
104+
c := PGStatWALCollector{}
105+
106+
if err := c.Update(context.Background(), inst, ch); err != nil {
107+
t.Errorf("Error calling PGStatWALCollector.Update: %s", err)
108+
}
109+
}()
110+
111+
expected := []MetricResult{
112+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
113+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
114+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
115+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
116+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
117+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
118+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
119+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
120+
{labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0},
121+
}
122+
123+
convey.Convey("Metrics comparison", t, func() {
124+
for _, expect := range expected {
125+
m := readMetric(<-ch)
126+
convey.So(expect, convey.ShouldResemble, m)
127+
}
128+
})
129+
if err := mock.ExpectationsWereMet(); err != nil {
130+
t.Errorf("there were unfulfilled exceptions: %s", err)
131+
}
132+
}

0 commit comments

Comments
 (0)