19
19
package com .dtstack .flink .sql .source .kafka ;
20
20
21
21
import com .dtstack .flink .sql .format .DeserializationMetricWrapper ;
22
+ import com .dtstack .flink .sql .util .ReflectionUtils ;
22
23
import org .apache .flink .api .common .serialization .DeserializationSchema ;
23
24
import org .apache .flink .api .common .typeinfo .TypeInformation ;
24
25
import org .apache .flink .metrics .Gauge ;
25
26
import org .apache .flink .metrics .MetricGroup ;
26
27
import org .apache .flink .streaming .connectors .kafka .internal .KafkaConsumerThread ;
27
28
import org .apache .flink .streaming .connectors .kafka .internals .AbstractFetcher ;
29
+ import org .apache .flink .streaming .connectors .kafka .internals .KafkaTopicPartition ;
30
+ import org .apache .flink .streaming .connectors .kafka .internals .KafkaTopicPartitionState ;
28
31
import org .apache .flink .types .Row ;
29
32
import org .apache .kafka .clients .consumer .KafkaConsumer ;
30
33
import org .apache .kafka .clients .consumer .internals .SubscriptionState ;
31
34
import org .apache .kafka .common .TopicPartition ;
32
35
import org .slf4j .Logger ;
33
36
import org .slf4j .LoggerFactory ;
34
37
35
- import java .io .IOException ;
36
38
import java .lang .reflect .Field ;
37
- import java .util .Set ;
38
- import java .util .concurrent .atomic .AtomicBoolean ;
39
+ import java .util .List ;
39
40
40
41
import static com .dtstack .flink .sql .metric .MetricConstant .DT_PARTITION_GROUP ;
41
42
import static com .dtstack .flink .sql .metric .MetricConstant .DT_TOPIC_GROUP ;
@@ -52,77 +53,89 @@ public class KafkaDeserializationMetricWrapper extends DeserializationMetricWrap
52
53
53
54
private static final Logger LOG = LoggerFactory .getLogger (KafkaDeserializationMetricWrapper .class );
54
55
55
- private AbstractFetcher <Row , ?> fetcher ;
56
-
57
- private AtomicBoolean firstMsg = new AtomicBoolean (true );
58
-
59
56
private Calculate calculate ;
60
57
61
58
public KafkaDeserializationMetricWrapper (TypeInformation <Row > typeInfo , DeserializationSchema <Row > deserializationSchema , Calculate calculate ) {
62
59
super (typeInfo , deserializationSchema );
63
60
this .calculate = calculate ;
64
61
}
65
62
66
- @ Override
67
- protected void beforeDeserialize () throws IOException {
68
- super .beforeDeserialize ();
69
- if (firstMsg .compareAndSet (true , false )) {
70
- try {
71
- registerPtMetric (fetcher );
72
- } catch (Exception e ) {
73
- LOG .error ("register topic partition metric error." , e );
74
- }
75
- }
76
- }
77
-
78
63
protected void registerPtMetric (AbstractFetcher <Row , ?> fetcher ) throws Exception {
79
- Field consumerThreadField = getConsumerThreadField (fetcher );
64
+ Field consumerThreadField = ReflectionUtils . getDeclaredField (fetcher , "consumerThread" );
80
65
consumerThreadField .setAccessible (true );
81
66
KafkaConsumerThread consumerThread = (KafkaConsumerThread ) consumerThreadField .get (fetcher );
82
67
83
68
Field hasAssignedPartitionsField = consumerThread .getClass ().getDeclaredField ("hasAssignedPartitions" );
84
69
hasAssignedPartitionsField .setAccessible (true );
85
70
86
- //wait until assignedPartitions
87
-
88
- boolean hasAssignedPartitions = (boolean ) hasAssignedPartitionsField .get (consumerThread );
71
+ // get subtask unassigned kafka topic partition
72
+ Field subscribedPartitionStatesField = ReflectionUtils .getDeclaredField (fetcher , "subscribedPartitionStates" );
73
+ subscribedPartitionStatesField .setAccessible (true );
74
+ List <KafkaTopicPartitionState <KafkaTopicPartition >> subscribedPartitionStates = (List <KafkaTopicPartitionState <KafkaTopicPartition >>) subscribedPartitionStatesField .get (fetcher );
89
75
90
- if (!hasAssignedPartitions ) {
91
- throw new RuntimeException ("wait 50 secs, but not assignedPartitions" );
92
- }
76
+ // init partition lag metric
77
+ for (KafkaTopicPartitionState <KafkaTopicPartition > kafkaTopicPartitionState : subscribedPartitionStates ) {
78
+ KafkaTopicPartition kafkaTopicPartition = kafkaTopicPartitionState .getKafkaTopicPartition ();
79
+ MetricGroup topicMetricGroup = getRuntimeContext ().getMetricGroup ().addGroup (DT_TOPIC_GROUP , kafkaTopicPartition .getTopic ());
93
80
94
- Field consumerField = consumerThread .getClass ().getDeclaredField ("consumer" );
95
- consumerField .setAccessible (true );
96
-
97
- KafkaConsumer kafkaConsumer = (KafkaConsumer ) consumerField .get (consumerThread );
98
- Field subscriptionStateField = kafkaConsumer .getClass ().getDeclaredField ("subscriptions" );
99
- subscriptionStateField .setAccessible (true );
100
-
101
- //topic partitions lag
102
- SubscriptionState subscriptionState = (SubscriptionState ) subscriptionStateField .get (kafkaConsumer );
103
- Set <TopicPartition > assignedPartitions = subscriptionState .assignedPartitions ();
104
-
105
- for (TopicPartition topicPartition : assignedPartitions ) {
106
- MetricGroup metricGroup = getRuntimeContext ().getMetricGroup ().addGroup (DT_TOPIC_GROUP , topicPartition .topic ())
107
- .addGroup (DT_PARTITION_GROUP , topicPartition .partition () + "" );
81
+ MetricGroup metricGroup = topicMetricGroup .addGroup (DT_PARTITION_GROUP , kafkaTopicPartition .getPartition () + "" );
108
82
metricGroup .gauge (DT_TOPIC_PARTITION_LAG_GAUGE , new Gauge <Long >() {
83
+ // tmp variable
84
+ boolean initLag = true ;
85
+ int partitionIndex ;
86
+ SubscriptionState subscriptionState ;
87
+ TopicPartition topicPartition ;
88
+
109
89
@ Override
110
90
public Long getValue () {
111
- return calculate .calc (subscriptionState , topicPartition );
91
+ // first time register metrics
92
+ if (initLag ) {
93
+ partitionIndex = kafkaTopicPartition .getPartition ();
94
+ initLag = false ;
95
+ return -1L ;
96
+ }
97
+ // when kafka topic partition assigned calc metrics
98
+ if (subscriptionState == null ) {
99
+ try {
100
+ Field consumerField = consumerThread .getClass ().getDeclaredField ("consumer" );
101
+ consumerField .setAccessible (true );
102
+
103
+ KafkaConsumer kafkaConsumer = (KafkaConsumer ) consumerField .get (consumerThread );
104
+ Field subscriptionStateField = kafkaConsumer .getClass ().getDeclaredField ("subscriptions" );
105
+ subscriptionStateField .setAccessible (true );
106
+
107
+ boolean hasAssignedPartitions = (boolean ) hasAssignedPartitionsField .get (consumerThread );
108
+
109
+ if (!hasAssignedPartitions ) {
110
+ LOG .error ("wait 50 secs, but not assignedPartitions" );
111
+ }
112
+
113
+ subscriptionState = (SubscriptionState ) subscriptionStateField .get (kafkaConsumer );
114
+
115
+ topicPartition = subscriptionState
116
+ .assignedPartitions ()
117
+ .stream ()
118
+ .filter (x -> x .partition () == partitionIndex )
119
+ .findFirst ()
120
+ .get ();
121
+
122
+ } catch (Exception e ) {
123
+ LOG .error (e .getMessage ());
124
+ }
125
+ return -1L ;
126
+ } else {
127
+ return calculate .calc (subscriptionState , topicPartition );
128
+ }
112
129
}
113
130
});
114
131
}
115
132
}
116
133
117
134
public void setFetcher (AbstractFetcher <Row , ?> fetcher ) {
118
- this .fetcher = fetcher ;
119
- }
120
-
121
- private Field getConsumerThreadField (AbstractFetcher fetcher ) throws NoSuchFieldException {
122
135
try {
123
- return fetcher . getClass (). getDeclaredField ( "consumerThread" );
136
+ registerPtMetric ( fetcher );
124
137
} catch (Exception e ) {
125
- return fetcher . getClass (). getSuperclass (). getDeclaredField ( "consumerThread" );
138
+ LOG . error ( "register topic partition metric error." , e );
126
139
}
127
140
}
128
141
}
0 commit comments