Skip to content

Commit 5a7a3fe

Browse files
authored
Diagnostic API
2 parents 9c7492f + 38e0ffe commit 5a7a3fe

File tree

12 files changed

+635
-32
lines changed

12 files changed

+635
-32
lines changed

manifests/configmap.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,5 @@ data:
2929
teams_api_url: http://fake-teams-api.default.svc.cluster.local
3030
workers: "4"
3131
enable_load_balancer: "true"
32+
api_port: "8080"
33+
ring_log_lines: "100"

pkg/apiserver/apiserver.go

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
package apiserver
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"net/http"
8+
"net/http/pprof"
9+
"regexp"
10+
"strconv"
11+
"sync"
12+
"time"
13+
14+
"github.com/Sirupsen/logrus"
15+
16+
"github.com/zalando-incubator/postgres-operator/pkg/spec"
17+
"github.com/zalando-incubator/postgres-operator/pkg/util"
18+
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
19+
)
20+
21+
const (
22+
httpAPITimeout = time.Minute * 1
23+
shutdownTimeout = time.Second * 10
24+
httpReadTimeout = time.Millisecond * 100
25+
)
26+
27+
// ControllerInformer describes stats methods of a controller
28+
type controllerInformer interface {
29+
GetConfig() *spec.ControllerConfig
30+
GetOperatorConfig() *config.Config
31+
GetStatus() *spec.ControllerStatus
32+
TeamClusterList() map[string][]spec.NamespacedName
33+
ClusterStatus(team, cluster string) (*spec.ClusterStatus, error)
34+
ClusterLogs(team, cluster string) ([]*spec.LogEntry, error)
35+
WorkerLogs(workerID uint32) ([]*spec.LogEntry, error)
36+
ListQueue(workerID uint32) (*spec.QueueDump, error)
37+
GetWorkersCnt() uint32
38+
}
39+
40+
// Server describes HTTP API server
41+
type Server struct {
42+
logger *logrus.Entry
43+
http http.Server
44+
controller controllerInformer
45+
}
46+
47+
var (
48+
clusterStatusURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9]*)/?$`)
49+
clusterLogsURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/(?P<cluster>[a-zA-Z][a-zA-Z0-9]*)/logs/?$`)
50+
teamURL = regexp.MustCompile(`^/clusters/(?P<team>[a-zA-Z][a-zA-Z0-9]*)/?$`)
51+
workerLogsURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/logs/?$`)
52+
workerEventsQueueURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/queue/?$`)
53+
workerAllQueue = regexp.MustCompile(`^/workers/all/queue/?$`)
54+
clustersURL = "/clusters/"
55+
)
56+
57+
// New creates new HTTP API server
58+
func New(controller controllerInformer, port int, logger *logrus.Logger) *Server {
59+
s := &Server{
60+
logger: logger.WithField("pkg", "apiserver"),
61+
controller: controller,
62+
}
63+
mux := http.NewServeMux()
64+
65+
mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
66+
mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
67+
mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
68+
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
69+
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
70+
71+
mux.Handle("/status/", http.HandlerFunc(s.controllerStatus))
72+
mux.Handle("/config/", http.HandlerFunc(s.operatorConfig))
73+
74+
mux.HandleFunc("/clusters/", s.clusters)
75+
mux.HandleFunc("/workers/", s.workers)
76+
77+
s.http = http.Server{
78+
Addr: fmt.Sprintf(":%d", port),
79+
Handler: http.TimeoutHandler(mux, httpAPITimeout, ""),
80+
ReadTimeout: httpReadTimeout,
81+
}
82+
83+
return s
84+
}
85+
86+
// Run starts the HTTP server
87+
func (s *Server) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
88+
defer wg.Done()
89+
90+
go func() {
91+
err := s.http.ListenAndServe()
92+
if err != http.ErrServerClosed {
93+
s.logger.Fatalf("Could not start http server: %v", err)
94+
}
95+
}()
96+
s.logger.Infof("Listening on %s", s.http.Addr)
97+
98+
<-stopCh
99+
100+
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
101+
defer cancel()
102+
err := s.http.Shutdown(ctx)
103+
if err == context.DeadlineExceeded {
104+
s.logger.Warnf("Shutdown timeout exceeded. closing http server")
105+
s.http.Close()
106+
} else if err != nil {
107+
s.logger.Errorf("Could not shutdown http server: %v", err)
108+
}
109+
s.logger.Infoln("Http server shut down")
110+
}
111+
112+
func (s *Server) respond(obj interface{}, err error, w http.ResponseWriter) {
113+
w.Header().Set("Content-Type", "application/json")
114+
if err != nil {
115+
w.WriteHeader(http.StatusInternalServerError)
116+
json.NewEncoder(w).Encode(map[string]interface{}{"error": err.Error()})
117+
return
118+
}
119+
120+
err = json.NewEncoder(w).Encode(obj)
121+
if err != nil {
122+
w.WriteHeader(http.StatusInternalServerError)
123+
s.logger.Errorf("Could not encode: %v", err)
124+
}
125+
}
126+
127+
func (s *Server) controllerStatus(w http.ResponseWriter, req *http.Request) {
128+
s.respond(s.controller.GetStatus(), nil, w)
129+
}
130+
131+
func (s *Server) operatorConfig(w http.ResponseWriter, req *http.Request) {
132+
s.respond(map[string]interface{}{
133+
"controller": s.controller.GetConfig(),
134+
"operator": s.controller.GetOperatorConfig(),
135+
}, nil, w)
136+
}
137+
138+
func (s *Server) clusters(w http.ResponseWriter, req *http.Request) {
139+
var (
140+
resp interface{}
141+
err error
142+
)
143+
144+
if matches := util.FindNamedStringSubmatch(clusterStatusURL, req.URL.Path); matches != nil {
145+
resp, err = s.controller.ClusterStatus(matches["team"], matches["cluster"])
146+
} else if matches := util.FindNamedStringSubmatch(teamURL, req.URL.Path); matches != nil {
147+
teamClusters := s.controller.TeamClusterList()
148+
clusters, found := teamClusters[matches["team"]]
149+
if !found {
150+
s.respond(nil, fmt.Errorf("could not find clusters for the team"), w)
151+
}
152+
153+
clusterNames := make([]string, 0)
154+
for _, cluster := range clusters {
155+
clusterNames = append(clusterNames, cluster.Name[len(matches["team"])+1:])
156+
}
157+
158+
s.respond(clusterNames, nil, w)
159+
return
160+
} else if matches := util.FindNamedStringSubmatch(clusterLogsURL, req.URL.Path); matches != nil {
161+
resp, err = s.controller.ClusterLogs(matches["team"], matches["cluster"])
162+
} else if req.URL.Path == clustersURL {
163+
res := make(map[string][]string)
164+
for team, clusters := range s.controller.TeamClusterList() {
165+
for _, cluster := range clusters {
166+
res[team] = append(res[team], cluster.Name[len(team)+1:])
167+
}
168+
}
169+
170+
s.respond(res, nil, w)
171+
return
172+
} else {
173+
s.respond(nil, fmt.Errorf("page not found"), w)
174+
return
175+
}
176+
177+
s.respond(resp, err, w)
178+
}
179+
180+
func (s *Server) workers(w http.ResponseWriter, req *http.Request) {
181+
var (
182+
resp interface{}
183+
err error
184+
)
185+
186+
if workerAllQueue.MatchString(req.URL.Path) {
187+
s.allQueues(w, req)
188+
return
189+
} else if matches := util.FindNamedStringSubmatch(workerLogsURL, req.URL.Path); matches != nil {
190+
workerID, _ := strconv.Atoi(matches["id"])
191+
192+
resp, err = s.controller.WorkerLogs(uint32(workerID))
193+
} else if matches := util.FindNamedStringSubmatch(workerEventsQueueURL, req.URL.Path); matches != nil {
194+
workerID, _ := strconv.Atoi(matches["id"])
195+
196+
resp, err = s.controller.ListQueue(uint32(workerID))
197+
} else {
198+
s.respond(nil, fmt.Errorf("page not found"), w)
199+
return
200+
}
201+
202+
s.respond(resp, err, w)
203+
}
204+
205+
func (s *Server) allQueues(w http.ResponseWriter, r *http.Request) {
206+
workersCnt := s.controller.GetWorkersCnt()
207+
resp := make(map[uint32]*spec.QueueDump, workersCnt)
208+
for i := uint32(0); i < workersCnt; i++ {
209+
queueDump, err := s.controller.ListQueue(i)
210+
if err != nil {
211+
s.respond(nil, err, w)
212+
return
213+
}
214+
215+
resp[i] = queueDump
216+
}
217+
218+
s.respond(resp, nil, w)
219+
}

