From b81167bb2702a3f396d1d20447ebf9049a0b8bea Mon Sep 17 00:00:00 2001 From: "xiayu.lyt" Date: Fri, 2 Feb 2024 11:22:22 +0800 Subject: [PATCH] fix(exporter): fix agent reconnect Signed-off-by: xiayu.lyt --- pkg/exporter/task-agent/agent.go | 43 ++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/pkg/exporter/task-agent/agent.go b/pkg/exporter/task-agent/agent.go index 24f0753e..ee109b8b 100644 --- a/pkg/exporter/task-agent/agent.go +++ b/pkg/exporter/task-agent/agent.go @@ -2,7 +2,6 @@ package taskagent import ( "context" - "fmt" "os" "time" @@ -32,26 +31,21 @@ type Agent struct { func (a *Agent) Run() error { var opts []grpc.CallOption opts = append(opts, grpc.MaxCallSendMsgSize(102*1024*1024)) - conn, err := grpc.Dial(controllerAddr, grpc.WithDefaultCallOptions(opts...), - grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return fmt.Errorf("failed to connect: %v", err) - } - a.grpcClient = rpc.NewControllerRegisterServiceClient(conn) - watchClient, err := a.grpcClient.WatchTasks(context.TODO(), &rpc.TaskFilter{ - NodeName: a.NodeName, - Type: []rpc.TaskType{rpc.TaskType_Capture, rpc.TaskType_Ping}, - }) - if err != nil { - return fmt.Errorf("failed to watch tasks: %v", err) - } - reconn := func() { + var conn *grpc.ClientConn + var watchClient rpc.ControllerRegisterService_WatchTasksClient + + reconn := func() error { time.Sleep(1 * time.Second) + log.Infof("connecting to controller.") + if conn != nil { + _ = conn.Close() + } + var err error conn, err = grpc.Dial(controllerAddr, grpc.WithDefaultCallOptions(opts...), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Errorf("failed to connect: %v", err) - return + return err } a.grpcClient = rpc.NewControllerRegisterServiceClient(conn) watchClient, err = a.grpcClient.WatchTasks(context.TODO(), &rpc.TaskFilter{ @@ -60,22 +54,33 @@ func (a *Agent) Run() error { }) if err != nil { log.Errorf("failed to watch: %v", err) + return err } + log.Infof("controller connected.") + return nil + } + + err := reconn() + if err != nil { + return err } go func() { - defer conn.Close() for { + if watchClient == nil { + _ = reconn() + continue + } select { case <-watchClient.Context().Done(): log.Errorf("watch client closed") - reconn() + _ = reconn() continue default: task, err := watchClient.Recv() if err != nil { log.Errorf("failed to receive task: %v", err) - reconn() + _ = reconn() continue } err = a.ProcessTasks(task)