Skip to content

Commit e1816f3

Browse files
authored
SAMZA-2797: Call flush during stop from CoordinatorStreamWriter (#1692)
1 parent 66495b6 commit e1816f3

File tree

2 files changed

+9
-0
lines changed

2 files changed

+9
-0
lines changed

samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,14 @@ public void stop() {
110110
isStarted = false;
111111
}
112112

113+
/**
114+
* Flushes underlying system producer.
115+
* */
116+
public void flush(String source) {
117+
log.info("Flushing coordinator stream producer.");
118+
systemProducer.flush(source);
119+
}
120+
113121
/**
114122
* Serialize and send a coordinator stream message.
115123
*

samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public void start() {
6565
*/
6666
public void stop() {
6767
log.info("Stopping the coordinator stream producer.");
68+
coordinatorStreamSystemProducer.flush(SOURCE);
6869
coordinatorStreamSystemProducer.stop();
6970
}
7071

0 commit comments

Comments
 (0)