Skip to content

Commit 751d6a0

Browse files
committed
feature xkcoding#12.4第三次提交
1 parent 80de748 commit 751d6a0

File tree

1 file changed

+218
-0
lines changed

1 file changed

+218
-0
lines changed
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
package com.xkcoding.swagger.controller;
2+
3+
import java.io.FileInputStream;
4+
import java.io.IOException;
5+
import java.nio.ByteBuffer;
6+
import java.nio.ByteOrder;
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
10+
public class BinaryParserUtils {
11+
12+
// 定义 Kafka 二进制头结构
13+
static class KafkaTransBinaryHead {
14+
byte magic;
15+
byte headerLen;
16+
byte ipVersion;
17+
byte reserve;
18+
byte isFirst;
19+
byte isDone;
20+
byte netlinkId;
21+
int dateLen;
22+
short sequence;
23+
short pktCount;
24+
}
25+
26+
// 定义字段结构
27+
static class Field {
28+
int type;
29+
String name;
30+
31+
Field(int type, String name) {
32+
this.type = type;
33+
this.name = name;
34+
}
35+
}
36+
37+
// 读取文件内容
38+
public static byte[] readFile(String filePath) throws IOException {
39+
try (FileInputStream fis = new FileInputStream(filePath)) {
40+
byte[] content = new byte[fis.available()];
41+
fis.read(content);
42+
return content;
43+
}
44+
}
45+
46+
// 解析 Kafka 二进制头
47+
public static KafkaTransBinaryHead parseKafkaHead(ByteBuffer buffer) {
48+
KafkaTransBinaryHead head = new KafkaTransBinaryHead();
49+
head.magic = buffer.get();
50+
head.headerLen = buffer.get();
51+
byte flags = buffer.get();
52+
head.ipVersion = (byte) (flags & 0x0F);
53+
head.reserve = (byte) ((flags >> 4) & 0x03);
54+
head.isFirst = (byte) ((flags >> 6) & 0x01);
55+
head.isDone = (byte) ((flags >> 7) & 0x01);
56+
head.netlinkId = buffer.get();
57+
head.dateLen = buffer.getInt();
58+
head.sequence = buffer.getShort();
59+
head.pktCount = buffer.getShort();
60+
return head;
61+
}
62+
63+
// 打印数据
64+
public static void printData(String prefix, byte[] data, int len) {
65+
StringBuilder sb = new StringBuilder();
66+
sb.append(prefix).append("[").append(len).append("]:\n");
67+
68+
for (int i = 0; i < len; i += 16) {
69+
sb.append("| ");
70+
for (int j = i, k = 0; k < 16 && j < len; j++, k++) {
71+
sb.append(String.format("%02x ", data[j]));
72+
}
73+
for (int k = (len - i) % 16; k < 16; k++) {
74+
sb.append(" ");
75+
}
76+
sb.append("|");
77+
for (int j = i, k = 0; k < 16 && j < len; j++, k++) {
78+
char c = (char) data[j];
79+
if (!Character.isISOControl(c) && !Character.isWhitespace(c)) {
80+
sb.append(c);
81+
} else {
82+
sb.append('.');
83+
}
84+
}
85+
for (int k = (len - i) % 16; k < 16; k++) {
86+
sb.append(" ");
87+
}
88+
sb.append("|\n");
89+
}
90+
91+
System.out.println(sb.toString());
92+
}
93+
94+
// 主函数
95+
public static void main(String[] args) {
96+
try {
97+
// 读取文件
98+
byte[] content = readFile("E:\\myfile\\mywork\\lingcloud\\cpic\\2579_1_2020.txt");
99+
ByteBuffer buffer = ByteBuffer.wrap(content).order(ByteOrder.BIG_ENDIAN);
100+
101+
// 解析 Kafka 头
102+
KafkaTransBinaryHead head = parseKafkaHead(buffer);
103+
System.out.printf("headlen: %d, ipVersion: %d, reserve: %d, isFirst: %d, isDone: %d, netlinkId: %d, dataLen: %d, seq: %d, pktCount: %d\n",
104+
head.headerLen, head.ipVersion, head.reserve, head.isFirst, head.isDone, head.netlinkId, head.dateLen, head.sequence, head.pktCount);
105+
106+
// 打印数据
107+
printData("data1", content, content.length);
108+
109+
// 处理数据
110+
byte[] sData = new byte[1024 * 1024 * 4];
111+
int sDataLen = 0;
112+
System.arraycopy(content, buffer.position(), sData, sDataLen, head.dateLen - 2);
113+
sDataLen += head.dateLen - 2;
114+
System.out.printf("proc %d %d\n", head.dateLen, sDataLen);
115+
116+
// 解析字段
117+
List<Field> fieldT = new ArrayList<>();
118+
int pos = 0;
119+
120+
buffer.order(ByteOrder.LITTLE_ENDIAN);
121+
short fieldCountShort = buffer.getShort(pos);
122+
int fieldCount = Short.toUnsignedInt(fieldCountShort);
123+
pos += 2;
124+
125+
126+
System.out.printf("fieldCount = %d\n", fieldCount);
127+
while (fieldCount > 0) {
128+
int len = buffer.getInt(pos);
129+
pos += 4;
130+
byte[] fieldNameBytes = new byte[len];
131+
System.arraycopy(sData, pos, fieldNameBytes, 0, len);
132+
pos += len;
133+
byte fieldType = sData[pos];
134+
pos += 1;
135+
String fieldName = new String(fieldNameBytes);
136+
fieldT.add(new Field(fieldType, fieldName));
137+
System.out.printf("field: %s %d %d\n", fieldName, len, fieldType);
138+
fieldCount--;
139+
}
140+
141+
// 解析记录
142+
byte netlinkCount = sData[pos];
143+
pos += 1;
144+
short netlinkId = buffer.getShort(pos);
145+
pos += 2;
146+
int netlinkRecord = buffer.getInt(pos);
147+
pos += 4;
148+
long netlinkTime = buffer.getLong(pos);
149+
pos += 8;
150+
int teimRecord = buffer.getInt(pos);
151+
pos += 4;
152+
153+
System.out.printf("%d %d %d %d %d\n", netlinkCount, netlinkId, netlinkRecord, netlinkTime, teimRecord);
154+
155+
while (teimRecord > 0 && pos < sDataLen) {
156+
teimRecord--;
157+
for (Field f : fieldT) {
158+
System.out.printf("%s %d %d ", f.name, f.type, pos);
159+
switch (f.type) {
160+
case 1:
161+
case 12:
162+
System.out.printf("%d\n", sData[pos]);
163+
pos += 1;
164+
break;
165+
case 2:
166+
System.out.printf("%d\n", buffer.getShort(pos));
167+
pos += 2;
168+
break;
169+
case 3:
170+
case 8:
171+
System.out.printf("%d\n", buffer.getInt(pos));
172+
pos += 4;
173+
break;
174+
case 4:
175+
case 5:
176+
case 6:
177+
case 9:
178+
System.out.printf("%d\n", buffer.getLong(pos));
179+
pos += 8;
180+
break;
181+
case 7:
182+
int len = buffer.getInt(pos);
183+
pos += 4;
184+
pos += len;
185+
System.out.printf("%d\n", len);
186+
break;
187+
case 10:
188+
byte ver = sData[pos];
189+
pos += 1;
190+
if (ver == 4) {
191+
pos += 4;
192+
} else if (ver == 6) {
193+
pos += 16;
194+
}
195+
System.out.printf("%d\n", ver);
196+
break;
197+
case 11:
198+
int strLen = buffer.getInt(pos);
199+
pos += 4;
200+
byte[] strBytes = new byte[strLen];
201+
System.arraycopy(sData, pos, strBytes, 0, strLen);
202+
pos += strLen;
203+
System.out.printf("%s\n", new String(strBytes));
204+
break;
205+
default:
206+
throw new IllegalArgumentException("Unknown field type: " + f.type);
207+
}
208+
}
209+
}
210+
211+
System.out.printf("%d %d %d\n", teimRecord, pos, sDataLen);
212+
213+
} catch (IOException e) {
214+
e.printStackTrace();
215+
}
216+
217+
}
218+
}

0 commit comments

Comments
 (0)