From 9bc985c0673374d1167f0149c7345c7f6549ff25 Mon Sep 17 00:00:00 2001 From: Linary Date: Wed, 25 Nov 2020 19:08:18 +0800 Subject: [PATCH] Fix leader doesn't update cache for request forwarded by follower (#1279) * also add snapshot operation in security check white list * clear cache when truncate or clear backend Change-Id: Ibb6b1e0966d1df3a77b96aa8f48c30cd29c1132a --- .../backend/cache/AbstractCache.java | 3 + .../backend/cache/CachedGraphTransaction.java | 7 +- .../cache/CachedSchemaTransaction.java | 7 +- .../backend/store/raft/RaftBackendStore.java | 1 + .../backend/store/raft/RaftSharedContext.java | 12 +- .../backend/store/raft/StoreCommand.java | 11 ++ .../backend/store/raft/StoreStateMachine.java | 120 +++++++++--------- .../store/raft/rpc/StoreCommandProcessor.java | 2 +- .../security/HugeSecurityManager.java | 32 +++-- 9 files changed, 117 insertions(+), 78 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java index 8243b0360b..14d251e515 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java @@ -34,6 +34,9 @@ public abstract class AbstractCache implements Cache { public static final int DEFAULT_SIZE = 1 * MB; public static final int MAX_INIT_CAP = 100 * MB; + public static final String ACTION_INVALID = "invalid"; + public static final String ACTION_CLEAR = "clear"; + protected static final Logger LOG = Log.logger(Cache.class); private volatile long hits = 0L; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java index 056f2c3fce..1d1a5bcdc3 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java @@ -19,6 +19,9 @@ package com.baidu.hugegraph.backend.cache; +import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR; +import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -138,7 +141,7 @@ private void listenChanges() { this.graph(), event); event.checkArgs(String.class, HugeType.class, Id.class); Object[] args = event.args(); - if ("invalid".equals(args[0])) { + if (ACTION_INVALID.equals(args[0])) { HugeType type = (HugeType) args[1]; Id id = (Id) args[2]; if (type.isVertex()) { @@ -153,7 +156,7 @@ private void listenChanges() { this.edgesCache.clear(); } return true; - } else if ("clear".equals(args[0])) { + } else if (ACTION_CLEAR.equals(args[0])) { this.verticesCache.clear(); this.edgesCache.clear(); return true; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java index 20032c539c..91926b5d41 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java @@ -19,6 +19,9 @@ package com.baidu.hugegraph.backend.cache; +import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR; +import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID; + import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -104,7 +107,7 @@ private void listenChanges() { this.graph(), event); event.checkArgs(String.class, HugeType.class, Id.class); Object[] args = event.args(); - if ("invalid".equals(args[0])) { + if (ACTION_INVALID.equals(args[0])) { HugeType type = (HugeType) args[1]; Id id = (Id) args[2]; this.arrayCaches.remove(type, id); @@ -122,7 +125,7 @@ private void listenChanges() { this.nameCache.invalidate(prefixedName); } return true; - } else if ("clear".equals(args[0])) { + } else if (ACTION_CLEAR.equals(args[0])) { this.clearCache(); return true; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java index 7085bf4d66..3515f12b9e 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java @@ -145,6 +145,7 @@ public Number queryNumber(Query query) { @Override public void beginTx() { + // Don't write raft log, commitTx(in statemachine) will call beginTx } @Override diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java index 45d7dc93d3..d76cb41c97 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java @@ -19,6 +19,8 @@ package com.baidu.hugegraph.backend.store.raft; +import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR; + import java.io.File; import java.io.IOException; import java.nio.file.Paths; @@ -255,7 +257,13 @@ public NodeOptions nodeOptions() throws IOException { return nodeOptions; } - public void notifyCache(HugeType type, Id id) { + public void clearCache() { + // Just choose two representatives used to represent schema and graph + this.notifyCache(ACTION_CLEAR, HugeType.VERTEX_LABEL, null); + this.notifyCache(ACTION_CLEAR, HugeType.VERTEX, null); + } + + public void notifyCache(String action, HugeType type, Id id) { EventHub eventHub; if (type.isGraph()) { eventHub = this.params.graphEventHub(); @@ -266,7 +274,7 @@ public void notifyCache(HugeType type, Id id) { } try { // How to avoid update cache from server info - eventHub.notify(Events.CACHE, "invalid", type, id); + eventHub.notify(Events.CACHE, action, type, id); } catch (RejectedExecutionException e) { LOG.warn("Can't update cache due to EventHub is too busy"); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreCommand.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreCommand.java index 3a7a8992c7..f239d0f103 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreCommand.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreCommand.java @@ -30,8 +30,14 @@ public class StoreCommand { private final StoreType type; private final StoreAction action; private final byte[] data; + private final boolean forwarded; public StoreCommand(StoreType type, StoreAction action, byte[] data) { + this(type, action, data, false); + } + + public StoreCommand(StoreType type, StoreAction action, + byte[] data, boolean forwarded) { this.type = type; this.action = action; if (data == null) { @@ -42,6 +48,7 @@ public StoreCommand(StoreType type, StoreAction action, byte[] data) { } this.data[0] = (byte) this.type.getNumber(); this.data[1] = (byte) this.action.getNumber(); + this.forwarded = forwarded; } public StoreType type() { @@ -56,6 +63,10 @@ public byte[] data() { return this.data; } + public boolean forwarded() { + return this.forwarded; + } + public static void writeHeader(BytesBuffer buffer) { buffer.write((byte) 0); buffer.write((byte) 0); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java index f51048b7ae..670cd6a627 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java @@ -19,10 +19,9 @@ package com.baidu.hugegraph.backend.store.raft; -import java.util.EnumMap; +import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID; + import java.util.List; -import java.util.Map; -import java.util.function.BiConsumer; import org.slf4j.Logger; @@ -56,13 +55,10 @@ public class StoreStateMachine extends StateMachineAdapter { private final RaftSharedContext context; private final StoreSnapshotFile snapshotFile; - private final Map> funcs; public StoreStateMachine(RaftSharedContext context) { this.context = context; this.snapshotFile = new StoreSnapshotFile(context.stores()); - this.funcs = new EnumMap<>(StoreAction.class); - this.registerCommands(); } private BackendStore store(StoreType type) { @@ -73,45 +69,18 @@ private RaftNode node() { return this.context.node(); } - private void registerCommands() { - // clear - this.register(StoreAction.CLEAR, (store, buffer) -> { - boolean clearSpace = buffer.read() > 0; - store.clear(clearSpace); - }); - this.register(StoreAction.TRUNCATE, (store, buffer) -> { - store.truncate(); - }); - this.register(StoreAction.SNAPSHOT, (store, buffer) -> { - assert store == null; - this.node().snapshot(); - }); - this.register(StoreAction.BEGIN_TX, (store, buffer) -> store.beginTx()); - this.register(StoreAction.COMMIT_TX, (store, buffer) -> { - List ms = StoreSerializer.readMutations(buffer); - store.beginTx(); - for (BackendMutation mutation : ms) { - store.mutate(mutation); - // update cache on follower when graph run in general mode - if (this.context.graphMode() == GraphMode.NONE) { - this.updateCacheIfNeeded(mutation); - } - } - store.commitTx(); - }); - this.register(StoreAction.ROLLBACK_TX, (store, buffer) -> { - store.rollbackTx(); - }); - // increase counter - this.register(StoreAction.INCR_COUNTER, (store, buffer) -> { - IncrCounter counter = StoreSerializer.readIncrCounter(buffer); - store.increaseCounter(counter.type(), counter.increment()); - }); - } - - private void updateCacheIfNeeded(BackendMutation mutation) { - // Only follower need to update cache from store to tx - if (this.node().selfIsLeader()) { + private void updateCacheIfNeeded(BackendMutation mutation, + boolean forwarded) { + // Update cache only when graph run in general mode + if (this.context.graphMode() != GraphMode.NONE) { + return; + } + /* + * 1. Follower need to update cache from store to tx + * 2. If request come from leader, cache will be updated by upper layer + * 3. If request is forwarded by follower, need to update cache + */ + if (!forwarded && this.node().selfIsLeader()) { return; } for (HugeType type : mutation.types()) { @@ -121,16 +90,11 @@ private void updateCacheIfNeeded(BackendMutation mutation) { for (java.util.Iterator it = mutation.mutation(type); it.hasNext();) { BackendEntry entry = it.next().entry(); - this.context.notifyCache(type, entry.originId()); + this.context.notifyCache(ACTION_INVALID, type, entry.originId()); } } } - private void register(StoreAction action, - BiConsumer func) { - this.funcs.put(action, func); - } - @Override public void onApply(Iterator iter) { LOG.debug("Node role: {}", this.node().selfIsLeader() ? @@ -141,13 +105,16 @@ public void onApply(Iterator iter) { closure = (StoreClosure) iter.done(); if (closure != null) { // Leader just take it out from the closure - BytesBuffer buffer = BytesBuffer.wrap(closure.command().data()); + StoreCommand command = closure.command(); + BytesBuffer buffer = BytesBuffer.wrap(command.data()); // The first two bytes are StoreType and StoreAction StoreType type = StoreType.valueOf(buffer.read()); StoreAction action = StoreAction.valueOf(buffer.read()); + boolean forwarded = command.forwarded(); // Let the producer thread to handle it closure.complete(Status.OK(), () -> { - return this.applyCommand(type, action, buffer); + this.applyCommand(type, action, buffer, forwarded); + return null; }); } else { // Follower need readMutation data @@ -161,7 +128,7 @@ public void onApply(Iterator iter) { StoreType type = StoreType.valueOf(buffer.read()); StoreAction action = StoreAction.valueOf(buffer.read()); try { - this.applyCommand(type, action, buffer); + this.applyCommand(type, action, buffer, false); } catch (Throwable e) { LOG.error("Failed to execute backend command: {}", action, e); @@ -184,12 +151,47 @@ public void onApply(Iterator iter) { } } - private Object applyCommand(StoreType type, StoreAction action, - BytesBuffer buffer) { + private void applyCommand(StoreType type, StoreAction action, + BytesBuffer buffer, boolean forwarded) { BackendStore store = type != StoreType.ALL ? this.store(type) : null; - BiConsumer func = this.funcs.get(action); - func.accept(store, buffer); - return null; + switch (action) { + case CLEAR: + boolean clearSpace = buffer.read() > 0; + store.clear(clearSpace); + this.context.clearCache(); + break; + case TRUNCATE: + store.truncate(); + this.context.clearCache(); + break; + case SNAPSHOT: + assert store == null; + this.node().snapshot(); + break; + case BEGIN_TX: + store.beginTx(); + break; + case COMMIT_TX: + List ms = StoreSerializer.readMutations(buffer); + // RaftBackendStore doesn't write raft log for beginTx + store.beginTx(); + for (BackendMutation mutation : ms) { + store.mutate(mutation); + this.updateCacheIfNeeded(mutation, forwarded); + } + store.commitTx(); + break; + case ROLLBACK_TX: + store.rollbackTx(); + break; + // increase counter + case INCR_COUNTER: + IncrCounter counter = StoreSerializer.readIncrCounter(buffer); + store.increaseCounter(counter.type(), counter.increment()); + break; + default: + throw new IllegalArgumentException("Invalid action " + action); + } } @Override diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java index 167fc68987..50964d49b6 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java @@ -76,6 +76,6 @@ private StoreCommand parseStoreCommand(StoreCommandRequest request) { StoreType type = request.getType(); StoreAction action = request.getAction(); byte[] data = request.getData().toByteArray(); - return new StoreCommand(type, action, data); + return new StoreCommand(type, action, data, true); } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java index 55ea2e405a..4ccf9e9eb4 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java @@ -102,6 +102,12 @@ public class HugeSecurityManager extends SecurityManager { "com.baidu.hugegraph.backend.store.hbase.HbaseSessions$RowIterator" ); + private static final Set RAFT_CLASSES = ImmutableSet.of( + "com.baidu.hugegraph.backend.store.raft.RaftNode", + "com.baidu.hugegraph.backend.store.raft.StoreStateMachine", + "com.baidu.hugegraph.backend.store.raft.rpc.RpcForwarder" + ); + @Override public void checkPermission(Permission permission) { if (DENIED_PERMISSIONS.contains(permission.getName()) && @@ -143,7 +149,7 @@ public void checkAccess(Thread thread) { if (callFromGremlin() && !callFromCaffeine() && !callFromAsyncTasks() && !callFromEventHubNotify() && !callFromBackendThread() && !callFromBackendHbase() && - !callFromRaftMethods()) { + !callFromRaft()) { throw newSecurityException( "Not allowed to access thread via Gremlin"); } @@ -155,7 +161,7 @@ public void checkAccess(ThreadGroup threadGroup) { if (callFromGremlin() && !callFromCaffeine() && !callFromAsyncTasks() && !callFromEventHubNotify() && !callFromBackendThread() && !callFromBackendHbase() && - !callFromRaftMethods()) { + !callFromRaft()) { throw newSecurityException( "Not allowed to access thread group via Gremlin"); } @@ -182,7 +188,8 @@ public void checkExec(String cmd) { @Override public void checkRead(FileDescriptor fd) { - if (callFromGremlin() && !callFromBackendSocket()) { + if (callFromGremlin() && !callFromBackendSocket() && + !callFromRaft()) { throw newSecurityException("Not allowed to read fd via Gremlin"); } super.checkRead(fd); @@ -191,7 +198,8 @@ public void checkRead(FileDescriptor fd) { @Override public void checkRead(String file) { if (callFromGremlin() && !callFromCaffeine() && - !readGroovyInCurrentDir(file) && !callFromBackendHbase()) { + !readGroovyInCurrentDir(file) && !callFromBackendHbase() && + !callFromRaft()) { throw newSecurityException( "Not allowed to read file via Gremlin: %s", file); } @@ -200,7 +208,7 @@ public void checkRead(String file) { @Override public void checkRead(String file, Object context) { - if (callFromGremlin()) { + if (callFromGremlin() && !callFromRaft()) { throw newSecurityException( "Not allowed to read file via Gremlin: %s", file); } @@ -209,7 +217,8 @@ public void checkRead(String file, Object context) { @Override public void checkWrite(FileDescriptor fd) { - if (callFromGremlin() && !callFromBackendSocket()) { + if (callFromGremlin() && !callFromBackendSocket() && + !callFromRaft()) { throw newSecurityException("Not allowed to write fd via Gremlin"); } super.checkWrite(fd); @@ -217,7 +226,7 @@ public void checkWrite(FileDescriptor fd) { @Override public void checkWrite(String file) { - if (callFromGremlin()) { + if (callFromGremlin() && !callFromRaft()) { throw newSecurityException("Not allowed to write file via Gremlin"); } super.checkWrite(file); @@ -253,7 +262,7 @@ public void checkAccept(String host, int port) { @Override public void checkConnect(String host, int port) { if (callFromGremlin() && !callFromBackendSocket() && - !callFromBackendHbase()) { + !callFromBackendHbase() && !callFromRaft()) { throw newSecurityException( "Not allowed to connect socket via Gremlin"); } @@ -308,7 +317,7 @@ public void checkPropertiesAccess() { public void checkPropertyAccess(String key) { if (!callFromAcceptClassLoaders() && callFromGremlin() && !WHITE_SYSTEM_PROPERTYS.contains(key) && !callFromBackendHbase() && - !callFromRaftMethods()) { + !callFromRaft()) { throw newSecurityException( "Not allowed to access system property(%s) via Gremlin", key); } @@ -428,9 +437,8 @@ private static boolean callFromBackendHbase() { return callFromWorkerWithClass(HBASE_CLASSES); } - private static boolean callFromRaftMethods() { - return callFromMethod("com.baidu.hugegraph.backend.store.raft.rpc.RpcForwarder", - "forwardToLeader"); + private static boolean callFromRaft() { + return callFromWorkerWithClass(RAFT_CLASSES); } private static boolean callFromWorkerWithClass(Set classes) {