Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vai: do not discard closing statement errors #462

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions pkg/sqlcache/informer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package informer
import (
"context"
"database/sql"
"errors"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -147,7 +148,7 @@ func (i *Indexer) AfterUpsert(key string, obj any, tx transaction.Client) error
/* Satisfy cache.Indexer */

// Index returns a list of items that match the given object on the index function
func (i *Indexer) Index(indexName string, obj any) ([]any, error) {
func (i *Indexer) Index(indexName string, obj any) (result []any, err error) {
i.indexersLock.RLock()
defer i.indexersLock.RUnlock()
indexFunc := i.indexers[indexName]
Expand All @@ -173,7 +174,13 @@ func (i *Indexer) Index(indexName string, obj any) ([]any, error) {
// HACK: sql.Statement.Query does not allow to pass slices in as of go 1.19 - create an ad-hoc statement
query := fmt.Sprintf(selectQueryFmt, db.Sanitize(i.GetName()), strings.Repeat(", ?", len(values)-1))
stmt := i.Prepare(query)
defer i.CloseStmt(stmt)

defer func() {
cerr := i.CloseStmt(stmt)
if cerr != nil {
err = errors.Join(err, &db.QueryError{QueryString: query, Err: cerr})
}
}()
// HACK: Query will accept []any but not []string
params := []any{indexName}
for _, value := range values {
Expand Down
19 changes: 14 additions & 5 deletions pkg/sqlcache/informer/listoption_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,13 +454,17 @@ func (l *ListOptionIndexer) constructQuery(lo ListOptions, partitions []partitio
return queryInfo, nil
}

func (l *ListOptionIndexer) executeQuery(ctx context.Context, queryInfo *QueryInfo) (*unstructured.UnstructuredList, int, string, error) {
func (l *ListOptionIndexer) executeQuery(ctx context.Context, queryInfo *QueryInfo) (result *unstructured.UnstructuredList, total int, token string, err error) {
stmt := l.Prepare(queryInfo.query)
defer l.CloseStmt(stmt)
defer func() {
cerr := l.CloseStmt(stmt)
if cerr != nil {
err = errors.Join(err, &db.QueryError{QueryString: queryInfo.query, Err: cerr})
}
}()

var items []any
var total int
err := l.WithTransaction(ctx, false, func(tx transaction.Client) error {
err = l.WithTransaction(ctx, false, func(tx transaction.Client) error {
txStmt := tx.Stmt(stmt)
rows, err := txStmt.QueryContext(ctx, queryInfo.params...)
if err != nil {
Expand All @@ -474,7 +478,12 @@ func (l *ListOptionIndexer) executeQuery(ctx context.Context, queryInfo *QueryIn
total = len(items)
if queryInfo.countQuery != "" {
countStmt := l.Prepare(queryInfo.countQuery)
defer l.CloseStmt(countStmt)
defer func() {
cerr := l.CloseStmt(countStmt)
if cerr != nil {
err = errors.Join(err, &db.QueryError{QueryString: queryInfo.countQuery, Err: cerr})
}
}()
txStmt := tx.Stmt(countStmt)
rows, err := txStmt.QueryContext(ctx, queryInfo.countParams...)
if err != nil {
Expand Down