Skip to content

Commit

Permalink
Merge pull request #594 from dolthub/aaron/join-iter-memory-leak
Browse files Browse the repository at this point in the history
sql/plan: Fix join iterators to always Close() their secondary RowIter. Fix Subquery to Dispose its subquery.
  • Loading branch information
zachmu authored Oct 20, 2021
2 parents 4b5e169 + 60e129c commit 400af43
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 9 deletions.
2 changes: 2 additions & 0 deletions enginetest/enginetests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3630,6 +3630,8 @@ func TestQueryWithContext(t *testing.T, ctx *sql.Context, e *sqle.Engine, q stri
require.NoError(err, "Unexpected error for query %s", q)

checkResults(t, require, expected, expectedCols, sch, rows, q)

require.Equal(0, ctx.Memory.NumCaches())
}

func checkResults(t *testing.T, require *require.Assertions, expected []sql.Row, expectedCols []*sql.Column, sch sql.Schema, rows []sql.Row, q string) {
Expand Down
6 changes: 6 additions & 0 deletions sql/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,9 @@ func (m *MemoryManager) Free() {
}
}
}

func (m *MemoryManager) NumCaches() int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.caches)
}
4 changes: 4 additions & 0 deletions sql/plan/indexed_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ func (i *indexedJoinIter) loadSecondary() (sql.Row, error) {
secondaryRow, err := i.secondary.Next()
if err != nil {
if err == io.EOF {
err = i.secondary.Close(i.ctx)
i.secondary = nil
if err != nil {
return nil, err
}
i.primaryRow = nil
return nil, io.EOF
}
Expand Down
13 changes: 12 additions & 1 deletion sql/plan/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,14 +527,21 @@ func (i *joinIter) loadSecondaryInMemory() error {
break
}
if err != nil {
iter.Close(i.ctx)
return err
}

if err := i.secondaryRows.Add(row); err != nil {
iter.Close(i.ctx)
return err
}
}

err = iter.Close(i.ctx)
if err != nil {
return err
}

if len(i.secondaryRows.Get()) == 0 {
return io.EOF
}
Expand Down Expand Up @@ -578,7 +585,11 @@ func (i *joinIter) loadSecondary() (row sql.Row, err error) {
rightRow, err := i.secondary.Next()
if err != nil {
if err == io.EOF {
err = i.secondary.Close(i.ctx)
i.secondary = nil
if err != nil {
return nil, err
}
i.primaryRow = nil

// If we got to this point and the mode is still unknown it means
Expand Down Expand Up @@ -679,7 +690,6 @@ func (i *joinIter) buildRow(primary, secondary sql.Row) sql.Row {

func (i *joinIter) Close(ctx *sql.Context) (err error) {
i.Dispose()
i.secondary = nil

if i.primary != nil {
if err = i.primary.Close(ctx); err != nil {
Expand All @@ -693,6 +703,7 @@ func (i *joinIter) Close(ctx *sql.Context) (err error) {

if i.secondary != nil {
err = i.secondary.Close(ctx)
i.secondary = nil
}

return err
Expand Down
20 changes: 12 additions & 8 deletions sql/plan/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,19 +274,23 @@ func (i *trackedRowIter) done() {
}
}

func (i *trackedRowIter) Dispose() {
if i.node != nil {
Inspect(i.node, func(node sql.Node) bool {
sql.Dispose(node)
return true
})
}
InspectExpressions(i.node, func(e sql.Expression) bool {
func disposeNode(n sql.Node) {
Inspect(n, func(node sql.Node) bool {
sql.Dispose(node)
return true
})
InspectExpressions(n, func(e sql.Expression) bool {
sql.Dispose(e)
return true
})
}

func (i *trackedRowIter) Dispose() {
if i.node != nil {
disposeNode(i.node)
}
}

func (i *trackedRowIter) Next() (sql.Row, error) {
row, err := i.iter.Next()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions sql/plan/subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,4 +439,5 @@ func (s *Subquery) Dispose() {
s.disposeFunc()
s.disposeFunc = nil
}
disposeNode(s.Query)
}

0 comments on commit 400af43

Please sign in to comment.