Skip to content

Commit 394cc6c

Browse files
authored
Merge branch 'master' into feat/ssd-kv-cache-ttl
2 parents 89d04e2 + 16e1f78 commit 394cc6c

15 files changed

Lines changed: 937 additions & 230 deletions

File tree

coordinator/api/provider.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -420,10 +420,20 @@ func (s *Server) providerReadLoop(ctx context.Context, conn *websocket.Conn, pro
420420
s.registry.ClearPendingModelLoad(providerID, statusMsg.ModelID)
421421
s.registry.DrainQueuedRequestsForModel(statusMsg.ModelID)
422422
case protocol.LoadModelStatusFailed:
423-
// Keep the pending entry (TTL cooldown suppresses retry storms).
424-
// If no other provider can serve this model, reject queued
425-
// requests immediately rather than making them wait 120s.
426-
s.registry.RejectUnservableQueuedRequests(statusMsg.ModelID)
423+
if statusMsg.Error == protocol.ProviderDrainingForUpdate {
424+
// Transient: the provider refused only because it is
425+
// draining ahead of an auto-update restart. Shorten the
426+
// cooldown so a failed restart (provider resumes serving)
427+
// becomes loadable again quickly; queued requests are NOT
428+
// rejected — the provider is back within the queue window
429+
// and other providers remain plannable.
430+
s.registry.BackoffPendingModelLoadForDrain(providerID, statusMsg.ModelID)
431+
} else {
432+
// Keep the pending entry (TTL cooldown suppresses retry storms).
433+
// If no other provider can serve this model, reject queued
434+
// requests immediately rather than making them wait 120s.
435+
s.registry.RejectUnservableQueuedRequests(statusMsg.ModelID)
436+
}
427437
}
428438
// "started" status: no action — load is in progress.
429439

coordinator/api/provider_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,58 @@ func TestSyncBinaryHashesPreservesAdditionalConfiguredHashes(t *testing.T) {
690690
}
691691
}
692692

