Skip to content

Commit

Permalink
database_observability: fetch indexes and foreign keys info
Browse files Browse the repository at this point in the history
Extend the tableSpec with indexes and foreign keys info.
  • Loading branch information
cristiangreco committed Feb 17, 2025
1 parent d414035 commit 4f82b7a
Show file tree
Hide file tree
Showing 2 changed files with 314 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,35 @@ const (
WHERE
TABLE_SCHEMA = ? AND TABLE_NAME = ?
ORDER BY ORDINAL_POSITION ASC`

selectIndexNames = `
SELECT
index_name,
seq_in_index,
column_name,
nullable,
non_unique,
index_type
FROM
information_schema.statistics
WHERE
table_schema = ? and table_name = ?
ORDER BY table_name, index_name, seq_in_index`

// Ignore 'PRIMARY' constraints, as they're already covered by the query above
selectForeignKeys = `
SELECT
constraint_name,
column_name,
referenced_table_name,
referenced_column_name
FROM
information_schema.key_column_usage
WHERE
constraint_name <> 'PRIMARY'
AND referenced_table_schema is not null
AND table_schema = ? and table_name = ?
ORDER BY table_name, constraint_name, ordinal_position`
)

type SchemaTableArguments struct {
Expand Down Expand Up @@ -108,7 +137,9 @@ type tableInfo struct {
}

type tableSpec struct {
Columns []columnSpec `json:"columns"`
Columns []columnSpec `json:"columns"`
Indexes []indexSpec `json:"indexes,omitempty"`
ForeignKeys []foreignKey `json:"foreign_keys,omitempty"`
}
type columnSpec struct {
Name string `json:"name"`
Expand All @@ -119,6 +150,21 @@ type columnSpec struct {
DefaultValue string `json:"default_value,omitempty"`
}

type indexSpec struct {
Name string `json:"name"`
Type string `json:"type"`
Columns []string `json:"columns"`
Unique bool `json:"unique"`
Nullable bool `json:"nullable"`
}

type foreignKey struct {
Name string `json:"name"`
ColumnName string `json:"column_name"`
ReferencedTableName string `json:"referenced_table_name"`
ReferencedColumnName string `json:"referenced_column_name"`
}

func NewSchemaTable(args SchemaTableArguments) (*SchemaTable, error) {
c := &SchemaTable{
dbConnection: args.DB,
Expand Down Expand Up @@ -375,22 +421,6 @@ func (c *SchemaTable) fetchColumnsDefinitions(ctx context.Context, schemaName st
}

extra = strings.ToUpper(extra)

notNull := false
if isNullable == "NO" {
notNull = true
}

autoIncrement := false
if strings.Contains(extra, "AUTO_INCREMENT") {
autoIncrement = true
}

primaryKey := false
if columnKey == "PRI" {
primaryKey = true
}

defaultValue := ""
if columnDefault.Valid {
defaultValue = columnDefault.String
Expand All @@ -402,9 +432,9 @@ func (c *SchemaTable) fetchColumnsDefinitions(ctx context.Context, schemaName st
colSpec := columnSpec{
Name: columnName,
Type: columnType,
NotNull: notNull,
AutoIncrement: autoIncrement,
PrimaryKey: primaryKey,
NotNull: isNullable == "NO",
AutoIncrement: strings.Contains(extra, "AUTO_INCREMENT"),
PrimaryKey: columnKey == "PRI",
DefaultValue: defaultValue,
}
tblSpec.Columns = append(tblSpec.Columns, colSpec)
Expand All @@ -415,5 +445,71 @@ func (c *SchemaTable) fetchColumnsDefinitions(ctx context.Context, schemaName st
return nil, err
}

rs, err = c.dbConnection.QueryContext(ctx, selectIndexNames, schemaName, tableName)
if err != nil {
level.Error(c.logger).Log("msg", "failed to query table indexes", "schema", schemaName, "table", tableName, "err", err)
return nil, err
}
defer rs.Close()

for rs.Next() {
var indexName, columnName, indexType string
var seqInIndex, nonUnique int
var nullable sql.NullString
if err := rs.Scan(&indexName, &seqInIndex, &columnName, &nullable, &nonUnique, &indexType); err != nil {
level.Error(c.logger).Log("msg", "failed to scan table indexes", "schema", schemaName, "table", tableName, "err", err)
return nil, err
}

if nIndexes := len(tblSpec.Indexes); nIndexes > 0 && tblSpec.Indexes[nIndexes-1].Name == indexName {
lastIndex := &tblSpec.Indexes[nIndexes-1]
if len(lastIndex.Columns) != seqInIndex-1 {
level.Error(c.logger).Log("msg", "unexpected index column sequence", "schema", schemaName, "table", tableName, "index", indexName, "column", columnName)
continue
}
lastIndex.Columns = append(lastIndex.Columns, columnName)
} else {
tblSpec.Indexes = append(tblSpec.Indexes, indexSpec{
Name: indexName,
Type: indexType,
Columns: []string{columnName},
Unique: nonUnique == 0,
Nullable: nullable.Valid && nullable.String == "YES",
})
}
}

if err := rs.Err(); err != nil {
level.Error(c.logger).Log("msg", "error during iterating over table indexes result set", "schema", schemaName, "table", tableName, "err", err)
return nil, err
}

rs, err = c.dbConnection.QueryContext(ctx, selectForeignKeys, schemaName, tableName)
if err != nil {
level.Error(c.logger).Log("msg", "failed to query table foreign keys", "schema", schemaName, "table", tableName, "err", err)
return nil, err
}
defer rs.Close()

for rs.Next() {
var constraintName, columnName, referencedTableName, referencedColumnName string
if err := rs.Scan(&constraintName, &columnName, &referencedTableName, &referencedColumnName); err != nil {
level.Error(c.logger).Log("msg", "failed to scan foreign keys", "schema", schemaName, "table", tableName, "err", err)
return nil, err
}

tblSpec.ForeignKeys = append(tblSpec.ForeignKeys, foreignKey{
Name: constraintName,
ColumnName: columnName,
ReferencedTableName: referencedTableName,
ReferencedColumnName: referencedColumnName,
})
}

if err := rs.Err(); err != nil {
level.Error(c.logger).Log("msg", "error during iterating over foreign keys result set", "schema", schemaName, "table", tableName, "err", err)
return nil, err
}

return tblSpec, nil
}
Loading

0 comments on commit 4f82b7a

Please sign in to comment.