Skip to content

Commit

Permalink
Refactor how diagnostics are pushed through incremental; add `queri…
Browse files Browse the repository at this point in the history
…es.AST` (#404)

The `NonFatal` functionality of the incremental package has proven to be
a poor abstraction. What I essentially wanted was a way to have queries
generate diagnostics, which I could then deduplicate and collect in the
end.

Collecting diagnostics from dependencies is incorrect, because A -> B, A
-> C, B -> D, C -> D would mean diagnostics for D contain the
diagnostics from A twice. The rough idea was to instead stash reports in
the `NonFatal` area, which required the somewhat unnatural-feeling
`report.AsError` type. But this is unnecessary ceremony: all that
`NonFatal` will ever be used for is for stashing reports, so the
incremental framework should Just Do That. It's not a general library,
after all.

This PR replaces `Task.NonFatal` with `Task.Report`, which is a report
included with each task. When `Run` completes, it collects the set of
all queries that were computed (possibly from cache) and merges their
reports, and sorts it to eliminate non-determinism.

It is not immediately clear to me if having tasks return `(v T, fatal
error)` is still useful. Perhaps `(v T, ok bool)` may be more
appropriate, since that error should be logged as a diagnostic and will
probably just get thrown away.
  • Loading branch information
mcy authored Jan 8, 2025
1 parent 1618dd6 commit b5374ec
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 116 deletions.
43 changes: 31 additions & 12 deletions experimental/incremental/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
"sync/atomic"

"golang.org/x/sync/semaphore"

"github.com/bufbuild/protocompile/experimental/report"
)

