diff --git a/pkg/resources/server.go b/pkg/resources/server.go index bb4fb0135..f750c821a 100644 --- a/pkg/resources/server.go +++ b/pkg/resources/server.go @@ -34,23 +34,25 @@ import ( ) type resourceServer struct { - resourcePool types.ResourcePool - pluginWatch bool - endPoint string // Socket file - sockPath string // Socket file path - resourceNamePrefix string - grpcServer *grpc.Server - termSignal chan bool - updateSignal chan bool - stopWatcher chan bool - checkIntervals int // health check intervals in seconds - useCdi bool - cdi cdiPkg.CDI + resourcePool types.ResourcePool + pluginWatch bool + endPoint string // Socket file + sockPath string // Socket file path + resourceNamePrefix string + grpcServer *grpc.Server + listAndWatchStopSignal chan bool + listAndWatchFinishedSignal chan bool + updateSignal chan bool + stopWatcher chan bool + checkIntervals int // health check intervals in seconds + useCdi bool + cdi cdiPkg.CDI } const ( - rsWatchInterval = 5 * time.Second - serverStartTimeout = 5 * time.Second + rsWatchInterval = 5 * time.Second + serverStartTimeout = 5 * time.Second + terminatedSignalTimeOut = 5 * time.Second ) // NewResourceServer returns an instance of ResourceServer @@ -61,18 +63,19 @@ func NewResourceServer(prefix, suffix string, pluginWatch, useCdi bool, rp types sockPath = filepath.Join(types.DeprecatedSockDir, sockName) } return &resourceServer{ - resourcePool: rp, - pluginWatch: pluginWatch, - endPoint: sockName, - sockPath: sockPath, - resourceNamePrefix: prefix, - useCdi: useCdi, - grpcServer: grpc.NewServer(), - termSignal: make(chan bool, 1), - updateSignal: make(chan bool), - stopWatcher: make(chan bool), - checkIntervals: 20, // updates every 20 seconds - cdi: cdiPkg.New(), + resourcePool: rp, + pluginWatch: pluginWatch, + endPoint: sockName, + sockPath: sockPath, + resourceNamePrefix: prefix, + useCdi: useCdi, + grpcServer: grpc.NewServer(), + listAndWatchStopSignal: make(chan bool, 1), + listAndWatchFinishedSignal: make(chan bool, 1), + updateSignal: make(chan bool), + stopWatcher: make(chan bool), + checkIntervals: 20, // updates every 20 seconds + cdi: cdiPkg.New(), } } @@ -177,9 +180,19 @@ func (rs *resourceServer) ListAndWatch(empty *pluginapi.Empty, stream pluginapi. // listen for events: if updateSignal send new list of devices for { select { - case <-rs.termSignal: - // Terminate signal received; return from mehtod call + case <-rs.listAndWatchStopSignal: + // Terminate signal received; send an empty list of devices and return from method call glog.Infof("%s: terminate signal received", methodID) + resp = new(pluginapi.ListAndWatchResponse) + resp.Devices = make([]*pluginapi.Device, 0) + + if err := stream.Send(resp); err != nil { + glog.Errorf("%s: error: cannot update device states: %v\n", methodID, err) + return err + } + // Releasing the terminate process to close the grpc server + rs.listAndWatchFinishedSignal <- true + return nil case <-rs.updateSignal: // Device health changed; so send new device list @@ -304,11 +317,19 @@ func (rs *resourceServer) restart() error { if rs.grpcServer == nil { return fmt.Errorf("grpc server instance not found for %s", resourceName) } - rs.grpcServer.Stop() - rs.grpcServer = nil // Send terminate signal to ListAndWatch() - rs.termSignal <- true + rs.listAndWatchStopSignal <- true + + // wait for the terminated signal or 5 second + select { + case <-rs.listAndWatchFinishedSignal: + break + case <-time.After(terminatedSignalTimeOut): + glog.Warningf("Timed out waiting for ListAndWatch terminated signal") + } + rs.grpcServer.Stop() + rs.grpcServer = nil rs.grpcServer = grpc.NewServer() // new instance of a grpc server return rs.Start() } @@ -320,11 +341,19 @@ func (rs *resourceServer) Stop() error { return nil } // Send terminate signal to ListAndWatch() - rs.termSignal <- true + rs.listAndWatchStopSignal <- true if !rs.pluginWatch { rs.stopWatcher <- true } + // wait for the terminated signal or 5 second + select { + case <-rs.listAndWatchFinishedSignal: + break + case <-time.After(terminatedSignalTimeOut): + glog.Warningf("Timed out waiting for ListAndWatch terminated signal") + } + rs.grpcServer.Stop() rs.grpcServer = nil diff --git a/pkg/resources/server_test.go b/pkg/resources/server_test.go index 5608b7413..9522077cc 100644 --- a/pkg/resources/server_test.go +++ b/pkg/resources/server_test.go @@ -169,17 +169,21 @@ var _ = Describe("Server", func() { err := rs.Start() Expect(err).NotTo(HaveOccurred()) + // demo the listAndWatch + rs.listAndWatchFinishedSignal <- true err = rs.restart() Expect(err).NotTo(HaveOccurred()) - Eventually(rs.termSignal, time.Second*10).Should(Receive()) + Eventually(rs.listAndWatchStopSignal, time.Second*10).Should(Receive()) go func() { rp.On("CleanDeviceInfoFile", "fake").Return(nil) + // demo the listAndWatch + rs.listAndWatchFinishedSignal <- true err := rs.Stop() Expect(err).NotTo(HaveOccurred()) rp.AssertCalled(t, "CleanDeviceInfoFile", "fake") }() - Eventually(rs.termSignal, time.Second*10).Should(Receive()) + Eventually(rs.listAndWatchStopSignal, time.Second*10).Should(Receive()) Eventually(rs.stopWatcher, time.Second*10).Should(Receive()) close(done) @@ -210,11 +214,12 @@ var _ = Describe("Server", func() { go func() { rp.On("CleanDeviceInfoFile", "fake").Return(nil) + rs.listAndWatchFinishedSignal <- true err := rs.Stop() Expect(err).NotTo(HaveOccurred()) rp.AssertCalled(t, "CleanDeviceInfoFile", "fake") }() - Eventually(rs.termSignal, time.Second*10).Should(Receive()) + Eventually(rs.listAndWatchStopSignal, time.Second*10).Should(Receive()) close(done) }, 12.0) @@ -252,10 +257,11 @@ var _ = Describe("Server", func() { // sleep 1 second to let watcher perform at least a single socket-file check time.Sleep(time.Second) + rs.listAndWatchFinishedSignal <- true err = rs.Stop() Expect(err).NotTo(HaveOccurred()) - Eventually(rs.termSignal, time.Second*10).Should(Receive()) + Eventually(rs.listAndWatchStopSignal, time.Second*10).Should(Receive()) close(done) }, 12.0) @@ -459,7 +465,7 @@ var _ = Describe("Server", func() { Eventually(lwSrv.updates, time.Second*10).Should(Receive()) // finally send term signal - rs.termSignal <- true + rs.listAndWatchStopSignal <- true close(done) }, 30.0) @@ -501,7 +507,7 @@ var _ = Describe("Server", func() { Eventually(lwSrv.updates, time.Second*10).Should(Receive()) // finally send term signal - rs.termSignal <- true + rs.listAndWatchStopSignal <- true close(done) }, 30)