Skip to content

Commit

Permalink
fix(exporter): fix agent reconnect
Browse files Browse the repository at this point in the history
Signed-off-by: xiayu.lyt <[email protected]>
  • Loading branch information
Lyt99 committed Feb 2, 2024
1 parent 4f7a11d commit b81167b
Showing 1 changed file with 24 additions and 19 deletions.
43 changes: 24 additions & 19 deletions pkg/exporter/task-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package taskagent

import (
"context"
"fmt"
"os"
"time"

Expand Down Expand Up @@ -32,26 +31,21 @@ type Agent struct {
func (a *Agent) Run() error {
var opts []grpc.CallOption
opts = append(opts, grpc.MaxCallSendMsgSize(102*1024*1024))
conn, err := grpc.Dial(controllerAddr, grpc.WithDefaultCallOptions(opts...),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("failed to connect: %v", err)
}
a.grpcClient = rpc.NewControllerRegisterServiceClient(conn)
watchClient, err := a.grpcClient.WatchTasks(context.TODO(), &rpc.TaskFilter{
NodeName: a.NodeName,
Type: []rpc.TaskType{rpc.TaskType_Capture, rpc.TaskType_Ping},
})
if err != nil {
return fmt.Errorf("failed to watch tasks: %v", err)
}
reconn := func() {
var conn *grpc.ClientConn
var watchClient rpc.ControllerRegisterService_WatchTasksClient

reconn := func() error {
time.Sleep(1 * time.Second)
log.Infof("connecting to controller.")
if conn != nil {
_ = conn.Close()
}
var err error
conn, err = grpc.Dial(controllerAddr, grpc.WithDefaultCallOptions(opts...),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Errorf("failed to connect: %v", err)
return
return err
}
a.grpcClient = rpc.NewControllerRegisterServiceClient(conn)
watchClient, err = a.grpcClient.WatchTasks(context.TODO(), &rpc.TaskFilter{
Expand All @@ -60,22 +54,33 @@ func (a *Agent) Run() error {
})
if err != nil {
log.Errorf("failed to watch: %v", err)
return err
}
log.Infof("controller connected.")
return nil
}

err := reconn()
if err != nil {
return err
}

go func() {
defer conn.Close()
for {
if watchClient == nil {
_ = reconn()
continue
}
select {
case <-watchClient.Context().Done():
log.Errorf("watch client closed")
reconn()
_ = reconn()
continue
default:
task, err := watchClient.Recv()
if err != nil {
log.Errorf("failed to receive task: %v", err)
reconn()
_ = reconn()
continue
}
err = a.ProcessTasks(task)
Expand Down

0 comments on commit b81167b

Please sign in to comment.