Skip to content

Commit

Permalink
add psync for applier
Browse files Browse the repository at this point in the history
  • Loading branch information
hailu committed May 22, 2024
1 parent 7f9a48b commit a0143df
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ public ApplierPsync(SimpleObjectPool<NettyClient> clientPool, AtomicReference<St
@Override
// psync runID offset
public ByteBuf getRequest() {
if (replId == null) {
replId = new AtomicReference<>("?");
}
long offsetRequest = offsetRecorder.get() == -1 || "?".equalsIgnoreCase(replId.get()) ? -1 : offsetRecorder.get() + 1;
long offsetRequest = "?".equalsIgnoreCase(replId.get()) ? -1 : offsetRecorder.get() + 1;
RequestStringParser requestString = new RequestStringParser(getName(), replId.get(),
String.valueOf(offsetRequest));
if (getLogger().isDebugEnabled()) {
Expand Down Expand Up @@ -81,7 +78,7 @@ protected void handleRedisResponse(Channel channel, String psync) throws IOExcep
replId.set(newReplId);
rdbOffset = Long.parseLong(split[2]);
// reset offset recorder
offsetRecorder.set(0);
offsetRecorder.set(-1);
getLogger().debug("[readRedisResponse]{}, {}, {}, {}", ChannelUtil.getDesc(channel), this, replId, rdbOffset);
syncState = READING_RDB;
doOnFullSync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public DefaultApplierServer(String clusterName, ClusterId clusterId, ShardId sha
this.sequenceController = new DefaultSequenceController(qpsThreshold, bytesPerSecondThreshold, memoryThreshold, concurrencyThreshold);
this.lwmManager = new DefaultLwmManager();
this.dispatcher = new DefaultCommandDispatcher();
this.offsetRecorder = new AtomicLong(0);
this.replication = new DefaultPsyncReplication(this);
this.offsetRecorder = new AtomicLong(-1);
this.replId = new AtomicReference<>("?");

this.parser = parser;
Expand Down Expand Up @@ -256,8 +257,11 @@ public void setStateActive(Endpoint endpoint, GtidSet gtidSet, boolean useXsync)
}

private void createReplication(boolean useXsync) {
if (!Objects.isNull(replication)) {
throw new IllegalStateException("can't change protocol");
if (Objects.nonNull(replication)) {
if ((replication instanceof DefaultXsyncReplication) != useXsync) {
throw new IllegalStateException("can't change protocol");
}
return;
}
logger.info("[setStateActive] useXsync:{}", useXsync);
try {
Expand All @@ -267,10 +271,10 @@ private void createReplication(boolean useXsync) {
} else {
syncReplication = new DefaultPsyncReplication(this);
}
dependencies.put("replication", syncReplication);
dependencies.put("replication",syncReplication);
syncReplication.inject(dependencies);
ComponentRegistryHolder.getComponentRegistry().add(syncReplication);
this.replication = syncReplication;
this.replication= syncReplication;
} catch (Exception e) {
logger.error("[setStateActive] inject syncReplication error", e);
throw new RuntimeException("create replication error");
Expand Down

0 comments on commit a0143df

Please sign in to comment.