-
-
Notifications
You must be signed in to change notification settings - Fork 535
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[stats] Rewrite stat management to use single threaded event loop #8815
base: main
Are you sure you want to change the base?
Conversation
@max-hoffman DOLT
|
@max-hoffman DOLT
|
@max-hoffman DOLT
|
@max-hoffman DOLT
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Been chatting offline, so I'm going to land these comments, which mostly ignore tests and also don't include scheduler.go
or provider.go
. Offline we were discussing the structure a bit, related to the mutable state reads which happen outside of the queue, and whether there's a way to shove more stuff through the queue so that we can rid of most of the external synchronization primitives outside of channels and statsMu.
sqlEngine, err = engine.NewSqlEngine( | ||
ctx, | ||
mrEnv, | ||
config, | ||
) | ||
if sc, ok := sqlEngine.GetUnderlyingEngine().Analyzer.Catalog.StatsProvider.(*statspro.StatsCoord); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels wrong to tie this to the server, instead of engine.SqlEngine doing it. What's the logic behind only doing this restart when we're in server context?
//intervalSec := time.Duration(0) | ||
//thresholdf64 := 0. | ||
//bThreads := sql.NewBackgroundThreads() | ||
//branches := []string{"main"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete...
bThreads := sql.NewBackgroundThreads() | ||
|
||
ctxGen := func(ctx context.Context) (*sql.Context, error) { | ||
return d.NewSession(), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subtlety here around session vs. context might require some comment.
Both here and down on line 329, maybe it's correct today for this factory to ignore the incoming ctx context.Context
, but that's hard for a reader to verify and it looks wrong to a reader when they encounter it. We should probably just refactor these methods on *DoltHarness a bit so that we can actually preserve the requested Context
when we want/need to.
ctxGen := func(ctx context.Context) (*sql.Context, error) { | ||
return d.NewContext(), nil | ||
} | ||
bThreads := sql.NewBackgroundThreads() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird that we lost the BackgroundThreads from when we initialized / which our engine is using...
} | ||
bThreads := sql.NewBackgroundThreads() | ||
statsPro := statspro.NewStatsCoord(d.provider.(*sqle.DoltDatabaseProvider), ctxGen, ctx.Session.GetLogger().Logger, bThreads, d.multiRepoEnv.GetEnv(d.multiRepoEnv.GetFirstDatabase())) | ||
require.NoError(t, statsPro.Restart(ctx)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doesn't this look like when we initialize it in the initialize block? No SetTimers for example?
if tableInfo.name == "is_restricted" { | ||
print() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
bThreads := sql.NewBackgroundThreads() | ||
|
||
ctxGen := func(ctx context.Context) (*sql.Context, error) { | ||
return d.NewContextWithClient(sql.Client{Address: "localhost", User: "root"}), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be OK in current implementation and to lose the original context on this factory, but feels weird. Maybe change DoltHarness so we have the methods we need to thread it here and down below?
@@ -132,7 +133,7 @@ func ApplyMutations[K ~[]byte, O Ordering[K], S message.Serializer]( | |||
prev := newKey | |||
newKey, newValue = edits.NextMutation(ctx) | |||
if newKey != nil { | |||
assertTrue(order.Compare(K(newKey), K(prev)) > 0, "expected sorted edits") | |||
assertTrue(order.Compare(K(newKey), K(prev)) > 0, "expected sorted edits"+fmt.Sprintf("%v, %v", prev, newKey)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better as
func assertTrue(b bool, format string, args ...any) {
if !b {
panic(fmt.Sprintf("assertion failed: "+format, args...))
}
}
if we want to add this functionality...
@@ -77,7 +79,7 @@ func NewTupleBuilder(desc TupleDesc) *TupleBuilder { | |||
func (tb *TupleBuilder) Build(pool pool.BuffPool) (tup Tuple) { | |||
for i, typ := range tb.Desc.Types { | |||
if !typ.Nullable && tb.fields[i] == nil { | |||
panic("cannot write NULL to non-NULL field") | |||
log.Println("cannot write NULL to non-NULL field: " + strconv.Itoa(i) + " " + string(tb.fields[i])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert...
@@ -639,7 +639,7 @@ func (td TupleDesc) formatValue(enc Encoding, i int, value []byte) string { | |||
case StringAddrEnc: | |||
return hex.EncodeToString(value) | |||
case CommitAddrEnc: | |||
return hex.EncodeToString(value) | |||
return hash.New(value).String()[:5] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert? Or just the fulle String() and pick up StringAddr and BytesAddr as well?
Reference
statspro/doc.go
for more detailed overview. Replaced stats management with an event loop and shared bucket cache. Rudimentary GC and branch syncing seems like a must for thousands of branches.Branch add/delete seems to work as I'd expect under high concurrency. The GC and branch syncing concurrency is a bit sloppy right now. It doesn't deadlock, but thousands of concurrent adds a second while running GC/branch sync every <10ms doesn't true up the way I would want.
Early perf testing seems to have little/no impact on TPC-C when the job ticker is 100-500ms. Every few minutes during TPC-C stats throws spurious errors that appear to self-resolve. It doesn't exactly clutter output, but I think users would be happier if I figured how out how to reduce those errors.