pkg/cluster/cluster.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,3 +644,20 @@ func (c *Cluster) initInfrastructureRoles() error {
644644
}
645645
return nil
646646
}
647+
648+
// GetStatus provides status of the cluster
649+
func (c *Cluster) GetStatus() *spec.ClusterStatus {
650+
return &spec.ClusterStatus{
651+
Cluster: c.Spec.ClusterName,
652+
Team: c.Spec.TeamID,
653+
Status: c.Status,
654+
Spec: c.Spec,
655+
656+
MasterService: c.GetServiceMaster(),
657+
ReplicaService: c.GetServiceReplica(),
658+
Endpoint: c.GetEndpoint(),
659+
StatefulSet: c.GetStatefulSet(),
660+
661+
Error: c.Error,
662+
}
663+
}

pkg/cluster/resources.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,3 +419,23 @@ func (c *Cluster) createRoles() (err error) {
419419
// TODO: figure out what to do with duplicate names (humans and robots) among pgUsers
420420
return c.syncRoles(false)
421421
}
422+
423+
// GetServiceMaster returns cluster's kubernetes master Service
424+
func (c *Cluster) GetServiceMaster() *v1.Service {
425+
return c.Service[master]
426+
}
427+
428+
// GetServiceReplica returns cluster's kubernetes replica Service
429+
func (c *Cluster) GetServiceReplica() *v1.Service {
430+
return c.Service[replica]
431+
}
432+
433+
// GetEndpoint returns cluster's kubernetes Endpoint
434+
func (c *Cluster) GetEndpoint() *v1.Endpoints {
435+
return c.Endpoint
436+
}
437+
438+
// GetStatefulSet returns cluster's kubernetes StatefulSet
439+
func (c *Cluster) GetStatefulSet() *v1beta1.StatefulSet {
440+
return c.Statefulset
441+
}

