-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[cdc-connector][mysql]Mysql add numRecordsOutBySnapshot and numRecordsOutByIncremental metrics #3456
Conversation
…sOutByIncremental metrics
public static final String IO_NUM_RECORDS_OUT_INCREMENTAL_INSERT = | ||
".numRecordsOutByIncrementalInsert"; | ||
|
||
public static final String IO_NUM_RECORDS_OUT_INCREMENTAL_DELETE = | ||
".numRecordsOutByIncrementalDelete"; | ||
|
||
public static final String IO_NUM_RECORDS_OUT_INCREMENTAL_UPDATE = | ||
".numRecordsOutByIncrementalUpdate"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx for contirbution @ChengJie1053
Nit: How about changing INCREMENTAL
to BINLOG
or CHANGE_EVENT
? INCREMENTAL
can be understood as a Incremental Snapshot
step.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your suggestion is great, and I will CHANGE it to CHANGE EVENT
if (RecordUtils.isDataChangeRecord(element)) { | ||
Struct value = (Struct) element.value(); | ||
if (value != null) { | ||
Envelope.Operation operation = | ||
Envelope.Operation.forCode( | ||
value.getString(Envelope.FieldName.OPERATION)); | ||
switch (operation) { | ||
case CREATE: | ||
sourceReaderMetrics.numRecordsOutInsertIncrease(tableId); | ||
break; | ||
case UPDATE: | ||
sourceReaderMetrics.numRecordsOutUpdateIncrease(tableId); | ||
break; | ||
case DELETE: | ||
sourceReaderMetrics.numRecordsOutDeleteIncrease(tableId); | ||
break; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We checked whether element is data change record or not in line 101.
How about removing this condition?
if (RecordUtils.isDataChangeRecord(element)) { | |
Struct value = (Struct) element.value(); | |
if (value != null) { | |
Envelope.Operation operation = | |
Envelope.Operation.forCode( | |
value.getString(Envelope.FieldName.OPERATION)); | |
switch (operation) { | |
case CREATE: | |
sourceReaderMetrics.numRecordsOutInsertIncrease(tableId); | |
break; | |
case UPDATE: | |
sourceReaderMetrics.numRecordsOutUpdateIncrease(tableId); | |
break; | |
case DELETE: | |
sourceReaderMetrics.numRecordsOutDeleteIncrease(tableId); | |
break; | |
} | |
} | |
} | |
Struct value = (Struct) element.value(); | |
if (value != null) { | |
Envelope.Operation operation = | |
Envelope.Operation.forCode( | |
value.getString(Envelope.FieldName.OPERATION)); | |
switch (operation) { | |
case CREATE: | |
sourceReaderMetrics.numRecordsOutInsertIncrease(tableId); | |
break; | |
case UPDATE: | |
sourceReaderMetrics.numRecordsOutUpdateIncrease(tableId); | |
break; | |
case DELETE: | |
sourceReaderMetrics.numRecordsOutDeleteIncrease(tableId); | |
break; | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for helping me review the code.
I will delete the isDataChangeRecord judgment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL
public void numRecordsOutSnapshotIncrease(TableId tableId) { | ||
Counter counter = | ||
snapshotNumRecordsOutMap.computeIfAbsent( | ||
tableId, | ||
k -> | ||
metricGroup.counter( | ||
tableId.identifier() + IO_NUM_RECORDS_OUT_SNAPSHOT)); | ||
counter.inc(); | ||
} | ||
|
||
public void numRecordsOutInsertIncrease(TableId tableId) { | ||
Counter counter = | ||
insertNumRecordsOutMap.computeIfAbsent( | ||
tableId, | ||
k -> | ||
metricGroup.counter( | ||
tableId.identifier() | ||
+ IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_INSERT)); | ||
counter.inc(); | ||
} | ||
|
||
public void numRecordsOutUpdateIncrease(TableId tableId) { | ||
Counter counter = | ||
updateNumRecordsOutMap.computeIfAbsent( | ||
tableId, | ||
k -> | ||
metricGroup.counter( | ||
tableId.identifier() | ||
+ IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_UPDATE)); | ||
counter.inc(); | ||
} | ||
|
||
public void numRecordsOutDeleteIncrease(TableId tableId) { | ||
Counter counter = | ||
deleteNumRecordsOutMap.computeIfAbsent( | ||
tableId, | ||
k -> | ||
metricGroup.counter( | ||
tableId.identifier() | ||
+ IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_DELETE)); | ||
counter.inc(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I applied this metrics to my code, and left some comments.
- Metric names should be general.
- In your case, It generate uniqe metrics for each tables. But in monitorig tool such as grafana, it distingush tables by property such as job name in metric, not by metric name
So I suggest to removetableId.identifier()
in metric name.
public void numRecordsOutSnapshotIncrease(TableId tableId) { | |
Counter counter = | |
snapshotNumRecordsOutMap.computeIfAbsent( | |
tableId, | |
k -> | |
metricGroup.counter( | |
tableId.identifier() + IO_NUM_RECORDS_OUT_SNAPSHOT)); | |
counter.inc(); | |
} | |
public void numRecordsOutInsertIncrease(TableId tableId) { | |
Counter counter = | |
insertNumRecordsOutMap.computeIfAbsent( | |
tableId, | |
k -> | |
metricGroup.counter( | |
tableId.identifier() | |
+ IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_INSERT)); | |
counter.inc(); | |
} | |
public void numRecordsOutUpdateIncrease(TableId tableId) { | |
Counter counter = | |
updateNumRecordsOutMap.computeIfAbsent( | |
tableId, | |
k -> | |
metricGroup.counter( | |
tableId.identifier() | |
+ IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_UPDATE)); | |
counter.inc(); | |
} | |
public void numRecordsOutDeleteIncrease(TableId tableId) { | |
Counter counter = | |
deleteNumRecordsOutMap.computeIfAbsent( | |
tableId, | |
k -> | |
metricGroup.counter( | |
tableId.identifier() | |
+ IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_DELETE)); | |
counter.inc(); | |
} | |
public void numRecordsOutSnapshotIncrease(TableId tableId) { | |
Counter counter = | |
snapshotNumRecordsOutMap.computeIfAbsent( | |
tableId, | |
k -> | |
metricGroup.counter(IO_NUM_RECORDS_OUT_SNAPSHOT)); | |
counter.inc(); | |
} | |
public void numRecordsOutInsertIncrease(TableId tableId) { | |
Counter counter = | |
insertNumRecordsOutMap.computeIfAbsent( | |
tableId, | |
k -> | |
metricGroup.counter(IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_INSERT)); | |
counter.inc(); | |
} | |
public void numRecordsOutUpdateIncrease(TableId tableId) { | |
Counter counter = | |
updateNumRecordsOutMap.computeIfAbsent( | |
tableId, | |
k -> | |
metricGroup.counter(IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_UPDATE)); | |
counter.inc(); | |
} | |
public void numRecordsOutDeleteIncrease(TableId tableId) { | |
Counter counter = | |
deleteNumRecordsOutMap.computeIfAbsent( | |
tableId, | |
k -> | |
metricGroup.counter(IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_DELETE)); | |
counter.inc(); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for reviewing the code for me, but the original purpose of this metric is to see each table at a granular level, if you need to see the total, I will add other metrics
public static final String IO_NUM_RECORDS_OUT_SNAPSHOT = ".numRecordsOutBySnapshot"; | ||
|
||
public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_INSERT = | ||
".numRecordsOutByDataChangeEventInsert"; | ||
|
||
public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_DELETE = | ||
".numRecordsOutByDataChangeEventDelete"; | ||
|
||
public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_UPDATE = | ||
".numRecordsOutByDataChangeEventUpdate"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to remove dot(.) from metric name.
public static final String IO_NUM_RECORDS_OUT_SNAPSHOT = ".numRecordsOutBySnapshot"; | |
public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_INSERT = | |
".numRecordsOutByDataChangeEventInsert"; | |
public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_DELETE = | |
".numRecordsOutByDataChangeEventDelete"; | |
public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_UPDATE = | |
".numRecordsOutByDataChangeEventUpdate"; | |
public static final String IO_NUM_RECORDS_OUT_SNAPSHOT = "numRecordsOutBySnapshot"; | |
public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_INSERT = | |
"numRecordsOutByDataChangeEventInsert"; | |
public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_DELETE = | |
"numRecordsOutByDataChangeEventDelete"; | |
public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_EVENT_UPDATE = | |
"numRecordsOutByDataChangeEventUpdate"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thank you for helping me review the code, I will remove (.)
Seems we can't come up with a "best" metric design for everyone: Some may want a task-level unified record counter, while some may want fine-grained counters for each table. Or one may want to disable it entirely to avoid any extra cost. I wonder if we should add an option to control metric behavior? cc @ruanhang1993 |
@ChengJie1053 cc. @yuxiqian |
This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs. |
This pull request has been closed because it has not had recent activity. You could reopen it if you try to continue your work, and anyone who are interested in it are encouraged to continue work on this pull request. |
numRecordsOutBySnapshot
metrics collects snapshot statisticsnumRecordsOutByIncremental
metrics collects incremental statisticsnumRecordsOutByIncremental
has three metrics, respectively isnumRecordsOutByIncrementalInsert
,numRecordsOutByIncrementalUpdate
,numRecordsOutByIncrementalDelete
2024.7.12 change metrics name,change
![image](https://private-user-images.githubusercontent.com/125547374/348212556-892f1090-58a9-4212-95d1-9816f75e06e8.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzkwMDYxMDEsIm5iZiI6MTczOTAwNTgwMSwicGF0aCI6Ii8xMjU1NDczNzQvMzQ4MjEyNTU2LTg5MmYxMDkwLTU4YTktNDIxMi05NWQxLTk4MTZmNzVlMDZlOC5wbmc_WC1BbXotQWxnb3JpdGhtPUFXUzQtSE1BQy1TSEEyNTYmWC1BbXotQ3JlZGVudGlhbD1BS0lBVkNPRFlMU0E1M1BRSzRaQSUyRjIwMjUwMjA4JTJGdXMtZWFzdC0xJTJGczMlMkZhd3M0X3JlcXVlc3QmWC1BbXotRGF0ZT0yMDI1MDIwOFQwOTEwMDFaJlgtQW16LUV4cGlyZXM9MzAwJlgtQW16LVNpZ25hdHVyZT04Mjc1ZjMyZGNmZTExZThlZWZjOTRkZmQ1NjkwNmU3NDY4ZjhlNjI5ZTJmNDE1ZmExNGU5Y2ZkMTQwYzgzYjBhJlgtQW16LVNpZ25lZEhlYWRlcnM9aG9zdCJ9.ecchdfvKyMG1vYzD1tXO_y-7Umpcsi3kLg0Cbs2KR7k)
numRecordsOutByIncrementalInsert
tonumRecordsOutByDataChangeEventInsert