693+
func TestAdminDeleteReleaseBlocksActiveBinaryHashWhenEnforced(t *testing.T) {
694+
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
695+
st := store.NewMemory(store.Config{AdminKey: "test-key"})
696+
reg := registry.New(logger)
697+
srv := NewServer(reg, st, ServerConfig{AdminKey: "admin-key"}, logger)
698+
srv.SetBinaryHashEnforcement(true)
699+
700+
releaseHash := strings.Repeat("c", 64)
701+
if err := st.SetRelease(&store.Release{
702+
Version: "1.0.0",
703+
Platform: "macos-arm64",
704+
BinaryHash: releaseHash,
705+
BundleHash: strings.Repeat("d", 64),
706+
URL: "https://r2.example.com/releases/v1.0.0/darkbloom-bundle-macos-arm64.tar.gz",
707+
}); err != nil {
708+
t.Fatalf("SetRelease: %v", err)
709+
}
710+
p := reg.Register("provider-old", nil, &protocol.RegisterMessage{
711+
Type: protocol.TypeRegister,
712+
Backend: "mlx-swift",
713+
Hardware: protocol.Hardware{
714+
MachineModel: "Mac15,8",
715+
ChipName: "Apple M3 Max",
716+
MemoryGB: 64,
717+
},
718+
Models: []protocol.ModelInfo{{ID: "test-model", ModelType: "chat", Quantization: "4bit"}},
719+
})
720+
p.SetAttestationResult(&attestation.VerificationResult{Valid: true, BinaryHash: releaseHash})
721+
722+
req := httptest.NewRequest(http.MethodDelete, "/v1/admin/releases", strings.NewReader(`{"version":"1.0.0","platform":"macos-arm64"}`))
723+
req.Header.Set("Authorization", "Bearer admin-key")
724+
w := httptest.NewRecorder()
725+
srv.handleAdminDeleteRelease(w, req)
726+
if w.Code != http.StatusConflict {
727+
t.Fatalf("delete without force status = %d, want %d; body=%s", w.Code, http.StatusConflict, w.Body.String())
728+
}
729+
if latest := st.GetLatestRelease("macos-arm64"); latest == nil || !latest.Active {
730+
t.Fatal("release should remain active after protected delete")
731+
}
732+
733+
forceReq := httptest.NewRequest(http.MethodDelete, "/v1/admin/releases", strings.NewReader(`{"version":"1.0.0","platform":"macos-arm64","force":true}`))
734+
forceReq.Header.Set("Authorization", "Bearer admin-key")
735+
forceW := httptest.NewRecorder()
736+
srv.handleAdminDeleteRelease(forceW, forceReq)
737+
if forceW.Code != http.StatusOK {
738+
t.Fatalf("force delete status = %d, want %d; body=%s", forceW.Code, http.StatusOK, forceW.Body.String())
739+
}
740+
if latest := st.GetLatestRelease("macos-arm64"); latest != nil {
741+
t.Fatal("release should be inactive after forced delete")
742+
}
743+
}
744+
693745
func TestBinaryHashPolicySnapshotConcurrentSync(t *testing.T) {
694746
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
695747
st := store.NewMemory(store.Config{AdminKey: "test-key"})

coordinator/api/release_handlers.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,7 @@ func (s *Server) handleAdminDeleteRelease(w http.ResponseWriter, r *http.Request
505505
var req struct {
506506
Version string `json:"version"`
507507
Platform string `json:"platform"`
508+
Force bool `json:"force,omitempty"`
508509
}
509510
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
510511
writeJSON(w, http.StatusBadRequest, errorResponse("invalid_request_error", "invalid JSON: "+err.Error()))
@@ -517,6 +518,17 @@ func (s *Server) handleAdminDeleteRelease(w http.ResponseWriter, r *http.Request
517518
if req.Platform == "" {
518519
req.Platform = "macos-arm64"
519520
}
521+
if s.binaryHashEnforce && !req.Force {
522+
if release, ok := findReleaseForDeactivation(s.store.ListReleases(), req.Version, req.Platform); ok {
523+
if activeProviders := s.registry.CountProvidersByBinaryHash(release.BinaryHash); activeProviders > 0 {
524+
writeJSON(w, http.StatusConflict, errorResponse(
525+
"release_in_use",
526+
fmt.Sprintf("release %s/%s binary hash is still used by %d connected provider(s); wait for rollout or set force=true", req.Version, req.Platform, activeProviders),
527+
))
528+
return
529+
}
530+
}
531+
}
520532

521533
if err := s.store.DeleteRelease(req.Version, req.Platform); err != nil {
522534
writeJSON(w, http.StatusNotFound, errorResponse("not_found", err.Error()))
@@ -535,6 +547,15 @@ func (s *Server) handleAdminDeleteRelease(w http.ResponseWriter, r *http.Request
535547
})
536548
}
537549

550+
func findReleaseForDeactivation(releases []store.Release, version, platform string) (store.Release, bool) {
551+
for _, release := range releases {
552+
if release.Version == version && release.Platform == platform && release.Active {
553+
return release, true
554+
}
555+
}
556+
return store.Release{}, false
557+
}
558+
538559
// isAdminAuthorized checks if the request is from an admin.
539560
// Accepts either Privy admin (email in admin list) OR EIGENINFERENCE_ADMIN_KEY.
540561
func (s *Server) isAdminAuthorized(w http.ResponseWriter, r *http.Request) bool {

coordinator/protocol/messages.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,15 @@ const (
6363
LoadModelStatusFailed = "failed"
6464
)
6565

66+
// ProviderDrainingForUpdate is the well-known error reason a provider attaches
67+
// to inference / load_model / prefetch_model rejections while it is draining
68+
// ahead of an auto-update restart. The coordinator matches this exact string
69+
// to treat such a load_model failure as transient (short retry backoff,
70+
// provider is about to restart) rather than a genuine load failure that earns
71+
// the full cooldown. Mirrored in
72+
// provider-swift/Sources/ProviderCore/Protocol/Types.swift.
73+
const ProviderDrainingForUpdate = "provider draining for update"
74+
6675
// PrefetchModelStatus is the lifecycle state reported by a provider in
6776
// response to a PrefetchModelMessage. Unlike a load, a prefetch only
6877
// downloads + verifies the model on disk; it does NOT load weights into
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package registry
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func hasPendingLoad(r *Registry, providerID string) bool {
9+
r.mu.RLock()
10+
defer r.mu.RUnlock()
11+
return r.providerHasPendingLoad(providerID)
12+
}
13+
14+
func TestPendingModelLoadReserveAndExpiry(t *testing.T) {
15+
r := New(testLogger())
16+
now := time.Now()
17+
18+
reserved := r.reservePendingModelLoads([]modelLoadAction{{providerID: "p1", modelID: "m1"}}, now)
19+
if len(reserved) != 1 {
20+
t.Fatalf("expected 1 reserved action, got %d", len(reserved))
21+
}
22+
23+
// While the entry lives, the provider must not be reserved again — not
24+
// even for a different model (single-slot swap oscillation guard).
25+
again := r.reservePendingModelLoads([]modelLoadAction{{providerID: "p1", modelID: "m2"}}, now.Add(time.Minute))
26+
if len(again) != 0 {
27+
t.Fatal("provider with a pending load was reserved again")
28+
}
29+
30+
r.expirePendingModelLoads(now.Add(pendingModelLoadTTL - time.Second))
31+
if !hasPendingLoad(r, "p1") {
32+
t.Fatal("pending load expired before the TTL")
33+
}
34+
35+
r.expirePendingModelLoads(now.Add(pendingModelLoadTTL + time.Second))
36+
if hasPendingLoad(r, "p1") {
37+
t.Fatal("pending load survived past the TTL")
38+
}
39+
}
40+
41+
func TestDrainBackoffShortensPendingLoadCooldown(t *testing.T) {
42+
r := New(testLogger())
43+
r.reservePendingModelLoads([]modelLoadAction{{providerID: "p1", modelID: "m1"}}, time.Now())
44+
45+
// A drain rejection re-stamps the entry with the short backoff: long
46+
// enough to keep the planner off a provider that is about to restart,
47+
// short enough that an aborted restart leaves it plannable again well
48+
// inside the queue window.
49+
r.BackoffPendingModelLoadForDrain("p1", "m1")
50+
51+
r.expirePendingModelLoads(time.Now().Add(pendingModelLoadDrainBackoff - 5*time.Second))
52+
if !hasPendingLoad(r, "p1") {
53+
t.Fatal("drain backoff cleared too early")
54+
}
55+
56+
r.expirePendingModelLoads(time.Now().Add(pendingModelLoadDrainBackoff + time.Second))
57+
if hasPendingLoad(r, "p1") {
58+
t.Fatal("drain backoff survived past pendingModelLoadDrainBackoff")
59+
}
60+
}
61+
62+
func TestDrainBackoffAppliesWithoutPriorReservation(t *testing.T) {
63+
// The coordinator may learn of a drain rejection for a load_model it sent
64+
// before a restart (entry already expired or cleared). The backoff must
65+
// still record the provider as temporarily unplannable.
66+
r := New(testLogger())
67+
r.BackoffPendingModelLoadForDrain("p1", "m1")
68+
69+
if !hasPendingLoad(r, "p1") {
70+
t.Fatal("drain backoff did not create a pending entry")
71+
}
72+
73+
r.expirePendingModelLoads(time.Now().Add(pendingModelLoadDrainBackoff + time.Second))
74+
if hasPendingLoad(r, "p1") {
75+
t.Fatal("drain backoff survived past pendingModelLoadDrainBackoff")
76+
}
77+
}

coordinator/registry/registry.go

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -639,13 +639,26 @@ type Registry struct {
639639
modelProvidersMu sync.Mutex
640640

641641
// pendingModelLoads tracks provider-model pairs that have been sent a
642-
// load_model command and are awaiting completion. Prevents duplicate
643-
// sends across heartbeat cycles.
644-
pendingModelLoads map[string]time.Time // key: "providerID:modelID"
642+
// load_model command and are awaiting completion, or are cooling down
643+
// after a failed one. The value is the entry's expiry time. While an
644+
// entry lives, the provider is skipped for new load_model sends
645+
// (bestModelLoadProviderLocked / reservePendingModelLoads).
646+
pendingModelLoads map[string]time.Time // key: "providerID:modelID", value: expiry
645647
}
646648

649+
// pendingModelLoadTTL bounds how long an outstanding (or failed) load_model
650+
// suppresses re-sends to the same provider.
647651
const pendingModelLoadTTL = 2 * time.Minute
648652

653+
// pendingModelLoadDrainBackoff is the short cooldown used when a provider
654+
// rejects load_model because it is draining for an auto-update restart. The
655+
// entry keeps the planner away from a provider that is about to bounce, but
656+
// must not outlive a failed restart: if the provider aborts the restart and
657+
// resumes serving, it is fully loadable again, and the full 2-minute cooldown
658+
// would strand queued requests that this provider (or its post-restart
659+
// re-registration) could serve.
660+
const pendingModelLoadDrainBackoff = 30 * time.Second
661+
649662
type modelLoadAction struct {
650663
providerID string
651664
modelID string
@@ -2102,8 +2115,8 @@ func (r *Registry) TriggerModelSwaps() {
21022115
func (r *Registry) expirePendingModelLoads(now time.Time) {
21032116
r.mu.Lock()
21042117
defer r.mu.Unlock()
2105-
for key, sentAt := range r.pendingModelLoads {
2106-
if now.Sub(sentAt) > pendingModelLoadTTL {
2118+
for key, expiresAt := range r.pendingModelLoads {
2119+
if now.After(expiresAt) {
21072120
delete(r.pendingModelLoads, key)
21082121
}
21092122
}
@@ -2290,7 +2303,7 @@ func (r *Registry) reservePendingModelLoads(actions []modelLoadAction, now time.
22902303
if r.providerHasPendingLoad(action.providerID) {
22912304
continue
22922305
}
2293-
r.pendingModelLoads[modelLoadKey(action.providerID, action.modelID)] = now
2306+
r.pendingModelLoads[modelLoadKey(action.providerID, action.modelID)] = now.Add(pendingModelLoadTTL)
22942307
reserved = append(reserved, action)
22952308
}
22962309
return reserved
@@ -2382,6 +2395,19 @@ func (r *Registry) ClearPendingModelLoad(providerID, modelID string) {
23822395
r.mu.Unlock()
23832396
}
23842397

2398+
// BackoffPendingModelLoadForDrain re-stamps a pending load entry with the
2399+
// short drain backoff. Called when a provider rejects load_model because it
2400+
// is draining ahead of an auto-update restart: clearing the entry outright
2401+
// would re-send load_model to the same draining provider on the very next
2402+
// TriggerModelSwaps pass, while the full failure cooldown would suppress the
2403+
// provider long after a failed restart resumed serving. A successful restart
2404+
// clears the entry anyway via Disconnect.
2405+
func (r *Registry) BackoffPendingModelLoadForDrain(providerID, modelID string) {
2406+
r.mu.Lock()
2407+
r.pendingModelLoads[modelLoadKey(providerID, modelID)] = time.Now().Add(pendingModelLoadDrainBackoff)
2408+
r.mu.Unlock()
2409+
}
2410+
23852411
// RejectUnservableQueuedRequests checks whether any eligible provider can
23862412
// serve the given model. If not, all queued requests for the model are
23872413
// rejected immediately rather than waiting for the 120s queue timeout.
@@ -2516,6 +2542,37 @@ func (r *Registry) GetProvider(id string) *Provider {
25162542
return r.providers[id]
25172543
}
25182544

2545+
// CountProvidersByBinaryHash returns the number of currently connected
2546+
// providers whose registration attested the given provider binary hash. Used by
2547+
// release administration to avoid removing a hash from the forced allowlist
2548+
// while old-but-still-connected providers are draining/restarting into a newer
2549+
// release.
2550+
func (r *Registry) CountProvidersByBinaryHash(hash string) int {
2551+
normalized := strings.ToLower(strings.TrimSpace(hash))
2552+
if normalized == "" {
2553+
return 0
2554+
}
2555+
2556+
r.mu.RLock()
2557+
defer r.mu.RUnlock()
2558+
2559+
count := 0
2560+
for _, p := range r.providers {
2561+
p.mu.Lock()
2562+
status := p.Status
2563+
attestedHash := ""
2564+
if p.AttestationResult != nil {
2565+
attestedHash = p.AttestationResult.BinaryHash
2566+
}
2567+
p.mu.Unlock()
2568+
2569+
if status != StatusOffline && strings.EqualFold(attestedHash, normalized) {
2570+
count++
2571+
}
2572+
}
2573+
return count
2574+
}
2575+
25192576
// MarkUntrusted sets a provider's status to untrusted for a hard/security
25202577
// reason (bad encrypted chunk, MDM/MDA failure, SIP disabled, binary or model
25212578
// hash mismatch, serial impersonation, attestation failure). The deroute is

provider-swift/Sources/ProviderCore/Protocol/Types.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
import Foundation
22

3+
/// Well-known error reason attached to inference / load_model / prefetch_model
4+
/// rejections while the provider is draining ahead of an auto-update restart.
5+
/// The coordinator matches this exact string to treat a load_model failure as
6+
/// transient (short retry backoff, provider is about to restart) rather than a
7+
/// genuine load failure. Mirrored in coordinator/protocol/messages.go
8+
/// (ProviderDrainingForUpdate) — keep the two in sync.
9+
public let providerDrainingForUpdateReason = "provider draining for update"
10+
311
public struct HardwareInfo: Codable, Sendable, Equatable {
412
public var machineModel: String
513
public var chipName: String

0 commit comments

Comments
 (0)