Skip to content

Commit 49bc562

Browse files
committed
feature xkcoding#12.4第三次提交
1 parent b49cc38 commit 49bc562

File tree

5 files changed

+1031
-0
lines changed

5 files changed

+1031
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package com.xkcoding.kafkatrans;
2+
3+
import com.xkcoding.swagger.entity.Constant;
4+
5+
import java.io.FileInputStream;
6+
import java.io.IOException;
7+
import java.nio.ByteBuffer;
8+
import java.nio.ByteOrder;
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
12+
public class BinaryParserUtils {
13+
14+
// 定义 Kafka 二进制头结构
15+
static class KafkaTransBinaryHead {
16+
byte magic;
17+
byte headerLen;
18+
byte ipVersion;
19+
byte reserve;
20+
byte isFirst;
21+
byte isDone;
22+
byte netlinkId;
23+
int dateLen;
24+
short sequence;
25+
short pktCount;
26+
}
27+
28+
// 定义字段结构
29+
static class Field {
30+
int type;
31+
String name;
32+
33+
Field(int type, String name) {
34+
this.type = type;
35+
this.name = name;
36+
}
37+
}
38+
39+
// 读取文件内容
40+
public static byte[] readFile(String filePath) throws IOException {
41+
try (FileInputStream fis = new FileInputStream(filePath)) {
42+
byte[] content = new byte[fis.available()];
43+
fis.read(content);
44+
return content;
45+
}
46+
}
47+
48+
// 解析 Kafka 二进制头
49+
public static KafkaTransBinaryHead parseKafkaHead(ByteBuffer buffer) {
50+
KafkaTransBinaryHead head = new KafkaTransBinaryHead();
51+
head.magic = buffer.get();
52+
head.headerLen = buffer.get();
53+
byte flags = buffer.get();
54+
head.ipVersion = (byte) (flags & 0x0F);
55+
head.reserve = (byte) ((flags >> 4) & 0x03);
56+
head.isFirst = (byte) ((flags >> 6) & 0x01);
57+
head.isDone = (byte) ((flags >> 7) & 0x01);
58+
head.netlinkId = buffer.get();
59+
head.dateLen = buffer.getInt();
60+
head.sequence = buffer.getShort();
61+
head.pktCount = buffer.getShort();
62+
return head;
63+
}
64+
65+
// 打印数据
66+
public static void printData(String prefix, byte[] data, int len) {
67+
StringBuilder sb = new StringBuilder();
68+
sb.append(prefix).append("[").append(len).append("]:\n");
69+
70+
for (int i = 0; i < len; i += 16) {
71+
sb.append("| ");
72+
for (int j = i, k = 0; k < 16 && j < len; j++, k++) {
73+
sb.append(String.format("%02x ", data[j]));
74+
}
75+
for (int k = (len - i) % 16; k < 16; k++) {
76+
sb.append(" ");
77+
}
78+
sb.append("|");
79+
for (int j = i, k = 0; k < 16 && j < len; j++, k++) {
80+
char c = (char) data[j];
81+
if (!Character.isISOControl(c) && !Character.isWhitespace(c)) {
82+
sb.append(c);
83+
} else {
84+
sb.append('.');
85+
}
86+
}
87+
for (int k = (len - i) % 16; k < 16; k++) {
88+
sb.append(" ");
89+
}
90+
sb.append("|\n");
91+
}
92+
93+
System.out.println(sb.toString());
94+
}
95+
96+
// 主函数
97+
public static void main(String[] args) {
98+
try {
99+
// 读取文件
100+
byte[] content = readFile(Constant.KAFKA_LOG_PATH);
101+
ByteBuffer buffer = ByteBuffer.wrap(content).order(ByteOrder.BIG_ENDIAN);
102+
103+
// 解析 Kafka 头
104+
KafkaTransBinaryHead head = parseKafkaHead(buffer);
105+
System.out.printf("headlen: %d, ipVersion: %d, reserve: %d, isFirst: %d, isDone: %d, netlinkId: %d, dataLen: %d, seq: %d, pktCount: %d\n",
106+
head.headerLen, head.ipVersion, head.reserve, head.isFirst, head.isDone, head.netlinkId, head.dateLen, head.sequence, head.pktCount);
107+
108+
// 打印数据
109+
printData("data1", content, content.length);
110+
111+
// 处理数据
112+
byte[] sData = new byte[1024 * 1024 * 4];
113+
int sDataLen = 0;
114+
System.arraycopy(content, buffer.position(), sData, sDataLen, head.dateLen - 2);
115+
sDataLen += head.dateLen - 2;
116+
System.out.printf("proc %d %d\n", head.dateLen, sDataLen);
117+
118+
// 解析字段
119+
List<Field> fieldT = new ArrayList<>();
120+
int pos = 0;
121+
122+
buffer.order(ByteOrder.LITTLE_ENDIAN);
123+
short fieldCountShort = buffer.getShort(pos);
124+
int fieldCount = Short.toUnsignedInt(fieldCountShort);
125+
pos += 2;
126+
127+
128+
System.out.printf("fieldCount = %d\n", fieldCount);
129+
while (fieldCount > 0) {
130+
int len = buffer.getInt(pos);
131+
pos += 4;
132+
byte[] fieldNameBytes = new byte[len];
133+
System.arraycopy(sData, pos, fieldNameBytes, 0, len);
134+
pos += len;
135+
byte fieldType = sData[pos];
136+
pos += 1;
137+
String fieldName = new String(fieldNameBytes);
138+
fieldT.add(new Field(fieldType, fieldName));
139+
System.out.printf("field: %s %d %d\n", fieldName, len, fieldType);
140+
fieldCount--;
141+
}
142+
143+
// 解析记录
144+
byte netlinkCount = sData[pos];
145+
pos += 1;
146+
short netlinkId = buffer.getShort(pos);
147+
pos += 2;
148+
int netlinkRecord = buffer.getInt(pos);
149+
pos += 4;
150+
long netlinkTime = buffer.getLong(pos);
151+
pos += 8;
152+
int teimRecord = buffer.getInt(pos);
153+
pos += 4;
154+
155+
System.out.printf("%d %d %d %d %d\n", netlinkCount, netlinkId, netlinkRecord, netlinkTime, teimRecord);
156+
157+
while (teimRecord > 0 && pos < sDataLen) {
158+
teimRecord--;
159+
for (Field f : fieldT) {
160+
System.out.printf("%s %d %d ", f.name, f.type, pos);
161+
switch (f.type) {
162+
case 1:
163+
case 12:
164+
System.out.printf("%d\n", sData[pos]);
165+
pos += 1;
166+
break;
167+
case 2:
168+
System.out.printf("%d\n", buffer.getShort(pos));
169+
pos += 2;
170+
break;
171+
case 3:
172+
case 8:
173+
System.out.printf("%d\n", buffer.getInt(pos));
174+
pos += 4;
175+
break;
176+
case 4:
177+
case 5:
178+
case 6:
179+
case 9:
180+
System.out.printf("%d\n", buffer.getLong(pos));
181+
pos += 8;
182+
break;
183+
case 7:
184+
int len = buffer.getInt(pos);
185+
pos += 4;
186+
pos += len;
187+
System.out.printf("%d\n", len);
188+
break;
189+
case 10:
190+
byte ver = sData[pos];
191+
pos += 1;
192+
if (ver == 4) {
193+
pos += 4;
194+
} else if (ver == 6) {
195+
pos += 16;
196+
}
197+
System.out.printf("%d\n", ver);
198+
break;
199+
case 11:
200+
int strLen = buffer.getInt(pos);
201+
pos += 4;
202+
byte[] strBytes = new byte[strLen];
203+
System.arraycopy(sData, pos, strBytes, 0, strLen);
204+
pos += strLen;
205+
System.out.printf("%s\n", new String(strBytes));
206+
break;
207+
default:
208+
throw new IllegalArgumentException("Unknown field type: " + f.type);
209+
}
210+
}
211+
}
212+
213+
System.out.printf("%d %d %d\n", teimRecord, pos, sDataLen);
214+
215+
} catch (IOException e) {
216+
e.printStackTrace();
217+
}
218+
219+
}
220+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.xkcoding.kafkatrans;
2+
3+
public class ByteOrderUtils {
4+
public static short ntohs(short value) {
5+
return (short) (((value & 0xFF00) >> 8) | ((value & 0x00FF) << 8));
6+
}
7+
8+
public static int ntohl(int value) {
9+
return ((value & 0xFF000000) >> 24) |
10+
((value & 0x00FF0000) >> 8) |
11+
((value & 0x0000FF00) << 8) |
12+
((value & 0x000000FF) << 24);
13+
}
14+
15+
public static long ntohll(long value) {
16+
return ((value & 0xFF00000000000000L) >> 56) |
17+
((value & 0x00FF000000000000L) >> 40) |
18+
((value & 0x0000FF0000000000L) >> 24) |
19+
((value & 0x000000FF00000000L) >> 8) |
20+
((value & 0x00000000FF000000L) << 8) |
21+
((value & 0x0000000000FF0000L) << 24) |
22+
((value & 0x000000000000FF00L) << 40) |
23+
((value & 0x00000000000000FFL) << 56);
24+
}
25+
26+
public static short htons(short value) {
27+
return ntohs(value);
28+
}
29+
30+
public static int htonl(int value) {
31+
return ntohl(value);
32+
}
33+
34+
public static long htonll(long value) {
35+
return ntohll(value);
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.xkcoding.kafkatrans;
2+
3+
public class KafkaTransBinaryHead {
4+
private byte magic;
5+
private byte headerLen;
6+
private byte ipVersion; // 4 bits
7+
private byte reserve; // 2 bits
8+
private boolean isFirst; // 1 bit
9+
private boolean isDone; // 1 bit
10+
private byte netlinkId;
11+
private int dataLen;
12+
private short sequence;
13+
private short pktCount;
14+
private byte[] ipaddr; // variable length
15+
16+
// Getters and setters
17+
public byte getMagic() { return magic; }
18+
public void setMagic(byte magic) { this.magic = magic; }
19+
20+
public byte getHeaderLen() { return headerLen; }
21+
public void setHeaderLen(byte headerLen) { this.headerLen = headerLen; }
22+
23+
public byte getIpVersion() { return ipVersion; }
24+
public void setIpVersion(byte ipVersion) { this.ipVersion = ipVersion; }
25+
26+
public byte getReserve() { return reserve; }
27+
public void setReserve(byte reserve) { this.reserve = reserve; }
28+
29+
public boolean isFirst() { return isFirst; }
30+
public void setFirst(boolean first) { isFirst = first; }
31+
32+
public boolean isDone() { return isDone; }
33+
public void setDone(boolean done) { isDone = done; }
34+
35+
public byte getNetlinkId() { return netlinkId; }
36+
public void setNetlinkId(byte netlinkId) { this.netlinkId = netlinkId; }
37+
38+
public int getDataLen() { return dataLen; }
39+
public void setDataLen(int dataLen) { this.dataLen = dataLen; }
40+
41+
public short getSequence() { return sequence; }
42+
public void setSequence(short sequence) { this.sequence = sequence; }
43+
44+
public short getPktCount() { return pktCount; }
45+
public void setPktCount(short pktCount) { this.pktCount = pktCount; }
46+
47+
public byte[] getIpaddr() { return ipaddr; }
48+
public void setIpaddr(byte[] ipaddr) { this.ipaddr = ipaddr; }
49+
}

0 commit comments

Comments
 (0)