diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/rdb/RdbParseContext.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/rdb/RdbParseContext.java index e8bfe10889..391bf25166 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/rdb/RdbParseContext.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/rdb/RdbParseContext.java @@ -60,7 +60,7 @@ public interface RdbParseContext { enum RdbType { STRING(RdbConstant.REDIS_RDB_TYPE_STRING, false, RdbStringParser::new), -// LIST(RdbConstant.REDIS_RDB_TYPE_LIST), + LIST(RdbConstant.REDIS_RDB_TYPE_LIST,false,RdbListParser::new), SET(RdbConstant.REDIS_RDB_TYPE_SET, false, RdbSetParser::new), // ZSET(RdbConstant.REDIS_RDB_TYPE_ZSET), HASH(RdbConstant.REDIS_RDB_TYPE_HASH, false, RdbHashParser::new), diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/rdb/parser/RdbListParser.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/rdb/parser/RdbListParser.java new file mode 100644 index 0000000000..522da57367 --- /dev/null +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/rdb/parser/RdbListParser.java @@ -0,0 +1,125 @@ +package com.ctrip.xpipe.redis.core.redis.rdb.parser; + +import com.ctrip.xpipe.redis.core.redis.exception.RdbParseEmptyKeyException; +import com.ctrip.xpipe.redis.core.redis.operation.RedisOpType; +import com.ctrip.xpipe.redis.core.redis.operation.op.RedisOpSingleKey; +import com.ctrip.xpipe.redis.core.redis.rdb.RdbLength; +import com.ctrip.xpipe.redis.core.redis.rdb.RdbParseContext; +import com.ctrip.xpipe.redis.core.redis.rdb.RdbParser; +import io.netty.buffer.ByteBuf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author hailu + * @date 2024/1/17 19:06 + */ +public class RdbListParser extends AbstractRdbParser implements RdbParser { + + private RdbParseContext context; + + private RdbParser rdbStringParser; + + private RdbLength len; + + private int readCnt; + + private STATE state = STATE.READ_INIT; + + private static final Logger logger = LoggerFactory.getLogger(RdbListParser.class); + + enum STATE { + READ_INIT, + READ_LEN, + READ_VALUE, + READ_END + } + + public RdbListParser(RdbParseContext parseContext) { + this.context = parseContext; + this.rdbStringParser = (RdbParser) context.getOrCreateParser(RdbParseContext.RdbType.STRING); + } + + @Override + public Integer read(ByteBuf byteBuf) { + + while (!isFinish() && byteBuf.readableBytes() > 0) { + + switch (state) { + + case READ_INIT: + len = null; + readCnt = 0; + state = STATE.READ_LEN; + break; + + case READ_LEN: + len = parseRdbLength(byteBuf); + if (null != len) { + if (len.getLenValue() > 0) { + state = STATE.READ_VALUE; + } else { + throw new RdbParseEmptyKeyException("set key " + context.getKey()); + } + } + break; + + case READ_VALUE: + byte[] value = rdbStringParser.read(byteBuf); + if (null != value) { + rdbStringParser.reset(); + propagateCmdIfNeed(value); + + readCnt++; + if (readCnt >= len.getLenValue()) { + state = STATE.READ_END; + } else { + state = STATE.READ_VALUE; + } + } + break; + + case READ_END: + default: + + } + + if (isFinish()) { + propagateExpireAtIfNeed(context.getKey(), context.getExpireMilli()); + } + } + + if (isFinish()) return len.getLenValue(); + else return null; + } + + private void propagateCmdIfNeed(byte[] value) { + if (null == value || null == context.getKey()) { + return; + } + + notifyRedisOp(new RedisOpSingleKey( + RedisOpType.RPUSH, + new byte[][] {RedisOpType.RPUSH.name().getBytes(), context.getKey().get(), value}, + context.getKey(), value)); + } + + @Override + public boolean isFinish() { + return STATE.READ_END.equals(state); + } + + @Override + public void reset() { + super.reset(); + if (rdbStringParser != null) { + rdbStringParser.reset(); + } + this.state = STATE.READ_INIT; + } + + @Override + protected Logger getLogger() { + return logger; + } +} diff --git a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/rdb/parser/DefaultRdbParserTest.java b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/rdb/parser/DefaultRdbParserTest.java index e703e09b6f..0c336487a3 100644 --- a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/rdb/parser/DefaultRdbParserTest.java +++ b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/rdb/parser/DefaultRdbParserTest.java @@ -146,6 +146,23 @@ public void testParseSet() { Assert.assertEquals("SADD set 13927438904093012", redisOps.get(3).toString()); Assert.assertEquals("SADD set v1", redisOps.get(4).toString()); } + @Test + public void testParseList() { + ByteBuf byteBuf = Unpooled.wrappedBuffer(listRdbBytes); + while (!parser.isFinish()) { + parser.read(byteBuf); + } + + Assert.assertEquals("SELECT 0", redisOps.get(0).toString()); + Assert.assertEquals("RPUSH hailu_0_7743 aaa0_0_530", redisOps.get(1).toString()); + Assert.assertEquals("RPUSH hailu_0_7743 aaa0_0_454", redisOps.get(2).toString()); + Assert.assertEquals("RPUSH hailu_0_7743 aaa1_1_39", redisOps.get(3).toString()); + Assert.assertEquals("RPUSH hailu_0_7743 aaa1_1_244", redisOps.get(4).toString()); + Assert.assertEquals("RPUSH hailu_0_7743 aaa2_2_497", redisOps.get(5).toString()); + Assert.assertEquals("RPUSH hailu_0_7743 aaa2_2_972", redisOps.get(6).toString()); + Assert.assertEquals("RPUSH hailu_0_7743 aaa3_3_187", redisOps.get(7).toString()); + Assert.assertEquals("RPUSH hailu_0_7743 aaa3_3_148", redisOps.get(8).toString()); + } @Test public void testParseZiplistZSet() { diff --git a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/rdb/parser/RdbDataBytes.java b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/rdb/parser/RdbDataBytes.java index ced6da0f5f..7bda965701 100644 --- a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/rdb/parser/RdbDataBytes.java +++ b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/rdb/parser/RdbDataBytes.java @@ -116,6 +116,33 @@ public class RdbDataBytes { 0x33, 0x39, 0x32, 0x37, 0x34, 0x33, 0x38, 0x39, 0x30, 0x34, 0x30, 0x39, 0x33, 0x30, 0x31, 0x32, 0x02, 0x76, 0x31, (byte)0xff, (byte)0xb9, 0x52, 0x00, 0x16, (byte)0xaf, (byte)0xc6, 0x2d, (byte)0xae, 0x0a}; + public static final byte[] listRdbBytes = new byte[]{0x52, 0x45, 0x44, 0x49, 0x53, 0x30, 0x30, 0x30, 0x39, (byte) 0xfa, 0x09, 0x72, 0x65, 0x64, 0x69, 0x73, 0x2d, 0x76, 0x65, 0x72, 0x05, + 0x36, 0x2e, 0x32, 0x2e, 0x36, (byte)0xfa, 0x0a, 0x72, 0x65, 0x64, 0x69, 0x73, 0x2d, 0x62, 0x69, 0x74, 0x73, (byte)0xc0, 0x40, (byte)0xfa, 0x05, 0x63, + 0x74, 0x69, 0x6d, 0x65, (byte)0xc2, (byte)0x9b, (byte)0xc2, (byte)0xa7, 0x65, (byte)0xfa, 0x08, 0x75, 0x73, 0x65, 0x64, 0x2d, 0x6d, 0x65, 0x6d, (byte)0xc2, (byte)0x88, 0x3b, + (byte)0x87, 0x06, (byte)0xfa, 0x0e, 0x72, 0x65, 0x70, 0x6c, 0x2d, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2d, 0x64, 0x62, (byte)0xc0, 0x00, (byte)0xfa, 0x07, + 0x72, 0x65, 0x70, 0x6c, 0x2d, 0x69, 0x64, 0x28, 0x34, 0x37, 0x31, 0x34, 0x61, 0x66, 0x61, 0x66, 0x65, 0x39, 0x65, 0x33, 0x63, 0x61, + 0x37, 0x38, 0x65, 0x37, 0x63, 0x61, 0x64, 0x61, 0x66, 0x64, 0x65, 0x35, 0x33, 0x62, 0x62, 0x30, 0x66, 0x30, 0x66, 0x31, 0x31, 0x62, + 0x37, 0x63, 0x33, 0x39, (byte)0xfa, 0x0b, 0x72, 0x65, 0x70, 0x6c, 0x2d, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x0b, 0x33, 0x32, 0x36, 0x30, + 0x36, 0x30, 0x36, 0x32, 0x37, 0x37, 0x35, (byte)0xfa, 0x04, 0x67, 0x74, 0x69, 0x64, 0x40, (byte)0xe9, 0x38, 0x33, 0x31, 0x65, 0x34, 0x62, 0x35, + 0x66, 0x39, 0x38, 0x32, 0x38, 0x30, 0x34, 0x31, 0x32, 0x37, 0x31, 0x37, 0x65, 0x63, 0x66, 0x32, 0x39, 0x37, 0x35, 0x39, 0x32, 0x34, + 0x33, 0x30, 0x35, 0x30, 0x38, 0x37, 0x32, 0x36, 0x34, 0x36, 0x32, 0x3a, 0x30, 0x2c, 0x33, 0x66, 0x33, 0x33, 0x66, 0x35, 0x65, 0x64, + 0x61, 0x38, 0x62, 0x37, 0x63, 0x63, 0x66, 0x65, 0x65, 0x36, 0x66, 0x37, 0x33, 0x32, 0x33, 0x62, 0x61, 0x35, 0x64, 0x34, 0x37, 0x64, + 0x39, 0x38, 0x34, 0x62, 0x37, 0x38, 0x35, 0x66, 0x65, 0x39, 0x3a, 0x31, 0x2d, 0x31, 0x33, 0x31, 0x36, 0x32, 0x34, 0x33, 0x36, 0x2c, + 0x62, 0x39, 0x37, 0x39, 0x66, 0x61, 0x36, 0x35, 0x66, 0x31, 0x30, 0x64, 0x36, 0x33, 0x62, 0x34, 0x30, 0x37, 0x36, 0x37, 0x32, 0x34, + 0x31, 0x31, 0x63, 0x63, 0x33, 0x33, 0x31, 0x33, 0x33, 0x38, 0x30, 0x30, 0x65, 0x32, 0x33, 0x65, 0x32, 0x63, 0x3a, 0x31, 0x2d, 0x34, + 0x30, 0x2c, 0x65, 0x35, 0x32, 0x38, 0x62, 0x65, 0x35, 0x30, 0x61, 0x32, 0x64, 0x37, 0x32, 0x61, 0x65, 0x32, 0x33, 0x37, 0x63, 0x64, + 0x39, 0x36, 0x37, 0x32, 0x38, 0x36, 0x65, 0x38, 0x31, 0x39, 0x62, 0x38, 0x33, 0x39, 0x36, 0x32, 0x62, 0x36, 0x38, 0x32, 0x3a, 0x31, + 0x2d, 0x36, 0x34, 0x2c, 0x37, 0x35, 0x39, 0x33, 0x64, 0x64, 0x63, 0x63, 0x35, 0x31, 0x65, 0x38, 0x36, 0x36, 0x64, 0x61, 0x61, 0x64, + 0x33, 0x33, 0x37, 0x34, 0x35, 0x39, 0x31, 0x63, 0x35, 0x31, 0x35, 0x64, 0x39, 0x39, 0x62, 0x31, 0x64, 0x62, 0x36, 0x31, 0x36, 0x38, + 0x3a, 0x31, 0x2d, 0x32, 0x37, 0x34, (byte)0xfa, 0x0c, 0x61, 0x6f, 0x66, 0x2d, 0x70, 0x72, 0x65, 0x61, 0x6d, 0x62, 0x6c, 0x65, (byte)0xc0, 0x00, + (byte)0xfe, 0x00, (byte)0xfb, 0x01, 0x00, (byte)0xf9, 0x06, 0x0e, 0x0c, 0x68, 0x61, 0x69, 0x6c, 0x75, 0x5f, 0x30, 0x5f, 0x37, 0x37, 0x34, 0x33, 0x01, + 0x40, 0x6a, 0x6a, 0x00, 0x00, 0x00, 0x5d, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x0a, 0x61, 0x61, 0x61, 0x30, 0x5f, 0x30, 0x5f, 0x35, + 0x33, 0x30, 0x0c, 0x0a, 0x61, 0x61, 0x61, 0x30, 0x5f, 0x30, 0x5f, 0x34, 0x35, 0x34, 0x0c, 0x09, 0x61, 0x61, 0x61, 0x31, 0x5f, 0x31, + 0x5f, 0x33, 0x39, 0x0b, 0x0a, 0x61, 0x61, 0x61, 0x31, 0x5f, 0x31, 0x5f, 0x32, 0x34, 0x34, 0x0c, 0x0a, 0x61, 0x61, 0x61, 0x32, 0x5f, + 0x32, 0x5f, 0x34, 0x39, 0x37, 0x0c, 0x0a, 0x61, 0x61, 0x61, 0x32, 0x5f, 0x32, 0x5f, 0x39, 0x37, 0x32, 0x0c, 0x0a, 0x61, 0x61, 0x61, + 0x33, 0x5f, 0x33, 0x5f, 0x31, 0x38, 0x37, 0x0c, 0x0a, 0x61, 0x61, 0x61, 0x33, 0x5f, 0x33, 0x5f, 0x31, 0x34, 0x38, (byte)0xff, (byte)0xff, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + public static final byte[] ziplistZSetRdbBytes = new byte[] {0x52, 0x45, 0x44, 0x49, 0x53, 0x30, 0x30, 0x30, 0x39, (byte)0xfa, 0x09, 0x72, 0x65, 0x64, 0x69, 0x73, 0x2d, 0x76, 0x65, 0x72, 0x05, 0x36, 0x2e, 0x32, 0x2e, 0x36, (byte)0xfa, 0x0a, 0x72, 0x65, 0x64, 0x69, 0x73, 0x2d, 0x62, 0x69, 0x74, 0x73, (byte)0xc0, 0x40, (byte)0xfa, diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultCommandDispatcher.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultCommandDispatcher.java index 2ee042e83d..81fca25077 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultCommandDispatcher.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultCommandDispatcher.java @@ -289,7 +289,7 @@ protected boolean shouldFilter(RedisOp redisOp) { String channel; if (length == 3) { channel = new String(redisOp.buildRawOpArgs()[1]); - } else if(length >= 5) { + } else if (length >= 5) { channel = new String(redisOp.buildRawOpArgs()[4]); } else { logger.warn("publish command {} length={} unexpected, filtered", redisOp, length); @@ -301,7 +301,8 @@ protected boolean shouldFilter(RedisOp redisOp) { } } if (redisOp.getOpType().isSwallow()) { - logger.debug("[onRedisOp] filter unknown redisOp: {}", redisOp); + logger.info("[onRedisOp] swallow redisOp: {}", redisOp); + EventMonitor.DEFAULT.logEvent("APPLIER.SWALLOW.OP", String.format("swallow redisOp %s", redisOp.toString())); } return redisOp.getOpType().isSwallow(); } @@ -332,6 +333,7 @@ private void doOnRedisOp(RedisOp redisOp, long commandOffsetToAccumulate) { } if (shouldFilter(redisOp)) { + gtid_executed.get().add(redisOp.getOpGtid()); offsetRecorder.addAndGet(commandOffsetToAccumulate); return; }