Skip to content

Commit

Permalink
add healthcheck and refactor stats
Browse files Browse the repository at this point in the history
  • Loading branch information
riccardobl committed Dec 27, 2024
1 parent 5c75cdf commit 2b98789
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 37 deletions.
42 changes: 42 additions & 0 deletions lncd/healthcheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"encoding/json"
"errors"
"net/http"
)

type HealthStatus struct {
Status string `json:"status"`
Stats Stats
Message string `json:"message"`

}

func healthCheckHandler(w http.ResponseWriter, r *http.Request) {
var stats *Stats = getStats()
var err error = nil
if stats == nil {
err = errors.New("starting")
}

if err == nil {
w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(HealthStatus{
Status: "OK",
Stats: *stats,
Message: "",
})
}

if err != nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(HealthStatus{
Status: "FAIL",
Stats: Stats{},
Message: err.Error(),
})

}
}
72 changes: 35 additions & 37 deletions lncd/lncd.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ type RpcRequest struct {
type RpcResponse struct {
Connection ConnectionInfo
Result string
err error
errCode int
}

func NewConnectionPool() *ConnectionPool {
Expand Down Expand Up @@ -305,52 +307,68 @@ func (pool *ConnectionPool) execute(info ConnectionInfo, req Action) {
}
}

func writeJSONError(w http.ResponseWriter, message string, statusCode int) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
json.NewEncoder(w).Encode(map[string]string{"error": message})
}

func rpcHandler(pool *ConnectionPool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var request RpcRequest
defer r.Body.Close()

if r.Method != http.MethodPost {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
writeJSONError(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var request RpcRequest

if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
writeJSONError(w, err.Error(), http.StatusBadRequest)
return
}
defer r.Body.Close()

done := make(chan struct{})

log.Infof("Incoming RPC request: %v", request.Method)
if UNSAFE_LOGS {
log.Debugf("Full request: %v", request)
}

var waitResponse chan RpcResponse = make(chan RpcResponse)

var response RpcResponse
pool.execute(request.Connection, Action{
method: request.Method,
payload: request.Payload,
onError: func(err error) {
log.Errorf("RPC error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
close(done)
waitResponse <- RpcResponse{err: err, errCode: http.StatusInternalServerError}
close(waitResponse)
},
onResponse: func(info ConnectionInfo, result string) {
log.Debugf("RPC response: %v", result)
if UNSAFE_LOGS {
log.Debugf("Connection: %v", info)
}
response = RpcResponse{
waitResponse <- RpcResponse{
Connection: info,
Result: result,
err: nil,
errCode: http.StatusOK,
}
close(done)
close(waitResponse)
},
})

<-done
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(response); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
var resp RpcResponse = <-waitResponse
if resp.err == nil {
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(resp); err != nil {
resp.err = err
resp.errCode = http.StatusInternalServerError
}
}

if resp.err != nil {
writeJSONError(w, resp.err.Error(), resp.errCode)
}
}
}
Expand Down Expand Up @@ -420,27 +438,6 @@ func parseKeys(localPrivKey, remotePubKey string) (



func stats(pool *ConnectionPool) {
ticker := time.NewTicker(LNCD_STATS_INTERVAL)
go func() {
for range ticker.C {
pool.mutex.Lock()
numConnections := len(pool.connections)
log.Infof("Number of active connections: %d", numConnections)

index := 0
for _, conn := range pool.connections {
log.Infof("Connection %d", index)
pendingActions := len(conn.actions)
log.Infof(" Pending actions: %d", pendingActions)
log.Infof(" Connection status: %v", conn.connInfo.Status)
index++
}
pool.mutex.Unlock()
}
}()
}


func main() {
shutdownInterceptor, err := signal.Intercept()
Expand All @@ -463,10 +460,11 @@ func main() {
}

var pool *ConnectionPool = NewConnectionPool()
stats(pool)
startStatsLoop(pool)

http.HandleFunc("/rpc", rpcHandler(pool))
http.HandleFunc("/", formHandler)
http.HandleFunc("/health", healthCheckHandler)

log.Infof("Server started at "+LNCD_RECEIVER_HOST+":" + LNCD_RECEIVER_PORT)
if err := http.ListenAndServe(LNCD_RECEIVER_HOST+":"+LNCD_RECEIVER_PORT, nil); err != nil {
Expand Down
74 changes: 74 additions & 0 deletions lncd/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package main

import (
"fmt"
"time"
)

var (
lastStats *Stats = nil
)

type ConnectionStats struct {
NumPendingActions int
Status string
}

type Stats struct {
NumConnections int
Connections []ConnectionStats
}

func refreshStats(pool *ConnectionPool, stats *Stats) *Stats {
pool.mutex.Lock()
defer pool.mutex.Unlock()

if stats == nil {
stats = &Stats{
NumConnections: 0,
Connections: nil,
}
}

stats.NumConnections = len(pool.connections)
if stats.Connections == nil || len(stats.Connections) != len(pool.connections) {
stats.Connections = make([]ConnectionStats, len(pool.connections))
}

var i int
for _, conn := range pool.connections {
stats.Connections[i] = ConnectionStats{
NumPendingActions: len(conn.actions),
Status: conn.connInfo.Status,
}
i++
}

return stats
}

func getStats() *Stats {
return lastStats
}

func startStatsLoop(pool *ConnectionPool) {
ticker := time.NewTicker(LNCD_STATS_INTERVAL)
go func() {
for range ticker.C {
lastStats = refreshStats(pool, lastStats)

if lastStats != nil {
var statsString string = ""
statsString += fmt.Sprintf("Active connections: %d\n", lastStats.NumConnections)
for i, conn := range lastStats.Connections {
statsString += fmt.Sprintf(" Connection id: %d\n", i)
statsString += fmt.Sprintf(" Pending actions: %d\n", conn.NumPendingActions)
statsString += fmt.Sprintf(" Status: %s", conn.Status)
}
log.Debugf("Stats:\n %s", statsString)
}
}
}()
}


0 comments on commit 2b98789

Please sign in to comment.