Skip to content

Commit 28ccd31

Browse files
dengzimingdengziming
authored andcommitted
[SPARK-29611][WEBUI] Sort Kafka metadata by the number of messages
### What changes were proposed in this pull request? Sort metadata by the number of messages in each Kafka partition ### Why are the changes needed? help to find the data skewness problem. ### Does this PR introduce any user-facing change? Yes, add a column count to the metadata and sort by count ![image](https://user-images.githubusercontent.com/26023240/67617886-63e06800-f81a-11e9-8718-be3a0100952e.png) If you set `minPartitions` configurations with structure structured-streaming which doesn't have the Streaming page, my code changes in `DirectKafkaInputDStream` won't affect the WEB UI page just as it shows in the follow image ![image](https://user-images.githubusercontent.com/26023240/68020762-79520800-fcda-11e9-96cd-f0c64a36f505.png) ### How was this patch tested? Manual test Closes apache#26266 from dengziming/feature_ui_optimize. Lead-authored-by: dengziming <[email protected]> Co-authored-by: dengziming <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 1e1b730 commit 28ccd31

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
237237
val description = offsetRanges.filter { offsetRange =>
238238
// Don't display empty ranges.
239239
offsetRange.fromOffset != offsetRange.untilOffset
240-
}.map { offsetRange =>
240+
}.toSeq.sortBy(-_.count()).map { offsetRange =>
241241
s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
242-
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
242+
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}\t" +
243+
s"count: ${offsetRange.count()}"
243244
}.mkString("\n")
244245
// Copy offsetRanges to immutable.List to prevent from being modified by the user
245246
val metadata = Map(

0 commit comments

Comments
 (0)