Skip to content

Commit

Permalink
feat: add circuit breaker tproxy (#28)
Browse files Browse the repository at this point in the history
* feat: add circuit breaker tproxy

* refactor: delete unused comment
  • Loading branch information
spring-young authored Jan 16, 2024
1 parent 251089b commit 7efa14b
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 21 deletions.
2 changes: 1 addition & 1 deletion pkg/cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func main() {
}

{
go tproxy.NewTProxy(*proxyIptablePort, faultInjectionMgr).Start()
go tproxy.NewTProxy(*proxyIptablePort, faultInjectionMgr, breakerMgr).Start()
}

serveHTTP(ctx, readyHandler)
Expand Down
8 changes: 0 additions & 8 deletions pkg/proxy/apiserver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
var (
upgradeSubresources = sets.NewString("exec", "attach")
enableIpTable = os.Getenv(constants.EnvIPTable) == "true"
enableWebhookProxy = os.Getenv(constants.EnvEnableWebHookProxy) == "true"

disableCircuitBreaker = os.Getenv(constants.EnvDisableCircuitBreaker) == "true"
disableFaultInjection = os.Getenv(constants.EnvDisableCircuitBreaker) == "true"
Expand Down Expand Up @@ -141,15 +140,9 @@ type handler struct {
electionHandler leaderelection.Handler
}

func getReqInfoStr(r *apirequest.RequestInfo) string {
return fmt.Sprintf("RequestInfo: { Path: %s, APIGroup: %s, Resource: %s, Subresource: %s, Verb: %s, Namespace: %s, Name: %s, APIVersion: %s }", r.Path, r.APIGroup, r.Resource, r.Subresource, r.Verb, r.Namespace, r.Name, r.APIVersion)
}

func (h *handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
startTime := time.Now()
requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
klog.Infof("handle http req %s", r.URL.String())
klog.Infof(getReqInfoStr(requestInfo))
if !ok {
klog.Errorf("%s %s %s, no request info in context", r.Method, r.Header.Get("Content-Type"), r.URL)
http.Error(rw, "no request info in context", http.StatusBadRequest)
Expand Down Expand Up @@ -216,7 +209,6 @@ func (h *handler) getURL(r *http.Request) *url.URL {
u, _ := url.Parse(fmt.Sprintf("https://%s", r.Host))
if !enableIpTable {
u, _ = url.Parse(fmt.Sprintf(h.cfg.Host))
klog.Infof("disable IPTABLE, proxy apiServer with real host %s", u.String())
r.Host = ""
}
return u
Expand Down
2 changes: 1 addition & 1 deletion pkg/proxy/faultinjection/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (m *manager) doFaultInjection(faultInjections []*ctrlmeshproto.HTTPFaultInj
if isInpercentRange(faultInjections[idx].Delay.Percent) {
delay := faultInjections[idx].Delay.GetFixedDelay()
delayDuration := delay.AsDuration()
fmt.Println("Delaying for ", delayDuration)
logger.Info("Delaying time ", "for", delayDuration)
time.Sleep(delayDuration)
}
}
Expand Down
52 changes: 42 additions & 10 deletions pkg/proxy/http/http_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package http
import (
"encoding/json"
"fmt"
"os"

"net/http"
"net/url"
Expand All @@ -27,29 +28,34 @@ import (
"k8s.io/klog/v2"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/constants"
meshhttp "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/http"
"github.com/KusionStack/controller-mesh/pkg/proxy/circuitbreaker"
"github.com/KusionStack/controller-mesh/pkg/proxy/faultinjection"
"github.com/KusionStack/controller-mesh/pkg/utils"
utilshttp "github.com/KusionStack/controller-mesh/pkg/utils/http"
)

var (
logger = logf.Log.WithName("http-proxy")
enableRestBreaker = os.Getenv(constants.EnvEnableRestCircuitBreaker) == "true"
logger = logf.Log.WithName("http-proxy")
)

type ITProxy interface {
Start()
}

type tproxy struct {
port int
FaultInjector faultinjection.ManagerInterface
port int
FaultInjector faultinjection.ManagerInterface
CircuitInjector circuitbreaker.ManagerInterface
}

func NewTProxy(port int, faultInjector faultinjection.ManagerInterface) ITProxy {
func NewTProxy(port int, faultInjector faultinjection.ManagerInterface, circuitInjector circuitbreaker.ManagerInterface) ITProxy {
return &tproxy{
port: port,
FaultInjector: faultInjector,
port: port,
FaultInjector: faultInjector,
CircuitInjector: circuitInjector,
}
}

Expand All @@ -59,7 +65,7 @@ func (t *tproxy) Start() {
Addr: fmt.Sprintf(":%d", t.port),
Handler: http.HandlerFunc(t.handleHTTP),
}
logger.Info("%s", server.ListenAndServe())
klog.Infof("%s", server.ListenAndServe())
}

func (t *tproxy) handleHTTP(resp http.ResponseWriter, req *http.Request) {
Expand All @@ -76,9 +82,10 @@ func (t *tproxy) handleHTTP(resp http.ResponseWriter, req *http.Request) {
return
}
realEndPointUrl = epUrl
logger.Info("receive", "proxy-host", realEndPointUrl.Host, "proxy-method", req.Method, "Mesh-Real-Endpoint", realEp)
klog.Infof("receive, proxy-host: %s, proxy-method: %s, Mesh-Real-Endpoint: %s", realEndPointUrl.Host, req.Method, realEp)
}
logger.Info("handel http request", "url", realEndPointUrl.String())
klog.Infof("handel http request, url: %s ", realEndPointUrl.String())
// faultinjection
result := t.FaultInjector.FaultInjectionRest(req.Header.Get(meshhttp.HeaderMeshRealEndpoint), req.Method)
if result.Abort {
apiErr := utils.HttpToAPIError(int(result.ErrCode), req.Method, result.Message)
Expand All @@ -87,10 +94,35 @@ func (t *tproxy) handleHTTP(resp http.ResponseWriter, req *http.Request) {
if err := json.NewEncoder(resp).Encode(apiErr); err != nil {
http.Error(resp, fmt.Sprintf("fail to inject fault %v", err), http.StatusInternalServerError)
}
logger.Info("faultInjection rule", "rule", fmt.Sprintf("fault injection, %s, %s,%d", result.Reason, result.Message, result.ErrCode))
klog.Infof("faultInjection rule, rule: %s", fmt.Sprintf("fault injection, %s, %s,%d", result.Reason, result.Message, result.ErrCode))
return
}

// circuitbreaker
if enableRestBreaker {
// check request is in the whitelist
klog.Infof("start checktrafficrule %s", realEndPointUrl.Host)
result := t.CircuitInjector.ValidateTrafficIntercept(realEndPointUrl.Host, req.Method)
if !result.Allowed {
klog.Infof("ErrorTProxy: %s %s ValidateTrafficIntercept NOPASSED ,checkresult:\t%s", realEndPointUrl.Host, req.Method, result.Reason)
http.Error(resp, fmt.Sprintf("Forbidden by ValidateTrafficIntercept breaker, %s, %s", result.Message, result.Reason), http.StatusForbidden)
return
}
}

// ValidateTrafficIntercept check pass or enableRestBreaker is false run http proxy
klog.Infof("TProxy: %s %s ValidateTrafficIntercept check PASSED or enableRestBreaker is false", realEndPointUrl.Host, req.Method)

// ValidateRest check
klog.Infof("start ValidateRest checkrule %s %s", realEndPointUrl.Host, req.Method)
validateresult := t.CircuitInjector.ValidateRest(req.Header.Get("Mesh-Real-Endpoint"), req.Method)
if !validateresult.Allowed {
klog.Infof("ErrorTProxy: %s %s ValidateRest NOPASSED ,checkresult:%t, validateresultReason:%s", req.Header.Get("Mesh-Real-Endpoint"), req.Method, validateresult.Allowed, validateresult.Reason)
http.Error(resp, fmt.Sprintf("Forbidden by circuit ValidateRest breaker, %s, %s", validateresult.Message, validateresult.Reason), http.StatusForbidden)
return
}
klog.Infof("TProxy: %s %s ValidateRest check PASSED", realEndPointUrl.Host, req.Method)

// modify request
director := func(target *http.Request) {
target.Header.Set("Pass-Via-Go-TProxy", "1")
Expand Down
4 changes: 3 additions & 1 deletion pkg/proxy/http/http_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

meshhttp "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/http"
ctrlmeshproto "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/proto"
"github.com/KusionStack/controller-mesh/pkg/proxy/circuitbreaker"
"github.com/KusionStack/controller-mesh/pkg/proxy/faultinjection"
)

Expand All @@ -52,6 +53,7 @@ func TestTProxy(t *testing.T) {

func StartProxy() {
faultInjectionMgr := faultinjection.NewManager(context.TODO())
circuitInjectionMgr := circuitbreaker.NewManager(context.TODO())
_, err := faultInjectionMgr.Sync(&ctrlmeshproto.FaultInjection{
Option: ctrlmeshproto.FaultInjection_UPDATE,
ConfigHash: "123",
Expand Down Expand Up @@ -119,6 +121,6 @@ func StartProxy() {
},
})
utilruntime.Must(err)
tProxy := NewTProxy(15002, faultInjectionMgr)
tProxy := NewTProxy(15002, faultInjectionMgr, circuitInjectionMgr)
tProxy.Start()
}

0 comments on commit 7efa14b

Please sign in to comment.