diff --git a/extkafka/broker_discovery.go b/extkafka/broker_discovery.go index c92d42c..939820a 100644 --- a/extkafka/broker_discovery.go +++ b/extkafka/broker_discovery.go @@ -243,11 +243,13 @@ func toBrokerTarget(broker kadm.BrokerDetail, controller int32) discovery_kit_ap if broker.Rack != nil { attributes["kafka.broker.rack"] = []string{*broker.Rack} } - podName := strings.Split(broker.Host, ".")[0] - namespace := strings.Split(broker.Host, ".")[2] + if len(strings.Split(broker.Host, ".")) == 4 && strings.HasSuffix(broker.Host, ".svc") { + podName := strings.Split(broker.Host, ".")[0] + namespace := strings.Split(broker.Host, ".")[2] - attributes["kafka.pod.name"] = []string{podName} - attributes["kafka.pod.namespace"] = []string{namespace} + attributes["kafka.pod.name"] = []string{podName} + attributes["kafka.pod.namespace"] = []string{namespace} + } return discovery_kit_api.Target{ Id: id, diff --git a/extkafka/check_brokers.go b/extkafka/check_brokers.go index 6a1dbbe..88be2da 100644 --- a/extkafka/check_brokers.go +++ b/extkafka/check_brokers.go @@ -32,7 +32,7 @@ type CheckBrokersState struct { End time.Time ExpectedChanges []string StateCheckMode string - StateCheckSuccess bool + StateCheckFailure bool BrokerHosts []string } @@ -172,6 +172,7 @@ func (m *CheckBrokersAction) Prepare(ctx context.Context, state *CheckBrokersSta state.End = end state.ExpectedChanges = expectedState state.StateCheckMode = stateCheckMode + state.StateCheckFailure = false state.PreviousController = metadata.Controller state.BrokerNodes = metadata.Brokers.NodeIDs() @@ -228,22 +229,23 @@ func BrokerCheckStatus(ctx context.Context, state *CheckBrokersState) (*action_k Title: fmt.Sprintf("Brokers got an unexpected change '%s' whereas '%s' is expected. Change(s) : %v", c, state.ExpectedChanges, - changes[c]), + changes), Status: extutil.Ptr(action_kit_api.Failed), }) } } } else if state.StateCheckMode == stateCheckModeAtLeastOnce { for _, c := range keys { - if slices.Contains(state.ExpectedChanges, c) { - state.StateCheckSuccess = true + if !slices.Contains(state.ExpectedChanges, c) { + state.StateCheckFailure = true } } - if completed && !state.StateCheckSuccess { + if completed && state.StateCheckFailure { checkError = extutil.Ptr(action_kit_api.ActionKitError{ - Title: fmt.Sprintf("Brokers didn't get the expected changes '%s' at least once.", - state.ExpectedChanges), + Title: fmt.Sprintf("Brokers didn't get the expected changes '%s' at least once or got an unexpected change. Change(s) : %v", + state.ExpectedChanges, + changes), Status: extutil.Ptr(action_kit_api.Failed), }) } diff --git a/extkafka/check_brokers_test.go b/extkafka/check_brokers_test.go index 47beb77..14a093f 100644 --- a/extkafka/check_brokers_test.go +++ b/extkafka/check_brokers_test.go @@ -79,7 +79,7 @@ func TestCheckBrokers_Prepare(t *testing.T) { wantedState: &CheckBrokersState{ ExpectedChanges: []string{"test"}, StateCheckMode: "test", - StateCheckSuccess: true, + StateCheckFailure: false, }, }, } @@ -100,7 +100,7 @@ func TestCheckBrokers_Prepare(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "test", tt.wantedState.StateCheckMode) assert.Equal(t, []string{"test"}, state.ExpectedChanges) - assert.Equal(t, false, state.StateCheckSuccess) + assert.Equal(t, false, state.StateCheckFailure) assert.NotNil(t, state.End) } }) @@ -155,7 +155,7 @@ func TestCheckBrokers_Status(t *testing.T) { wantedState: &CheckBrokersState{ StateCheckMode: "atLeastOnce", - StateCheckSuccess: true, + StateCheckFailure: false, }, }, { @@ -177,7 +177,7 @@ func TestCheckBrokers_Status(t *testing.T) { wantedState: &CheckBrokersState{ ExpectedChanges: []string{"kafka broker with downtime"}, StateCheckMode: "allTheTime", - StateCheckSuccess: false, + StateCheckFailure: false, BrokerNodes: []int32{1, 2, 3}, }, },