Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor how diagnostics are pushed through incremental; add queries.AST #404

Merged
merged 7 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions experimental/incremental/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
"sync/atomic"

"golang.org/x/sync/semaphore"

"github.com/bufbuild/protocompile/experimental/report"
)

// Executor is a caching executor for incremental queries.
//
// See [New], [Run], and [Invalidate].
type Executor struct {
dirty sync.RWMutex
reportOptions report.Options
dirty sync.RWMutex

// TODO: Evaluate alternatives. sync.Map is pretty bad at having predictable
// performance, and we may want to add eviction to keep memoization costs
Expand Down Expand Up @@ -64,6 +67,12 @@ func WithParallelism(n int64) ExecutorOption {
return func(e *Executor) { e.sema = semaphore.NewWeighted(n) }
}

// WithReportOptions sets the report options for reports generated by this
// executor.
func WithReportOptions(options report.Options) ExecutorOption {
return func(e *Executor) { e.reportOptions = options }
}

// Keys returns a snapshot of the keys of which queries are present (and
// memoized) in an Executor.
//
Expand Down Expand Up @@ -100,7 +109,7 @@ var runExecutorKey byte
//
// Note: this function really wants to be a method of [Executor], but it isn't
// because it's generic.
func Run[T any](ctx context.Context, e *Executor, queries ...Query[T]) (results []Result[T], expired error) {
func Run[T any](ctx context.Context, e *Executor, queries ...Query[T]) ([]Result[T], *report.Report, error) {
e.dirty.RLock()
defer e.dirty.RUnlock()

Expand All @@ -119,7 +128,7 @@ func Run[T any](ctx context.Context, e *Executor, queries ...Query[T]) (results
// Need to acquire a hold on the global semaphore to represent the root
// task we're about to execute.
if e.sema.Acquire(ctx, 1) != nil {
return nil, context.Cause(ctx)
return nil, nil, context.Cause(ctx)
}
defer e.sema.Release(1)

Expand All @@ -132,22 +141,32 @@ func Run[T any](ctx context.Context, e *Executor, queries ...Query[T]) (results
runID: generation,
}

results, expired = Resolve(root, queries...)
results, expired := Resolve(root, queries...)
if expired != nil {
return nil, expired
return nil, nil, expired
}

// Now, for each result, we need to walk their dependencies and collect
// their dependencies' non-fatal errors.
for i, query := range queries {
// Record all diagnostics generates by the queries.
report := &report.Report{Options: e.reportOptions}
dedup := make(map[*task]struct{})
record := func(t *task) {
if _, ok := dedup[t]; ok {
return
}

dedup[t] = struct{}{}
report.Diagnostics = append(report.Diagnostics, t.report.Diagnostics...)
}
for _, query := range queries {
task := e.getTask(query.Key())
record(task) // NOTE: task.deps does not contain task.
for dep := range task.deps {
r := &results[i]
r.NonFatal = append(r.NonFatal, dep.result.Load().NonFatal...)
record(dep)
}
}
report.Sort()

return results, nil
return results, report, nil
}

// Evict marks query keys as invalid, requiring those queries, and their
Expand Down Expand Up @@ -204,6 +223,6 @@ func (e *Executor) getTask(key any) *task {
return t.(*task) //nolint:errcheck
}

t, _ := e.tasks.LoadOrStore(key, new(task))
t, _ := e.tasks.LoadOrStore(key, &task{report: report.Report{Options: e.reportOptions}})
return t.(*task) //nolint:errcheck
}
24 changes: 12 additions & 12 deletions experimental/incremental/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (i ParseInt) Execute(t incremental.Task) (int, error) {

v, err := strconv.Atoi(i.Input)
if err != nil {
t.NonFatal(err)
t.Report().Errorf("%s", err)
}
if v < 0 {
return 0, fmt.Errorf("negative value: %v", v)
Expand Down Expand Up @@ -129,10 +129,10 @@ func TestSum(t *testing.T) {
incremental.WithParallelism(4),
)

result, err := incremental.Run(ctx, exec, Sum{"1,2,2,3,4"})
result, report, err := incremental.Run(ctx, exec, Sum{"1,2,2,3,4"})
require.NoError(t, err)
assert.Equal(12, result[0].Value)
assert.Empty(result[0].NonFatal)
assert.Empty(report.Diagnostics)
assert.Equal([]string{
`incremental_test.ParseInt{Input:"1"}`,
`incremental_test.ParseInt{Input:"2"}`,
Expand All @@ -142,10 +142,10 @@ func TestSum(t *testing.T) {
`incremental_test.Sum{Input:"1,2,2,3,4"}`,
}, exec.Keys())

result, err = incremental.Run(ctx, exec, Sum{"1,2,2,oops,4"})
result, report, err = incremental.Run(ctx, exec, Sum{"1,2,2,oops,4"})
require.NoError(t, err)
assert.Equal(9, result[0].Value)
assert.Len(result[0].NonFatal, 1)
assert.Len(report.Diagnostics, 1)
assert.Equal([]string{
`incremental_test.ParseInt{Input:"1"}`,
`incremental_test.ParseInt{Input:"2"}`,
Expand All @@ -166,10 +166,10 @@ func TestSum(t *testing.T) {
`incremental_test.Root{}`,
}, exec.Keys())

result, err = incremental.Run(ctx, exec, Sum{"1,2,2,3,4"})
result, report, err = incremental.Run(ctx, exec, Sum{"1,2,2,3,4"})
require.NoError(t, err)
assert.Equal(12, result[0].Value)
assert.Empty(result[0].NonFatal)
assert.Empty(report.Diagnostics)
assert.Equal([]string{
`incremental_test.ParseInt{Input:"1"}`,
`incremental_test.ParseInt{Input:"2"}`,
Expand All @@ -190,7 +190,7 @@ func TestFatal(t *testing.T) {
incremental.WithParallelism(4),
)

result, err := incremental.Run(ctx, exec, Sum{"1,2,-3,-4"})
result, _, err := incremental.Run(ctx, exec, Sum{"1,2,-3,-4"})
require.NoError(t, err)
// NOTE: This error is deterministic, because it's chosen by Sum.Execute.
assert.Equal("negative value: -3", result[0].Fatal.Error())
Expand All @@ -213,7 +213,7 @@ func TestCyclic(t *testing.T) {
incremental.WithParallelism(4),
)

result, err := incremental.Run(ctx, exec, Cyclic{Mod: 5, Step: 3})
result, _, err := incremental.Run(ctx, exec, Cyclic{Mod: 5, Step: 3})
require.NoError(t, err)
assert.Equal(
`cycle detected: `+
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestUnchanged(t *testing.T) {

for i := 0; i < runs; i++ {
exec.Evict(ParseInt{"42"})
results, _ := incremental.Run(ctx, exec, queries...)
results, _, _ := incremental.Run(ctx, exec, queries...)
for j, r := range results[1:] {
// All calls after an eviction should return true for Changed.
assert.True(r.Changed, "%d", j)
Expand All @@ -270,7 +270,7 @@ func TestUnchanged(t *testing.T) {
barrier.Wait() // Ensure all goroutines start together.
defer wg.Done()

results, _ := incremental.Run(ctx, exec, queries...)
results, _, _ := incremental.Run(ctx, exec, queries...)
for j, r := range results {
// We don't know who the winning g that gets to do the
// computation will be be, so just require that all of the
Expand All @@ -289,7 +289,7 @@ func TestUnchanged(t *testing.T) {
// Exactly one of the gs should have seen a change.
assert.Equal(int32(1), changed.Load())

results, _ = incremental.Run(ctx, exec, queries...)
results, _, _ = incremental.Run(ctx, exec, queries...)
for j, r := range results[1:] {
// All calls after computation should return false for Changed.
assert.False(r.Changed, "%d", j)
Expand Down
60 changes: 60 additions & 0 deletions experimental/incremental/queries/ast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2020-2025 Buf Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package queries

import (
"github.com/bufbuild/protocompile/experimental/ast"
"github.com/bufbuild/protocompile/experimental/incremental"
"github.com/bufbuild/protocompile/experimental/parser"
"github.com/bufbuild/protocompile/experimental/source"
)

// AST is an [incremental.Query] for the contents of a file as provided
// by a [source.Opener].
//
// AST queries with different Openers are considered distinct.
type AST struct {
source.Opener // Must be comparable.
Path string
}

var _ incremental.Query[ast.File] = AST{}

// Key implements [incremental.Query].
//
// The key for a Contents query is the query itself. This means that a single
// [incremental.Executor] can host Contents queries for multiple Openers. It
// also means that the Openers must all be comparable. As the [Opener]
// documentation states, implementations should take a pointer receiver so that
// comparison uses object identity.
func (a AST) Key() any {
return a
}

// Execute implements [incremental.Query].
func (a AST) Execute(t incremental.Task) (ast.File, error) {
t.Report().Options.Stage += stageAST

r, err := incremental.Resolve(t, File(a))
if err != nil {
return ast.File{}, err
}
if r[0].Fatal != nil {
return ast.File{}, r[0].Fatal
}

file, _ := parser.Parse(r[0].Value, t.Report())
return file, nil
}
19 changes: 11 additions & 8 deletions experimental/incremental/queries/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@ var _ incremental.Query[*report.File] = File{}
// also means that the Openers must all be comparable. As the [Opener]
// documentation states, implementations should take a pointer receiver so that
// comparison uses object identity.
func (t File) Key() any {
return t
func (f File) Key() any {
return f
}

// Execute implements [incremental.Query].
func (t File) Execute(incremental.Task) (*report.File, error) {
text, err := t.Open(t.Path)
func (f File) Execute(t incremental.Task) (*report.File, error) {
t.Report().Options.Stage += stageFile

text, err := f.Open(f.Path)
if err != nil {
r := newReport(stageFile)
r.Report.Error(&report.ErrInFile{Err: err, Path: t.Path})
return nil, r
t.Report().Errorf("%v", err).Apply(
report.InFile(f.Path),
)
return nil, err
}

return report.NewFile(t.Path, text), nil
return report.NewFile(f.Path, text), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,10 @@
// of Protocompile. It is separate from package incremental itself because it is
// Protocompile-specific.
package queries

// Values for [report.Report].SortOrder, which determine how diagnostics
// generated across parts of the compiler are sorted.
const (
stageFile int = iota * 10
stageAST
)
30 changes: 0 additions & 30 deletions experimental/incremental/queries/report.go

This file was deleted.

16 changes: 6 additions & 10 deletions experimental/incremental/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"sync/atomic"

"github.com/bufbuild/protocompile/experimental/report"
"github.com/bufbuild/protocompile/internal/iter"
)

Expand Down Expand Up @@ -50,14 +51,10 @@ func (t *Task) Context() context.Context {
return t.ctx
}

// Error adds errors to the current query, which will be propagated to all
// queries which depend on it.
//
// This will not cause the query to fail; instead, [Query].Execute should
// return false for the ok value to signal failure.
func (t *Task) NonFatal(errs ...error) {
// Report returns the diagnostic report for this task.
func (t *Task) Report() *report.Report {
t.checkDone()
t.result.NonFatal = append(t.result.NonFatal, errs...)
return &t.task.report
}

// Resolve executes a set of queries in parallel. Each query is run on its own
Expand Down Expand Up @@ -111,7 +108,6 @@ func Resolve[T any](caller Task, queries ...Query[T]) (results []Result[T], expi
results[i].Value = r.Value.(T) //nolint:errcheck
}

results[i].NonFatal = r.NonFatal
results[i].Fatal = r.Fatal
results[i].Changed = r.runID == caller.runID
}
Expand Down Expand Up @@ -182,15 +178,15 @@ type task struct {
// If this task has not been started yet, this is nil.
// Otherwise, if it is complete, result.done will be closed.
result atomic.Pointer[result]
report report.Report
}

// Result is the Result of executing a query on an [Executor], either via
// [Run] or [Resolve].
type Result[T any] struct {
Value T // Value is unspecified if Fatal is non-nil.

NonFatal []error
Fatal error
Fatal error

// Set if this result has possibly changed since the last time [Run] call in
// which this query was computed.
Expand Down
2 changes: 1 addition & 1 deletion experimental/parser/lex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestLexer(t *testing.T) {
corpus.Run(t, func(t *testing.T, path, text string, outputs []string) {
text = unescapeTestCase(text)

errs := &report.Report{Tracing: 10}
errs := &report.Report{Options: report.Options{Tracing: 10}}
ctx := ast.NewContext(report.NewFile(path, text))
lex(ctx, errs)

Expand Down
2 changes: 1 addition & 1 deletion experimental/parser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestParse(t *testing.T) {
}

corpus.Run(t, func(t *testing.T, path, text string, outputs []string) {
errs := &report.Report{Tracing: 10}
errs := &report.Report{Options: report.Options{Tracing: 10}}
file, _ := Parse(report.NewFile(path, text), errs)

errs.Sort()
Expand Down
Loading
Loading