@@ -35,6 +35,7 @@ import (
35
35
"syscall"
36
36
"time"
37
37
38
+ "github.com/fsnotify/fsnotify"
38
39
"github.com/prometheus/client_golang/prometheus/promhttp"
39
40
"github.com/spf13/cobra"
40
41
"google.golang.org/grpc"
@@ -135,7 +136,7 @@ func (p *Proxy) run(o *options.ProxyRunOptions) error {
135
136
}
136
137
137
138
klog .V (1 ).Infoln ("Starting agent server for tunnel connections." )
138
- err = p .runAgentServer (o , server )
139
+ err = p .runAgentServer (o , server , ctx . Done () )
139
140
if err != nil {
140
141
return fmt .Errorf ("failed to run the agent server: %v" , err )
141
142
}
@@ -353,33 +354,117 @@ func (p *Proxy) runMTLSFrontendServer(ctx context.Context, o *options.ProxyRunOp
353
354
return stop , nil
354
355
}
355
356
356
- func (p * Proxy ) runAgentServer (o * options.ProxyRunOptions , server * server.ProxyServer ) error {
357
+ func (p * Proxy ) runAgentServer (o * options.ProxyRunOptions , server * server.ProxyServer , stopCh <- chan struct {} ) error {
357
358
var tlsConfig * tls.Config
358
359
var err error
359
- if tlsConfig , err = p .getTLSConfig (o .ClusterCaCert , o .ClusterCert , o .ClusterKey , o .CipherSuites ); err != nil {
360
- return err
360
+
361
+ watcher , err := fsnotify .NewWatcher ()
362
+ if err != nil {
363
+ klog .Fatal (err )
361
364
}
365
+ defer func (watcher * fsnotify.Watcher ) {
366
+ if err := watcher .Close (); err != nil {
367
+ klog .ErrorS (err , "failed to close watcher" )
368
+ return
369
+ }
370
+ }(watcher )
362
371
363
- addr := net .JoinHostPort (o .AgentBindAddress , strconv .Itoa (o .AgentPort ))
364
- agentServerOptions := []grpc.ServerOption {
365
- grpc .Creds (credentials .NewTLS (tlsConfig )),
366
- grpc .KeepaliveParams (keepalive.ServerParameters {Time : o .KeepaliveTime }),
367
- grpc .KeepaliveEnforcementPolicy (keepalive.EnforcementPolicy {
368
- MinTime : 30 * time .Second ,
369
- PermitWithoutStream : true ,
370
- }),
372
+ // Watch the certificate files
373
+ if err := watcher .Add (o .ClusterCaCert ); err != nil {
374
+ return err
371
375
}
372
- grpcServer := grpc .NewServer (agentServerOptions ... )
373
- agent .RegisterAgentServiceServer (grpcServer , server )
374
- lis , err := net .Listen ("tcp" , addr )
375
- if err != nil {
376
- return fmt .Errorf ("failed to listen on %s: %v" , addr , err )
376
+ if err := watcher .Add (o .ClusterCert ); err != nil {
377
+ return err
377
378
}
378
- labels := runpprof .Labels (
379
- "core" , "agentListener" ,
380
- "port" , strconv .FormatUint (uint64 (o .AgentPort ), 10 ),
381
- )
382
- go runpprof .Do (context .Background (), labels , func (context.Context ) { grpcServer .Serve (lis ) })
379
+ if err := watcher .Add (o .ClusterKey ); err != nil {
380
+ return err
381
+ }
382
+
383
+ reload := make (chan bool )
384
+ restartServer := make (chan bool )
385
+
386
+ // Goroutine to watch for file changes
387
+ go func () {
388
+ for {
389
+ select {
390
+ case event , ok := <- watcher .Events :
391
+ if ! ok {
392
+ return
393
+ }
394
+ if event .Op & fsnotify .Write == fsnotify .Write {
395
+ reload <- true
396
+ }
397
+ case err , ok := <- watcher .Errors :
398
+ if ! ok {
399
+ return
400
+ }
401
+ klog .ErrorS (err , "failed to watch for file changes" )
402
+ case <- stopCh :
403
+ // Handle graceful shutdown
404
+ return
405
+ }
406
+ }
407
+ }()
408
+
409
+ addr := net .JoinHostPort (o .AgentBindAddress , strconv .Itoa (o .AgentPort ))
410
+
411
+ go func () {
412
+ var grpcServer * grpc.Server
413
+ var lis net.Listener
414
+
415
+ for {
416
+ select {
417
+ case <- reload :
418
+ if tlsConfig , err = p .getTLSConfig (o .ClusterCaCert , o .ClusterCert , o .ClusterKey , o .CipherSuites ); err != nil {
419
+ klog .ErrorS (err , "Failed to reload TLS config:" )
420
+ }
421
+ klog .V (1 ).Info ("TLS config reloaded" )
422
+ restartServer <- true
423
+
424
+ case <- restartServer :
425
+ if grpcServer != nil {
426
+ grpcServer .GracefulStop ()
427
+ klog .V (1 ).Info ("gRPC server stopped" )
428
+ }
429
+
430
+ agentServerOptions := []grpc.ServerOption {
431
+ grpc .Creds (credentials .NewTLS (tlsConfig )),
432
+ grpc .KeepaliveParams (keepalive.ServerParameters {Time : o .KeepaliveTime }),
433
+ grpc .KeepaliveEnforcementPolicy (keepalive.EnforcementPolicy {
434
+ MinTime : 30 * time .Second ,
435
+ PermitWithoutStream : true ,
436
+ }),
437
+ }
438
+ grpcServer = grpc .NewServer (agentServerOptions ... )
439
+ agent .RegisterAgentServiceServer (grpcServer , server )
440
+
441
+ lis , err = net .Listen ("tcp" , addr )
442
+ if err != nil {
443
+ klog .Fatalf ("failed to listen on %s: %v" , addr , err )
444
+ }
445
+
446
+ go func () {
447
+ if err := grpcServer .Serve (lis ); err != nil {
448
+ klog .Fatalf ("failed to serve: %v" , err )
449
+ }
450
+ }()
451
+
452
+ klog .V (1 ).Info ("gRPC server restarted" )
453
+ case <- stopCh :
454
+ // Handle graceful shutdown
455
+ if grpcServer != nil {
456
+ grpcServer .GracefulStop ()
457
+ }
458
+ if lis != nil {
459
+ lis .Close ()
460
+ }
461
+ return
462
+ }
463
+ }
464
+ }()
465
+
466
+ // Initial restart to start the server
467
+ restartServer <- true
383
468
384
469
return nil
385
470
}
0 commit comments