pkg/controller/controller.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ import (
1010
"k8s.io/client-go/rest"
1111
"k8s.io/client-go/tools/cache"
1212

13+
"github.com/zalando-incubator/postgres-operator/pkg/apiserver"
1314
"github.com/zalando-incubator/postgres-operator/pkg/cluster"
1415
"github.com/zalando-incubator/postgres-operator/pkg/spec"
1516
"github.com/zalando-incubator/postgres-operator/pkg/util/config"
1617
"github.com/zalando-incubator/postgres-operator/pkg/util/constants"
1718
"github.com/zalando-incubator/postgres-operator/pkg/util/k8sutil"
19+
"github.com/zalando-incubator/postgres-operator/pkg/util/ringlog"
1820
)
1921

2022
// Controller represents operator controller
@@ -25,30 +27,40 @@ type Controller struct {
2527
logger *logrus.Entry
2628
KubeClient k8sutil.KubernetesClient
2729
RestClient rest.Interface // kubernetes API group REST client
30+
apiserver *apiserver.Server
2831

29-
clustersMu sync.RWMutex
30-
clusters map[spec.NamespacedName]*cluster.Cluster
31-
stopChs map[spec.NamespacedName]chan struct{}
32+
stopCh chan struct{}
33+
34+
clustersMu sync.RWMutex
35+
clusters map[spec.NamespacedName]*cluster.Cluster
36+
clusterLogs map[spec.NamespacedName]ringlog.RingLogger
37+
teamClusters map[string][]spec.NamespacedName
3238

3339
postgresqlInformer cache.SharedIndexInformer
3440
podInformer cache.SharedIndexInformer
3541
podCh chan spec.PodEvent
3642

3743
clusterEventQueues []*cache.FIFO // [workerID]Queue
3844
lastClusterSyncTime int64
45+
46+
workerLogs map[uint32]ringlog.RingLogger
3947
}
4048

4149
// NewController creates a new controller
4250
func NewController(controllerConfig *spec.ControllerConfig) *Controller {
4351
logger := logrus.New()
4452

4553
c := &Controller{
46-
config: *controllerConfig,
47-
opConfig: &config.Config{},
48-
logger: logger.WithField("pkg", "controller"),
49-
clusters: make(map[spec.NamespacedName]*cluster.Cluster),
50-
podCh: make(chan spec.PodEvent),
54+
config: *controllerConfig,
55+
opConfig: &config.Config{},
56+
logger: logger.WithField("pkg", "controller"),
57+
clusters: make(map[spec.NamespacedName]*cluster.Cluster),
58+
clusterLogs: make(map[spec.NamespacedName]ringlog.RingLogger),
59+
teamClusters: make(map[string][]spec.NamespacedName),
60+
stopCh: make(chan struct{}),
61+
podCh: make(chan spec.PodEvent),
5162
}
63+
logger.Hooks.Add(c)
5264

5365
return c
5466
}
@@ -149,6 +161,7 @@ func (c *Controller) initController() {
149161
})
150162

151163
c.clusterEventQueues = make([]*cache.FIFO, c.opConfig.Workers)
164+
c.workerLogs = make(map[uint32]ringlog.RingLogger, c.opConfig.Workers)
152165
for i := range c.clusterEventQueues {
153166
c.clusterEventQueues[i] = cache.NewFIFO(func(obj interface{}) (string, error) {
154167
e, ok := obj.(spec.ClusterEvent)
@@ -159,19 +172,23 @@ func (c *Controller) initController() {
159172
return fmt.Sprintf("%s-%s", e.EventType, e.UID), nil
160173
})
161174
}
175+
176+
c.apiserver = apiserver.New(c, c.opConfig.APIPort, c.logger.Logger)
162177
}
163178

164179
// Run starts background controller processes
165180
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
166181
c.initController()
167182

168-
wg.Add(3)
183+
wg.Add(4)
169184
go c.runPodInformer(stopCh, wg)
170185
go c.runPostgresqlInformer(stopCh, wg)
171186
go c.clusterResync(stopCh, wg)
187+
go c.apiserver.Run(stopCh, wg)
172188

173189
for i := range c.clusterEventQueues {
174190
wg.Add(1)
191+
c.workerLogs[uint32(i)] = ringlog.New(c.opConfig.RingLogLines)
175192
go c.processClusterEventsQueue(i, stopCh, wg)
176193
}
177194

0 commit comments

Comments
 (0)