Skip to content

Commit 82d5583

Browse files
committed
add diagnostic api http server
1 parent 51fdfb9 commit 82d5583

File tree

3 files changed

+224
-1
lines changed

3 files changed

+224
-1
lines changed

pkg/apiserver/apiserver.go

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
package apiserver
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"net/http"
8+
"net/http/pprof"
9+
"regexp"
10+
"strconv"
11+
"sync"
12+
"time"
13+
14+
"github.com/Sirupsen/logrus"
15+
16+
"github.com/zalando-incubator/postgres-operator/pkg/spec"
17+
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
18+
)
19+
20+
const (
21+
httpAPITimeout = time.Minute * 1
22+
shutdownTimeout = time.Second * 10
23+
httpReadTimeout = time.Millisecond * 100
24+
)
25+
26+
// ControllerInformer describes stats methods of a controller
27+
type ControllerInformer interface {
28+
GetConfig() *spec.ControllerConfig
29+
GetOperatorConfig() *config.Config
30+
GetStatus() *spec.ControllerStatus
31+
TeamClusterList() map[string][]spec.NamespacedName
32+
ClusterStatus(team, cluster string) (*spec.ClusterStatus, error)
33+
ClusterLogs(team, cluster string) ([]*spec.LogEntry, error)
34+
WorkerLogs(workerID uint32) ([]*spec.LogEntry, error)
35+
ListQueue(workerID uint32) (*spec.QueueDump, error)
36+
GetWorkersCnt() uint32
37+
}
38+
39+
// Server describes HTTP API server
40+
type Server struct {
41+
logger *logrus.Entry
42+
http http.Server
43+
controller ControllerInformer
44+
}
45+
46+
var (
47+
clusterStatusURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9]*)/?$`)
48+
clusterLogsURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9]*)/logs/?$`)
49+
teamURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/?$`)
50+
workerLogsURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/logs/?$`)
51+
workerEventsQueueURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/queue/?$`)
52+
workerAllQueue = regexp.MustCompile(`^/workers/all/queue/?$`)
53+
clustersURL = "/clusters/"
54+
)
55+
56+
// New creates new HTTP API server
57+
func New(controller ControllerInformer, port int, logger *logrus.Logger) *Server {
58+
s := &Server{
59+
logger: logger.WithField("pkg", "apiserver"),
60+
controller: controller,
61+
}
62+
mux := http.NewServeMux()
63+
64+
mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
65+
mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
66+
mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
67+
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
68+
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
69+
70+
mux.Handle("/status/", http.HandlerFunc(s.controllerStatus))
71+
mux.Handle("/config/", http.HandlerFunc(s.operatorConfig))
72+
73+
mux.HandleFunc("/clusters/", s.clusters)
74+
mux.HandleFunc("/workers/", s.workers)
75+
76+
s.http = http.Server{
77+
Addr: fmt.Sprintf(":%d", port),
78+
Handler: http.TimeoutHandler(mux, httpAPITimeout, ""),
79+
ReadTimeout: httpReadTimeout,
80+
}
81+
82+
return s
83+
}
84+
85+
// Run starts the HTTP server
86+
func (s *Server) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
87+
defer wg.Done()
88+
89+
go func() {
90+
err := s.http.ListenAndServe()
91+
if err != http.ErrServerClosed {
92+
s.logger.Fatalf("Could not start http server: %v", err)
93+
}
94+
}()
95+
s.logger.Infof("Listening on %s", s.http.Addr)
96+
97+
<-stopCh
98+
99+
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
100+
defer cancel()
101+
err := s.http.Shutdown(ctx)
102+
if err == context.DeadlineExceeded {
103+
s.logger.Warnf("Shutdown timeout exceeded. closing http server")
104+
s.http.Close()
105+
} else if err != nil {
106+
s.logger.Errorf("Could not shutdown http server: %v", err)
107+
}
108+
s.logger.Infoln("Http server shut down")
109+
}
110+
111+
func (s *Server) respond(obj interface{}, err error, w http.ResponseWriter) {
112+
w.Header().Set("Content-Type", "application/json")
113+
if err != nil {
114+
w.WriteHeader(http.StatusInternalServerError)
115+
json.NewEncoder(w).Encode(map[string]interface{}{"error": err.Error()})
116+
return
117+
}
118+
119+
err = json.NewEncoder(w).Encode(obj)
120+
if err != nil {
121+
w.WriteHeader(http.StatusInternalServerError)
122+
s.logger.Errorf("Could not encode: %v", err)
123+
}
124+
}
125+
126+
func (s *Server) controllerStatus(w http.ResponseWriter, req *http.Request) {
127+
s.respond(s.controller.GetStatus(), nil, w)
128+
}
129+
130+
func (s *Server) operatorConfig(w http.ResponseWriter, req *http.Request) {
131+
s.respond(map[string]interface{}{
132+
"controller": s.controller.GetConfig(),
133+
"operator": s.controller.GetOperatorConfig(),
134+
}, nil, w)
135+
}
136+
137+
func (s *Server) clusters(w http.ResponseWriter, req *http.Request) {
138+
var (
139+
resp interface{}
140+
err error
141+
)
142+
143+
if matches := clusterStatusURL.FindAllStringSubmatch(req.URL.Path, -1); matches != nil {
144+
resp, err = s.controller.ClusterStatus(matches[0][1], matches[0][2])
145+
} else if matches := teamURL.FindAllStringSubmatch(req.URL.Path, -1); matches != nil {
146+
teamClusters := s.controller.TeamClusterList()
147+
clusters, found := teamClusters[matches[0][1]]
148+
if !found {
149+
s.respond(nil, fmt.Errorf("could not find clusters for the team"), w)
150+
}
151+
152+
clusterNames := make([]string, 0)
153+
for _, cluster := range clusters {
154+
clusterNames = append(clusterNames, cluster.Name[len(matches[0][1])+1:])
155+
}
156+
157+
s.respond(clusterNames, nil, w)
158+
return
159+
} else if matches := clusterLogsURL.FindAllStringSubmatch(req.URL.Path, -1); matches != nil {
160+
resp, err = s.controller.ClusterLogs(matches[0][1], matches[0][2])
161+
} else if req.URL.Path == clustersURL {
162+
res := make(map[string][]string)
163+
for team, clusters := range s.controller.TeamClusterList() {
164+
for _, cluster := range clusters {
165+
res[team] = append(res[team], cluster.Name[len(team)+1:])
166+
}
167+
}
168+
169+
s.respond(res, nil, w)
170+
return
171+
} else {
172+
s.respond(nil, fmt.Errorf("page not found"), w)
173+
return
174+
}
175+
176+
s.respond(resp, err, w)
177+
}
178+
179+
func (s *Server) workers(w http.ResponseWriter, req *http.Request) {
180+
var (
181+
resp interface{}
182+
err error
183+
)
184+
if workerAllQueue.MatchString(req.URL.Path) {
185+
s.allQueues(w, req)
186+
return
187+
} else if matches := workerLogsURL.FindAllStringSubmatch(req.URL.Path, -1); matches != nil {
188+
workerID, _ := strconv.Atoi(matches[0][1])
189+
190+
resp, err = s.controller.WorkerLogs(uint32(workerID))
191+
} else if matches := workerEventsQueueURL.FindAllStringSubmatch(req.URL.Path, -1); matches != nil {
192+
workerID, _ := strconv.Atoi(matches[0][1])
193+
194+
resp, err = s.controller.ListQueue(uint32(workerID))
195+
} else {
196+
s.respond(nil, fmt.Errorf("page not found"), w)
197+
return
198+
}
199+
200+
s.respond(resp, err, w)
201+
}
202+
203+
func (s *Server) allQueues(w http.ResponseWriter, r *http.Request) {
204+
workersCnt := s.controller.GetWorkersCnt()
205+
resp := make(map[uint32]*spec.QueueDump, workersCnt)
206+
for i := uint32(0); i < workersCnt; i++ {
207+
queueDump, err := s.controller.ListQueue(i)
208+
if err != nil {
209+
s.respond(nil, err, w)
210+
return
211+
}
212+
213+
resp[i] = queueDump
214+
}
215+
216+
s.respond(resp, nil, w)
217+
}

pkg/controller/controller.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"k8s.io/client-go/rest"
1111
"k8s.io/client-go/tools/cache"
1212

13+
"github.com/zalando-incubator/postgres-operator/pkg/apiserver"
1314
"github.com/zalando-incubator/postgres-operator/pkg/cluster"
1415
"github.com/zalando-incubator/postgres-operator/pkg/spec"
1516
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
@@ -26,6 +27,7 @@ type Controller struct {
2627
logger *logrus.Entry
2728
KubeClient k8sutil.KubernetesClient
2829
RestClient rest.Interface // kubernetes API group REST client
30+
apiserver *apiserver.Server
2931

3032
stopCh chan struct{}
3133

@@ -170,16 +172,19 @@ func (c *Controller) initController() {
170172
return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil
171173
})
172174
}
175+
176+
c.apiserver = apiserver.New(c, c.opConfig.APIPort, c.logger.Logger)
173177
}
174178

175179
// Run starts background controller processes
176180
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
177181
c.initController()
178182

179-
wg.Add(3)
183+
wg.Add(4)
180184
go c.runPodInformer(stopCh, wg)
181185
go c.runPostgresqlInformer(stopCh, wg)
182186
go c.clusterResync(stopCh, wg)
187+
go c.apiserver.Run(stopCh, wg)
183188

184189
for i := range c.clusterEventQueues {
185190
wg.Add(1)

pkg/util/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type Config struct {
6161
MasterDNSNameFormat stringTemplate `name:"master_dns_name_format" default:"{cluster}.{team}.{hostedzone}"`
6262
ReplicaDNSNameFormat stringTemplate `name:"replica_dns_name_format" default:"{cluster}-repl.{team}.{hostedzone}"`
6363
Workers uint32 `name:"workers" default:"4"`
64+
APIPort int `name:"api_port" default:"8080"`
6465
RingLogLines int `name:"ring_log_lines" default:"100"`
6566
}
6667

0 commit comments

Comments
 (0)