Skip to content

Commit

Permalink
fix: check brokers and broker discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
achoimet committed Jan 31, 2025
1 parent 5d4c17b commit d0aad18
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 15 deletions.
10 changes: 6 additions & 4 deletions extkafka/broker_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,13 @@ func toBrokerTarget(broker kadm.BrokerDetail, controller int32) discovery_kit_ap
if broker.Rack != nil {
attributes["kafka.broker.rack"] = []string{*broker.Rack}
}
podName := strings.Split(broker.Host, ".")[0]
namespace := strings.Split(broker.Host, ".")[2]
if len(strings.Split(broker.Host, ".")) == 4 && strings.HasSuffix(broker.Host, ".svc") {
podName := strings.Split(broker.Host, ".")[0]
namespace := strings.Split(broker.Host, ".")[2]

attributes["kafka.pod.name"] = []string{podName}
attributes["kafka.pod.namespace"] = []string{namespace}
attributes["kafka.pod.name"] = []string{podName}
attributes["kafka.pod.namespace"] = []string{namespace}
}

return discovery_kit_api.Target{
Id: id,
Expand Down
16 changes: 9 additions & 7 deletions extkafka/check_brokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type CheckBrokersState struct {
End time.Time
ExpectedChanges []string
StateCheckMode string
StateCheckSuccess bool
StateCheckFailure bool
BrokerHosts []string
}

Expand Down Expand Up @@ -172,6 +172,7 @@ func (m *CheckBrokersAction) Prepare(ctx context.Context, state *CheckBrokersSta
state.End = end
state.ExpectedChanges = expectedState
state.StateCheckMode = stateCheckMode
state.StateCheckFailure = false
state.PreviousController = metadata.Controller
state.BrokerNodes = metadata.Brokers.NodeIDs()

Expand Down Expand Up @@ -228,22 +229,23 @@ func BrokerCheckStatus(ctx context.Context, state *CheckBrokersState) (*action_k
Title: fmt.Sprintf("Brokers got an unexpected change '%s' whereas '%s' is expected. Change(s) : %v",
c,
state.ExpectedChanges,
changes[c]),
changes),
Status: extutil.Ptr(action_kit_api.Failed),
})
}
}
} else if state.StateCheckMode == stateCheckModeAtLeastOnce {
for _, c := range keys {
if slices.Contains(state.ExpectedChanges, c) {
state.StateCheckSuccess = true
if !slices.Contains(state.ExpectedChanges, c) {
state.StateCheckFailure = true
}
}

if completed && !state.StateCheckSuccess {
if completed && state.StateCheckFailure {
checkError = extutil.Ptr(action_kit_api.ActionKitError{
Title: fmt.Sprintf("Brokers didn't get the expected changes '%s' at least once.",
state.ExpectedChanges),
Title: fmt.Sprintf("Brokers didn't get the expected changes '%s' at least once or got an unexpected change. Change(s) : %v",
state.ExpectedChanges,
changes),
Status: extutil.Ptr(action_kit_api.Failed),
})
}
Expand Down
8 changes: 4 additions & 4 deletions extkafka/check_brokers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestCheckBrokers_Prepare(t *testing.T) {
wantedState: &CheckBrokersState{
ExpectedChanges: []string{"test"},
StateCheckMode: "test",
StateCheckSuccess: true,
StateCheckFailure: false,
},
},
}
Expand All @@ -100,7 +100,7 @@ func TestCheckBrokers_Prepare(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "test", tt.wantedState.StateCheckMode)
assert.Equal(t, []string{"test"}, state.ExpectedChanges)
assert.Equal(t, false, state.StateCheckSuccess)
assert.Equal(t, false, state.StateCheckFailure)
assert.NotNil(t, state.End)
}
})
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestCheckBrokers_Status(t *testing.T) {

wantedState: &CheckBrokersState{
StateCheckMode: "atLeastOnce",
StateCheckSuccess: true,
StateCheckFailure: false,
},
},
{
Expand All @@ -177,7 +177,7 @@ func TestCheckBrokers_Status(t *testing.T) {
wantedState: &CheckBrokersState{
ExpectedChanges: []string{"kafka broker with downtime"},
StateCheckMode: "allTheTime",
StateCheckSuccess: false,
StateCheckFailure: false,
BrokerNodes: []int32{1, 2, 3},
},
},
Expand Down

0 comments on commit d0aad18

Please sign in to comment.