Skip to content

Commit

Permalink
update the capacity to zero on shutdown/reset
Browse files Browse the repository at this point in the history
When the device plugin is restarted, kubelet marks the resource as unhealthy, but still reports the resource as existing for a grace period (5 mins). If a pod is scheduled before the device plugin comes up, the pod create fails without a retryloop with an error message Pod was rejected: Allocate failed due to no healthy devices present; cannot allocate unhealthy devices <DEVICE_NAME>, which is unexpected.

This commit allow the device plugin to send an empty list of devices before the reset or shutdown

Signed-off-by: Sebastian Sch <[email protected]>
  • Loading branch information
SchSeba committed Oct 21, 2023
1 parent 2cc723d commit c1a6852
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 38 deletions.
93 changes: 61 additions & 32 deletions pkg/resources/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,25 @@ import (
)

type resourceServer struct {
resourcePool types.ResourcePool
pluginWatch bool
endPoint string // Socket file
sockPath string // Socket file path
resourceNamePrefix string
grpcServer *grpc.Server
termSignal chan bool
updateSignal chan bool
stopWatcher chan bool
checkIntervals int // health check intervals in seconds
useCdi bool
cdi cdiPkg.CDI
resourcePool types.ResourcePool
pluginWatch bool
endPoint string // Socket file
sockPath string // Socket file path
resourceNamePrefix string
grpcServer *grpc.Server
listAndWatchStopSignal chan bool
listAndWatchFinishedSignal chan bool
updateSignal chan bool
stopWatcher chan bool
checkIntervals int // health check intervals in seconds
useCdi bool
cdi cdiPkg.CDI
}

const (
rsWatchInterval = 5 * time.Second
serverStartTimeout = 5 * time.Second
rsWatchInterval = 5 * time.Second
serverStartTimeout = 5 * time.Second
terminatedSignalTimeOut = 5 * time.Second
)

// NewResourceServer returns an instance of ResourceServer
Expand All @@ -61,18 +63,19 @@ func NewResourceServer(prefix, suffix string, pluginWatch, useCdi bool, rp types
sockPath = filepath.Join(types.DeprecatedSockDir, sockName)
}
return &resourceServer{
resourcePool: rp,
pluginWatch: pluginWatch,
endPoint: sockName,
sockPath: sockPath,
resourceNamePrefix: prefix,
useCdi: useCdi,
grpcServer: grpc.NewServer(),
termSignal: make(chan bool, 1),
updateSignal: make(chan bool),
stopWatcher: make(chan bool),
checkIntervals: 20, // updates every 20 seconds
cdi: cdiPkg.New(),
resourcePool: rp,
pluginWatch: pluginWatch,
endPoint: sockName,
sockPath: sockPath,
resourceNamePrefix: prefix,
useCdi: useCdi,
grpcServer: grpc.NewServer(),
listAndWatchStopSignal: make(chan bool, 1),
listAndWatchFinishedSignal: make(chan bool, 1),
updateSignal: make(chan bool),
stopWatcher: make(chan bool),
checkIntervals: 20, // updates every 20 seconds
cdi: cdiPkg.New(),
}
}