// Executor is a caching executor for incremental queries.
//
// See [New], [Run], and [Invalidate].
type Executor struct {
dirty sync.RWMutex
reportOptions report.Options
dirty sync.RWMutex

// TODO: Evaluate alternatives. sync.Map is pretty bad at having predictable
// performance, and we may want to add eviction to keep memoization costs
Expand Down Expand Up @@ -64,6 +67,12 @@ func WithParallelism(n int64) ExecutorOption {
return func(e *Executor) { e.sema = semaphore.NewWeighted(n) }
}

// WithReportOptions sets the report options for reports generated by this
// executor.
func WithReportOptions(options report.Options) ExecutorOption {
return func(e *Executor) { e.reportOptions = options }
}

// Keys returns a snapshot of the keys of which queries are present (and
// memoized) in an Executor.
//
Expand Down Expand Up @@ -100,7 +109,7 @@ var runExecutorKey byte
//
// Note: this function really wants to be a method of [Executor], but it isn't
// because it's generic.
func Run[T any](ctx context.Context, e *Executor, queries ...Query[T]) (results []Result[T], expired error) {
func Run[T any](ctx context.Context, e *Executor, queries ...Query[T]) ([]Result[T], *report.Report, error) {
e.dirty.RLock()
defer e.dirty.RUnlock()

Expand All @@ -119,7 +128,7 @@ func Run[T any](ctx context.Context, e *Executor, queries ...Query[T]) (results
// Need to acquire a hold on the global semaphore to represent the root
// task we're about to execute.
if e.sema.Acquire(ctx, 1) != nil {
return nil, context.Cause(ctx)
return nil, nil, context.Cause(ctx)
}
defer e.sema.Release(1)

Expand All @@ -132,22 +141,32 @@ func Run[T any](ctx context.Context, e *Executor, queries ...Query[T]) (results
runID: generation,
}

results, expired = Resolve(root, queries...)
results, expired := Resolve(root, queries...)
if expired != nil {
return nil, expired
return nil, nil, expired
}

// Now, for each result, we need to walk their dependencies and collect
// their dependencies' non-fatal errors.
for i, query := range queries {
// Record all diagnostics generates by the queries.
report := &report.Report{Options: e.reportOptions}
dedup := make(map[*task]struct{})
record := func(t *task) {
if _, ok := dedup[t]; ok {
return
}

dedup[t] = struct{}{}
report.Diagnostics = append(report.Diagnostics, t.report.Diagnostics...)
}
for _, query := range queries {
task := e.getTask(query.Key())
record(task) // NOTE: task.deps does not contain task.
for dep := range task.deps {
r := &results[i]
r.NonFatal = append(r.NonFatal, dep.result.Load().NonFatal...)
record(dep)
}
}
report.Sort()

return results, nil
return results, report, nil
}

// Evict marks query keys as invalid, requiring those queries, and their
Expand Down Expand Up @@ -204,6 +223,6 @@ func (e *Executor) getTask(key any) *task {
return t.(*task) //nolint:errcheck
}

t, _ := e.tasks.LoadOrStore(key, new(task))
t, _ := e.tasks.LoadOrStore(key, &task{report: report.Report{Options: e.reportOptions}})
return t.(*task) //nolint:errcheck
}
24 changes: 12 additions & 12 deletions experimental/incremental/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (i ParseInt) Execute(t incremental.Task) (int, error) {

v, err := strconv.Atoi(i.Input)
if err != nil {
t.NonFatal(err)
t.Report().Errorf("%s", err)
}
if v < 0 {
return 0, fmt.Errorf("negative value: %v", v)
Expand Down Expand Up @@ -129,10 +129,10 @@ func TestSum(t *testing.T) {
incremental.WithParallelism(4),
)

result, err := incremental.Run(ctx, exec, Sum{"1,2,2,3,4"})
result, report, err := incremental.Run(ctx, exec, Sum{"1,2,2,3,4"})
require.NoError(t, err)
assert.Equal(12, result[0].Value)
assert.Empty(result[0].NonFatal)
assert.Empty(report.Diagnostics)
assert.Equal([]string{
`incremental_test.ParseInt{Input:"1"}`,
`incremental_test.ParseInt{Input:"2"}`,
Expand All @@ -142,10 +142,10 @@ func TestSum(t *testing.T) {
`incremental_test.Sum{Input:"1,2,2,3,4"}`,
}, exec.Keys())

result, err = incremental.Run(ctx, exec, Sum{"1,2,2,oops,4"})
result, report, err = incremental.Run(ctx, exec, Sum{"1,2,2,oops,4"})
require.NoError(t, err)
assert.Equal(9, result[0].Value)
assert.Len(result[0].NonFatal, 1)
assert.Len(report.Diagnostics, 1)
assert.Equal([]string{
`incremental_test.ParseInt{Input:"1"}`,
`incremental_test.ParseInt{Input:"2"}`,
Expand All @@ -166,10 +166,10 @@ func TestSum(t *testing.T) {
`incremental_test.Root{}`,
}, exec.Keys())

result, err = incremental.Run(ctx, exec, Sum{"1,2,2,3,4"})
result, report, err = incremental.Run(ctx, exec, Sum{"1,2,2,3,4"})
require.NoError(t, err)
assert.Equal(12, result[0].Value)
assert.Empty(result[0].NonFatal)
assert.Empty(report.Diagnostics)
assert.Equal([]string{
`incremental_test.ParseInt{Input:"1"}`,
`incremental_test.ParseInt{Input:"2"}`,
Expand All @@ -190,7 +190,7 @@ func TestFatal(t *testing.T) {
incremental.WithParallelism(4),
)

result, err := incremental.Run(ctx, exec, Sum{"1,2,-3,-4"})
result, _, err := incremental.Run(ctx, exec, Sum{"1,2,-3,-4"})
require.NoError(t, err)
// NOTE: This error is deterministic, because it's chosen by Sum.Execute.
assert.Equal("negative value: -3", result[0].Fatal.Error())
Expand All @@ -213,7 +213,7 @@ func TestCyclic(t *testing.T) {
incremental.WithParallelism(4),
)

result, err := incremental.Run(ctx, exec, Cyclic{Mod: 5, Step: 3})
result, _, err := incremental.Run(ctx, exec, Cyclic{Mod: 5, Step: 3})
require.NoError(t, err)
assert.Equal(
`cycle detected: `+
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestUnchanged(t *testing.T) {

for i := 0; i < runs; i++ {
exec.Evict(ParseInt{"42"})
results, _ := incremental.Run(ctx, exec, queries...)
results, _, _ := incremental.Run(ctx, exec, queries...)
for j, r := range results[1:] {
// All calls after an eviction should return true for Changed.
assert.True(r.Changed, "%d", j)
Expand All @@ -270,7 +270,7 @@ func TestUnchanged(t *testing.T) {
barrier.Wait() // Ensure all goroutines start together.
defer wg.Done()

results, _ := incremental.Run(ctx, exec, queries...)
results, _, _ := incremental.Run(ctx, exec, queries...)
for j, r := range results {
// We don't know who the winning g that gets to do the
// computation will be be, so just require that all of the
Expand All @@ -289,7 +289,7 @@ func TestUnchanged(t *testing.T) {
// Exactly one of the gs should have seen a change.
assert.Equal(int32(1), changed.Load())

results, _ = incremental.Run(ctx, exec, queries...)
results, _, _ = incremental.Run(ctx, exec, queries...)
for j, r := range results[1:] {
// All calls after computation should return false for Changed.
assert.False(r.Changed, "%d", j)
Expand Down
60 changes: 60 additions & 0 deletions experimental/incremental/queries/ast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2020-2025 Buf Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package queries

import (
"github.com/bufbuild/protocompile/experimental/ast"
"github.com/bufbuild/protocompile/experimental/incremental"
"github.com/bufbuild/protocompile/experimental/parser"
"github.com/bufbuild/protocompile/experimental/source"
)

// AST is an [incremental.Query] for the contents of a file as provided
// by a [source.Opener].
//
// AST queries with different Openers are considered distinct.
type AST struct {
source.Opener // Must be comparable.
Path string
}

var _ incremental.Query[ast.File] = AST{}

// Key implements [incremental.Query].
//
// The key for a Contents query is the query itself. This means that a single
// [incremental.Executor] can host Contents queries for multiple Openers. It
// also means that the Openers must all be comparable. As the [Opener]
// documentation states, implementations should take a pointer receiver so that
// comparison uses object identity.
func (a AST) Key() any {
return a
}

// Execute implements [incremental.Query].
func (a AST) Execute(t incremental.Task) (ast.File, error) {
t.Report().Options.Stage += stageAST

r, err := incremental.Resolve(t, File(a))
if err != nil {
return ast.File{}, err
}
if r[0].Fatal != nil {
return ast.File{}, r[0].Fatal
}

file, _ := parser.Parse(r[0].Value, t.Report())
return file, nil
}
19 changes: 11 additions & 8 deletions experimental/incremental/queries/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@ var _ incremental.Query[*report.File] = File{}
// also means that the Openers must all be comparable. As the [Opener]
// documentation states, implementations should take a pointer receiver so that
// comparison uses object identity.
func (t File) Key() any {
return t
func (f File) Key() any {
return f
}

// Execute implements [incremental.Query].
func (t File) Execute(incremental.Task) (*report.File, error) {
text, err := t.Open(t.Path)
func (f File) Execute(t incremental.Task) (*report.File, error) {
t.Report().Options.Stage += stageFile

text, err := f.Open(f.Path)
if err != nil {
r := newReport(stageFile)
r.Report.Error(&report.ErrInFile{Err: err, Path: t.Path})
return nil, r
t.Report().Errorf("%v", err).Apply(
report.InFile(f.Path),
)
return nil, err
}

return report.NewFile(t.Path, text), nil
return report.NewFile(f.Path, text), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,10 @@
// of Protocompile. It is separate from package incremental itself because it is
// Protocompile-specific.
package queries

// Values for [report.Report].SortOrder, which determine how diagnostics
// generated across parts of the compiler are sorted.
const (
stageFile int = iota * 10
stageAST
)
30 changes: 0 additions & 30 deletions experimental/incremental/queries/report.go

This file was deleted.

16 changes: 6 additions & 10 deletions experimental/incremental/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"sync/atomic"

"github.com/bufbuild/protocompile/experimental/report"
"github.com/bufbuild/protocompile/internal/iter"
)

Expand Down Expand Up @@ -50,14 +51,10 @@ func (t *Task) Context() context.Context {
return t.ctx
}

// Error adds errors to the current query, which will be propagated to all
// queries which depend on it.
//
// This will not cause the query to fail; instead, [Query].Execute should
// return false for the ok value to signal failure.
func (t *Task) NonFatal(errs ...error) {
// Report returns the diagnostic report for this task.
func (t *Task) Report() *report.Report {
t.checkDone()
t.result.NonFatal = append(t.result.NonFatal, errs...)
return &t.task.report
}

// Resolve executes a set of queries in parallel. Each query is run on its own
Expand Down Expand Up @@ -111,7 +108,6 @@ func Resolve[T any](caller Task, queries ...Query[T]) (results []Result[T], expi
results[i].Value = r.Value.(T) //nolint:errcheck
}

results[i].NonFatal = r.NonFatal
results[i].Fatal = r.Fatal
results[i].Changed = r.runID == caller.runID
}
Expand Down Expand Up @@ -182,15 +178,15 @@ type task struct {
// If this task has not been started yet, this is nil.
// Otherwise, if it is complete, result.done will be closed.
result atomic.Pointer[result]
report report.Report
}

// Result is the Result of executing a query on an [Executor], either via
// [Run] or [Resolve].
type Result[T any] struct {
Value T // Value is unspecified if Fatal is non-nil.

NonFatal []error
Fatal error
Fatal error

// Set if this result has possibly changed since the last time [Run] call in
// which this query was computed.
Expand Down
2 changes: 1 addition & 1 deletion experimental/parser/lex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestLexer(t *testing.T) {
corpus.Run(t, func(t *testing.T, path, text string, outputs []string) {
text = unescapeTestCase(text)

errs := &report.Report{Tracing: 10}
errs := &report.Report{Options: report.Options{Tracing: 10}}
ctx := ast.NewContext(report.NewFile(path, text))
lex(ctx, errs)

Expand Down
2 changes: 1 addition & 1 deletion experimental/parser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestParse(t *testing.T) {
}

corpus.Run(t, func(t *testing.T, path, text string, outputs []string) {
errs := &report.Report{Tracing: 10}
errs := &report.Report{Options: report.Options{Tracing: 10}}
file, _ := Parse(report.NewFile(path, text), errs)

errs.Sort()
Expand Down
Loading

0 comments on commit b5374ec

Please sign in to comment.