20
20
21
21
import com .dtstack .flink .sql .dirtyManager .manager .DirtyDataManager ;
22
22
import com .dtstack .flink .sql .format .DeserializationMetricWrapper ;
23
+ import com .dtstack .flink .sql .util .ReflectionUtils ;
23
24
import org .apache .flink .api .common .serialization .DeserializationSchema ;
24
25
import org .apache .flink .api .common .typeinfo .TypeInformation ;
25
26
import org .apache .flink .metrics .Gauge ;
26
27
import org .apache .flink .metrics .MetricGroup ;
27
28
import org .apache .flink .streaming .connectors .kafka .internal .KafkaConsumerThread ;
28
29
import org .apache .flink .streaming .connectors .kafka .internals .AbstractFetcher ;
30
+ import org .apache .flink .streaming .connectors .kafka .internals .KafkaTopicPartition ;
31
+ import org .apache .flink .streaming .connectors .kafka .internals .KafkaTopicPartitionState ;
29
32
import org .apache .flink .types .Row ;
30
33
import org .apache .kafka .clients .consumer .KafkaConsumer ;
31
34
import org .apache .kafka .clients .consumer .internals .SubscriptionState ;
32
35
import org .apache .kafka .common .TopicPartition ;
33
36
import org .slf4j .Logger ;
34
37
import org .slf4j .LoggerFactory ;
35
38
36
- import java .io .IOException ;
37
39
import java .lang .reflect .Field ;
38
- import java .util .Set ;
39
- import java .util .concurrent .atomic .AtomicBoolean ;
40
+ import java .util .List ;
40
41
41
42
import static com .dtstack .flink .sql .metric .MetricConstant .DT_PARTITION_GROUP ;
42
43
import static com .dtstack .flink .sql .metric .MetricConstant .DT_TOPIC_GROUP ;
@@ -53,10 +54,6 @@ public class KafkaDeserializationMetricWrapper extends DeserializationMetricWrap
53
54
54
55
private static final Logger LOG = LoggerFactory .getLogger (KafkaDeserializationMetricWrapper .class );
55
56
56
- private AbstractFetcher <Row , ?> fetcher ;
57
-
58
- private AtomicBoolean firstMsg = new AtomicBoolean (true );
59
-
60
57
private Calculate calculate ;
61
58
62
59
public KafkaDeserializationMetricWrapper (
@@ -68,66 +65,82 @@ public KafkaDeserializationMetricWrapper(
68
65
this .calculate = calculate ;
69
66
}
70
67
71
- @ Override
72
- protected void beforeDeserialize () throws IOException {
73
- super .beforeDeserialize ();
74
- if (firstMsg .compareAndSet (true , false )) {
75
- try {
76
- registerPtMetric (fetcher );
77
- } catch (Exception e ) {
78
- LOG .error ("register topic partition metric error." , e );
79
- }
80
- }
81
- }
82
-
83
68
protected void registerPtMetric (AbstractFetcher <Row , ?> fetcher ) throws Exception {
84
- Field consumerThreadField = getConsumerThreadField (fetcher );
69
+ Field consumerThreadField = ReflectionUtils . getDeclaredField (fetcher , "consumerThread" );
85
70
consumerThreadField .setAccessible (true );
86
71
KafkaConsumerThread consumerThread = (KafkaConsumerThread ) consumerThreadField .get (fetcher );
87
72
88
73
Field hasAssignedPartitionsField = consumerThread .getClass ().getDeclaredField ("hasAssignedPartitions" );
89
74
hasAssignedPartitionsField .setAccessible (true );
90
75
91
- //wait until assignedPartitions
92
-
93
- boolean hasAssignedPartitions = (boolean ) hasAssignedPartitionsField .get (consumerThread );
76
+ // get subtask unassigned kafka topic partition
77
+ Field subscribedPartitionStatesField = ReflectionUtils .getDeclaredField (fetcher , "subscribedPartitionStates" );
78
+ subscribedPartitionStatesField .setAccessible (true );
79
+ List <KafkaTopicPartitionState <KafkaTopicPartition >> subscribedPartitionStates = (List <KafkaTopicPartitionState <KafkaTopicPartition >>) subscribedPartitionStatesField .get (fetcher );
94
80
95
- if (!hasAssignedPartitions ) {
96
- throw new RuntimeException ("wait 50 secs, but not assignedPartitions" );
97
- }
81
+ // init partition lag metric
82
+ for (KafkaTopicPartitionState <KafkaTopicPartition > kafkaTopicPartitionState : subscribedPartitionStates ) {
83
+ KafkaTopicPartition kafkaTopicPartition = kafkaTopicPartitionState .getKafkaTopicPartition ();
84
+ MetricGroup topicMetricGroup = getRuntimeContext ().getMetricGroup ().addGroup (DT_TOPIC_GROUP , kafkaTopicPartition .getTopic ());
98
85
99
- Field consumerField = consumerThread .getClass ().getDeclaredField ("consumer" );
100
- consumerField .setAccessible (true );
101
-
102
- KafkaConsumer kafkaConsumer = (KafkaConsumer ) consumerField .get (consumerThread );
103
- Field subscriptionStateField = kafkaConsumer .getClass ().getDeclaredField ("subscriptions" );
104
- subscriptionStateField .setAccessible (true );
105
-
106
- //topic partitions lag
107
- SubscriptionState subscriptionState = (SubscriptionState ) subscriptionStateField .get (kafkaConsumer );
108
- Set <TopicPartition > assignedPartitions = subscriptionState .assignedPartitions ();
109
-
110
- for (TopicPartition topicPartition : assignedPartitions ) {
111
- MetricGroup metricGroup = getRuntimeContext ().getMetricGroup ().addGroup (DT_TOPIC_GROUP , topicPartition .topic ())
112
- .addGroup (DT_PARTITION_GROUP , topicPartition .partition () + "" );
86
+ MetricGroup metricGroup = topicMetricGroup .addGroup (DT_PARTITION_GROUP , kafkaTopicPartition .getPartition () + "" );
113
87
metricGroup .gauge (DT_TOPIC_PARTITION_LAG_GAUGE , new Gauge <Long >() {
88
+ // tmp variable
89
+ boolean initLag = true ;
90
+ int partitionIndex ;
91
+ SubscriptionState subscriptionState ;
92
+ TopicPartition topicPartition ;
93
+
114
94
@ Override
115
95
public Long getValue () {
116
- return calculate .calc (subscriptionState , topicPartition );
96
+ // first time register metrics
97
+ if (initLag ) {
98
+ partitionIndex = kafkaTopicPartition .getPartition ();
99
+ initLag = false ;
100
+ return -1L ;
101
+ }
102
+ // when kafka topic partition assigned calc metrics
103
+ if (subscriptionState == null ) {
104
+ try {
105
+ Field consumerField = consumerThread .getClass ().getDeclaredField ("consumer" );
106
+ consumerField .setAccessible (true );
107
+
108
+ KafkaConsumer kafkaConsumer = (KafkaConsumer ) consumerField .get (consumerThread );
109
+ Field subscriptionStateField = kafkaConsumer .getClass ().getDeclaredField ("subscriptions" );
110
+ subscriptionStateField .setAccessible (true );
111
+
112
+ boolean hasAssignedPartitions = (boolean ) hasAssignedPartitionsField .get (consumerThread );
113
+
114
+ if (!hasAssignedPartitions ) {
115
+ LOG .error ("wait 50 secs, but not assignedPartitions" );
116
+ }
117
+
118
+ subscriptionState = (SubscriptionState ) subscriptionStateField .get (kafkaConsumer );
119
+
120
+ topicPartition = subscriptionState
121
+ .assignedPartitions ()
122
+ .stream ()
123
+ .filter (x -> x .partition () == partitionIndex )
124
+ .findFirst ()
125
+ .get ();
126
+
127
+ } catch (Exception e ) {
128
+ LOG .error (e .getMessage ());
129
+ }
130
+ return -1L ;
131
+ } else {
132
+ return calculate .calc (subscriptionState , topicPartition );
133
+ }
117
134
}
118
135
});
119
136
}
120
137
}
121
138
122
139
public void setFetcher (AbstractFetcher <Row , ?> fetcher ) {
123
- this .fetcher = fetcher ;
124
- }
125
-
126
- private Field getConsumerThreadField (AbstractFetcher fetcher ) throws NoSuchFieldException {
127
140
try {
128
- return fetcher . getClass (). getDeclaredField ( "consumerThread" );
141
+ registerPtMetric ( fetcher );
129
142
} catch (Exception e ) {
130
- return fetcher . getClass (). getSuperclass (). getDeclaredField ( "consumerThread" );
143
+ LOG . error ( "register topic partition metric error." , e );
131
144
}
132
145
}
133
146
}
0 commit comments