Expand Down Expand Up @@ -177,9 +180,19 @@ func (rs *resourceServer) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.
// listen for events: if updateSignal send new list of devices
for {
select {
case <-rs.termSignal:
// Terminate signal received; return from mehtod call
case <-rs.listAndWatchStopSignal:
// Terminate signal received; send an empty list of devices and return from method call
glog.Infof("%s: terminate signal received", methodID)
resp = new(pluginapi.ListAndWatchResponse)
resp.Devices = make([]*pluginapi.Device, 0)

if err := stream.Send(resp); err != nil {
glog.Errorf("%s: error: cannot update device states: %v\n", methodID, err)
return err
}
// Releasing the terminate process to close the grpc server
rs.listAndWatchFinishedSignal <- true

return nil
case <-rs.updateSignal:
// Device health changed; so send new device list
Expand Down Expand Up @@ -304,11 +317,19 @@ func (rs *resourceServer) restart() error {
if rs.grpcServer == nil {
return fmt.Errorf("grpc server instance not found for %s", resourceName)
}
rs.grpcServer.Stop()
rs.grpcServer = nil
// Send terminate signal to ListAndWatch()
rs.termSignal <- true
rs.listAndWatchStopSignal <- true

// wait for the terminated signal or 5 second
select {
case <-rs.listAndWatchFinishedSignal:
break
case <-time.After(terminatedSignalTimeOut):
glog.Warningf("Timed out waiting for ListAndWatch terminated signal")
}

rs.grpcServer.Stop()
rs.grpcServer = nil
rs.grpcServer = grpc.NewServer() // new instance of a grpc server
return rs.Start()
}
Expand All @@ -320,11 +341,19 @@ func (rs *resourceServer) Stop() error {
return nil
}
// Send terminate signal to ListAndWatch()
rs.termSignal <- true
rs.listAndWatchStopSignal <- true
if !rs.pluginWatch {
rs.stopWatcher <- true
}

// wait for the terminated signal or 5 second
select {
case <-rs.listAndWatchFinishedSignal:
break
case <-time.After(terminatedSignalTimeOut):
glog.Warningf("Timed out waiting for ListAndWatch terminated signal")
}

rs.grpcServer.Stop()
rs.grpcServer = nil

Expand Down
18 changes: 12 additions & 6 deletions pkg/resources/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,21 @@ var _ = Describe("Server", func() {
err := rs.Start()
Expect(err).NotTo(HaveOccurred())

// demo the listAndWatch
rs.listAndWatchFinishedSignal <- true
err = rs.restart()
Expect(err).NotTo(HaveOccurred())
Eventually(rs.termSignal, time.Second*10).Should(Receive())
Eventually(rs.listAndWatchStopSignal, time.Second*10).Should(Receive())

go func() {
rp.On("CleanDeviceInfoFile", "fake").Return(nil)
// demo the listAndWatch
rs.listAndWatchFinishedSignal <- true
err := rs.Stop()
Expect(err).NotTo(HaveOccurred())
rp.AssertCalled(t, "CleanDeviceInfoFile", "fake")
}()
Eventually(rs.termSignal, time.Second*10).Should(Receive())
Eventually(rs.listAndWatchStopSignal, time.Second*10).Should(Receive())
Eventually(rs.stopWatcher, time.Second*10).Should(Receive())

close(done)
Expand Down Expand Up @@ -210,11 +214,12 @@ var _ = Describe("Server", func() {

go func() {
rp.On("CleanDeviceInfoFile", "fake").Return(nil)
rs.listAndWatchFinishedSignal <- true
err := rs.Stop()
Expect(err).NotTo(HaveOccurred())
rp.AssertCalled(t, "CleanDeviceInfoFile", "fake")
}()
Eventually(rs.termSignal, time.Second*10).Should(Receive())
Eventually(rs.listAndWatchStopSignal, time.Second*10).Should(Receive())

close(done)
}, 12.0)
Expand Down Expand Up @@ -252,10 +257,11 @@ var _ = Describe("Server", func() {

// sleep 1 second to let watcher perform at least a single socket-file check
time.Sleep(time.Second)
rs.listAndWatchFinishedSignal <- true
err = rs.Stop()
Expect(err).NotTo(HaveOccurred())

Eventually(rs.termSignal, time.Second*10).Should(Receive())
Eventually(rs.listAndWatchStopSignal, time.Second*10).Should(Receive())

close(done)
}, 12.0)
Expand Down Expand Up @@ -459,7 +465,7 @@ var _ = Describe("Server", func() {
Eventually(lwSrv.updates, time.Second*10).Should(Receive())

// finally send term signal
rs.termSignal <- true
rs.listAndWatchStopSignal <- true

close(done)
}, 30.0)
Expand Down Expand Up @@ -501,7 +507,7 @@ var _ = Describe("Server", func() {
Eventually(lwSrv.updates, time.Second*10).Should(Receive())

// finally send term signal
rs.termSignal <- true
rs.listAndWatchStopSignal <- true

close(done)
}, 30)
Expand Down

0 comments on commit c1a6852

Please sign in to comment.