Skip to content

Commit

Permalink
pass gtid.lwm & add unknow command parse
Browse files Browse the repository at this point in the history
  • Loading branch information
hailu committed Jan 18, 2024
1 parent b903aab commit 750abd6
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public enum RedisOpType {
MSETNX(true, -3),

// ctrip
GTID_LWM(false, 3, true),
GTID_LWM(false, 3, false),
CTRIP_MERGE_START(false, -1, true),
CTRIP_MERGE_END(false, -2, true),

Expand All @@ -98,7 +98,7 @@ public enum RedisOpType {
EXEC(false, 1),
SCRIPT(false, -2),
MOVE(false, 3),
UNKNOWN(false, -1);
UNKNOWN(false, -1, true);

// Support multi key or not
private boolean supportMultiKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ public enum RedisOpNoneKeyEnum {
PING(RedisOpType.PING),
MULT(RedisOpType.MULTI),
EXEC(RedisOpType.EXEC),
SCRIPT(RedisOpType.SCRIPT);
SCRIPT(RedisOpType.SCRIPT),

UNKNOW(RedisOpType.UNKNOWN);

private RedisOpType redisOpType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ public void testCtripGtidLwmParse() {
RedisSingleKeyOp redisSingleKeyOp = (RedisSingleKeyOp) redisOp;
Assert.assertArrayEquals("24d9e2513182d156cbd999df5ebedf24e7634140".getBytes(), redisSingleKeyOp.getKey().get());
Assert.assertArrayEquals("1494763841".getBytes(), redisSingleKeyOp.getValue());
Assert.assertFalse(redisOp.getOpType().isSwallow());
}
@Test
public void testUnknowParse() {
byte[][] rawOpArgs = {"unknow".getBytes(), "unknow_key".getBytes(), "unknow_value".getBytes()};
RedisOp redisOp = parser.parse(rawOpArgs);
Assert.assertEquals(RedisOpType.UNKNOWN, redisOp.getOpType());
Assert.assertNull(redisOp.getOpGtid());
Assert.assertArrayEquals(rawOpArgs, redisOp.buildRawOpArgs());

RedisSingleKeyOp redisSingleKeyOp = (RedisSingleKeyOp) redisOp;
Assert.assertNull(redisSingleKeyOp.getKey());
Assert.assertNull(redisSingleKeyOp.getValue());
Assert.assertTrue(redisOp.getOpType().isSwallow());
}

Expand Down Expand Up @@ -138,10 +151,11 @@ public void testPingParse() {
Assert.assertFalse(redisOp.getOpType().isSwallow());
}

@Test(expected = UnsupportedOperationException.class)
@Test
public void testNoneExistsCmdParse() {
RedisOp redisOp = parser.parse(Arrays.asList("EMPTY", "0").toArray());
Assert.assertEquals(RedisOpType.UNKNOWN, redisOp.getOpType());
Assert.assertTrue(redisOp.getOpType().isSwallow());
}

@Test(expected = IllegalArgumentException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ protected void doExecute() throws Throwable {

Object rc = resource != null ? resource : client.select(key().get());
Object[] rawArgs = redisOp.buildRawOpArgs();

if (getLogger().isDebugEnabled()) {
getLogger().debug("[command] write key {} start", redisOp() instanceof RedisSingleKeyOp ? ((RedisSingleKeyOp) redisOp()).getKey() : (redisOp() instanceof RedisMultiKeyOp ? keys() : "none"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.ctrip.xpipe.redis.keeper.applier.xsync;

import com.ctrip.xpipe.api.monitor.EventMonitor;
import com.ctrip.xpipe.client.redis.AsyncRedisClient;
import com.ctrip.xpipe.exception.XpipeRuntimeException;
import com.ctrip.xpipe.gtid.GtidSet;
Expand All @@ -8,6 +9,7 @@
import com.ctrip.xpipe.redis.core.redis.operation.RedisOp;
import com.ctrip.xpipe.redis.core.redis.operation.RedisOpParser;
import com.ctrip.xpipe.redis.core.redis.operation.RedisOpType;
import com.ctrip.xpipe.redis.core.redis.operation.op.RedisOpLwm;
import com.ctrip.xpipe.redis.core.redis.operation.op.RedisOpMergeEnd;
import com.ctrip.xpipe.redis.core.redis.operation.op.RedisOpMergeStart;
import com.ctrip.xpipe.redis.core.redis.rdb.RdbParser;
Expand Down Expand Up @@ -143,7 +145,7 @@ public void endReadRdb(EofType eofType, GtidSet rdbGtidSet, long rdbOffset) {
return;
}

//ctrip.merge_start [gtid_set]
//ctrip.merge_end [gtid_set]
sequenceController.submit(new DefaultBroadcastCommand(client, new RedisOpMergeEnd(rdbGtidSet.toString())), 0);
}

Expand Down Expand Up @@ -298,6 +300,9 @@ protected boolean shouldFilter(RedisOp redisOp) {
return true;
}
}
if (redisOp.getOpType().isSwallow()) {
logger.debug("[onRedisOp] filter unknown redisOp: {}", redisOp);
}
return redisOp.getOpType().isSwallow();
}

Expand Down

0 comments on commit 750abd6

Please sign in to comment.