Skip to content

Commit e01a342

Browse files
kphelpsxvandish-dd
authored andcommitted
Prevent broker delegation from getting stuck
1 parent 7ffc4fc commit e01a342

File tree

2 files changed

+3
-4
lines changed

2 files changed

+3
-4
lines changed

src/rdkafka_broker.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3327,6 +3327,7 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) {
33273327
rkb->rkb_toppar_cnt++;
33283328
rd_kafka_broker_unlock(rkb);
33293329
rktp->rktp_broker = rkb;
3330+
rktp->rktp_broker_id = rkb->rkb_nodeid;
33303331
rd_assert(!rktp->rktp_msgq_wakeup_q);
33313332
rktp->rktp_msgq_wakeup_q = rd_kafka_q_keep(rkb->rkb_ops);
33323333
rd_kafka_broker_keep(rkb);
@@ -3430,6 +3431,7 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) {
34303431
rktp->rktp_msgq_wakeup_q = NULL;
34313432
}
34323433
rktp->rktp_broker = NULL;
3434+
rktp->rktp_broker_id = -1;
34333435

34343436
rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_RKB);
34353437
rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_RKB;

src/rdkafka_topic.c

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -605,17 +605,14 @@ int rd_kafka_toppar_broker_update(rd_kafka_toppar_t *rktp,
605605
int32_t broker_id,
606606
rd_kafka_broker_t *rkb,
607607
const char *reason) {
608-
609-
rktp->rktp_broker_id = broker_id;
610-
611608
if (!rkb) {
612609
int had_broker = rktp->rktp_broker ? 1 : 0;
613610
rd_kafka_toppar_broker_delegate(rktp, NULL);
614611
return had_broker ? -1 : 0;
615612
}
616613

617614
if (rktp->rktp_broker) {
618-
if (rktp->rktp_broker == rkb) {
615+
if (rktp->rktp_broker == rkb && !rktp->rktp_next_broker) {
619616
/* No change in broker */
620617
return 0;
621618
}

0 commit comments

Comments
 (0)