From daf69cc8abda364b0dc16826272ee629361753fa Mon Sep 17 00:00:00 2001 From: Rahul Reddy Date: Wed, 12 Feb 2025 17:22:16 +0530 Subject: [PATCH] feat(query-performance-monitoring): Implement query performance monitoring (#189) feat(query-performance-monitoring): Implement query performance monitoring --- Makefile | 16 +- go.mod | 1 + go.sum | 2 + postgresql-config.yml.sample | 9 + src/args/argument_list.go | 46 +-- src/connection/pgsql_connection.go | 3 +- src/main.go | 9 +- src/metrics/metrics.go | 5 +- src/metrics/version_test.go | 10 +- .../common-parameters/common_parameters.go | 55 +++ .../common-utils/common_helpers.go | 42 ++ .../common-utils/common_helpers_test.go | 38 ++ .../common-utils/constants.go | 22 ++ .../common-utils/ingestion-helpers.go | 120 ++++++ .../common-utils/ingestion_helper_test.go | 87 +++++ .../common-utils/query_fetch_helpers.go | 38 ++ .../common-utils/query_fetch_helpers_test.go | 70 ++++ .../datamodels/performance_data_models.go | 66 ++++ .../performance-metrics/blocking_sessions.go | 71 ++++ .../blocking_sessions_test.go | 74 ++++ .../execution_plan_metrics.go | 122 ++++++ .../execution_plan_metrics_test.go | 89 +++++ .../individual_query_metrics.go | 121 ++++++ .../individual_query_metrics_test.go | 47 +++ .../performance-metrics/slow_query_metrics.go | 67 ++++ .../slow_query_metrics_test.go | 74 ++++ .../performance-metrics/wait_event_metrics.go | 61 +++ .../wait_event_metrics_test.go | 49 +++ .../queries/queries.go | 217 +++++++++++ .../query_performance_main.go | 79 ++++ .../performance_metrics_validations.go | 50 +++ .../performance_metrics_validations_test.go | 103 +++++ tests/docker-compose-performance.yml | 70 ++++ tests/docker-compose.yml | 15 +- tests/perf-testing/integration/Dockerfile | 9 + .../latest_supported/01-init-extensions.sql | 5 + .../latest_supported/02-create-database.sql | 1 + .../latest_supported/03-import-data.sql | 11 + .../perf-testing/latest_supported/Dockerfile | 40 ++ .../oldest_supported/01-init-extensions.sql | 5 + .../oldest_supported/02-create-database.sql | 1 + .../oldest_supported/03-import-data.sql | 11 + .../perf-testing/oldest_supported/Dockerfile | 40 ++ tests/postgresql_test.go | 133 ++----- tests/postgresqlperf_test.go | 185 +++++++++ tests/simulation/helpers.go | 123 ++++++ tests/simulation/sim_queries.go | 359 ++++++++++++++++++ tests/testdata/blocking-sessions-schema.json | 98 +++++ tests/testdata/execution-plan-schema.json | 187 +++++++++ tests/testdata/individual-queries-schema.json | 105 +++++ tests/testdata/slow-queries-schema.json | 121 ++++++ tests/testdata/wait-events-schema.json | 95 +++++ 52 files changed, 3337 insertions(+), 140 deletions(-) create mode 100644 src/query-performance-monitoring/common-parameters/common_parameters.go create mode 100644 src/query-performance-monitoring/common-utils/common_helpers.go create mode 100644 src/query-performance-monitoring/common-utils/common_helpers_test.go create mode 100644 src/query-performance-monitoring/common-utils/constants.go create mode 100644 src/query-performance-monitoring/common-utils/ingestion-helpers.go create mode 100644 src/query-performance-monitoring/common-utils/ingestion_helper_test.go create mode 100644 src/query-performance-monitoring/common-utils/query_fetch_helpers.go create mode 100644 src/query-performance-monitoring/common-utils/query_fetch_helpers_test.go create mode 100644 src/query-performance-monitoring/datamodels/performance_data_models.go create mode 100644 src/query-performance-monitoring/performance-metrics/blocking_sessions.go create mode 100644 src/query-performance-monitoring/performance-metrics/blocking_sessions_test.go create mode 100644 src/query-performance-monitoring/performance-metrics/execution_plan_metrics.go create mode 100644 src/query-performance-monitoring/performance-metrics/execution_plan_metrics_test.go create mode 100644 src/query-performance-monitoring/performance-metrics/individual_query_metrics.go create mode 100644 src/query-performance-monitoring/performance-metrics/individual_query_metrics_test.go create mode 100644 src/query-performance-monitoring/performance-metrics/slow_query_metrics.go create mode 100644 src/query-performance-monitoring/performance-metrics/slow_query_metrics_test.go create mode 100644 src/query-performance-monitoring/performance-metrics/wait_event_metrics.go create mode 100644 src/query-performance-monitoring/performance-metrics/wait_event_metrics_test.go create mode 100644 src/query-performance-monitoring/queries/queries.go create mode 100644 src/query-performance-monitoring/query_performance_main.go create mode 100644 src/query-performance-monitoring/validations/performance_metrics_validations.go create mode 100644 src/query-performance-monitoring/validations/performance_metrics_validations_test.go create mode 100644 tests/docker-compose-performance.yml create mode 100644 tests/perf-testing/integration/Dockerfile create mode 100644 tests/perf-testing/latest_supported/01-init-extensions.sql create mode 100644 tests/perf-testing/latest_supported/02-create-database.sql create mode 100644 tests/perf-testing/latest_supported/03-import-data.sql create mode 100644 tests/perf-testing/latest_supported/Dockerfile create mode 100644 tests/perf-testing/oldest_supported/01-init-extensions.sql create mode 100644 tests/perf-testing/oldest_supported/02-create-database.sql create mode 100644 tests/perf-testing/oldest_supported/03-import-data.sql create mode 100644 tests/perf-testing/oldest_supported/Dockerfile create mode 100644 tests/postgresqlperf_test.go create mode 100644 tests/simulation/helpers.go create mode 100644 tests/simulation/sim_queries.go create mode 100644 tests/testdata/blocking-sessions-schema.json create mode 100644 tests/testdata/execution-plan-schema.json create mode 100644 tests/testdata/individual-queries-schema.json create mode 100644 tests/testdata/slow-queries-schema.json create mode 100644 tests/testdata/wait-events-schema.json diff --git a/Makefile b/Makefile index 39ffffc8..8d1aacc4 100644 --- a/Makefile +++ b/Makefile @@ -32,9 +32,19 @@ test: integration-test: @echo "=== $(INTEGRATION) === [ test ]: running integration tests..." - @docker compose -f tests/docker-compose.yml pull - @go test -v -tags=integration -count 1 ./tests/. || (ret=$$?; docker compose -f tests/docker-compose.yml down && exit $$ret) - @docker compose -f tests/docker-compose.yml down + @docker compose -f tests/docker-compose.yml up -d + # Sleep added to allow postgres with test data and extensions to start up + @sleep 10 + @go test -v -tags=integration -count 1 ./tests/postgresql_test.go -timeout 300s || (ret=$$?; docker compose -f tests/docker-compose.yml down -v && exit $$ret) + @docker compose -f tests/docker-compose.yml down -v + @echo "=== $(INTEGRATION) === [ test ]: running integration tests for query performance monitoring..." + @echo "Starting containers for performance tests..." + @docker compose -f tests/docker-compose-performance.yml up -d + # Sleep added to allow postgres with test data and extensions to start up + @sleep 30 + @go test -v -tags=query_performance ./tests/postgresqlperf_test.go -timeout 600s || (ret=$$?; docker compose -f tests/docker-compose-performance.yml down -v && exit $$ret) + @echo "Stopping performance test containers..." + @docker compose -f tests/docker-compose-performance.yml down -v install: compile @echo "=== $(INTEGRATION) === [ install ]: installing bin/$(BINARY_NAME)..." diff --git a/go.mod b/go.mod index aab0ac89..e9b00c4c 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/xeipuuv/gojsonschema v1.2.0 gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 gopkg.in/yaml.v3 v3.0.1 + github.com/go-viper/mapstructure/v2 v2.2.1 ) require ( diff --git a/go.sum b/go.sum index 54454a08..b3486667 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= +github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= diff --git a/postgresql-config.yml.sample b/postgresql-config.yml.sample index 66ca782e..17f087e0 100644 --- a/postgresql-config.yml.sample +++ b/postgresql-config.yml.sample @@ -50,6 +50,15 @@ integrations: # True if SSL is to be used. Defaults to false. ENABLE_SSL: "false" + # Enable query performance monitoring - Defaults to false + # ENABLE_QUERY_MONITORING : "false" + + # Threshold in milliseconds for query response time to fetch individual query performance metrics - Defaults to 500 + # QUERY_MONITORING_RESPONSE_TIME_THRESHOLD : "500" + + # The number of records for each query performance metrics - Defaults to 20 + # QUERY_MONITORING_COUNT_THRESHOLD : "20" + # True if the SSL certificate should be trusted without validating. # Setting this to true may open up the monitoring service to MITM attacks. # Defaults to false. diff --git a/src/args/argument_list.go b/src/args/argument_list.go index 6c02c3b5..4fa62361 100644 --- a/src/args/argument_list.go +++ b/src/args/argument_list.go @@ -3,7 +3,6 @@ package args import ( "errors" - sdkArgs "github.com/newrelic/infra-integrations-sdk/v3/args" "github.com/newrelic/infra-integrations-sdk/v3/log" ) @@ -11,26 +10,29 @@ import ( // ArgumentList struct that holds all PostgreSQL arguments type ArgumentList struct { sdkArgs.DefaultArgumentList - Username string `default:"" help:"The username for the PostgreSQL database"` - Password string `default:"" help:"The password for the specified username"` - Hostname string `default:"localhost" help:"The PostgreSQL hostname to connect to"` - Database string `default:"postgres" help:"The PostgreSQL database name to connect to"` - Port string `default:"5432" help:"The port to connect to the PostgreSQL database"` - CollectionList string `default:"{}" help:"A JSON object which defines the databases, schemas, tables, and indexes to collect. Can also be a JSON array that list databases to be collected. Can also be the string literal 'ALL' to collect everything. Collects nothing by default."` - CollectionIgnoreDatabaseList string `default:"[]" help:"A JSON array that list databases that will be excluded from collection. Nothing is excluded by default."` - CollectionIgnoreTableList string `default:"[]" help:"A JSON array that list tables that will be excluded from collection. Nothing is excluded by default."` - SSLRootCertLocation string `default:"" help:"Absolute path to PEM encoded root certificate file"` - SSLCertLocation string `default:"" help:"Absolute path to PEM encoded client cert file"` - SSLKeyLocation string `default:"" help:"Absolute path to PEM encoded client key file"` - Timeout string `default:"10" help:"Maximum wait for connection, in seconds. Set 0 for no timeout"` - CustomMetricsQuery string `default:"" help:"A SQL query to collect custom metrics. Must have the columns metric_name, metric_type, and metric_value. Additional columns are added as attributes"` - CustomMetricsConfig string `default:"" help:"YAML configuration with one or more custom SQL queries to collect"` - EnableSSL bool `default:"false" help:"If true will use SSL encryption, false will not use encryption"` - TrustServerCertificate bool `default:"false" help:"If true server certificate is not verified for SSL. If false certificate will be verified against supplied certificate"` - Pgbouncer bool `default:"false" help:"Collects metrics from PgBouncer instance. Assumes connection is through PgBouncer."` - CollectDbLockMetrics bool `default:"false" help:"If true, enables collection of lock metrics for the specified database. (Note: requires that the 'tablefunc' extension is installed)"` //nolint: stylecheck - CollectBloatMetrics bool `default:"true" help:"Enable collecting bloat metrics which can be performance intensive"` - ShowVersion bool `default:"false" help:"Print build information and exit"` + Username string `default:"" help:"The username for the PostgreSQL database"` + Password string `default:"" help:"The password for the specified username"` + Hostname string `default:"localhost" help:"The PostgreSQL hostname to connect to"` + Database string `default:"postgres" help:"The PostgreSQL database name to connect to"` + Port string `default:"5432" help:"The port to connect to the PostgreSQL database"` + CollectionList string `default:"{}" help:"A JSON object which defines the databases, schemas, tables, and indexes to collect. Can also be a JSON array that list databases to be collected. Can also be the string literal 'ALL' to collect everything. Collects nothing by default."` + CollectionIgnoreDatabaseList string `default:"[]" help:"A JSON array that list databases that will be excluded from collection. Nothing is excluded by default."` + CollectionIgnoreTableList string `default:"[]" help:"A JSON array that list tables that will be excluded from collection. Nothing is excluded by default."` + SSLRootCertLocation string `default:"" help:"Absolute path to PEM encoded root certificate file"` + SSLCertLocation string `default:"" help:"Absolute path to PEM encoded client cert file"` + SSLKeyLocation string `default:"" help:"Absolute path to PEM encoded client key file"` + Timeout string `default:"10" help:"Maximum wait for connection, in seconds. Set 0 for no timeout"` + CustomMetricsQuery string `default:"" help:"A SQL query to collect custom metrics. Must have the columns metric_name, metric_type, and metric_value. Additional columns are added as attributes"` + CustomMetricsConfig string `default:"" help:"YAML configuration with one or more custom SQL queries to collect"` + EnableSSL bool `default:"false" help:"If true will use SSL encryption, false will not use encryption"` + TrustServerCertificate bool `default:"false" help:"If true server certificate is not verified for SSL. If false certificate will be verified against supplied certificate"` + Pgbouncer bool `default:"false" help:"Collects metrics from PgBouncer instance. Assumes connection is through PgBouncer."` + CollectDbLockMetrics bool `default:"false" help:"If true, enables collection of lock metrics for the specified database. (Note: requires that the 'tablefunc' extension is installed)"` //nolint: stylecheck + CollectBloatMetrics bool `default:"true" help:"Enable collecting bloat metrics which can be performance intensive"` + ShowVersion bool `default:"false" help:"Print build information and exit"` + EnableQueryMonitoring bool `default:"false" help:"Enable collection of detailed query performance metrics."` + QueryMonitoringResponseTimeThreshold int `default:"500" help:"Threshold in milliseconds for query response time. If response time for the individual query exceeds this threshold, the individual query is reported in metrics"` + QueryMonitoringCountThreshold int `default:"20" help:"The number of records for each query performance metrics"` } // Validate validates PostgreSQl arguments @@ -38,11 +40,9 @@ func (al ArgumentList) Validate() error { if al.Username == "" || al.Password == "" { return errors.New("invalid configuration: must specify a username and password") } - if err := al.validateSSL(); err != nil { return err } - return nil } diff --git a/src/connection/pgsql_connection.go b/src/connection/pgsql_connection.go index e31bf124..fd762f2a 100644 --- a/src/connection/pgsql_connection.go +++ b/src/connection/pgsql_connection.go @@ -149,7 +149,7 @@ func (p PGSQLConnection) HaveExtensionInSchema(extensionName, schemaName string) return true } -// createConnectionURL creates the connection string. A list of paramters +// createConnectionURL creates the connection string. A list of parameters // can be found here https://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters func createConnectionURL(ci *connectionInfo, database string) string { connectionURL := &url.URL{ @@ -170,7 +170,6 @@ func createConnectionURL(ci *connectionInfo, database string) string { } connectionURL.RawQuery = query.Encode() - return connectionURL.String() } diff --git a/src/main.go b/src/main.go index 42bdcc5d..5f4bf6ba 100644 --- a/src/main.go +++ b/src/main.go @@ -7,6 +7,8 @@ import ( "runtime" "strings" + queryperformancemonitoring "github.com/newrelic/nri-postgresql/src/query-performance-monitoring" + "github.com/newrelic/infra-integrations-sdk/v3/integration" "github.com/newrelic/infra-integrations-sdk/v3/log" "github.com/newrelic/nri-postgresql/src/args" @@ -27,6 +29,7 @@ var ( ) func main() { + var args args.ArgumentList // Create Integration pgIntegration, err := integration.New(integrationName, integrationVersion, integration.Args(&args)) @@ -62,7 +65,6 @@ func main() { log.Error("Error creating list of entities to collect: %s", err) os.Exit(1) } - instance, err := pgIntegration.Entity(fmt.Sprintf("%s:%s", args.Hostname, args.Port), "pg-instance") if err != nil { log.Error("Error creating instance entity: %s", err.Error()) @@ -89,4 +91,9 @@ func main() { if err = pgIntegration.Publish(); err != nil { log.Error(err.Error()) } + + if args.EnableQueryMonitoring { + queryperformancemonitoring.QueryPerformanceMain(args, pgIntegration, collectionList) + } + } diff --git a/src/metrics/metrics.go b/src/metrics/metrics.go index c5043b9d..0a3cbddb 100644 --- a/src/metrics/metrics.go +++ b/src/metrics/metrics.go @@ -37,7 +37,7 @@ func PopulateMetrics( } defer con.Close() - version, err := collectVersion(con) + version, err := CollectVersion(con) if err != nil { log.Error("Metrics collection failed: error collecting version number: %s", err.Error()) return @@ -223,7 +223,7 @@ type serverVersionRow struct { Version string `db:"server_version"` } -func collectVersion(connection *connection.PGSQLConnection) (*semver.Version, error) { +func CollectVersion(connection *connection.PGSQLConnection) (*semver.Version, error) { var versionRows []*serverVersionRow if err := connection.Query(&versionRows, versionQuery); err != nil { return nil, err @@ -231,7 +231,6 @@ func collectVersion(connection *connection.PGSQLConnection) (*semver.Version, er re := regexp.MustCompile(`[0-9]+\.[0-9]+(\.[0-9])?`) version := re.FindString(versionRows[0].Version) - // special cases for ubuntu/debian parsing //version := versionRows[0].Version //if strings.Contains(version, "Ubuntu") { diff --git a/src/metrics/version_test.go b/src/metrics/version_test.go index f151212f..d6963966 100644 --- a/src/metrics/version_test.go +++ b/src/metrics/version_test.go @@ -22,7 +22,7 @@ func Test_collectVersion(t *testing.T) { Minor: 3, } - version, err := collectVersion(testConnection) + version, err := CollectVersion(testConnection) assert.Nil(t, err) assert.Equal(t, expected, version) @@ -42,7 +42,7 @@ func Test_collectVersion_EnterpriseDB(t *testing.T) { Patch: 7, } - version, err := collectVersion(testConnection) + version, err := CollectVersion(testConnection) assert.Nil(t, err) assert.Equal(t, expected, version) @@ -61,7 +61,7 @@ func Test_collectVersion_Ubuntu(t *testing.T) { Minor: 4, } - version, err := collectVersion(testConnection) + version, err := CollectVersion(testConnection) assert.Nil(t, err) assert.Equal(t, expected, version) @@ -80,7 +80,7 @@ func Test_collectVersion_Debian(t *testing.T) { Minor: 4, } - version, err := collectVersion(testConnection) + version, err := CollectVersion(testConnection) assert.Nil(t, err) assert.Equal(t, expected, version) @@ -94,7 +94,7 @@ func Test_collectVersion_Err(t *testing.T) { mock.ExpectQuery(versionQuery).WillReturnRows(versionRows) - _, err := collectVersion(testConnection) + _, err := CollectVersion(testConnection) assert.NotNil(t, err) } diff --git a/src/query-performance-monitoring/common-parameters/common_parameters.go b/src/query-performance-monitoring/common-parameters/common_parameters.go new file mode 100644 index 00000000..c4d00da6 --- /dev/null +++ b/src/query-performance-monitoring/common-parameters/common_parameters.go @@ -0,0 +1,55 @@ +package commonparameters + +import ( + "github.com/newrelic/infra-integrations-sdk/v3/log" + "github.com/newrelic/nri-postgresql/src/args" +) + +// The maximum number records that can be fetched in a single metrics +const MaxQueryCountThreshold = 30 + +// DefaultQueryMonitoringCountThreshold is the default threshold for the number of queries to monitor. +const DefaultQueryMonitoringCountThreshold = 20 + +// DefaultQueryResponseTimeThreshold is the default threshold for the response time of a query. +const DefaultQueryResponseTimeThreshold = 500 + +type CommonParameters struct { + Version uint64 + Databases string + QueryMonitoringCountThreshold int + QueryMonitoringResponseTimeThreshold int + Host string + Port string +} + +func SetCommonParameters(args args.ArgumentList, version uint64, databases string) *CommonParameters { + return &CommonParameters{ + Version: version, + Databases: databases, // comma separated database names + QueryMonitoringCountThreshold: validateAndGetQueryMonitoringCountThreshold(args), + QueryMonitoringResponseTimeThreshold: validateAndGetQueryMonitoringResponseTimeThreshold(args), + Host: args.Hostname, + Port: args.Port, + } +} + +func validateAndGetQueryMonitoringResponseTimeThreshold(args args.ArgumentList) int { + if args.QueryMonitoringResponseTimeThreshold < 0 { + log.Warn("QueryResponseTimeThreshold should be greater than or equal to 0 but the input is %d, setting value to default which is %d", args.QueryMonitoringResponseTimeThreshold, DefaultQueryResponseTimeThreshold) + return DefaultQueryResponseTimeThreshold + } + return args.QueryMonitoringResponseTimeThreshold +} + +func validateAndGetQueryMonitoringCountThreshold(args args.ArgumentList) int { + if args.QueryMonitoringCountThreshold < 0 { + log.Warn("QueryCountThreshold should be greater than 0 but the input is %d, setting value to default which is %d", args.QueryMonitoringCountThreshold, DefaultQueryMonitoringCountThreshold) + return DefaultQueryMonitoringCountThreshold + } + if args.QueryMonitoringCountThreshold > MaxQueryCountThreshold { + log.Warn("QueryCountThreshold should be less than or equal to max limit but the input is %d, setting value to max limit which is %d", args.QueryMonitoringCountThreshold, MaxQueryCountThreshold) + return MaxQueryCountThreshold + } + return args.QueryMonitoringCountThreshold +} diff --git a/src/query-performance-monitoring/common-utils/common_helpers.go b/src/query-performance-monitoring/common-utils/common_helpers.go new file mode 100644 index 00000000..1fa690c8 --- /dev/null +++ b/src/query-performance-monitoring/common-utils/common_helpers.go @@ -0,0 +1,42 @@ +package commonutils + +import ( + "crypto/rand" + "fmt" + "math/big" + "regexp" + "strings" + "time" + + "github.com/newrelic/nri-postgresql/src/collection" +) + +// re is a regular expression that matches single-quoted strings, numbers, or double-quoted strings +var re = regexp.MustCompile(`'[^']*'|\d+|".*?"`) + +func GetDatabaseListInString(dbMap collection.DatabaseList) string { + if len(dbMap) == 0 { + return "" + } + var quotedNames = make([]string, 0) + for dbName := range dbMap { + quotedNames = append(quotedNames, fmt.Sprintf("'%s'", dbName)) + } + return strings.Join(quotedNames, ",") +} + +func AnonymizeQueryText(query string) string { + anonymizedQuery := re.ReplaceAllString(query, "?") + return anonymizedQuery +} + +// This function is used to generate a unique plan ID for a query +func GeneratePlanID() (string, error) { + randomInt, err := rand.Int(rand.Reader, big.NewInt(RandomIntRange)) + if err != nil { + return "", ErrUnExpectedError + } + currentTime := time.Now().Format(TimeFormat) + result := fmt.Sprintf("%d-%s", randomInt.Int64(), currentTime) + return result, nil +} diff --git a/src/query-performance-monitoring/common-utils/common_helpers_test.go b/src/query-performance-monitoring/common-utils/common_helpers_test.go new file mode 100644 index 00000000..921c1c02 --- /dev/null +++ b/src/query-performance-monitoring/common-utils/common_helpers_test.go @@ -0,0 +1,38 @@ +package commonutils + +import ( + "sort" + "testing" + + "github.com/newrelic/nri-postgresql/src/collection" + "github.com/stretchr/testify/assert" +) + +func TestGetDatabaseListInString(t *testing.T) { + dbListKeys := []string{"db1"} + sort.Strings(dbListKeys) // Sort the keys to ensure consistent order + dbList := collection.DatabaseList{} + for _, key := range dbListKeys { + dbList[key] = collection.SchemaList{} + } + expected := "'db1'" + result := GetDatabaseListInString(dbList) + assert.Equal(t, expected, result) + + // Test with empty database list + dbList = collection.DatabaseList{} + expected = "" + result = GetDatabaseListInString(dbList) + assert.Equal(t, expected, result) +} + +func TestAnonymizeQueryText(t *testing.T) { + query := "SELECT * FROM users WHERE id = 1 AND name = 'John'" + expected := "SELECT * FROM users WHERE id = ? AND name = ?" + result := AnonymizeQueryText(query) + assert.Equal(t, expected, result) + query = "SELECT * FROM employees WHERE id = 10 OR name <> 'John Doe' OR name != 'John Doe' OR age < 30 OR age <= 30 OR salary > 50000OR salary >= 50000 OR department LIKE 'Sales%' OR department ILIKE 'sales%'OR join_date BETWEEN '2023-01-01' AND '2023-12-31' OR department IN ('HR', 'Engineering', 'Marketing') OR department IS NOT NULL OR department IS NULL;" + expected = "SELECT * FROM employees WHERE id = ? OR name <> ? OR name != ? OR age < ? OR age <= ? OR salary > ?OR salary >= ? OR department LIKE ? OR department ILIKE ?OR join_date BETWEEN ? AND ? OR department IN (?, ?, ?) OR department IS NOT NULL OR department IS NULL;" + result = AnonymizeQueryText(query) + assert.Equal(t, expected, result) +} diff --git a/src/query-performance-monitoring/common-utils/constants.go b/src/query-performance-monitoring/common-utils/constants.go new file mode 100644 index 00000000..3d993942 --- /dev/null +++ b/src/query-performance-monitoring/common-utils/constants.go @@ -0,0 +1,22 @@ +package commonutils + +import "errors" + +// The maximum number of metrics to be published in a single batch +const PublishThreshold = 600 +const RandomIntRange = 1000000 +const TimeFormat = "20060102150405" + +// The maximum number of individual queries that can be fetched in a single metrics, the value was chosen as the queries samples were with same query statements but with different parameters so 10 samples would be enough to check the execution plan +const MaxIndividualQueryCountThreshold = 10 + +var ErrUnsupportedVersion = errors.New("unsupported PostgreSQL version") +var ErrUnExpectedError = errors.New("unexpected error") + +var ErrInvalidModelType = errors.New("invalid model type") +var ErrNotEligible = errors.New("not Eligible to fetch metrics") + +const PostgresVersion12 = 12 +const PostgresVersion11 = 11 +const PostgresVersion13 = 13 +const PostgresVersion14 = 14 diff --git a/src/query-performance-monitoring/common-utils/ingestion-helpers.go b/src/query-performance-monitoring/common-utils/ingestion-helpers.go new file mode 100644 index 00000000..b6cf01fe --- /dev/null +++ b/src/query-performance-monitoring/common-utils/ingestion-helpers.go @@ -0,0 +1,120 @@ +package commonutils + +import ( + "fmt" + "reflect" + + commonparameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters" + + "github.com/newrelic/infra-integrations-sdk/v3/data/metric" + "github.com/newrelic/infra-integrations-sdk/v3/integration" + "github.com/newrelic/infra-integrations-sdk/v3/log" +) + +func SetMetric(metricSet *metric.Set, name string, value interface{}, sourceType string) { + switch sourceType { + case `gauge`: + err := metricSet.SetMetric(name, value, metric.GAUGE) + if err != nil { + log.Error("Error setting metric: %v", err) + return + } + case `attribute`: + err := metricSet.SetMetric(name, value, metric.ATTRIBUTE) + if err != nil { + log.Error("Error setting metric: %v", err) + return + } + default: + err := metricSet.SetMetric(name, value, metric.GAUGE) + if err != nil { + log.Error("Error setting metric: %v", err) + return + } + } +} + +// IngestMetric is a util by which we publish data in batches .Reason for this is to avoid publishing large data in one go and its a limitation for NewRelic. +func IngestMetric(metricList []interface{}, eventName string, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters) error { + instanceEntity, err := CreateEntity(pgIntegration, cp) + if err != nil { + log.Error("Error creating entity: %v", err) + return err + } + + metricCount := 0 + + for _, model := range metricList { + if model == nil { + continue + } + metricCount += 1 + metricSet := instanceEntity.NewMetricSet(eventName) + + processErr := ProcessModel(model, metricSet) + if processErr != nil { + log.Error("Error processing model: %v", processErr) + continue + } + + if metricCount == PublishThreshold { + metricCount = 0 + if err := PublishMetrics(pgIntegration, &instanceEntity, cp); err != nil { + log.Error("Error publishing metrics: %v", err) + return err + } + } + } + if metricCount > 0 { + if err := PublishMetrics(pgIntegration, &instanceEntity, cp); err != nil { + log.Error("Error publishing metrics: %v", err) + return err + } + } + return nil +} + +func CreateEntity(pgIntegration *integration.Integration, cp *commonparameters.CommonParameters) (*integration.Entity, error) { + return pgIntegration.Entity(fmt.Sprintf("%s:%s", cp.Host, cp.Port), "pg-instance") +} + +func ProcessModel(model interface{}, metricSet *metric.Set) error { + modelValue := reflect.ValueOf(model) + if modelValue.Kind() == reflect.Ptr { + modelValue = modelValue.Elem() + } + if !modelValue.IsValid() || modelValue.Kind() != reflect.Struct { + log.Error("Invalid model type: %v", modelValue.Kind()) + return ErrInvalidModelType + } + + modelType := reflect.TypeOf(model) + + for i := 0; i < modelValue.NumField(); i++ { + field := modelValue.Field(i) + fieldType := modelType.Field(i) + metricName := fieldType.Tag.Get("metric_name") + sourceType := fieldType.Tag.Get("source_type") + ingestData := fieldType.Tag.Get("ingest_data") + + if ingestData == "false" { + continue + } + + if field.Kind() == reflect.Ptr && !field.IsNil() { + SetMetric(metricSet, metricName, field.Elem().Interface(), sourceType) + } else if field.Kind() != reflect.Ptr { + SetMetric(metricSet, metricName, field.Interface(), sourceType) + } + } + return nil +} + +func PublishMetrics(pgIntegration *integration.Integration, instanceEntity **integration.Entity, cp *commonparameters.CommonParameters) error { + if err := pgIntegration.Publish(); err != nil { + return err + } + var err error + *instanceEntity, err = CreateEntity(pgIntegration, cp) + return err +} diff --git a/src/query-performance-monitoring/common-utils/ingestion_helper_test.go b/src/query-performance-monitoring/common-utils/ingestion_helper_test.go new file mode 100644 index 00000000..579c72fc --- /dev/null +++ b/src/query-performance-monitoring/common-utils/ingestion_helper_test.go @@ -0,0 +1,87 @@ +package commonutils_test + +import ( + "testing" + + common_parameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters" + + "github.com/newrelic/infra-integrations-sdk/v3/integration" + "github.com/newrelic/nri-postgresql/src/args" + commonutils "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-utils" + "github.com/stretchr/testify/assert" +) + +func TestSetMetric(t *testing.T) { + pgIntegration, _ := integration.New("test", "1.0.0") + entity, _ := pgIntegration.Entity("test-entity", "test-type") + metricSet := entity.NewMetricSet("test-event") + commonutils.SetMetric(metricSet, "testGauge", 123.0, "gauge") + assert.Equal(t, 123.0, metricSet.Metrics["testGauge"]) + commonutils.SetMetric(metricSet, "testAttribute", "value", "attribute") + assert.Equal(t, "value", metricSet.Metrics["testAttribute"]) + commonutils.SetMetric(metricSet, "testDefault", 456.0, "unknown") + assert.Equal(t, 456.0, metricSet.Metrics["testDefault"]) +} + +func TestIngestMetric(t *testing.T) { + pgIntegration, _ := integration.New("test", "1.0.0") + args := args.ArgumentList{ + Hostname: "localhost", + Port: "5432", + } + cp := common_parameters.SetCommonParameters(args, uint64(14), "testdb") + metricList := []interface{}{ + struct { + TestField int `metric_name:"testField" source_type:"gauge"` + }{TestField: 123}, + } + err := commonutils.IngestMetric(metricList, "testEvent", pgIntegration, cp) + if err != nil { + t.Error(err) + return + } + assert.NotEmpty(t, pgIntegration.Entities) +} + +func TestCreateEntity(t *testing.T) { + pgIntegration, _ := integration.New("test", "1.0.0") + args := args.ArgumentList{ + Hostname: "localhost", + Port: "5432", + } + cp := common_parameters.SetCommonParameters(args, uint64(14), "testdb") + + entity, err := commonutils.CreateEntity(pgIntegration, cp) + assert.NoError(t, err) + assert.NotNil(t, entity) + assert.Equal(t, "localhost:5432", entity.Metadata.Name) +} + +func TestProcessModel(t *testing.T) { + pgIntegration, _ := integration.New("test", "1.0.0") + entity, _ := pgIntegration.Entity("test-entity", "test-type") + + metricSet := entity.NewMetricSet("test-event") + + model := struct { + TestField int `metric_name:"testField" source_type:"gauge"` + }{TestField: 123} + + err := commonutils.ProcessModel(model, metricSet) + assert.NoError(t, err) + assert.Equal(t, 123.0, metricSet.Metrics["testField"]) +} + +func TestPublishMetrics(t *testing.T) { + pgIntegration, _ := integration.New("test", "1.0.0") + args := args.ArgumentList{ + Hostname: "localhost", + Port: "5432", + } + cp := common_parameters.SetCommonParameters(args, uint64(14), "testdb") + entity, _ := commonutils.CreateEntity(pgIntegration, cp) + + err := commonutils.PublishMetrics(pgIntegration, &entity, cp) + assert.NoError(t, err) + assert.NotNil(t, entity) +} diff --git a/src/query-performance-monitoring/common-utils/query_fetch_helpers.go b/src/query-performance-monitoring/common-utils/query_fetch_helpers.go new file mode 100644 index 00000000..b2409eb1 --- /dev/null +++ b/src/query-performance-monitoring/common-utils/query_fetch_helpers.go @@ -0,0 +1,38 @@ +package commonutils + +import ( + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/queries" +) + +func FetchVersionSpecificSlowQueries(version uint64) (string, error) { + switch { + case version == PostgresVersion12: + return queries.SlowQueriesForV12, nil + case version >= PostgresVersion13: + return queries.SlowQueriesForV13AndAbove, nil + default: + return "", ErrUnsupportedVersion + } +} + +func FetchVersionSpecificBlockingQueries(version uint64) (string, error) { + switch { + case version == PostgresVersion12, version == PostgresVersion13: + return queries.BlockingQueriesForV12AndV13, nil + case version >= PostgresVersion14: + return queries.BlockingQueriesForV14AndAbove, nil + default: + return "", ErrUnsupportedVersion + } +} + +func FetchVersionSpecificIndividualQueries(version uint64) (string, error) { + switch { + case version == PostgresVersion12: + return queries.IndividualQuerySearchV12, nil + case version > PostgresVersion12: + return queries.IndividualQuerySearchV13AndAbove, nil + default: + return "", ErrUnsupportedVersion + } +} diff --git a/src/query-performance-monitoring/common-utils/query_fetch_helpers_test.go b/src/query-performance-monitoring/common-utils/query_fetch_helpers_test.go new file mode 100644 index 00000000..b2b9bbe8 --- /dev/null +++ b/src/query-performance-monitoring/common-utils/query_fetch_helpers_test.go @@ -0,0 +1,70 @@ +package commonutils_test + +import ( + "testing" + + commonutils "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-utils" + + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/queries" + "github.com/stretchr/testify/assert" +) + +func runTestCases(t *testing.T, tests []struct { + version uint64 + expected string + expectErr bool +}, fetchFunc func(uint64) (string, error)) { + for _, test := range tests { + result, err := fetchFunc(test.version) + if test.expectErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expected, result) + } + } +} + +func TestFetchVersionSpecificSlowQueries(t *testing.T) { + tests := []struct { + version uint64 + expected string + expectErr bool + }{ + {commonutils.PostgresVersion12, queries.SlowQueriesForV12, false}, + {commonutils.PostgresVersion13, queries.SlowQueriesForV13AndAbove, false}, + {commonutils.PostgresVersion11, "", true}, + } + + runTestCases(t, tests, commonutils.FetchVersionSpecificSlowQueries) +} + +func TestFetchVersionSpecificBlockingQueries(t *testing.T) { + tests := []struct { + version uint64 + expected string + expectErr bool + }{ + {commonutils.PostgresVersion12, queries.BlockingQueriesForV12AndV13, false}, + {commonutils.PostgresVersion13, queries.BlockingQueriesForV12AndV13, false}, + {commonutils.PostgresVersion14, queries.BlockingQueriesForV14AndAbove, false}, + {commonutils.PostgresVersion11, "", true}, + } + + runTestCases(t, tests, commonutils.FetchVersionSpecificBlockingQueries) +} + +func TestFetchVersionSpecificIndividualQueries(t *testing.T) { + tests := []struct { + version uint64 + expected string + expectErr bool + }{ + {commonutils.PostgresVersion12, queries.IndividualQuerySearchV12, false}, + {commonutils.PostgresVersion13, queries.IndividualQuerySearchV13AndAbove, false}, + {commonutils.PostgresVersion14, queries.IndividualQuerySearchV13AndAbove, false}, + {commonutils.PostgresVersion11, "", true}, + } + + runTestCases(t, tests, commonutils.FetchVersionSpecificIndividualQueries) +} diff --git a/src/query-performance-monitoring/datamodels/performance_data_models.go b/src/query-performance-monitoring/datamodels/performance_data_models.go new file mode 100644 index 00000000..16f46146 --- /dev/null +++ b/src/query-performance-monitoring/datamodels/performance_data_models.go @@ -0,0 +1,66 @@ +package datamodels + +type SlowRunningQueryMetrics struct { + Newrelic *string `db:"newrelic" metric_name:"newrelic" source_type:"attribute" ingest_data:"false"` + QueryID *string `db:"query_id" metric_name:"query_id" source_type:"attribute"` + QueryText *string `db:"query_text" metric_name:"query_text" source_type:"attribute"` + DatabaseName *string `db:"database_name" metric_name:"database_name" source_type:"attribute"` + SchemaName *string `db:"schema_name" metric_name:"schema_name" source_type:"attribute"` + ExecutionCount *int64 `db:"execution_count" metric_name:"execution_count" source_type:"gauge"` + AvgElapsedTimeMs *float64 `db:"avg_elapsed_time_ms" metric_name:"avg_elapsed_time_ms" source_type:"gauge"` + AvgDiskReads *float64 `db:"avg_disk_reads" metric_name:"avg_disk_reads" source_type:"gauge"` + AvgDiskWrites *float64 `db:"avg_disk_writes" metric_name:"avg_disk_writes" source_type:"gauge"` + StatementType *string `db:"statement_type" metric_name:"statement_type" source_type:"attribute"` + CollectionTimestamp *string `db:"collection_timestamp" metric_name:"collection_timestamp" source_type:"attribute"` +} +type WaitEventMetrics struct { + WaitEventName *string `db:"wait_event_name" metric_name:"wait_event_name" source_type:"attribute"` + WaitCategory *string `db:"wait_category" metric_name:"wait_category" source_type:"attribute"` + TotalWaitTimeMs *float64 `db:"total_wait_time_ms" metric_name:"total_wait_time_ms" source_type:"gauge"` + CollectionTimestamp *string `db:"collection_timestamp" metric_name:"collection_timestamp" source_type:"attribute"` + QueryID *string `db:"query_id" metric_name:"query_id" source_type:"attribute"` + QueryText *string `db:"query_text" metric_name:"query_text" source_type:"attribute"` + DatabaseName *string `db:"database_name" metric_name:"database_name" source_type:"attribute"` +} +type BlockingSessionMetrics struct { + Newrelic *string `db:"newrelic" metric_name:"newrelic" source_type:"attribute" ingest_data:"false"` + BlockedPid *int64 `db:"blocked_pid" metric_name:"blocked_pid" source_type:"gauge"` + BlockedQuery *string `db:"blocked_query" metric_name:"blocked_query" source_type:"attribute"` + BlockedQueryID *string `db:"blocked_query_id" metric_name:"blocked_query_id" source_type:"attribute"` + BlockedQueryStart *string `db:"blocked_query_start" metric_name:"blocked_query_start" source_type:"attribute"` + BlockedDatabase *string `db:"database_name" metric_name:"database_name" source_type:"attribute"` + BlockingPid *int64 `db:"blocking_pid" metric_name:"blocking_pid" source_type:"gauge"` + BlockingQuery *string `db:"blocking_query" metric_name:"blocking_query" source_type:"attribute"` + BlockingQueryID *string `db:"blocking_query_id" metric_name:"blocking_query_id" source_type:"attribute"` + BlockingQueryStart *string `db:"blocking_query_start" metric_name:"blocking_query_start" source_type:"attribute"` +} + +type IndividualQueryMetrics struct { + QueryText *string `json:"query" db:"query" metric_name:"query_text" source_type:"attribute"` + QueryID *string `json:"queryid" db:"queryid" metric_name:"query_id" source_type:"attribute"` + DatabaseName *string `json:"datname" db:"datname" metric_name:"database_name" source_type:"attribute"` + AvgCPUTimeInMS *float64 `json:"cpu_time_ms" db:"cpu_time_ms" metric_name:"cpu_time_ms" source_type:"gauge"` + PlanID *string `json:"planid" db:"planid" metric_name:"plan_id" source_type:"attribute"` + RealQueryText *string `ingest_data:"false"` + AvgExecTimeInMs *float64 `json:"exec_time_ms" db:"exec_time_ms" metric_name:"exec_time_ms" source_type:"gauge"` + Newrelic *string `db:"newrelic" metric_name:"newrelic" source_type:"attribute" ingest_data:"false"` +} + +type QueryExecutionPlanMetrics struct { + NodeType string `mapstructure:"Node Type" json:"Node Type" metric_name:"node_type" source_type:"attribute"` + ParallelAware bool `mapstructure:"Parallel Aware" json:"Parallel Aware" metric_name:"parallel_aware" source_type:"gauge"` + AsyncCapable bool `mapstructure:"Async Capable" json:"Async Capable" metric_name:"async_capable" source_type:"gauge"` + ScanDirection string `mapstructure:"Scan Direction" json:"Scan Direction" metric_name:"scan_direction" source_type:"attribute"` + IndexName string `mapstructure:"Index Name" json:"Index Name" metric_name:"index_name" source_type:"attribute"` + RelationName string `mapstructure:"Relation Name" json:"Relation Name" metric_name:"relation_name" source_type:"attribute"` + Alias string `mapstructure:"Alias" json:"Alias" metric_name:"alias" source_type:"attribute"` + StartupCost float64 `mapstructure:"Startup Cost" json:"Startup Cost" metric_name:"startup_cost" source_type:"gauge"` + TotalCost float64 `mapstructure:"Total Cost" json:"Total Cost" metric_name:"total_cost" source_type:"gauge"` + PlanRows int64 `mapstructure:"Plan Rows" json:"Plan Rows" metric_name:"plan_rows" source_type:"gauge"` + PlanWidth int64 `mapstructure:"Plan Width" json:"Plan Width" metric_name:"plan_width" source_type:"gauge"` + RowsRemovedByFilter int64 `mapstructure:"Rows Removed by Filter" json:"Rows Removed by Filter" metric_name:"rows_removed_by_filter" source_type:"gauge"` + DatabaseName string `mapstructure:"Database" json:"Database" metric_name:"database_name" source_type:"attribute"` + QueryID string `mapstructure:"Query Id" json:"Query Id" metric_name:"query_id" source_type:"attribute"` + PlanID string `mapstructure:"Plan Id" json:"Plan Id" metric_name:"plan_id" source_type:"attribute"` + Level int `mapstructure:"Level" json:"Level" metric_name:"level_id" source_type:"gauge"` +} diff --git a/src/query-performance-monitoring/performance-metrics/blocking_sessions.go b/src/query-performance-monitoring/performance-metrics/blocking_sessions.go new file mode 100644 index 00000000..48df7371 --- /dev/null +++ b/src/query-performance-monitoring/performance-metrics/blocking_sessions.go @@ -0,0 +1,71 @@ +package performancemetrics + +import ( + "fmt" + + commonparameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters" + + "github.com/newrelic/infra-integrations-sdk/v3/integration" + commonutils "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-utils" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/validations" + + "github.com/newrelic/infra-integrations-sdk/v3/log" + performancedbconnection "github.com/newrelic/nri-postgresql/src/connection" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/datamodels" +) + +func PopulateBlockingMetrics(conn *performancedbconnection.PGSQLConnection, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters, enabledExtensions map[string]bool) { + isEligible, enableCheckError := validations.CheckBlockingSessionMetricsFetchEligibility(enabledExtensions, cp.Version) + if enableCheckError != nil { + log.Error("Error executing query: %v in PopulateBlockingMetrics", enableCheckError) + return + } + if !isEligible { + log.Debug("Extension 'pg_stat_statements' is not enabled or unsupported version.") + return + } + blockingQueriesMetricsList, blockQueryFetchErr := getBlockingMetrics(conn, cp) + if blockQueryFetchErr != nil { + log.Error("Error fetching Blocking queries: %v", blockQueryFetchErr) + return + } + if len(blockingQueriesMetricsList) == 0 { + log.Debug("No Blocking queries found.") + return + } + err := commonutils.IngestMetric(blockingQueriesMetricsList, "PostgresBlockingSessions", pgIntegration, cp) + if err != nil { + log.Error("Error ingesting Blocking queries: %v", err) + return + } +} + +func getBlockingMetrics(conn *performancedbconnection.PGSQLConnection, cp *commonparameters.CommonParameters) ([]interface{}, error) { + var blockingQueriesMetricsList []interface{} + versionSpecificBlockingQuery, err := commonutils.FetchVersionSpecificBlockingQueries(cp.Version) + if err != nil { + log.Error("Unsupported postgres version: %v", err) + return nil, err + } + var query = fmt.Sprintf(versionSpecificBlockingQuery, cp.Databases, cp.QueryMonitoringCountThreshold) + rows, err := conn.Queryx(query) + if err != nil { + log.Error("Failed to execute query: %v", err) + return nil, commonutils.ErrUnExpectedError + } + defer rows.Close() + for rows.Next() { + var blockingQueryMetric datamodels.BlockingSessionMetrics + if scanError := rows.StructScan(&blockingQueryMetric); scanError != nil { + return nil, scanError + } + // For PostgreSQL versions 13 and 12, anonymization of queries does not occur for blocking sessions, so it's necessary to explicitly anonymize them. + if cp.Version == commonutils.PostgresVersion13 || cp.Version == commonutils.PostgresVersion12 { + *blockingQueryMetric.BlockedQuery = commonutils.AnonymizeQueryText(*blockingQueryMetric.BlockedQuery) + *blockingQueryMetric.BlockingQuery = commonutils.AnonymizeQueryText(*blockingQueryMetric.BlockingQuery) + } + blockingQueriesMetricsList = append(blockingQueriesMetricsList, blockingQueryMetric) + } + + return blockingQueriesMetricsList, nil +} diff --git a/src/query-performance-monitoring/performance-metrics/blocking_sessions_test.go b/src/query-performance-monitoring/performance-metrics/blocking_sessions_test.go new file mode 100644 index 00000000..09779e36 --- /dev/null +++ b/src/query-performance-monitoring/performance-metrics/blocking_sessions_test.go @@ -0,0 +1,74 @@ +package performancemetrics + +import ( + "database/sql/driver" + "fmt" + "regexp" + "testing" + + commonutils "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-utils" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/datamodels" + + "github.com/newrelic/nri-postgresql/src/args" + "github.com/newrelic/nri-postgresql/src/connection" + common_parameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/queries" + "github.com/stretchr/testify/assert" + "gopkg.in/DATA-DOG/go-sqlmock.v1" +) + +func TestGetBlockingMetrics(t *testing.T) { + conn, mock := connection.CreateMockSQL(t) + args := args.ArgumentList{QueryMonitoringCountThreshold: 10} + databaseName := "testdb" + version := uint64(13) + cp := common_parameters.SetCommonParameters(args, version, databaseName) + expectedQuery := queries.BlockingQueriesForV12AndV13 + query := fmt.Sprintf(expectedQuery, databaseName, args.QueryMonitoringCountThreshold) + rowData := []driver.Value{ + "newrelic_value", int64(123), "SELECT 1", "1233444", "2023-01-01 00:00:00", "testdb", + int64(456), "SELECT 2", "4566", "2023-01-01 00:00:00", + } + expectedRows := [][]driver.Value{ + rowData, rowData, + } + mockRows := sqlmock.NewRows([]string{ + "newrelic", "blocked_pid", "blocked_query", "blocked_query_id", "blocked_query_start", "database_name", + "blocking_pid", "blocking_query", "blocking_query_id", "blocking_query_start", + }).AddRow(rowData...).AddRow(rowData...) + mock.ExpectQuery(regexp.QuoteMeta(query)).WillReturnRows(mockRows) + blockingQueriesMetricsList, err := getBlockingMetrics(conn, cp) + compareMockRowsWithMetrics(t, expectedRows, blockingQueriesMetricsList) + assert.NoError(t, err) + assert.Len(t, blockingQueriesMetricsList, 2) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func compareMockRowsWithMetrics(t *testing.T, expectedRows [][]driver.Value, blockingQueriesMetricsList []interface{}) { + assert.Equal(t, 2, len(blockingQueriesMetricsList)) + for index := range blockingQueriesMetricsList { + anonymizeQuery := commonutils.AnonymizeQueryText(expectedRows[index][2].(string)) + blockingSession := blockingQueriesMetricsList[index].(datamodels.BlockingSessionMetrics) + assert.Equal(t, expectedRows[index][0], *blockingSession.Newrelic) + assert.Equal(t, expectedRows[index][1], *blockingSession.BlockedPid) + assert.Equal(t, anonymizeQuery, *blockingSession.BlockedQuery) + assert.Equal(t, expectedRows[index][3], *blockingSession.BlockedQueryID) + assert.Equal(t, expectedRows[index][4], *blockingSession.BlockedQueryStart) + assert.Equal(t, expectedRows[index][5], *blockingSession.BlockedDatabase) + assert.Equal(t, expectedRows[index][6], *blockingSession.BlockingPid) + assert.Equal(t, anonymizeQuery, *blockingSession.BlockingQuery) + assert.Equal(t, expectedRows[index][8], *blockingSession.BlockingQueryID) + assert.Equal(t, expectedRows[index][9], *blockingSession.BlockingQueryStart) + } +} + +func TestGetBlockingMetricsErr(t *testing.T) { + conn, mock := connection.CreateMockSQL(t) + args := args.ArgumentList{QueryMonitoringCountThreshold: 10} + databaseName := "testdb" + version := uint64(13) + cp := common_parameters.SetCommonParameters(args, version, databaseName) + _, err := getBlockingMetrics(conn, cp) + assert.EqualError(t, err, commonutils.ErrUnExpectedError.Error()) + assert.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/src/query-performance-monitoring/performance-metrics/execution_plan_metrics.go b/src/query-performance-monitoring/performance-metrics/execution_plan_metrics.go new file mode 100644 index 00000000..b74f8f4f --- /dev/null +++ b/src/query-performance-monitoring/performance-metrics/execution_plan_metrics.go @@ -0,0 +1,122 @@ +package performancemetrics + +import ( + "encoding/json" + + "github.com/go-viper/mapstructure/v2" + "github.com/newrelic/infra-integrations-sdk/v3/integration" + "github.com/newrelic/infra-integrations-sdk/v3/log" + performancedbconnection "github.com/newrelic/nri-postgresql/src/connection" + commonparameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters" + commonutils "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-utils" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/datamodels" +) + +func PopulateExecutionPlanMetrics(results []datamodels.IndividualQueryMetrics, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters, connectionInfo performancedbconnection.Info) { + if len(results) == 0 { + log.Debug("No individual queries found.") + return + } + executionDetailsList := getExecutionPlanMetrics(results, connectionInfo) + err := commonutils.IngestMetric(executionDetailsList, "PostgresExecutionPlanMetrics", pgIntegration, cp) + if err != nil { + log.Error("Error ingesting Execution Plan metrics: %v", err) + return + } +} + +func getExecutionPlanMetrics(results []datamodels.IndividualQueryMetrics, connectionInfo performancedbconnection.Info) []interface{} { + var executionPlanMetricsList []interface{} + var groupIndividualQueriesByDatabase = groupQueriesByDatabase(results) + for dbName, individualQueriesList := range groupIndividualQueriesByDatabase { + dbConn, err := connectionInfo.NewConnection(dbName) + if err != nil { + log.Error("Error opening database connection: %v", err) + continue + } + processExecutionPlanOfQueries(individualQueriesList, dbConn, &executionPlanMetricsList) + dbConn.Close() + } + + return executionPlanMetricsList +} + +func processExecutionPlanOfQueries(individualQueriesList []datamodels.IndividualQueryMetrics, dbConn *performancedbconnection.PGSQLConnection, executionPlanMetricsList *[]interface{}) { + for _, individualQuery := range individualQueriesList { + if individualQuery.RealQueryText == nil || individualQuery.QueryID == nil || individualQuery.DatabaseName == nil { + log.Error("QueryText, QueryID or Database Name is nil") + continue + } + query := "EXPLAIN (FORMAT JSON) " + *individualQuery.RealQueryText + rows, err := dbConn.Queryx(query) + if err != nil { + log.Debug("Error executing query: %v", err) + continue + } + defer rows.Close() + if !rows.Next() { + log.Debug("Execution plan not found for queryId", *individualQuery.QueryID) + continue + } + var execPlanJSON string + if scanErr := rows.Scan(&execPlanJSON); scanErr != nil { + log.Error("Error scanning row: ", scanErr.Error()) + continue + } + + var execPlan []map[string]interface{} + err = json.Unmarshal([]byte(execPlanJSON), &execPlan) + if err != nil { + log.Error("Failed to unmarshal execution plan: %v", err) + continue + } + validateAndFetchNestedExecPlan(execPlan, individualQuery, executionPlanMetricsList) + } +} + +func validateAndFetchNestedExecPlan(execPlan []map[string]interface{}, individualQuery datamodels.IndividualQueryMetrics, executionPlanMetricsList *[]interface{}) { + level := 0 + if len(execPlan) > 0 { + if plan, ok := execPlan[0]["Plan"].(map[string]interface{}); ok { + fetchNestedExecutionPlanDetails(individualQuery, &level, plan, executionPlanMetricsList) + } else { + log.Debug("execPlan is not in correct datatype") + } + } else { + log.Debug("execPlan is empty") + } +} + +func groupQueriesByDatabase(results []datamodels.IndividualQueryMetrics) map[string][]datamodels.IndividualQueryMetrics { + databaseMap := make(map[string][]datamodels.IndividualQueryMetrics) + for _, individualQueryMetric := range results { + if individualQueryMetric.DatabaseName == nil { + continue + } + dbName := *individualQueryMetric.DatabaseName + databaseMap[dbName] = append(databaseMap[dbName], individualQueryMetric) + } + return databaseMap +} + +func fetchNestedExecutionPlanDetails(individualQuery datamodels.IndividualQueryMetrics, level *int, execPlan map[string]interface{}, executionPlanMetricsList *[]interface{}) { + var execPlanMetrics datamodels.QueryExecutionPlanMetrics + err := mapstructure.Decode(execPlan, &execPlanMetrics) + if err != nil { + log.Error("Failed to decode execPlan to execPlanMetrics: %v", err) + return + } + execPlanMetrics.QueryID = *individualQuery.QueryID + execPlanMetrics.DatabaseName = *individualQuery.DatabaseName + execPlanMetrics.Level = *level + *level++ + execPlanMetrics.PlanID = *individualQuery.PlanID + *executionPlanMetricsList = append(*executionPlanMetricsList, execPlanMetrics) + if nestedPlans, ok := execPlan["Plans"].([]interface{}); ok { + for _, nestedPlan := range nestedPlans { + if nestedPlanMap, nestedOk := nestedPlan.(map[string]interface{}); nestedOk { + fetchNestedExecutionPlanDetails(individualQuery, level, nestedPlanMap, executionPlanMetricsList) + } + } + } +} diff --git a/src/query-performance-monitoring/performance-metrics/execution_plan_metrics_test.go b/src/query-performance-monitoring/performance-metrics/execution_plan_metrics_test.go new file mode 100644 index 00000000..f84ded36 --- /dev/null +++ b/src/query-performance-monitoring/performance-metrics/execution_plan_metrics_test.go @@ -0,0 +1,89 @@ +package performancemetrics + +import ( + "testing" + + performancedbconnection "github.com/newrelic/nri-postgresql/src/connection" + + common_parameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters" + + "github.com/newrelic/infra-integrations-sdk/v3/integration" + "github.com/newrelic/nri-postgresql/src/args" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/datamodels" + "github.com/stretchr/testify/assert" +) + +func TestPopulateExecutionPlanMetrics(t *testing.T) { + pgIntegration, _ := integration.New("test", "1.0.0") + args := args.ArgumentList{} + results := []datamodels.IndividualQueryMetrics{} + cp := common_parameters.SetCommonParameters(args, uint64(13), "testdb") + connectionInfo := performancedbconnection.DefaultConnectionInfo(&args) + PopulateExecutionPlanMetrics(results, pgIntegration, cp, connectionInfo) + assert.Empty(t, pgIntegration.Entities) +} + +func TestGroupQueriesByDatabase(t *testing.T) { + databaseName := "testdb" + queryID := "queryid1" + queryText := "SELECT 1" + results := []datamodels.IndividualQueryMetrics{ + { + QueryID: &queryID, + QueryText: &queryText, + DatabaseName: &databaseName, + }, + } + + groupedQueries := groupQueriesByDatabase(results) + assert.Len(t, groupedQueries, 1) + assert.Contains(t, groupedQueries, databaseName) + assert.Len(t, groupedQueries[databaseName], 1) +} + +func TestFetchNestedExecutionPlanDetails(t *testing.T) { + queryID := "queryid1" + queryText := "SELECT 1" + databaseName := "testdb" + planID := "planid1" + individualQuery := datamodels.IndividualQueryMetrics{ + QueryID: &queryID, + QueryText: &queryText, + DatabaseName: &databaseName, + PlanID: &planID, + } + execPlan := map[string]interface{}{ + "Node Type": "Seq Scan", + "Relation Name": "test_table", + "Alias": "test_table", + "Startup Cost": 0.00, + "Total Cost": 1000.00, + "Plan Rows": 100000, + "Plan Width": 4, + } + execPlanLevel2 := map[string]interface{}{ + "Node Type": "Seq Scan", + "Relation Name": "test_table", + "Alias": "test_table", + "Startup Cost": 0.00, + "Total Cost": 1000.00, + "Plan Rows": 100000, + "Plan Width": 4, + "Plans": []interface{}{execPlan}, + } + execPlanLevel3 := map[string]interface{}{ + "Node Type": "Seq Scan", + "Relation Name": "test_table", + "Alias": "test_table", + "Startup Cost": 0.00, + "Total Cost": 1000.00, + "Plan Rows": 100000, + "Plan Width": 4, + "Plans": []interface{}{execPlanLevel2}, + } + var executionPlanMetricsList []interface{} + level := 0 + + fetchNestedExecutionPlanDetails(individualQuery, &level, execPlanLevel3, &executionPlanMetricsList) + assert.Len(t, executionPlanMetricsList, 3) +} diff --git a/src/query-performance-monitoring/performance-metrics/individual_query_metrics.go b/src/query-performance-monitoring/performance-metrics/individual_query_metrics.go new file mode 100644 index 00000000..eb98335d --- /dev/null +++ b/src/query-performance-monitoring/performance-metrics/individual_query_metrics.go @@ -0,0 +1,121 @@ +package performancemetrics + +import ( + "fmt" + + "github.com/jmoiron/sqlx" + + "github.com/newrelic/infra-integrations-sdk/v3/integration" + "github.com/newrelic/infra-integrations-sdk/v3/log" + performancedbconnection "github.com/newrelic/nri-postgresql/src/connection" + commonparameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters" + commonutils "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-utils" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/datamodels" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/validations" +) + +type queryInfoMap map[string]string +type databaseQueryInfoMap map[string]queryInfoMap + +func PopulateIndividualQueryMetrics(conn *performancedbconnection.PGSQLConnection, slowRunningQueries []datamodels.SlowRunningQueryMetrics, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters, enabledExtensions map[string]bool) []datamodels.IndividualQueryMetrics { + isEligible, err := validations.CheckIndividualQueryMetricsFetchEligibility(enabledExtensions) + if err != nil { + log.Error("Error executing query: %v", err) + return nil + } + if !isEligible { + log.Debug("Extension 'pg_stat_monitor' is not enabled or unsupported version.") + return nil + } + log.Debug("Extension 'pg_stat_monitor' enabled.") + individualQueryMetricsInterface, individualQueriesList := getIndividualQueryMetrics(conn, slowRunningQueries, cp) + if len(individualQueryMetricsInterface) == 0 { + log.Debug("No individual queries found.") + return nil + } + err = commonutils.IngestMetric(individualQueryMetricsInterface, "PostgresIndividualQueries", pgIntegration, cp) + if err != nil { + log.Error("Error ingesting individual queries: %v", err) + return nil + } + return individualQueriesList +} + +func getIndividualQueryMetrics(conn *performancedbconnection.PGSQLConnection, slowRunningQueries []datamodels.SlowRunningQueryMetrics, cp *commonparameters.CommonParameters) ([]interface{}, []datamodels.IndividualQueryMetrics) { + if len(slowRunningQueries) == 0 { + log.Debug("No slow running queries found.") + return nil, nil + } + var individualQueryMetricsList []datamodels.IndividualQueryMetrics + var individualQueryMetricsListInterface []interface{} + anonymizedQueriesByDB := processForAnonymizeQueryMap(slowRunningQueries) + versionSpecificIndividualQuery, err := commonutils.FetchVersionSpecificIndividualQueries(cp.Version) + if err != nil { + log.Error("Unsupported postgres version: %v", err) + return nil, nil + } + for _, slowRunningMetric := range slowRunningQueries { + if slowRunningMetric.QueryID == nil { + continue + } + query := fmt.Sprintf(versionSpecificIndividualQuery, *slowRunningMetric.QueryID, cp.Databases, cp.QueryMonitoringResponseTimeThreshold, min(cp.QueryMonitoringCountThreshold, commonutils.MaxIndividualQueryCountThreshold)) + rows, err := conn.Queryx(query) + if err != nil { + log.Debug("Error executing query in individual query: %v", err) + return nil, nil + } + defer rows.Close() + individualQuerySamplesList := processRows(rows, anonymizedQueriesByDB) + for _, individualQuery := range individualQuerySamplesList { + individualQueryMetricsList = append(individualQueryMetricsList, individualQuery) + individualQueryMetricsListInterface = append(individualQueryMetricsListInterface, individualQuery) + } + } + return individualQueryMetricsListInterface, individualQueryMetricsList +} + +func processRows(rows *sqlx.Rows, anonymizedQueriesByDB databaseQueryInfoMap) []datamodels.IndividualQueryMetrics { + var individualQueryMetricsList []datamodels.IndividualQueryMetrics + for rows.Next() { + var model datamodels.IndividualQueryMetrics + if scanErr := rows.StructScan(&model); scanErr != nil { + log.Error("Could not scan row: ", scanErr) + continue + } + if model.QueryID == nil || model.DatabaseName == nil { + log.Error("QueryID or DatabaseName is nil") + continue + } + individualQueryMetric := model + anonymizedQueryText := anonymizedQueriesByDB[*model.DatabaseName][*model.QueryID] + queryText := *model.QueryText + individualQueryMetric.RealQueryText = &queryText + individualQueryMetric.QueryText = &anonymizedQueryText + generatedPlanID, err := commonutils.GeneratePlanID() + if err != nil { + log.Error("Error generating plan ID: %v", err) + continue + } + individualQueryMetric.PlanID = &generatedPlanID + individualQueryMetricsList = append(individualQueryMetricsList, individualQueryMetric) + } + return individualQueryMetricsList +} + +func processForAnonymizeQueryMap(slowRunningMetricList []datamodels.SlowRunningQueryMetrics) databaseQueryInfoMap { + anonymizeQueryMapByDB := make(databaseQueryInfoMap) + for _, metric := range slowRunningMetricList { + if metric.DatabaseName == nil || metric.QueryID == nil || metric.QueryText == nil { + continue + } + dbName := *metric.DatabaseName + queryID := *metric.QueryID + anonymizedQuery := *metric.QueryText + + if _, exists := anonymizeQueryMapByDB[dbName]; !exists { + anonymizeQueryMapByDB[dbName] = make(map[string]string) + } + anonymizeQueryMapByDB[dbName][queryID] = anonymizedQuery + } + return anonymizeQueryMapByDB +} diff --git a/src/query-performance-monitoring/performance-metrics/individual_query_metrics_test.go b/src/query-performance-monitoring/performance-metrics/individual_query_metrics_test.go new file mode 100644 index 00000000..466e3b0c --- /dev/null +++ b/src/query-performance-monitoring/performance-metrics/individual_query_metrics_test.go @@ -0,0 +1,47 @@ +package performancemetrics + +import ( + "fmt" + "regexp" + "testing" + + "github.com/newrelic/nri-postgresql/src/args" + "github.com/newrelic/nri-postgresql/src/connection" + common_parameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/datamodels" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/queries" + "github.com/stretchr/testify/assert" + "gopkg.in/DATA-DOG/go-sqlmock.v1" +) + +func TestGetIndividualQueryMetrics(t *testing.T) { + conn, mock := connection.CreateMockSQL(t) + args := args.ArgumentList{QueryMonitoringCountThreshold: 10} + databaseName := "testdb" + version := uint64(13) + mockQueryID := "-123" + mockQueryText := "SELECT 1" + cp := common_parameters.SetCommonParameters(args, version, databaseName) + + // Mock the individual query + query := fmt.Sprintf(queries.IndividualQuerySearchV13AndAbove, mockQueryID, databaseName, args.QueryMonitoringResponseTimeThreshold, args.QueryMonitoringCountThreshold) + mock.ExpectQuery(regexp.QuoteMeta(query)).WillReturnRows(sqlmock.NewRows([]string{ + "newrelic", "query", "queryid", "datname", "planid", "cpu_time_ms", "exec_time_ms", + }).AddRow( + "newrelic_value", "SELECT 1", "queryid1", "testdb", "planid1", 10.0, 20.0, + )) + + slowRunningQueries := []datamodels.SlowRunningQueryMetrics{ + { + QueryID: &mockQueryID, + QueryText: &mockQueryText, + DatabaseName: &databaseName, + }, + } + + individualQueryMetricsInterface, individualQueryMetrics := getIndividualQueryMetrics(conn, slowRunningQueries, cp) + + assert.Len(t, individualQueryMetricsInterface, 1) + assert.Len(t, individualQueryMetrics, 1) + assert.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/src/query-performance-monitoring/performance-metrics/slow_query_metrics.go b/src/query-performance-monitoring/performance-metrics/slow_query_metrics.go new file mode 100644 index 00000000..6b7adf52 --- /dev/null +++ b/src/query-performance-monitoring/performance-metrics/slow_query_metrics.go @@ -0,0 +1,67 @@ +package performancemetrics + +import ( + "fmt" + + "github.com/newrelic/infra-integrations-sdk/v3/integration" + "github.com/newrelic/infra-integrations-sdk/v3/log" + performancedbconnection "github.com/newrelic/nri-postgresql/src/connection" + commonparameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters" + commonutils "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-utils" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/datamodels" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/validations" +) + +func getSlowRunningMetrics(conn *performancedbconnection.PGSQLConnection, cp *commonparameters.CommonParameters) ([]datamodels.SlowRunningQueryMetrics, []interface{}, error) { + var slowQueryMetricsList []datamodels.SlowRunningQueryMetrics + var slowQueryMetricsListInterface []interface{} + versionSpecificSlowQuery, err := commonutils.FetchVersionSpecificSlowQueries(cp.Version) + if err != nil { + log.Error("Unsupported postgres version: %v", err) + return nil, nil, err + } + var query = fmt.Sprintf(versionSpecificSlowQuery, cp.Databases, cp.QueryMonitoringCountThreshold) + rows, err := conn.Queryx(query) + if err != nil { + return nil, nil, err + } + defer rows.Close() + for rows.Next() { + var slowQuery datamodels.SlowRunningQueryMetrics + if scanErr := rows.StructScan(&slowQuery); scanErr != nil { + return nil, nil, err + } + slowQueryMetricsList = append(slowQueryMetricsList, slowQuery) + slowQueryMetricsListInterface = append(slowQueryMetricsListInterface, slowQuery) + } + return slowQueryMetricsList, slowQueryMetricsListInterface, nil +} + +func PopulateSlowRunningMetrics(conn *performancedbconnection.PGSQLConnection, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters, enabledExtensions map[string]bool) []datamodels.SlowRunningQueryMetrics { + isEligible, err := validations.CheckSlowQueryMetricsFetchEligibility(enabledExtensions) + if err != nil { + log.Error("Error executing query: %v", err) + return nil + } + if !isEligible { + log.Debug("Extension 'pg_stat_statements' is not enabled or unsupported version.") + return nil + } + + slowQueryMetricsList, slowQueryMetricsListInterface, err := getSlowRunningMetrics(conn, cp) + if err != nil { + log.Error("Error fetching slow-running queries: %v", err) + return nil + } + + if len(slowQueryMetricsList) == 0 { + log.Debug("No slow-running queries found.") + return nil + } + err = commonutils.IngestMetric(slowQueryMetricsListInterface, "PostgresSlowQueries", pgIntegration, cp) + if err != nil { + log.Error("Error ingesting slow-running queries: %v", err) + return nil + } + return slowQueryMetricsList +} diff --git a/src/query-performance-monitoring/performance-metrics/slow_query_metrics_test.go b/src/query-performance-monitoring/performance-metrics/slow_query_metrics_test.go new file mode 100644 index 00000000..2709bb88 --- /dev/null +++ b/src/query-performance-monitoring/performance-metrics/slow_query_metrics_test.go @@ -0,0 +1,74 @@ +package performancemetrics + +import ( + "fmt" + "regexp" + "testing" + + "github.com/newrelic/nri-postgresql/src/args" + "github.com/newrelic/nri-postgresql/src/connection" + common_parameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters" + commonutils "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-utils" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/queries" + "github.com/stretchr/testify/assert" + "gopkg.in/DATA-DOG/go-sqlmock.v1" +) + +func runSlowQueryTest(t *testing.T, query string, version uint64, expectedLength int) { + conn, mock := connection.CreateMockSQL(t) + args := args.ArgumentList{QueryMonitoringCountThreshold: 10} + databaseName := "testdb" + cp := common_parameters.SetCommonParameters(args, version, databaseName) + + query = fmt.Sprintf(query, "testdb", args.QueryMonitoringCountThreshold) + mock.ExpectQuery(regexp.QuoteMeta(query)).WillReturnRows(sqlmock.NewRows([]string{ + "newrelic", "query_id", "query_text", "database_name", "schema_name", "execution_count", + "avg_elapsed_time_ms", "avg_disk_reads", "avg_disk_writes", "statement_type", "collection_timestamp", + }).AddRow( + "newrelic_value", "queryid1", "SELECT 1", "testdb", "public", 10, + 15.0, 5, 2, "SELECT", "2023-01-01T00:00:00Z", + )) + slowQueryList, _, err := getSlowRunningMetrics(conn, cp) + assert.NoError(t, err) + assert.Len(t, slowQueryList, expectedLength) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestGetSlowRunningMetrics(t *testing.T) { + runSlowQueryTest(t, queries.SlowQueriesForV13AndAbove, 13, 1) +} + +func TestGetSlowRunningMetricsV12(t *testing.T) { + runSlowQueryTest(t, queries.SlowQueriesForV12, 12, 1) +} + +func TestGetSlowRunningEmptyMetrics(t *testing.T) { + conn, mock := connection.CreateMockSQL(t) + args := args.ArgumentList{QueryMonitoringCountThreshold: 10} + databaseName := "testdb" + version := uint64(13) + cp := common_parameters.SetCommonParameters(args, version, databaseName) + expectedQuery := queries.SlowQueriesForV13AndAbove + query := fmt.Sprintf(expectedQuery, "testdb", args.QueryMonitoringCountThreshold) + mock.ExpectQuery(regexp.QuoteMeta(query)).WillReturnRows(sqlmock.NewRows([]string{ + "newrelic", "query_id", "query_text", "database_name", "schema_name", "execution_count", + "avg_elapsed_time_ms", "avg_disk_reads", "avg_disk_writes", "statement_type", "collection_timestamp", + })) + slowQueryList, _, err := getSlowRunningMetrics(conn, cp) + + assert.NoError(t, err) + assert.Len(t, slowQueryList, 0) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestGetSlowRunningMetricsUnsupportedVersion(t *testing.T) { + conn, mock := connection.CreateMockSQL(t) + args := args.ArgumentList{QueryMonitoringCountThreshold: 10} + databaseName := "testdb" + version := uint64(11) + cp := common_parameters.SetCommonParameters(args, version, databaseName) + slowQueryList, _, err := getSlowRunningMetrics(conn, cp) + assert.EqualError(t, err, commonutils.ErrUnsupportedVersion.Error()) + assert.Len(t, slowQueryList, 0) + assert.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/src/query-performance-monitoring/performance-metrics/wait_event_metrics.go b/src/query-performance-monitoring/performance-metrics/wait_event_metrics.go new file mode 100644 index 00000000..b1a8ea56 --- /dev/null +++ b/src/query-performance-monitoring/performance-metrics/wait_event_metrics.go @@ -0,0 +1,61 @@ +package performancemetrics + +import ( + "fmt" + + "github.com/newrelic/infra-integrations-sdk/v3/integration" + "github.com/newrelic/infra-integrations-sdk/v3/log" + performancedbconnection "github.com/newrelic/nri-postgresql/src/connection" + commonparameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters" + commonutils "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-utils" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/datamodels" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/queries" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/validations" +) + +func PopulateWaitEventMetrics(conn *performancedbconnection.PGSQLConnection, pgIntegration *integration.Integration, cp *commonparameters.CommonParameters, enabledExtensions map[string]bool) error { + var isEligible bool + var eligibleCheckErr error + isEligible, eligibleCheckErr = validations.CheckWaitEventMetricsFetchEligibility(enabledExtensions) + if eligibleCheckErr != nil { + log.Error("Error executing query: %v", eligibleCheckErr) + return commonutils.ErrUnExpectedError + } + if !isEligible { + log.Debug("Extension 'pg_wait_sampling' or 'pg_stat_statement' is not enabled or unsupported version.") + return commonutils.ErrNotEligible + } + waitEventMetricsList, waitEventErr := getWaitEventMetrics(conn, cp) + if waitEventErr != nil { + log.Error("Error fetching wait event queries: %v", waitEventErr) + return commonutils.ErrUnExpectedError + } + if len(waitEventMetricsList) == 0 { + log.Debug("No wait event queries found.") + return nil + } + err := commonutils.IngestMetric(waitEventMetricsList, "PostgresWaitEvents", pgIntegration, cp) + if err != nil { + log.Error("Error ingesting wait event queries: %v", err) + return err + } + return nil +} + +func getWaitEventMetrics(conn *performancedbconnection.PGSQLConnection, cp *commonparameters.CommonParameters) ([]interface{}, error) { + var waitEventMetricsList []interface{} + var query = fmt.Sprintf(queries.WaitEvents, cp.Databases, cp.QueryMonitoringCountThreshold) + rows, err := conn.Queryx(query) + if err != nil { + return nil, err + } + defer rows.Close() + for rows.Next() { + var waitEvent datamodels.WaitEventMetrics + if waitScanErr := rows.StructScan(&waitEvent); waitScanErr != nil { + return nil, err + } + waitEventMetricsList = append(waitEventMetricsList, waitEvent) + } + return waitEventMetricsList, nil +} diff --git a/src/query-performance-monitoring/performance-metrics/wait_event_metrics_test.go b/src/query-performance-monitoring/performance-metrics/wait_event_metrics_test.go new file mode 100644 index 00000000..3b5f153c --- /dev/null +++ b/src/query-performance-monitoring/performance-metrics/wait_event_metrics_test.go @@ -0,0 +1,49 @@ +package performancemetrics + +import ( + "fmt" + "regexp" + "testing" + + "github.com/newrelic/nri-postgresql/src/args" + "github.com/newrelic/nri-postgresql/src/connection" + common_parameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters" + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/queries" + "github.com/stretchr/testify/assert" + "gopkg.in/DATA-DOG/go-sqlmock.v1" +) + +func TestGetWaitEventMetrics(t *testing.T) { + conn, mock := connection.CreateMockSQL(t) + args := args.ArgumentList{QueryMonitoringCountThreshold: 10} + databaseName := "testdb" + cp := common_parameters.SetCommonParameters(args, uint64(14), databaseName) + + var query = fmt.Sprintf(queries.WaitEvents, databaseName, args.QueryMonitoringCountThreshold) + mock.ExpectQuery(regexp.QuoteMeta(query)).WillReturnRows(sqlmock.NewRows([]string{ + "wait_event_name", "wait_category", "total_wait_time_ms", "collection_timestamp", "query_id", "query_text", "database_name", + }).AddRow( + "Locks:Lock", "Locks", 1000.0, "2023-01-01T00:00:00Z", "queryid1", "SELECT 1", "testdb", + )) + waitEventsList, err := getWaitEventMetrics(conn, cp) + + assert.NoError(t, err) + assert.Len(t, waitEventsList, 1) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestGetWaitEventEmptyMetrics(t *testing.T) { + conn, mock := connection.CreateMockSQL(t) + args := args.ArgumentList{QueryMonitoringCountThreshold: 10} + databaseName := "testdb" + cp := common_parameters.SetCommonParameters(args, uint64(14), databaseName) + + var query = fmt.Sprintf(queries.WaitEvents, databaseName, args.QueryMonitoringCountThreshold) + mock.ExpectQuery(regexp.QuoteMeta(query)).WillReturnRows(sqlmock.NewRows([]string{ + "wait_event_name", "wait_category", "total_wait_time_ms", "collection_timestamp", "query_id", "query_text", "database_name", + })) + waitEventsList, err := getWaitEventMetrics(conn, cp) + assert.NoError(t, err) + assert.Len(t, waitEventsList, 0) + assert.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/src/query-performance-monitoring/queries/queries.go b/src/query-performance-monitoring/queries/queries.go new file mode 100644 index 00000000..42ee0829 --- /dev/null +++ b/src/query-performance-monitoring/queries/queries.go @@ -0,0 +1,217 @@ +// Package queries contains the collection methods to parse and build the collection schema +package queries + +const ( + SlowQueriesForV13AndAbove = `SELECT 'newrelic' as newrelic, -- Common value to filter with like operator in slow query metrics + pss.queryid AS query_id, -- Unique identifier for the query + LEFT(pss.query, 4095) AS query_text, -- Query text truncated to 4095 characters + pd.datname AS database_name, -- Name of the database + current_schema() AS schema_name, -- Name of the current schema + pss.calls AS execution_count, -- Number of times the query was executed + ROUND((pss.total_exec_time / pss.calls)::numeric, 3) AS avg_elapsed_time_ms, -- Average execution time in milliseconds + pss.shared_blks_read / pss.calls AS avg_disk_reads, -- Average number of disk reads per execution + pss.shared_blks_written / pss.calls AS avg_disk_writes, -- Average number of disk writes per execution + CASE + WHEN pss.query ILIKE 'SELECT%%' THEN 'SELECT' -- Query type is SELECT + WHEN pss.query ILIKE 'INSERT%%' THEN 'INSERT' -- Query type is INSERT + WHEN pss.query ILIKE 'UPDATE%%' THEN 'UPDATE' -- Query type is UPDATE + WHEN pss.query ILIKE 'DELETE%%' THEN 'DELETE' -- Query type is DELETE + ELSE 'OTHER' -- Query type is OTHER + END AS statement_type, -- Type of SQL statement + to_char(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"') AS collection_timestamp -- Timestamp of data collection + FROM + pg_stat_statements pss + JOIN + pg_database pd ON pss.dbid = pd.oid + WHERE + pd.datname in (%s) -- List of database names + AND pss.query NOT ILIKE 'EXPLAIN (FORMAT JSON)%%' -- Exclude EXPLAIN queries + AND pss.query NOT ILIKE 'SELECT $1 as newrelic%%' -- Exclude specific New Relic queries + AND pss.query NOT ILIKE 'WITH wait_history AS%%' -- Exclude specific WITH queries + AND pss.query NOT ILIKE 'select -- BLOATQUERY%%' -- Exclude BLOATQUERY + AND pss.query NOT ILIKE 'select -- INDEXQUERY%%' -- Exclude INDEXQUERY + AND pss.query NOT ILIKE 'SELECT -- TABLEQUERY%%' -- Exclude TABLEQUERY + AND pss.query NOT ILIKE 'SELECT table_schema%%' -- Exclude table_schema queries + ORDER BY + avg_elapsed_time_ms DESC -- Order by the average elapsed time in descending order + LIMIT %d;` + + // SlowQueriesForV12 retrieves slow queries and their statistics for PostgreSQL version 12 + SlowQueriesForV12 = `SELECT 'newrelic' as newrelic, -- Common value to filter with like operator in slow query metrics + pss.queryid AS query_id, -- Unique identifier for the query + LEFT(pss.query, 4095) AS query_text, -- Query text truncated to 4095 characters + pd.datname AS database_name, -- Name of the database + current_schema() AS schema_name, -- Name of the current schema + pss.calls AS execution_count, -- Number of times the query was executed + ROUND((pss.total_time / pss.calls)::numeric, 3) AS avg_elapsed_time_ms, -- Average execution time in milliseconds + pss.shared_blks_read / pss.calls AS avg_disk_reads, -- Average number of disk reads per execution + pss.shared_blks_written / pss.calls AS avg_disk_writes, -- Average number of disk writes per execution + CASE + WHEN pss.query ILIKE 'SELECT%%' THEN 'SELECT' -- Query type is SELECT + WHEN pss.query ILIKE 'INSERT%%' THEN 'INSERT' -- Query type is INSERT + WHEN pss.query ILIKE 'UPDATE%%' THEN 'UPDATE' -- Query type is UPDATE + WHEN pss.query ILIKE 'DELETE%%' THEN 'DELETE' -- Query type is DELETE + ELSE 'OTHER' -- Query type is OTHER + END AS statement_type, -- Type of SQL statement + to_char(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"') AS collection_timestamp -- Timestamp of data collection + FROM + pg_stat_statements pss + JOIN + pg_database pd ON pss.dbid = pd.oid + WHERE + pd.datname in (%s) -- List of database names + AND pss.query NOT ILIKE 'EXPLAIN (FORMAT JSON) %%' -- Exclude EXPLAIN queries + AND pss.query NOT ILIKE 'SELECT $1 as newrelic%%' -- Exclude specific New Relic queries + AND pss.query NOT ILIKE 'WITH wait_history AS%%' -- Exclude specific WITH queries + AND pss.query NOT ILIKE 'select -- BLOATQUERY%%' -- Exclude BLOATQUERY + AND pss.query NOT ILIKE 'select -- INDEXQUERY%%' -- Exclude INDEXQUERY + AND pss.query NOT ILIKE 'SELECT -- TABLEQUERY%%' -- Exclude TABLEQUERY + AND pss.query NOT ILIKE 'SELECT table_schema%%' -- Exclude table_schema queries + AND pss.query NOT ILIKE 'SELECT D.datname%%' -- Exclude specific datname queries + ORDER BY + avg_elapsed_time_ms DESC -- Order by the average elapsed time in descending order + LIMIT + %d; -- Limit the number of results` + + // WaitEvents retrieves wait events and their statistics + WaitEvents = `WITH wait_history AS ( + SELECT + wh.pid, -- Process ID + wh.event_type, -- Type of the wait event + wh.event, -- Wait event + wh.ts, -- Timestamp of the wait event + pg_database.datname AS database_name, -- Name of the database + LEAD(wh.ts) OVER (PARTITION BY wh.pid ORDER BY wh.ts) - wh.ts AS duration, -- Duration of the wait event + LEFT(sa.query, 4095) AS query_text, -- Query text truncated to 4095 characters + sa.queryid AS query_id -- Unique identifier for the query + FROM + pg_wait_sampling_history wh + LEFT JOIN + pg_stat_statements sa ON wh.queryid = sa.queryid + LEFT JOIN + pg_database ON pg_database.oid = sa.dbid + WHERE pg_database.datname in (%s) -- List of database names + ) + SELECT + event_type || ':' || event AS wait_event_name, -- Concatenated wait event name + CASE + WHEN event_type IN ('LWLock', 'Lock') THEN 'Locks' -- Wait category is Locks + WHEN event_type = 'IO' THEN 'Disk IO' -- Wait category is Disk IO + WHEN event_type = 'CPU' THEN 'CPU' -- Wait category is CPU + ELSE 'Other' -- Wait category is Other + END AS wait_category, -- Category of the wait event + EXTRACT(EPOCH FROM SUM(duration)) * 1000 AS total_wait_time_ms, -- Convert duration to milliseconds + to_char(NOW() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"') AS collection_timestamp, -- Timestamp of data collection + query_id, -- Unique identifier for the query + query_text, -- Query text + database_name -- Name of the database + FROM wait_history + WHERE query_text NOT LIKE 'EXPLAIN (FORMAT JSON) %%' AND query_id IS NOT NULL AND event_type IS NOT NULL + GROUP BY event_type, event, query_id, query_text, database_name + ORDER BY total_wait_time_ms DESC -- Order by the total wait time in descending order + LIMIT %d; -- Limit the number of results` + + // BlockingQueriesForV14AndAbove retrieves information about blocking and blocked queries for PostgreSQL version 14 and above + BlockingQueriesForV14AndAbove = `SELECT 'newrelic' as newrelic, -- Common value to filter with like operator in slow query metrics + blocked_activity.pid AS blocked_pid, -- Process ID of the blocked query + LEFT(blocked_statements.query, 4095) AS blocked_query, -- Blocked query text truncated to 4095 characters + blocked_statements.queryid AS blocked_query_id, -- Unique identifier for the blocked query + blocked_activity.query_start AS blocked_query_start, -- Start time of the blocked query + blocked_activity.datname AS database_name, -- Name of the database + blocking_activity.pid AS blocking_pid, -- Process ID of the blocking query + LEFT(blocking_statements.query, 4095) AS blocking_query, -- Blocking query text truncated to 4095 characters + blocking_statements.queryid AS blocking_query_id, -- Unique identifier for the blocking query + blocking_activity.query_start AS blocking_query_start -- Start time of the blocking query + FROM pg_stat_activity AS blocked_activity + JOIN pg_stat_statements AS blocked_statements ON blocked_activity.query_id = blocked_statements.queryid + JOIN pg_locks blocked_locks ON blocked_activity.pid = blocked_locks.pid + JOIN pg_locks blocking_locks ON blocked_locks.locktype = blocking_locks.locktype + AND blocked_locks.database IS NOT DISTINCT FROM blocking_locks.database + AND blocked_locks.relation IS NOT DISTINCT FROM blocking_locks.relation + AND blocked_locks.page IS NOT DISTINCT FROM blocking_locks.page + AND blocked_locks.tuple IS NOT DISTINCT FROM blocking_locks.tuple + AND blocked_locks.transactionid IS NOT DISTINCT FROM blocking_locks.transactionid + AND blocked_locks.classid IS NOT DISTINCT FROM blocking_locks.classid + AND blocked_locks.objid IS NOT DISTINCT FROM blocking_locks.objid + AND blocked_locks.objsubid IS NOT DISTINCT FROM blocking_locks.objsubid + AND blocked_locks.pid <> blocking_locks.pid + JOIN pg_stat_activity AS blocking_activity ON blocking_locks.pid = blocking_activity.pid + JOIN pg_stat_statements AS blocking_statements ON blocking_activity.query_id = blocking_statements.queryid + WHERE NOT blocked_locks.granted + AND blocked_activity.datname IN (%s) -- List of database names + AND blocked_statements.query NOT LIKE 'EXPLAIN (FORMAT JSON) %%' -- Exclude EXPLAIN queries + AND blocking_statements.query NOT LIKE 'EXPLAIN (FORMAT JSON) %%' -- Exclude EXPLAIN queries + ORDER BY blocked_activity.query_start ASC -- Order by the start time of the blocked query in ascending order + LIMIT %d; -- Limit the number of results` + + // BlockingQueriesForV12AndV13 retrieves information about blocking and blocked queries for PostgreSQL versions 12 and 13 + BlockingQueriesForV12AndV13 = `SELECT 'newrelic' as newrelic, -- Common value to filter with like operator in slow query metrics + blocked_activity.pid AS blocked_pid, -- Process ID of the blocked query + LEFT(blocked_activity.query, 4095) AS blocked_query, -- Blocked query text truncated to 4095 characters + blocked_activity.query_start AS blocked_query_start, -- Start time of the blocked query + blocked_activity.datname AS database_name, -- Name of the database + blocking_activity.pid AS blocking_pid, -- Process ID of the blocking query + LEFT(blocking_activity.query, 4095) AS blocking_query, -- Blocking query text truncated to 4095 characters + blocking_activity.query_start AS blocking_query_start -- Start time of the blocking query + FROM pg_stat_activity AS blocked_activity + JOIN pg_locks blocked_locks ON blocked_activity.pid = blocked_locks.pid + JOIN pg_locks blocking_locks ON blocked_locks.locktype = blocking_locks.locktype + AND blocked_locks.database IS NOT DISTINCT FROM blocking_locks.database + AND blocked_locks.relation IS NOT DISTINCT FROM blocking_locks.relation + AND blocked_locks.page IS NOT DISTINCT FROM blocking_locks.page + AND blocked_locks.tuple IS NOT DISTINCT FROM blocking_locks.tuple + AND blocked_locks.transactionid IS NOT DISTINCT FROM blocking_locks.transactionid + AND blocked_locks.classid IS NOT DISTINCT FROM blocking_locks.classid + AND blocked_locks.objid IS NOT DISTINCT FROM blocking_locks.objid + AND blocked_locks.objsubid IS NOT DISTINCT FROM blocking_locks.objsubid + AND blocked_locks.pid <> blocking_locks.pid + JOIN pg_stat_activity AS blocking_activity ON blocking_locks.pid = blocking_activity.pid + WHERE NOT blocked_locks.granted + AND blocked_activity.datname IN (%s) -- List of database names + AND blocked_activity.query NOT LIKE 'EXPLAIN (FORMAT JSON) %%' -- Exclude EXPLAIN queries + AND blocking_activity.query NOT LIKE 'EXPLAIN (FORMAT JSON) %%' -- Exclude EXPLAIN queries + ORDER BY blocked_activity.query_start ASC -- Order by the start time of the blocked query in ascending order + LIMIT %d; -- Limit the number of results` + + // IndividualQuerySearchV13AndAbove retrieves individual query statistics for PostgreSQL version 13 and above + IndividualQuerySearchV13AndAbove = `SELECT 'newrelic' as newrelic, -- Common value to filter with like operator in slow query metrics + LEFT(query, 4095) as query, -- Query text truncated to 4095 characters + queryid, -- Unique identifier for the query + datname, -- Name of the database + planid, -- Plan identifier + ROUND(((cpu_user_time + cpu_sys_time) / NULLIF(calls, 0))::numeric, 3) AS cpu_time_ms, -- Average CPU time in milliseconds + total_exec_time / NULLIF(calls, 0) AS exec_time_ms -- Average execution time in milliseconds + FROM + pg_stat_monitor + WHERE + queryid = %s -- Query identifier + AND datname IN (%s) -- List of database names + AND (total_exec_time / NULLIF(calls, 0)) > %d -- Minimum average execution time + AND bucket_start_time >= NOW() - INTERVAL '60 seconds' -- Time interval + GROUP BY + query, queryid, datname, planid, cpu_user_time, cpu_sys_time, calls, total_exec_time + ORDER BY + exec_time_ms DESC -- Order by average execution time in descending order + LIMIT %d; -- Limit the number of results` + + // IndividualQuerySearchV12 retrieves individual query statistics for PostgreSQL version 12 + IndividualQuerySearchV12 = `SELECT 'newrelic' as newrelic, -- Common value to filter with like operator in slow query metrics + LEFT(query, 4095) as query, -- Query text truncated to 4095 characters + queryid, -- Unique identifier for the query + datname, -- Name of the database + planid, -- Plan identifier + ROUND(((cpu_user_time + cpu_sys_time) / NULLIF(calls, 0))::numeric, 3) AS cpu_time_ms, -- Average CPU time in milliseconds + total_time / NULLIF(calls, 0) AS exec_time_ms -- Average execution time in milliseconds + FROM + pg_stat_monitor + WHERE + queryid = %s -- Query identifier + AND datname IN (%s) -- List of database names + AND (total_time / NULLIF(calls, 0)) > %d -- Minimum average execution time + AND bucket_start_time >= NOW() - INTERVAL '60 seconds' -- Time interval + GROUP BY + query, queryid, datname, planid, cpu_user_time, cpu_sys_time, calls, total_time + ORDER BY + exec_time_ms DESC -- Order by average execution time in descending order + LIMIT %d; -- Limit the number of results` +) diff --git a/src/query-performance-monitoring/query_performance_main.go b/src/query-performance-monitoring/query_performance_main.go new file mode 100644 index 00000000..8640f5c8 --- /dev/null +++ b/src/query-performance-monitoring/query_performance_main.go @@ -0,0 +1,79 @@ +package queryperformancemonitoring + +// this is the main go file for the query_monitoring package +import ( + "time" + + "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/validations" + + common_parameters "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-parameters" + + "github.com/newrelic/infra-integrations-sdk/v3/integration" + "github.com/newrelic/infra-integrations-sdk/v3/log" + "github.com/newrelic/nri-postgresql/src/args" + "github.com/newrelic/nri-postgresql/src/collection" + performancedbconnection "github.com/newrelic/nri-postgresql/src/connection" + "github.com/newrelic/nri-postgresql/src/metrics" + commonutils "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-utils" + performancemetrics "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/performance-metrics" +) + +func QueryPerformanceMain(args args.ArgumentList, pgIntegration *integration.Integration, databaseMap collection.DatabaseList) { + connectionInfo := performancedbconnection.DefaultConnectionInfo(&args) + if len(databaseMap) == 0 { + log.Debug("No databases found") + return + } + newConnection, err := connectionInfo.NewConnection(connectionInfo.DatabaseName()) + if err != nil { + log.Error("Error creating connection: ", err) + return + } + defer newConnection.Close() + + version, versionErr := metrics.CollectVersion(newConnection) + if versionErr != nil { + log.Error("Error fetching version: ", versionErr) + return + } + versionInt := version.Major + if !validations.CheckPostgresVersionSupportForQueryMonitoring(versionInt) { + log.Debug("Postgres version: %d is not supported for query monitoring", versionInt) + return + } + cp := common_parameters.SetCommonParameters(args, versionInt, commonutils.GetDatabaseListInString(databaseMap)) + + populateQueryPerformanceMetrics(newConnection, pgIntegration, cp, connectionInfo) +} + +func populateQueryPerformanceMetrics(newConnection *performancedbconnection.PGSQLConnection, pgIntegration *integration.Integration, cp *common_parameters.CommonParameters, connectionInfo performancedbconnection.Info) { + enabledExtensions, err := validations.FetchAllExtensions(newConnection) + if err != nil { + log.Error("Error fetching extensions: ", err) + return + } + start := time.Now() + log.Debug("Starting PopulateSlowRunningMetrics at ", start) + slowRunningQueries := performancemetrics.PopulateSlowRunningMetrics(newConnection, pgIntegration, cp, enabledExtensions) + log.Debug("PopulateSlowRunningMetrics completed in ", time.Since(start)) + + start = time.Now() + log.Debug("Starting PopulateWaitEventMetrics at ", start) + _ = performancemetrics.PopulateWaitEventMetrics(newConnection, pgIntegration, cp, enabledExtensions) + log.Debug("PopulateWaitEventMetrics completed in ", time.Since(start)) + + start = time.Now() + log.Debug("Starting PopulateBlockingMetrics at ", start) + performancemetrics.PopulateBlockingMetrics(newConnection, pgIntegration, cp, enabledExtensions) + log.Debug("PopulateBlockingMetrics completed in ", time.Since(start)) + + start = time.Now() + log.Debug("Starting PopulateIndividualQueryMetrics at ", start) + individualQueries := performancemetrics.PopulateIndividualQueryMetrics(newConnection, slowRunningQueries, pgIntegration, cp, enabledExtensions) + log.Debug("PopulateIndividualQueryMetrics completed in ", time.Since(start)) + + start = time.Now() + log.Debug("Starting PopulateExecutionPlanMetrics at ", start) + performancemetrics.PopulateExecutionPlanMetrics(individualQueries, pgIntegration, cp, connectionInfo) + log.Debug("PopulateExecutionPlanMetrics completed in ", time.Since(start)) +} diff --git a/src/query-performance-monitoring/validations/performance_metrics_validations.go b/src/query-performance-monitoring/validations/performance_metrics_validations.go new file mode 100644 index 00000000..356080e8 --- /dev/null +++ b/src/query-performance-monitoring/validations/performance_metrics_validations.go @@ -0,0 +1,50 @@ +package validations + +import ( + "github.com/newrelic/infra-integrations-sdk/v3/log" + performancedbconnection "github.com/newrelic/nri-postgresql/src/connection" + commonutils "github.com/newrelic/nri-postgresql/src/query-performance-monitoring/common-utils" +) + +func FetchAllExtensions(conn *performancedbconnection.PGSQLConnection) (map[string]bool, error) { + rows, err := conn.Queryx("SELECT extname FROM pg_extension") + if err != nil { + log.Error("Error executing query: ", err.Error()) + return nil, err + } + defer rows.Close() + var enabledExtensions = make(map[string]bool) + for rows.Next() { + var extname string + if err := rows.Scan(&extname); err != nil { + log.Error("Error scanning rows: ", err.Error()) + return nil, err + } + enabledExtensions[extname] = true + } + return enabledExtensions, nil +} + +func CheckSlowQueryMetricsFetchEligibility(enabledExtensions map[string]bool) (bool, error) { + return enabledExtensions["pg_stat_statements"], nil +} + +func CheckWaitEventMetricsFetchEligibility(enabledExtensions map[string]bool) (bool, error) { + return enabledExtensions["pg_wait_sampling"] && enabledExtensions["pg_stat_statements"], nil +} + +func CheckBlockingSessionMetricsFetchEligibility(enabledExtensions map[string]bool, version uint64) (bool, error) { + // Version 12 and 13 do not require the pg_stat_statements extension + if version == commonutils.PostgresVersion12 || version == commonutils.PostgresVersion13 { + return true, nil + } + return enabledExtensions["pg_stat_statements"], nil +} + +func CheckIndividualQueryMetricsFetchEligibility(enabledExtensions map[string]bool) (bool, error) { + return enabledExtensions["pg_stat_monitor"], nil +} + +func CheckPostgresVersionSupportForQueryMonitoring(version uint64) bool { + return version >= commonutils.PostgresVersion12 +} diff --git a/src/query-performance-monitoring/validations/performance_metrics_validations_test.go b/src/query-performance-monitoring/validations/performance_metrics_validations_test.go new file mode 100644 index 00000000..cd4ef1d8 --- /dev/null +++ b/src/query-performance-monitoring/validations/performance_metrics_validations_test.go @@ -0,0 +1,103 @@ +package validations + +import ( + "regexp" + "testing" + + "github.com/newrelic/nri-postgresql/src/connection" + "github.com/stretchr/testify/assert" + "gopkg.in/DATA-DOG/go-sqlmock.v1" +) + +func TestCheckBlockingSessionMetricsFetchEligibilityExtensionNotRequired(t *testing.T) { + conn, mock := connection.CreateMockSQL(t) + version := uint64(12) + enabledExtensions, _ := FetchAllExtensions(conn) + isExtensionEnabledTest, _ := CheckBlockingSessionMetricsFetchEligibility(enabledExtensions, version) + assert.Equal(t, isExtensionEnabledTest, true) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestCheckBlockingSessionMetricsFetchEligibilitySupportedVersionSuccess(t *testing.T) { + conn, mock := connection.CreateMockSQL(t) + version := uint64(14) + validationQueryStatStatements := "SELECT extname FROM pg_extension" + mock.ExpectQuery(regexp.QuoteMeta(validationQueryStatStatements)).WillReturnRows(sqlmock.NewRows([]string{"extname"}).AddRow("pg_stat_statements")) + enabledExtensions, _ := FetchAllExtensions(conn) + isExtensionEnabledTest, _ := CheckBlockingSessionMetricsFetchEligibility(enabledExtensions, version) + assert.Equal(t, isExtensionEnabledTest, true) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestCheckBlockingSessionMetricsFetchEligibilitySupportedVersionFail(t *testing.T) { + conn, mock := connection.CreateMockSQL(t) + version := uint64(14) + validationQueryStatStatements := "SELECT extname FROM pg_extension" + mock.ExpectQuery(regexp.QuoteMeta(validationQueryStatStatements)).WillReturnRows(sqlmock.NewRows([]string{"extname"}).AddRow("pg_stat_statements")) + enabledExtensions, _ := FetchAllExtensions(conn) + isExtensionEnabledTest, _ := CheckBlockingSessionMetricsFetchEligibility(enabledExtensions, version) + assert.Equal(t, isExtensionEnabledTest, true) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestIndividualQueryMetricsFetchEligibilitySupportedVersionSuccess(t *testing.T) { + conn, mock := connection.CreateMockSQL(t) + validationQueryStatStatements := "SELECT extname FROM pg_extension" + mock.ExpectQuery(regexp.QuoteMeta(validationQueryStatStatements)).WillReturnRows(sqlmock.NewRows([]string{"extname"}).AddRow("pg_stat_monitor")) + enabledExtensions, _ := FetchAllExtensions(conn) + isExtensionEnabledTest, _ := CheckIndividualQueryMetricsFetchEligibility(enabledExtensions) + assert.Equal(t, isExtensionEnabledTest, true) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestIndividualQueryMetricsFetchEligibilitySupportedVersionFail(t *testing.T) { + conn, mock := connection.CreateMockSQL(t) + validationQueryStatStatements := "SELECT extname FROM pg_extension" + mock.ExpectQuery(regexp.QuoteMeta(validationQueryStatStatements)).WillReturnRows(sqlmock.NewRows([]string{"extname"})) + enabledExtensions, _ := FetchAllExtensions(conn) + isExtensionEnabledTest, _ := CheckIndividualQueryMetricsFetchEligibility(enabledExtensions) + assert.Equal(t, isExtensionEnabledTest, false) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestCheckWaitEventMetricsFetchEligibility(t *testing.T) { + validationQuery := "SELECT extname FROM pg_extension" + testCases := []struct { + waitExt string + statExt string + expected bool + }{ + {"pg_wait_sampling", "pg_stat_statements", true}, // Success + {"pg_wait_sampling", "", false}, // Fail V1 + {"", "pg_stat_statements", false}, // Fail V2 + } + + conn, mock := connection.CreateMockSQL(t) + for _, tc := range testCases { + mock.ExpectQuery(regexp.QuoteMeta(validationQuery)).WillReturnRows(sqlmock.NewRows([]string{"extname"}).AddRow(tc.waitExt).AddRow(tc.statExt)) + enabledExtensions, _ := FetchAllExtensions(conn) + isExtensionEnabledTest, _ := CheckWaitEventMetricsFetchEligibility(enabledExtensions) + assert.Equal(t, isExtensionEnabledTest, tc.expected) + assert.NoError(t, mock.ExpectationsWereMet()) + } +} + +func TestCheckSlowQueryMetricsFetchEligibilitySupportedVersionSuccess(t *testing.T) { + conn, mock := connection.CreateMockSQL(t) + validationQueryStatStatements := "SELECT extname FROM pg_extension" + mock.ExpectQuery(regexp.QuoteMeta(validationQueryStatStatements)).WillReturnRows(sqlmock.NewRows([]string{"extname"}).AddRow("pg_stat_statements")) + enabledExtensions, _ := FetchAllExtensions(conn) + isExtensionEnabledTest, _ := CheckSlowQueryMetricsFetchEligibility(enabledExtensions) + assert.Equal(t, isExtensionEnabledTest, true) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestCheckSlowQueryMetricsFetchEligibilitySupportedVersionFail(t *testing.T) { + conn, mock := connection.CreateMockSQL(t) + validationQueryStatStatements := "SELECT extname FROM pg_extension" + mock.ExpectQuery(regexp.QuoteMeta(validationQueryStatStatements)).WillReturnRows(sqlmock.NewRows([]string{"extname"})) + enabledExtensions, _ := FetchAllExtensions(conn) + isExtensionEnabledTest, _ := CheckSlowQueryMetricsFetchEligibility(enabledExtensions) + assert.Equal(t, isExtensionEnabledTest, false) + assert.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/tests/docker-compose-performance.yml b/tests/docker-compose-performance.yml new file mode 100644 index 00000000..55d7425e --- /dev/null +++ b/tests/docker-compose-performance.yml @@ -0,0 +1,70 @@ +services: + + postgres13: + build: + context: ./perf-testing/oldest_supported/ + dockerfile: Dockerfile + container_name: "postgresql-perf-oldest" + restart: always + environment: + - POSTGRES_USER=dbuser + - POSTGRES_PASSWORD=dbpassword + - POSTGRES_DB=demo + volumes: + - postgres13:/var/lib/postgresql/data + ports: + - "6432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 10s + timeout: 5s + retries: 5 + + postgresql-latest: + build: + context: ./perf-testing/oldest_supported/ + dockerfile: Dockerfile + restart: always + container_name: "postgresql-perf-latest" + environment: + - POSTGRES_USER=dbuser + - POSTGRES_PASSWORD=dbpassword + - POSTGRES_DB=demo + volumes: + - pgdata_latest:/var/lib/postgresql/data + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 10s + timeout: 5s + retries: 5 + + postgres-without-extensions: + image: postgres:17.0 + restart: always + container_name: "postgresql-noext" + environment: + - POSTGRES_USER=dbuser + - POSTGRES_PASSWORD=dbpassword + - POSTGRES_DB=demo + volumes: + - pgdata_noext:/var/lib/postgresql/data + ports: + - "7432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 10s + timeout: 5s + retries: 5 + + nri-postgresql: + container_name: nri_postgresql + build: + context: ../ + dockerfile: tests/perf-testing/integration/Dockerfile + +volumes: + pgdata_latest: + postgres13: + pgdata_noext: diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 2e741d36..3484cdcb 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -2,6 +2,7 @@ services: postgres-9-6: image: postgres:9.6 restart: always + container_name: postgres-9-6 environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: example @@ -10,18 +11,14 @@ services: postgres-latest-supported: image: postgres:17.0 restart: always + container_name: postgres-latest-supported environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: example POSTGRES_DB: demo nri-postgresql: - image: golang:1.23.5-bookworm - container_name: nri_postgresql - working_dir: /code - depends_on: - - postgres-9-6 - - postgres-latest-supported - volumes: - - ../:/code - entrypoint: go run /code/src/main.go + container_name: nri-postgresql + build: + context: ../ + dockerfile: tests/perf-testing/integration/Dockerfile diff --git a/tests/perf-testing/integration/Dockerfile b/tests/perf-testing/integration/Dockerfile new file mode 100644 index 00000000..513025d6 --- /dev/null +++ b/tests/perf-testing/integration/Dockerfile @@ -0,0 +1,9 @@ +FROM golang:1.23.5-bookworm as builder +ARG CGO_ENABLED=0 +WORKDIR /go/src/github.com/newrelic/nri-postgresql +COPY . . +RUN make clean compile + +FROM alpine:latest +COPY --from=builder /go/src/github.com/newrelic/nri-postgresql/bin / +CMD ["sleep", "1h"] \ No newline at end of file diff --git a/tests/perf-testing/latest_supported/01-init-extensions.sql b/tests/perf-testing/latest_supported/01-init-extensions.sql new file mode 100644 index 00000000..66210525 --- /dev/null +++ b/tests/perf-testing/latest_supported/01-init-extensions.sql @@ -0,0 +1,5 @@ +CREATE EXTENSION IF NOT EXISTS pg_stat_statements; + +CREATE EXTENSION IF NOT EXISTS pg_wait_sampling; + +CREATE EXTENSION IF NOT EXISTS pg_stat_monitor; \ No newline at end of file diff --git a/tests/perf-testing/latest_supported/02-create-database.sql b/tests/perf-testing/latest_supported/02-create-database.sql new file mode 100644 index 00000000..e053c036 --- /dev/null +++ b/tests/perf-testing/latest_supported/02-create-database.sql @@ -0,0 +1 @@ +CREATE DATABASE titanic; \ No newline at end of file diff --git a/tests/perf-testing/latest_supported/03-import-data.sql b/tests/perf-testing/latest_supported/03-import-data.sql new file mode 100644 index 00000000..28cc43b2 --- /dev/null +++ b/tests/perf-testing/latest_supported/03-import-data.sql @@ -0,0 +1,11 @@ +-- Connect to titanic database +\c titanic; + +-- Import the titanic.sql file that was downloaded during Docker build +\i /docker-entrypoint-initdb.d/titanic.sql; + +GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO dbuser; +GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO dbuser; + +-- Analyze tables for better query planning +ANALYZE VERBOSE; diff --git a/tests/perf-testing/latest_supported/Dockerfile b/tests/perf-testing/latest_supported/Dockerfile new file mode 100644 index 00000000..ffc0530f --- /dev/null +++ b/tests/perf-testing/latest_supported/Dockerfile @@ -0,0 +1,40 @@ +FROM postgres:17.0 + +# Dependencies +RUN apt-get update && apt-get install -y \ + build-essential \ + git \ + wget \ + postgresql-server-dev-17 \ + && rm -rf /var/lib/apt/lists/* + +# Postgres Docker Images copy contents of postgresql.conf.sample to postgresql.conf during initialization +# COPY custom.conf /usr/share/postgresql/postgresql.conf.sample -- DO NOT USE +RUN echo "shared_preload_libraries = 'pg_stat_statements,pg_wait_sampling,pg_stat_monitor'" >> /usr/share/postgresql/postgresql.conf.sample +RUN echo "pg_stat_statements.track = all" >> /usr/share/postgresql/postgresql.conf.sample +RUN echo "pg_stat_statements.save = on" >> /usr/share/postgresql/postgresql.conf.sample +RUN echo "pg_stat_monitor.pgsm_enable_query_plan = on" >> /usr/share/postgresql/postgresql.conf.sample + +# Install pg_wait_sampling +RUN git clone https://github.com/postgrespro/pg_wait_sampling.git \ + && cd pg_wait_sampling \ + && make USE_PGXS=1 \ + && make USE_PGXS=1 install \ + && cd .. \ + && rm -rf pg_wait_sampling + +# Install pg_stat_monitor +RUN git clone https://github.com/percona/pg_stat_monitor.git \ + && cd pg_stat_monitor \ + && make USE_PGXS=1 \ + && make USE_PGXS=1 install \ + && cd .. \ + && rm -rf pg_stat_monitor + +# Download the titanic database +RUN wget https://raw.githubusercontent.com/neondatabase/postgres-sample-dbs/main/titanic.sql -P /docker-entrypoint-initdb.d/ + +# Enable the extensions and setup the titanic database +COPY 01-init-extensions.sql /docker-entrypoint-initdb.d/01-init-extensions.sql +COPY 02-create-database.sql /docker-entrypoint-initdb.d/02-create-database.sql +COPY 03-import-data.sql /docker-entrypoint-initdb.d/03-import-data.sql \ No newline at end of file diff --git a/tests/perf-testing/oldest_supported/01-init-extensions.sql b/tests/perf-testing/oldest_supported/01-init-extensions.sql new file mode 100644 index 00000000..66210525 --- /dev/null +++ b/tests/perf-testing/oldest_supported/01-init-extensions.sql @@ -0,0 +1,5 @@ +CREATE EXTENSION IF NOT EXISTS pg_stat_statements; + +CREATE EXTENSION IF NOT EXISTS pg_wait_sampling; + +CREATE EXTENSION IF NOT EXISTS pg_stat_monitor; \ No newline at end of file diff --git a/tests/perf-testing/oldest_supported/02-create-database.sql b/tests/perf-testing/oldest_supported/02-create-database.sql new file mode 100644 index 00000000..e053c036 --- /dev/null +++ b/tests/perf-testing/oldest_supported/02-create-database.sql @@ -0,0 +1 @@ +CREATE DATABASE titanic; \ No newline at end of file diff --git a/tests/perf-testing/oldest_supported/03-import-data.sql b/tests/perf-testing/oldest_supported/03-import-data.sql new file mode 100644 index 00000000..28cc43b2 --- /dev/null +++ b/tests/perf-testing/oldest_supported/03-import-data.sql @@ -0,0 +1,11 @@ +-- Connect to titanic database +\c titanic; + +-- Import the titanic.sql file that was downloaded during Docker build +\i /docker-entrypoint-initdb.d/titanic.sql; + +GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO dbuser; +GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO dbuser; + +-- Analyze tables for better query planning +ANALYZE VERBOSE; diff --git a/tests/perf-testing/oldest_supported/Dockerfile b/tests/perf-testing/oldest_supported/Dockerfile new file mode 100644 index 00000000..288ca624 --- /dev/null +++ b/tests/perf-testing/oldest_supported/Dockerfile @@ -0,0 +1,40 @@ +FROM postgres:13 + +# Dependencies +RUN apt-get update && apt-get install -y \ + build-essential \ + git \ + wget \ + postgresql-server-dev-13 \ + && rm -rf /var/lib/apt/lists/* + +# Postgres Docker Images copy contents of postgresql.conf.sample to postgresql.conf during initialization +# COPY custom.conf /usr/share/postgresql/postgresql.conf.sample -- DO NOT USE +RUN echo "shared_preload_libraries = 'pg_stat_statements,pg_wait_sampling,pg_stat_monitor'" >> /usr/share/postgresql/postgresql.conf.sample +RUN echo "pg_stat_statements.track = all" >> /usr/share/postgresql/postgresql.conf.sample +RUN echo "pg_stat_statements.save = on" >> /usr/share/postgresql/postgresql.conf.sample +RUN echo "pg_stat_monitor.pgsm_enable_query_plan = on" >> /usr/share/postgresql/postgresql.conf.sample + +# Install pg_wait_sampling +RUN git clone https://github.com/postgrespro/pg_wait_sampling.git \ + && cd pg_wait_sampling \ + && make USE_PGXS=1 \ + && make USE_PGXS=1 install \ + && cd .. \ + && rm -rf pg_wait_sampling + +# Install pg_stat_monitor +RUN git clone https://github.com/percona/pg_stat_monitor.git \ + && cd pg_stat_monitor \ + && make USE_PGXS=1 \ + && make USE_PGXS=1 install \ + && cd .. \ + && rm -rf pg_stat_monitor + +# Download the titanic database +RUN wget https://raw.githubusercontent.com/neondatabase/postgres-sample-dbs/main/titanic.sql -P /docker-entrypoint-initdb.d/ + +# Enable the extensions and setup the titanic database +COPY 01-init-extensions.sql /docker-entrypoint-initdb.d/01-init-extensions.sql +COPY 02-create-database.sql /docker-entrypoint-initdb.d/02-create-database.sql +COPY 03-import-data.sql /docker-entrypoint-initdb.d/03-import-data.sql \ No newline at end of file diff --git a/tests/postgresql_test.go b/tests/postgresql_test.go index 93d91af1..60fee100 100644 --- a/tests/postgresql_test.go +++ b/tests/postgresql_test.go @@ -3,46 +3,29 @@ package tests import ( - "bytes" "flag" - "fmt" "os" - "os/exec" - "path/filepath" - "strings" "testing" - "github.com/newrelic/infra-integrations-sdk/v3/log" + "github.com/newrelic/nri-postgresql/tests/simulation" "github.com/stretchr/testify/assert" - "github.com/xeipuuv/gojsonschema" +) + +var ( + defaultPassword = flag.String("password", "example", "Default password for postgres") + defaultUser = flag.String("username", "postgres", "Default username for postgres") + defaultDB = flag.String("database", "demo", "Default database name") + container = flag.String("container", "nri-postgresql", "Container name for the integration") ) const ( // docker compose service names - serviceNameNRI = "nri-postgresql" serviceNamePostgres96 = "postgres-9-6" serviceNamePostgresLatest = "postgres-latest-supported" + defaultBinaryPath = "/nri-postgresql" + integrationContainer = "nri-postgresql" ) -func executeDockerCompose(serviceName string, envVars []string) (string, string, error) { - cmdLine := []string{"compose", "run"} - for i := range envVars { - cmdLine = append(cmdLine, "-e") - cmdLine = append(cmdLine, envVars[i]) - } - cmdLine = append(cmdLine, serviceName) - fmt.Printf("executing: docker %s\n", strings.Join(cmdLine, " ")) - cmd := exec.Command("docker", cmdLine...) - var outbuf, errbuf bytes.Buffer - cmd.Stdout = &outbuf - cmd.Stderr = &errbuf - err := cmd.Run() - - stdout := outbuf.String() - stderr := errbuf.String() - return stdout, stderr, err -} - func TestMain(m *testing.M) { flag.Parse() result := m.Run() @@ -51,35 +34,27 @@ func TestMain(m *testing.M) { func TestSuccessConnection(t *testing.T) { t.Parallel() - defaultEnvVars := []string{ - "USERNAME=postgres", - "PASSWORD=example", - "DATABASE=demo", - "COLLECTION_LIST=ALL", - } testCases := []struct { - Name string - Hostname string - Schema string - EnvVars []string + Name string + Hostname string + Schema string + ExtraFlags []string }{ { Name: "Testing Metrics and inventory for Postgres v9.6.x", Hostname: serviceNamePostgres96, Schema: "jsonschema-latest.json", - EnvVars: []string{}, }, { Name: "Testing Metrics and inventory for latest Postgres supported version", Hostname: serviceNamePostgresLatest, Schema: "jsonschema-latest.json", - EnvVars: []string{}, }, { - Name: "Inventory only for latest Postgres supported version", - Hostname: serviceNamePostgresLatest, - Schema: "jsonschema-inventory-latest.json", - EnvVars: []string{"INVENTORY=true"}, + Name: "Inventory only for latest Postgres supported version", + Hostname: serviceNamePostgresLatest, + Schema: "jsonschema-inventory-latest.json", + ExtraFlags: []string{`-inventory=true`}, }, } @@ -87,68 +62,38 @@ func TestSuccessConnection(t *testing.T) { tc := tc t.Run(tc.Name, func(t *testing.T) { t.Parallel() - envVars := []string{ - fmt.Sprintf("HOSTNAME=%s", tc.Hostname), - } - envVars = append(envVars, defaultEnvVars...) - envVars = append(envVars, tc.EnvVars...) - stdout, _, err := executeDockerCompose(serviceNameNRI, envVars) - assert.Nil(t, err) + args := append([]string{`-collection_list=all`}, tc.ExtraFlags...) + stdout, stderr, err := simulation.RunIntegration(tc.Hostname, integrationContainer, defaultBinaryPath, defaultUser, defaultPassword, defaultDB, args...) + assert.Empty(t, stderr) + assert.NoError(t, err) assert.NotEmpty(t, stdout) - err = validateJSONSchema(tc.Schema, stdout) - assert.Nil(t, err) + err = simulation.ValidateJSONSchema(tc.Schema, stdout) + assert.NoError(t, err) }) } } func TestMissingRequiredVars(t *testing.T) { - envVars := []string{ - "HOSTNAME=" + serviceNamePostgresLatest, - "DATABASE=demo", - } - _, stderr, err := executeDockerCompose(serviceNameNRI, envVars) - assert.NotNil(t, err) + // Temporarily set username and password to nil to test missing credentials + origUser, origPsw := defaultUser, defaultPassword + defaultUser, defaultPassword = nil, nil + defer func() { + defaultUser, defaultPassword = origUser, origPsw + }() + + _, stderr, err := simulation.RunIntegration(serviceNamePostgresLatest, integrationContainer, defaultBinaryPath, defaultUser, defaultPassword, defaultDB) + assert.Error(t, err) assert.Contains(t, stderr, "invalid configuration: must specify a username and password") } func TestIgnoringDB(t *testing.T) { - envVars := []string{ - "HOSTNAME=" + serviceNamePostgresLatest, - "USERNAME=postgres", - "PASSWORD=example", - "DATABASE=demo", - "COLLECTION_LIST=ALL", // The instance has 2 DB: 'demo' and 'postgres' - `COLLECTION_IGNORE_DATABASE_LIST=["demo"]`, + args := []string{ + `-collection_list=all`, + `-collection_ignore_database_list=["demo"]`, } - stdout, _, err := executeDockerCompose(serviceNameNRI, envVars) - assert.Nil(t, err) + stdout, stderr, err := simulation.RunIntegration(serviceNamePostgresLatest, integrationContainer, defaultBinaryPath, defaultUser, defaultPassword, defaultDB, args...) + assert.NoError(t, err) + assert.Empty(t, stderr) assert.Contains(t, stdout, `"database:postgres"`) assert.NotContains(t, stdout, `"database:demo"`) } - -func validateJSONSchema(fileName string, input string) error { - pwd, err := os.Getwd() - if err != nil { - log.Error(err.Error()) - return err - } - schemaURI := fmt.Sprintf("file://%s", filepath.Join(pwd, "testdata", fileName)) - log.Info("loading schema from %s", schemaURI) - schemaLoader := gojsonschema.NewReferenceLoader(schemaURI) - documentLoader := gojsonschema.NewStringLoader(input) - - result, err := gojsonschema.Validate(schemaLoader, documentLoader) - if err != nil { - return fmt.Errorf("Error loading JSON schema, error: %v", err) - } - - if result.Valid() { - return nil - } - fmt.Printf("Errors for JSON schema: '%s'\n", schemaURI) - for _, desc := range result.Errors() { - fmt.Printf("\t- %s\n", desc) - } - fmt.Printf("\n") - return fmt.Errorf("The output of the integration doesn't have expected JSON format") -} diff --git a/tests/postgresqlperf_test.go b/tests/postgresqlperf_test.go new file mode 100644 index 00000000..0b3cfbd8 --- /dev/null +++ b/tests/postgresqlperf_test.go @@ -0,0 +1,185 @@ +//go:build integration + +package tests + +import ( + "encoding/json" + "flag" + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/newrelic/nri-postgresql/tests/simulation" + "github.com/stretchr/testify/assert" +) + +var ( + oldestSupportedPerf = "postgresql-perf-oldest" + latestSupportedPerf = "postgresql-perf-latest" + unsupportedPerf = "postgresql-noext" + perfContainers = []string{oldestSupportedPerf, latestSupportedPerf} + nonPerfContainers = []string{unsupportedPerf} + integrationContainer = "nri_postgresql" + + defaultBinPath = "/nri-postgresql" + defaultUser = "dbuser" + defaultPass = "dbpassword" + defaultPort = 5432 + defaultDB = "demo" + testDB = "titanic" + + // cli flags + container = flag.String("container", integrationContainer, "container where the integration is installed") + binaryPath = flag.String("bin", defaultBinPath, "Integration binary path") + user = flag.String("user", defaultUser, "Postgresql user name") + psw = flag.String("psw", defaultPass, "Postgresql user password") + port = flag.Int("port", defaultPort, "Postgresql port") + database = flag.String("database", defaultDB, "Postgresql database") +) + +func TestMain(m *testing.M) { + flag.Parse() + result := m.Run() + os.Exit(result) +} + +func TestIntegrationWithDatabaseLoadPerfEnabled(t *testing.T) { + tests := []struct { + name string + expectedOrder []string + containers []string + args []string + }{ + { + name: "Performance metrics collection test", + expectedOrder: []string{ + "PostgresqlInstanceSample", + "PostgresSlowQueries", + "PostgresWaitEvents", + "PostgresBlockingSessions", + "PostgresIndividualQueries", + "PostgresExecutionPlanMetrics", + }, + containers: perfContainers, + args: []string{`-collection_list=all`, `-enable_query_monitoring=true`}, + }, + { + name: "Performance metrics collection test without collection list", + expectedOrder: []string{ + "PostgresqlInstanceSample", + }, + containers: perfContainers, + args: []string{`-enable_query_monitoring=true`}, + }, + { + name: "Performance metrics collection test without query monitoring enabled", + expectedOrder: []string{ + "PostgresqlInstanceSample", + }, + containers: perfContainers, + args: []string{`-collection_list=all`}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for _, container := range tt.containers { + t.Run(container, func(t *testing.T) { + // Create simulation controller + controller := simulation.NewSimulationController(container) + // Start all simulations + done := controller.StartAllSimulations(t) + + time.Sleep(30 * time.Second) + + stdout, stderr, err := simulation.RunIntegration(container, integrationContainer, *binaryPath, user, psw, database, tt.args...) + if stderr != "" { + fmt.Println(stderr) + } + assert.NoError(t, err, "Running Integration Failed") + + samples := strings.Split(stdout, "\n") + count := 0 + + for idx, sample := range samples { + sample = strings.TrimSpace(sample) + if sample == "" { + continue + } + + // Validate JSON + var j map[string]interface{} + err = json.Unmarshal([]byte(sample), &j) + assert.NoError(t, err, "Sample %d - Integration Output Is An Invalid JSONs", idx) + + // Validate sample type + t.Run(fmt.Sprintf("Validating JSON Schema For %s", tt.expectedOrder[count]), func(t *testing.T) { + sampleType := tt.expectedOrder[count] + if !strings.Contains(sample, sampleType) { + t.Errorf("Integration output does not contain: %s", tt.expectedOrder[count]) + } + + // Validate against schema + schemaFileName := simulation.GetSchemaFileName(sampleType) + err = simulation.ValidateJSONSchema(schemaFileName, sample) + assert.NoError(t, err, "Sample %d (%s) failed schema validation", idx, sampleType) + }) + + count++ + } + + // Wait for all simulations to complete + <-done + }) + } + }) + } +} + +func TestIntegrationUnsupportedDatabase(t *testing.T) { + tests := []struct { + name string + containers []string + args []string + }{ + { + name: "Performance metrics collection with unsupported database - perf enabled", + containers: nonPerfContainers, + args: []string{`-collection_list=all`, `-enable_query_monitoring=true`}, + }, + { + name: "Performance metrics collection with unsupported database - perf disabled", + containers: nonPerfContainers, + args: []string{`-collection_list=all`, `-enable_query_monitoring=false`}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for _, container := range tt.containers { + t.Run(container, func(t *testing.T) { + stdout, stderr, err := simulation.RunIntegration(container, integrationContainer, *binaryPath, user, psw, database, tt.args...) + if stderr != "" { + fmt.Println(stderr) + } + assert.NoError(t, err, "Running Integration Failed") + + // Validate JSON format + var j map[string]interface{} + err = json.Unmarshal([]byte(stdout), &j) + assert.NoError(t, err, "Integration Output Is An Invalid JSON") + + // Verify it's a PostgresqlInstanceSample + assert.Contains(t, stdout, "PostgresqlInstanceSample", + "Integration output does not contain PostgresqlInstanceSample") + + // Validate against schema + err = simulation.ValidateJSONSchema("jsonschema-latest.json", stdout) + assert.NoError(t, err, "Output failed schema validation") + }) + } + }) + } +} diff --git a/tests/simulation/helpers.go b/tests/simulation/helpers.go new file mode 100644 index 00000000..839b5d9c --- /dev/null +++ b/tests/simulation/helpers.go @@ -0,0 +1,123 @@ +//nolint:all +package simulation + +import ( + "bytes" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + + "github.com/newrelic/infra-integrations-sdk/v3/log" + "github.com/xeipuuv/gojsonschema" +) + +// ExecInContainer executes a command in a specified container +func ExecInContainer(container string, command []string, envVars ...string) (string, string, error) { + cmdLine := make([]string, 0, 3+len(command)) + cmdLine = append(cmdLine, "exec", "-i") + + for _, envVar := range envVars { + cmdLine = append(cmdLine, "-e", envVar) + } + + cmdLine = append(cmdLine, container) + cmdLine = append(cmdLine, command...) + + log.Debug("executing: docker %s", strings.Join(cmdLine, " ")) + + cmd := exec.Command("docker", cmdLine...) + + var outbuf, errbuf bytes.Buffer + cmd.Stdout = &outbuf + cmd.Stderr = &errbuf + + err := cmd.Run() + stdout := outbuf.String() + stderr := errbuf.String() + + if err != nil { + return stdout, stderr, err + } + + return stdout, stderr, nil +} + +// RunIntegration executes the integration binary with the provided arguments +func RunIntegration(targetContainer, integrationContainer, binaryPath string, username, password *string, database *string, args ...string) (string, string, error) { + command := []string{binaryPath} + + if username != nil { + command = append(command, "-username", *username) + } + if password != nil { + command = append(command, "-password", *password) + } + + // Always use port 5432 for integration runs + command = append(command, "-port", "5432") + + if database != nil { + command = append(command, "-database", *database) + } + if targetContainer != "" { + command = append(command, "-hostname", targetContainer) + } + + for _, arg := range args { + command = append(command, arg) + } + + stdout, stderr, err := ExecInContainer(integrationContainer, command) + if stderr != "" { + log.Debug("Integration command Standard Error: ", stderr) + } + + return stdout, stderr, err +} + +// ValidateJSONSchema validates a JSON string against a schema file +func ValidateJSONSchema(fileName string, input string) error { + pwd, err := os.Getwd() + if err != nil { + log.Error(err.Error()) + return err + } + + schemaURI := fmt.Sprintf("file://%s", filepath.Join(pwd, "testdata", fileName)) + log.Info("loading schema from %s", schemaURI) + + schemaLoader := gojsonschema.NewReferenceLoader(schemaURI) + documentLoader := gojsonschema.NewStringLoader(input) + + result, err := gojsonschema.Validate(schemaLoader, documentLoader) + if err != nil { + return fmt.Errorf("error loading JSON schema: %v", err) + } + + if result.Valid() { + return nil + } + + fmt.Printf("Errors for JSON schema: '%s'\n", schemaURI) + for _, desc := range result.Errors() { + fmt.Printf("\t- %s\n", desc) + } + fmt.Println() + + return fmt.Errorf("the output of the integration doesn't have expected JSON format") +} + +// GetSchemaFileName returns the appropriate schema filename for a given sample type +func GetSchemaFileName(sampleType string) string { + schemaMap := map[string]string{ + "PostgresqlInstanceSample": "jsonschema-latest.json", + "PostgresSlowQueries": "slow-queries-schema.json", + "PostgresWaitEvents": "wait-events-schema.json", + "PostgresBlockingSessions": "blocking-sessions-schema.json", + "PostgresIndividualQueries": "individual-queries-schema.json", + "PostgresExecutionPlanMetrics": "execution-plan-schema.json", + } + return schemaMap[sampleType] +} diff --git a/tests/simulation/sim_queries.go b/tests/simulation/sim_queries.go new file mode 100644 index 00000000..bfe3bcf9 --- /dev/null +++ b/tests/simulation/sim_queries.go @@ -0,0 +1,359 @@ +//nolint:all +package simulation + +import ( + "fmt" + "net/url" + "testing" + "time" + + "github.com/jmoiron/sqlx" + _ "github.com/lib/pq" + + "github.com/newrelic/infra-integrations-sdk/v3/log" + "github.com/stretchr/testify/require" +) + +const ( + defaultUser = "dbuser" + defaultPass = "dbpassword" + testDB = "titanic" +) + +func openDB(targetContainer string) (*sqlx.DB, error) { + dbPort := getPortForContainer(targetContainer) + + connectionURL := &url.URL{ + Scheme: "postgres", + User: url.UserPassword(defaultUser, defaultPass), + Host: fmt.Sprintf("%s:%d", "localhost", dbPort), + Path: testDB, + } + + query := url.Values{} + query.Add("connect_timeout", "10") + query.Add("sslmode", "disable") + + connectionURL.RawQuery = query.Encode() + dsn := connectionURL.String() + db, err := sqlx.Open("postgres", dsn) + if err != nil { + return nil, fmt.Errorf("cannot connect to db: %s", err) + } + return db, nil +} + +func getPortForContainer(container string) int { + switch container { + case "postgresql-perf-latest": + return 5432 + case "postgresql-perf-oldest": + return 6432 + case "postgresql-noext": + return 7432 + default: + return 5432 + } +} + +// SimulationController handles coordinating multiple database simulations +type SimulationController struct { + targetContainer string + envVars []string +} + +// NewSimulationController creates a new controller for database simulations +func NewSimulationController(targetContainer string, envVars ...string) *SimulationController { + return &SimulationController{ + targetContainer: targetContainer, + envVars: envVars, + } +} + +// StartAllSimulations starts all simulation routines concurrently +func (sc *SimulationController) StartAllSimulations(t *testing.T) chan struct{} { + done := make(chan struct{}) + + go func() { + defer close(done) + + // Create error channel to collect errors from goroutines + errChan := make(chan error, 6) + + // Start all simulations in separate goroutines + go func() { + SimulateQueries(t, sc.targetContainer) + errChan <- nil + }() + + go func() { + SimulateSlowQueries(t, sc.targetContainer) + errChan <- nil + }() + + for pclass := 1; pclass <= 3; pclass++ { + go func() { + SimulateWaitEvents(t, sc.targetContainer, pclass) + errChan <- nil + }() + } + + go func() { + SimulateBlockingSessions(t, sc.targetContainer) + errChan <- nil + }() + + // Wait for all goroutines to complete + for i := 0; i < 6; i++ { + if err := <-errChan; err != nil { + log.Error("Error in simulation routine: %v", err) + t.Error(err) + } + } + }() + + return done +} + +func ExecuteQuery(t *testing.T, query string, targetContainer string, delay int) { + db, err := openDB(targetContainer) + require.NoError(t, err) + defer db.Close() + + _, err = db.Exec(query) + require.NoError(t, err) + time.Sleep(time.Duration(delay) * time.Millisecond) +} + +func SimulateQueries(t *testing.T, targetContainer string) { + t.Helper() + for _, query := range SimpleQueries() { + ExecuteQuery(t, query, targetContainer, 100) + } +} + +func SimulateSlowQueries(t *testing.T, targetContainer string) { + t.Helper() + for _, query := range SlowQueries() { + ExecuteQuery(t, query, targetContainer, 500) + } +} + +func SimulateWaitEvents(t *testing.T, targetContainer string, pclass int) { + t.Helper() + + queries := WaitEventQueries(pclass) + + // Start the locking transaction in a goroutine + go func() { + ExecuteQuery(t, queries.LockingQuery, targetContainer, 100) + }() + + // Wait for first transaction started + time.Sleep(2 * time.Second) + + // Run the blocked transaction + ExecuteQuery(t, queries.BlockedQuery, targetContainer, 100) + + time.Sleep(30 * time.Second) +} + +func SimulateBlockingSessions(t *testing.T, targetContainer string) { + t.Helper() + + queries := BlockingQueries() + + db, err := openDB(targetContainer) + require.NoError(t, err) + defer db.Close() + + // Start the first transaction that will hold the lock + tx1, err := db.Begin() + require.NoError(t, err) + defer tx1.Rollback() + + // Execute the locking query + _, err = tx1.Exec(queries.HoldLockQuery) + require.NoError(t, err) + + // Start the blocking query in a separate goroutine + go func() { + time.Sleep(2 * time.Second) // Wait for a bit before trying to acquire lock + + tx2, err := db.Begin() + if err != nil { + t.Error(err) + return + } + defer tx2.Rollback() + + // This query will block waiting for tx1's lock + tx2.Exec(queries.BlockedQuery) + // We don't check for errors here since this might timeout + }() + + // Hold the lock for a few seconds, then release it + time.Sleep(5 * time.Second) + tx1.Commit() +} + +func SimpleQueries() []string { + return []string{ + // Basic queries that will generate typical workload + "SELECT COUNT(*) FROM passenger WHERE survived = 1", + "SELECT class, COUNT(*) FROM passenger GROUP BY class", + "SELECT * FROM passenger WHERE fare > 100 ORDER BY fare DESC LIMIT 10", + "SELECT sex, AVG(age) as avg_age FROM passenger GROUP BY sex", + "SELECT * FROM passenger WHERE name LIKE '%John%'", + } +} + +func SlowQueries() []string { + return []string{ + // Age-based survival analysis + `WITH age_groups AS ( + SELECT + CASE + WHEN age < 18 THEN 'child' + WHEN age < 50 THEN 'adult' + ELSE 'elderly' + END as age_group, + survived + FROM passenger + ) + SELECT + age_group, + COUNT(*) as total, + SUM(survived::int) as survived_count, + ROUND(AVG(survived::int) * 100, 2) as survival_rate + FROM age_groups + GROUP BY age_group + ORDER BY survival_rate DESC`, + + // Multiple self-joins analysis + `SELECT DISTINCT p1.name, p1.class, p1.fare + FROM passenger p1 + JOIN passenger p2 ON p1.fare = p2.fare AND p1.passengerid != p2.passengerid + JOIN passenger p3 ON p2.class = p3.class AND p2.passengerid != p3.passengerid + WHERE p1.survived = 1 + ORDER BY p1.fare DESC`, + + // Subquery with expensive sort + `SELECT *, + (SELECT COUNT(*) + FROM passenger p2 + WHERE p2.fare > p1.fare) as more_expensive_tickets + FROM passenger p1 + ORDER BY more_expensive_tickets DESC`, + + // Complex aggregation with JSON + `SELECT + p1.class, + p1.survived, + COUNT(*) as group_size, + AVG(p1.age) as avg_age, + STRING_AGG(DISTINCT p1.name, ', ' ORDER BY p1.name) as passenger_names, + ( + SELECT JSON_AGG( + JSON_BUILD_OBJECT( + 'name', p2.name, + 'fare', p2.fare + ) + ) + FROM passenger p2 + WHERE p2.class = p1.class + AND p2.survived = p1.survived + ) as similar_fare_passengers + FROM passenger p1 + GROUP BY p1.class, p1.survived + ORDER BY p1.class, p1.survived`, + + // Statistical analysis with percentiles + `SELECT + p1.class, + p1.sex, + PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY p1.age) as median_age, + PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY p1.fare) as median_fare, + COUNT(*) FILTER (WHERE p1.survived = 1)::float / COUNT(*) as survival_rate, + ( + SELECT array_agg(DISTINCT p2.embarked) + FROM passenger p2 + WHERE p2.class = p1.class AND p2.sex = p1.sex + ) as embarkation_points + FROM passenger p1 + GROUP BY p1.class, p1.sex + HAVING COUNT(*) > 10 + ORDER BY p1.class, p1.sex`, + + // Decile analysis with window functions + `WITH fare_ranks AS ( + SELECT + *, + NTILE(10) OVER (ORDER BY fare) as fare_decile + FROM passenger + ), + age_ranks AS ( + SELECT + *, + NTILE(10) OVER (ORDER BY age) as age_decile + FROM passenger + WHERE age IS NOT NULL + ) + SELECT + fr.fare_decile, + ar.age_decile, + COUNT(*) as passenger_count, + SUM(fr.survived::int) as survivors, + AVG(fr.fare) as avg_fare, + AVG(ar.age) as avg_age, + array_agg(DISTINCT fr.class) as class_distribution + FROM fare_ranks fr + JOIN age_ranks ar ON fr.passengerid = ar.passengerid + GROUP BY fr.fare_decile, ar.age_decile + ORDER BY fr.fare_decile, ar.age_decile`, + } +} + +func BlockingQueries() struct { + HoldLockQuery string + BlockedQuery string +} { + return struct { + HoldLockQuery string + BlockedQuery string + }{ + HoldLockQuery: ` +BEGIN; +SELECT * FROM passenger WHERE passengerid = 100 FOR UPDATE; +`, + BlockedQuery: ` +BEGIN; +SELECT * FROM passenger WHERE passengerid = 100 FOR UPDATE; +`, + } +} + +func WaitEventQueries(pclass int) struct { + LockingQuery string + BlockedQuery string +} { + return struct { + LockingQuery string + BlockedQuery string + }{ + LockingQuery: fmt.Sprintf(` +BEGIN; +UPDATE passenger +SET fare = fare * 1.01 +WHERE pclass = %d; +SELECT pg_sleep(30); +COMMIT;`, pclass), + + BlockedQuery: fmt.Sprintf(` +BEGIN; +UPDATE passenger +SET fare = fare * 0.99 +WHERE pclass = %d; +COMMIT;`, pclass), + } +} diff --git a/tests/testdata/blocking-sessions-schema.json b/tests/testdata/blocking-sessions-schema.json new file mode 100644 index 00000000..677db04e --- /dev/null +++ b/tests/testdata/blocking-sessions-schema.json @@ -0,0 +1,98 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["name", "protocol_version", "integration_version", "data"], + "properties": { + "name": { + "type": "string", + "const": "com.newrelic.postgresql" + }, + "protocol_version": { + "type": "string" + }, + "integration_version": { + "type": "string" + }, + "data": { + "type": "array", + "items": { + "type": "object", + "required": ["entity", "metrics", "inventory", "events"], + "properties": { + "entity": { + "type": "object", + "required": ["name", "type", "id_attributes"], + "properties": { + "name": { + "type": "string" + }, + "type": { + "type": "string", + "const": "pg-instance" + }, + "id_attributes": { + "type": "array" + } + } + }, + "metrics": { + "type": "array", + "items": { + "type": "object", + "required": [ + "blocked_pid", + "blocked_query", + "blocked_query_start", + "blocking_pid", + "blocking_query", + "blocking_query_start", + "database_name", + "event_type" + ], + "properties": { + "blocked_pid": { + "type": "integer", + "minimum": 0 + }, + "blocked_query": { + "type": "string" + }, + "blocked_query_start": { + "type": "string", + "format": "date-time" + }, + "blocking_pid": { + "type": "integer", + "minimum": 0 + }, + "blocking_query": { + "type": "string" + }, + "blocking_query_start": { + "type": "string", + "format": "date-time" + }, + "database_name": { + "type": "string" + }, + "event_type": { + "type": "string", + "const": "PostgresBlockingSessions" + } + }, + "additionalProperties": false + } + }, + "inventory": { + "type": "object" + }, + "events": { + "type": "array" + } + }, + "additionalProperties": false + } + } + }, + "additionalProperties": false + } \ No newline at end of file diff --git a/tests/testdata/execution-plan-schema.json b/tests/testdata/execution-plan-schema.json new file mode 100644 index 00000000..19091b2d --- /dev/null +++ b/tests/testdata/execution-plan-schema.json @@ -0,0 +1,187 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["name", "protocol_version", "integration_version", "data"], + "properties": { + "name": { + "type": "string", + "const": "com.newrelic.postgresql" + }, + "protocol_version": { + "type": "string" + }, + "integration_version": { + "type": "string" + }, + "data": { + "type": "array", + "items": { + "type": "object", + "required": ["entity", "metrics", "inventory", "events"], + "properties": { + "entity": { + "type": "object", + "required": ["name", "type", "id_attributes"], + "properties": { + "name": { + "type": "string" + }, + "type": { + "type": "string", + "const": "pg-instance" + }, + "id_attributes": { + "type": "array" + } + } + }, + "metrics": { + "type": "array", + "items": { + "type": "object", + "required": [ + "database_name", + "event_type", + "level_id", + "node_type", + "plan_id", + "plan_rows", + "query_id", + "startup_cost", + "total_cost" + ], + "properties": { + "actual_loops": { + "type": "integer", + "minimum": 0 + }, + "actual_rows": { + "type": "integer", + "minimum": 0 + }, + "actual_startup_time": { + "type": "integer", + "minimum": 0 + }, + "actual_total_time": { + "type": "integer", + "minimum": 0 + }, + "alias": { + "type": "string" + }, + "async_capable": { + "type": ["boolean", "integer"] + }, + "database_name": { + "type": "string" + }, + "event_type": { + "type": "string", + "const": "PostgresExecutionPlanMetrics" + }, + "index_name": { + "type": "string" + }, + "level_id": { + "type": "integer", + "minimum": 0 + }, + "local_dirtied_blocks": { + "type": "integer", + "minimum": 0 + }, + "local_hit_blocks": { + "type": "integer", + "minimum": 0 + }, + "local_read_blocks": { + "type": "integer", + "minimum": 0 + }, + "local_written_blocks": { + "type": "integer", + "minimum": 0 + }, + "node_type": { + "type": "string" + }, + "parallel_aware": { + "type": ["boolean", "integer"] + }, + "plan_id": { + "type": "string" + }, + "plan_rows": { + "type": "integer", + "minimum": 0 + }, + "plan_width": { + "type": "integer", + "minimum": 0 + }, + "query_id": { + "type": "string" + }, + "query_text": { + "type": "string" + }, + "relation_name": { + "type": "string" + }, + "rows_removed_by_filter": { + "type": "integer", + "minimum": 0 + }, + "scan_direction": { + "type": "string" + }, + "shared_dirtied_blocks": { + "type": "integer", + "minimum": 0 + }, + "shared_hit_blocks": { + "type": "integer", + "minimum": 0 + }, + "shared_read_blocks": { + "type": "integer", + "minimum": 0 + }, + "shared_written_blocks": { + "type": "integer", + "minimum": 0 + }, + "startup_cost": { + "type": "number", + "minimum": 0 + }, + "temp_read_blocks": { + "type": "integer", + "minimum": 0 + }, + "temp_written_blocks": { + "type": "integer", + "minimum": 0 + }, + "total_cost": { + "type": "number", + "minimum": 0 + } + }, + "additionalProperties": false + } + }, + "inventory": { + "type": "object" + }, + "events": { + "type": "array" + } + }, + "additionalProperties": false + } + } + }, + "additionalProperties": false +} \ No newline at end of file diff --git a/tests/testdata/individual-queries-schema.json b/tests/testdata/individual-queries-schema.json new file mode 100644 index 00000000..387810b3 --- /dev/null +++ b/tests/testdata/individual-queries-schema.json @@ -0,0 +1,105 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": [ + "name", + "protocol_version", + "integration_version", + "data" + ], + "properties": { + "name": { + "type": "string", + "const": "com.newrelic.postgresql" + }, + "protocol_version": { + "type": "string" + }, + "integration_version": { + "type": "string" + }, + "data": { + "type": "array", + "items": { + "type": "object", + "required": [ + "entity", + "metrics", + "inventory", + "events" + ], + "properties": { + "entity": { + "type": "object", + "required": [ + "name", + "type", + "id_attributes" + ], + "properties": { + "name": { + "type": "string" + }, + "type": { + "type": "string", + "const": "pg-instance" + }, + "id_attributes": { + "type": "array" + } + } + }, + "metrics": { + "type": "array", + "items": { + "type": "object", + "required": [ + "event_type", + "query_id", + "query_text", + "database_name", + "plan_id", + "exec_time_ms" + ], + "properties": { + "cpu_time_ms": { + "type": "number", + "minimum": 0 + }, + "exec_time_ms": { + "type": "number", + "minimum": 0 + }, + "database_name": { + "type": "string" + }, + "event_type": { + "type": "string", + "const": "PostgresIndividualQueries" + }, + "plan_id": { + "type": "string" + }, + "query_id": { + "type": "string" + }, + "query_text": { + "type": "string" + } + }, + "additionalProperties": false + } + }, + "inventory": { + "type": "object" + }, + "events": { + "type": "array" + } + }, + "additionalProperties": false + } + } + }, + "additionalProperties": false +} \ No newline at end of file diff --git a/tests/testdata/slow-queries-schema.json b/tests/testdata/slow-queries-schema.json new file mode 100644 index 00000000..cc3427fd --- /dev/null +++ b/tests/testdata/slow-queries-schema.json @@ -0,0 +1,121 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": [ + "name", + "protocol_version", + "integration_version", + "data" + ], + "properties": { + "name": { + "type": "string", + "const": "com.newrelic.postgresql" + }, + "protocol_version": { + "type": "string" + }, + "integration_version": { + "type": "string" + }, + "data": { + "type": "array", + "items": { + "type": "object", + "required": [ + "entity", + "metrics", + "inventory", + "events" + ], + "properties": { + "entity": { + "type": "object", + "required": [ + "name", + "type", + "id_attributes" + ], + "properties": { + "name": { + "type": "string" + }, + "type": { + "type": "string", + "const": "pg-instance" + }, + "id_attributes": { + "type": "array" + } + } + }, + "metrics": { + "type": "array", + "items": { + "type": "object", + "required": [ + "event_type", + "query_id", + "query_text", + "database_name", + "avg_elapsed_time_ms", + "execution_count", + "collection_timestamp" + ], + "properties": { + "avg_disk_reads": { + "type": "integer", + "minimum": 0 + }, + "avg_disk_writes": { + "type": "integer", + "minimum": 0 + }, + "avg_elapsed_time_ms": { + "type": "number", + "minimum": 0 + }, + "collection_timestamp": { + "type": "string", + "format": "date-time" + }, + "database_name": { + "type": "string" + }, + "event_type": { + "type": "string", + "const": "PostgresSlowQueries" + }, + "execution_count": { + "type": "integer", + "minimum": 0 + }, + "query_id": { + "type": "string" + }, + "query_text": { + "type": "string" + }, + "schema_name": { + "type": "string" + }, + "statement_type": { + "type": "string" + } + }, + "additionalProperties": false + } + }, + "inventory": { + "type": "object" + }, + "events": { + "type": "array" + } + }, + "additionalProperties": false + } + } + }, + "additionalProperties": false +} \ No newline at end of file diff --git a/tests/testdata/wait-events-schema.json b/tests/testdata/wait-events-schema.json new file mode 100644 index 00000000..44a39904 --- /dev/null +++ b/tests/testdata/wait-events-schema.json @@ -0,0 +1,95 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["name", "protocol_version", "integration_version", "data"], + "properties": { + "name": { + "type": "string", + "const": "com.newrelic.postgresql" + }, + "protocol_version": { + "type": "string" + }, + "integration_version": { + "type": "string" + }, + "data": { + "type": "array", + "items": { + "type": "object", + "required": ["entity", "metrics", "inventory", "events"], + "properties": { + "entity": { + "type": "object", + "required": ["name", "type", "id_attributes"], + "properties": { + "name": { + "type": "string" + }, + "type": { + "type": "string", + "const": "pg-instance" + }, + "id_attributes": { + "type": "array" + } + } + }, + "metrics": { + "type": "array", + "items": { + "type": "object", + "required": [ + "collection_timestamp", + "database_name", + "event_type", + "query_id", + "query_text", + "wait_category", + "wait_event_name" + ], + "properties": { + "collection_timestamp": { + "type": "string", + "format": "date-time" + }, + "database_name": { + "type": "string" + }, + "event_type": { + "type": "string", + "const": "PostgresWaitEvents" + }, + "query_id": { + "type": "string" + }, + "query_text": { + "type": "string" + }, + "total_wait_time_ms": { + "type": "number", + "minimum": 0 + }, + "wait_category": { + "type": "string" + }, + "wait_event_name": { + "type": "string" + } + }, + "additionalProperties": false + } + }, + "inventory": { + "type": "object" + }, + "events": { + "type": "array" + } + }, + "additionalProperties": false + } + } + }, + "additionalProperties": false + } \ No newline at end of file