Skip to content

Commit 62adaaf

Browse files
author
Runze Cui
committed
[DBS-000] route cluster pipelines through withConn for circuit breaker protection
1 parent d72ad2f commit 62adaaf

1 file changed

Lines changed: 27 additions & 22 deletions

File tree

osscluster.go

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1476,23 +1476,31 @@ func (c *ClusterClient) processPipelineNode(
14761476
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
14771477
) {
14781478
_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1479-
cn, err := node.Client.getConn(ctx)
1480-
if err != nil {
1479+
// Route through withConn so cluster pipelines get circuit breaker
1480+
// protection (Allow + Execute/cb.Do). The upstream v9 OSS uses
1481+
// getConn/releaseConn directly here, which is functionally equivalent
1482+
// without a CB limiter, but bypasses the CB logic in our fork's withConn.
1483+
// This matches the approach used in go-redis v8.
1484+
//
1485+
// The connErr flag tracks whether the fn callback ran. If withConn
1486+
// returns an error before fn executes (CB rejected, pool exhausted,
1487+
// dial failed), we handle error mapping here. If fn ran and failed,
1488+
// processPipelineNodeConn already handled its own errors internally
1489+
// (MarkAsFailing, mapCmdsByNode, setCmdsErr), so we skip to avoid
1490+
// double-handling.
1491+
connErr := true
1492+
err := node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1493+
connErr = false
1494+
return c.processPipelineNodeConn(ctx, node, cn, cmds, failedCmds)
1495+
})
1496+
if err != nil && connErr {
14811497
if !isContextError(err) {
14821498
node.MarkAsFailing()
14831499
}
14841500
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
14851501
setCmdsErr(cmds, err)
1486-
return err
14871502
}
1488-
1489-
var processErr error
1490-
defer func() {
1491-
node.Client.releaseConn(ctx, cn, processErr)
1492-
}()
1493-
processErr = c.processPipelineNodeConn(ctx, node, cn, cmds, failedCmds)
1494-
1495-
return processErr
1503+
return err
14961504
})
14971505
}
14981506

@@ -1693,20 +1701,17 @@ func (c *ClusterClient) processTxPipelineNode(
16931701
) {
16941702
cmds = wrapMultiExec(ctx, cmds)
16951703
_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1696-
cn, err := node.Client.getConn(ctx)
1697-
if err != nil {
1704+
// See processPipelineNode for explanation of withConn and connErr pattern.
1705+
connErr := true
1706+
err := node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1707+
connErr = false
1708+
return c.processTxPipelineNodeConn(ctx, node, cn, cmds, failedCmds)
1709+
})
1710+
if err != nil && connErr {
16981711
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
16991712
setCmdsErr(cmds, err)
1700-
return err
17011713
}
1702-
1703-
var processErr error
1704-
defer func() {
1705-
node.Client.releaseConn(ctx, cn, processErr)
1706-
}()
1707-
processErr = c.processTxPipelineNodeConn(ctx, node, cn, cmds, failedCmds)
1708-
1709-
return processErr
1714+
return err
17101715
})
17111716
}
17121717

0 commit comments

Comments
 (0)