Skip to content

Commit

Permalink
clusterLivenesscheck #17
Browse files Browse the repository at this point in the history
  • Loading branch information
alitari committed Jun 10, 2018
1 parent fa6adfc commit b7ed7eb
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 15 deletions.
57 changes: 42 additions & 15 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,19 @@ type watchType struct {
}

type backendType struct {
cfg *configType
context contextType
resItems map[string][]interface{}
watches map[string]*watchType
sorter sorterType
restExecutor func(httpMethod, url, body string, timout time.Duration) (*http.Response, error)
cfg *configType
context contextType
resItems map[string][]interface{}
watches map[string]*watchType
sorter sorterType
restExecutor func(httpMethod, url, body string, timout int) (*http.Response, error)
clusterLivenessCheck <-chan time.Time
clusterLivenessDone chan bool
}

func newBackend(cfg *configType, context contextType) *backendType {
return &backendType{cfg: cfg, context: context,
restExecutor: func(httpMethod, url, body string, timeout time.Duration) (*http.Response, error) {
restExecutor: func(httpMethod, url, body string, timeout int) (*http.Response, error) {
tracelog.Printf("rest call: %s %s", httpMethod, url)
if body != "" {
tracelog.Printf("body: '%s'", body)
Expand All @@ -138,7 +140,7 @@ func newBackend(cfg *configType, context contextType) *backendType {
},
}
if timeout > 0 {
client.Timeout = timeout * time.Second
client.Timeout = time.Duration(timeout) * time.Second
}
req, err := http.NewRequest(httpMethod, url, strings.NewReader(body))
if err != nil {
Expand All @@ -156,7 +158,9 @@ func newBackend(cfg *configType, context contextType) *backendType {
return response, err
},

sorter: &nameSorterType{ascending: true},
sorter: &nameSorterType{ascending: true},
clusterLivenessCheck: time.NewTicker(time.Duration(5) * time.Second).C,
clusterLivenessDone: make(chan bool),
}
}

Expand Down Expand Up @@ -199,6 +203,27 @@ func (b *backendType) createWatches() error {
}
}
}
go func() {
for {
select {
case <-b.clusterLivenessCheck:
err := b.availabiltyCheck()
if err != nil {
for _, w := range b.watches {
w.online = false
}
errorlog.Printf("cluster heart beat no ok: %v", err)
updateResource(false)
} else {
tracelog.Printf("cluster heart beat ok")
}

case <-b.clusterLivenessDone:
infolog.Printf("clusterHeartbeatDoneDone")
return
}
}
}()

return nil
}
Expand All @@ -211,6 +236,7 @@ func (b *backendType) closeWatches() {
(*v.reader).Close()
}
}
b.clusterLivenessDone <- true
}

func (b *backendType) delete(ns string, resource resourceType, resourceItem string, noGracePeriod bool) (interface{}, error) {
Expand Down Expand Up @@ -264,7 +290,7 @@ func (b *backendType) scale(ns string, resource resourceType, deploymentName str

func (b *backendType) handleResponse(httpMethod, url, reqBody string, resp *http.Response, err error) (string, error) {
if err != nil {
mes := fmt.Sprintf("\nError calling '%s %s %s', error: %s", httpMethod, url, reqBody, err)
mes := fmt.Sprintf("\nError calling '%s %s %s'\ndetails: %s", httpMethod, url, reqBody, err)
errorlog.Print(mes)
return mes, err
}
Expand All @@ -291,32 +317,32 @@ func (b *backendType) handleResponse(httpMethod, url, reqBody string, resp *http

func (b *backendType) availabiltyCheck() error {
url := fmt.Sprintf("%s/%s/%s", b.context.Cluster.URL, "api/v1", "nodes")
resp, err := b.restExecutor(http.MethodGet, url, "", 3)
resp, err := b.restExecutor(http.MethodGet, url, "", restCallTimeout)
_, err = b.handleResponse(http.MethodGet, url, "", resp, err)
return err
}

func (b *backendType) restCallAll(apiPrefix, ress string, body string) (string, error) {
url := fmt.Sprintf("%s/%s/%s", b.context.Cluster.URL, apiPrefix, ress)
resp, err := b.restExecutor(http.MethodGet, url, body, -1)
resp, err := b.restExecutor(http.MethodGet, url, body, restCallTimeout)
return b.handleResponse(http.MethodGet, url, body, resp, err)
}

func (b *backendType) restCall(httpMethod, apiPrefix, ress, ns string, body string) (string, error) {
url := fmt.Sprintf("%s/%s/namespaces/%s/%s", b.context.Cluster.URL, apiPrefix, ns, ress)
resp, err := b.restExecutor(httpMethod, url, body, -1)
resp, err := b.restExecutor(httpMethod, url, body, restCallTimeout)
return b.handleResponse(httpMethod, url, body, resp, err)
}

func (b *backendType) restCallNoNs(httpMethod, apiPrefix, ress string, body string) (string, error) {
url := fmt.Sprintf("%s/%s/%s", b.context.Cluster.URL, apiPrefix, ress)
resp, err := b.restExecutor(httpMethod, url, body, -1)
resp, err := b.restExecutor(httpMethod, url, body, restCallTimeout)
return b.handleResponse(httpMethod, url, body, resp, err)
}

func (b *backendType) restCallBatch(httpMethod, ress string, ns string, body string) (string, error) {
url := fmt.Sprintf("%s/apis/batch/v1/namespaces/%s/%s", b.context.Cluster.URL, ns, ress)
resp, err := b.restExecutor(httpMethod, url, body, -1)
resp, err := b.restExecutor(httpMethod, url, body, restCallTimeout)
return b.handleResponse(httpMethod, url, body, resp, err)
}

Expand Down Expand Up @@ -354,6 +380,7 @@ func (b *backendType) watch(apiPrefix string, resName string) error {
}
}
}()

b.watches[resName] = &watchType{reader: body, online: false}
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ var portforwardProxies = map[string][]*portforwardProxy{}

var portforwardStartPort int
var currentPortforwardPort int
var restCallTimeout int
var clusterLivenessPeriod int

var clusterList *nlist
var clusterResourcesWidget *textWidget
Expand Down Expand Up @@ -334,6 +336,9 @@ func parseFlags() {
logLevel = flag.String("logLevel", "info", "verbosity of log output. Values: 'trace','info','warn','error'")
logFilePath = flag.String("logFile", "./kubexp.log", "fullpath to log file, set empty ( -logFile='') if no logfile should be used")
flag.IntVar(&portforwardStartPort, "portForwardStartPort", 32100, "start of portforward range")
flag.IntVar(&restCallTimeout, "restCallTimeout", 3, "time out for rest calls in seconds")
flag.IntVar(&clusterLivenessPeriod, "clusterLivenessPeriod", 5, "cluster liveness check period in seconds")

flag.Parse()
}

Expand Down

0 comments on commit b7ed7eb

Please sign in to comment.