From 19fbd28ffffce397e5041558fbe89276839cfd0c Mon Sep 17 00:00:00 2001 From: EasonLiao Date: Mon, 29 Sep 2014 11:21:58 -0700 Subject: [PATCH] Add callback interface for notifying user cluster changes. --- .../com/github/zk1931/skipper/Skipper.java | 20 +++++- .../zk1931/skipper/StateChangeCallback.java | 49 ++++++++++++++ .../zk1931/skipper/SkipperHashMapTest.java | 67 +++++++++++++++---- 3 files changed, 121 insertions(+), 15 deletions(-) create mode 100644 src/main/java/com/github/zk1931/skipper/StateChangeCallback.java diff --git a/src/main/java/com/github/zk1931/skipper/Skipper.java b/src/main/java/com/github/zk1931/skipper/Skipper.java index ae0a884..1250da4 100644 --- a/src/main/java/com/github/zk1931/skipper/Skipper.java +++ b/src/main/java/com/github/zk1931/skipper/Skipper.java @@ -51,20 +51,26 @@ public class Skipper { private final SkipperContext skipperCtx; + private final StateChangeCallback stateChangeCallback; + /** * Constructs Skipper object by joining a Skipper server. * * @param serverId the server address of this server. * @param joinPeer the server address you want to join in. * @param logDir the log directory for Skipper. + * @param stateChangeCallback the callback that notifies user the changes of + * cluster membership. * @throws InterruptedException in case of interruption. */ - public Skipper(String serverId, String joinPeer, File logdir) + public Skipper(String serverId, String joinPeer, File logdir, + StateChangeCallback stateChangeCallback) throws InterruptedException { Properties prop = new Properties(); prop.setProperty("logdir", logdir.getPath()); prop.setProperty("serverId", serverId); this.stateMachine = new SkipperStateMachine(); + this.stateChangeCallback = stateChangeCallback; this.zab = new QuorumZab(this.stateMachine, prop, joinPeer); this.commandsPool= new CommandPool(this.zab); this.serverId = this.zab.getServerId(); @@ -77,12 +83,16 @@ public Skipper(String serverId, String joinPeer, File logdir) * Constructs Skipper object by recovering from a log directory. * * @param logDir the log directory for Skipper. + * @param stateChangeCallback the callback that notifies user the changes of + * cluster membership. * @throws InterruptedException in case of interruption. */ - public Skipper(File logdir) throws InterruptedException { + public Skipper(File logdir, StateChangeCallback stateChangeCallback) + throws InterruptedException { Properties prop = new Properties(); prop.setProperty("logdir", logdir.getPath()); this.stateMachine = new SkipperStateMachine(); + this.stateChangeCallback = stateChangeCallback; this.zab = new QuorumZab(this.stateMachine, prop); this.commandsPool= new CommandPool(this.zab); this.serverId = this.zab.getServerId(); @@ -206,6 +216,9 @@ public void leading(Set activeFollowers, LOG.info("Leading : "); LOG.info("- Active followers : {}", setToString(activeFollowers)); LOG.info("- Cluster members: {}", setToString(clusterMembers)); + if (stateChangeCallback != null) { + stateChangeCallback.leading(activeFollowers, clusterMembers); + } broadcasting.countDown(); } @@ -213,6 +226,9 @@ public void leading(Set activeFollowers, public void following(String leader, Set clusterMembers) { LOG.info("Following {} : ", leader); LOG.info("- Cluster members: {}", setToString(clusterMembers)); + if (stateChangeCallback != null) { + stateChangeCallback.following(leader, clusterMembers); + } broadcasting.countDown(); } } diff --git a/src/main/java/com/github/zk1931/skipper/StateChangeCallback.java b/src/main/java/com/github/zk1931/skipper/StateChangeCallback.java new file mode 100644 index 0000000..2d11d51 --- /dev/null +++ b/src/main/java/com/github/zk1931/skipper/StateChangeCallback.java @@ -0,0 +1,49 @@ +/** + * Licensed to the zk1931 under one or more contributor license + * agreements. See the NOTICE file distributed with this work + * for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.zk1931.skipper; + +import java.util.Set; + +/** + * The callback interface for cluster state changes. + */ +public interface StateChangeCallback { + /** + * Upcall to notify the application who is running on the leader role of + * Skipper instance the membership changes of Zab cluster. The membership + * changes include the detection of recovered members or disconnected members + * in current configuration or new configuration after some one joined or be + * removed from current configuration. + * + * @param activeFollowers current alive followers. + * @param clusterMembers the members of new configuration. + */ + void leading(Set activeFollowers, Set clusterMembeers); + + /** + * Upcall to notify the application who is running on the follower role of + * Skipper instance the membership changes of Zab cluster. The membership + * changes include the detection of the leader or the new cluster + * configuration after some servers are joined or removed. + * + * @param leader current leader. + * @param clusterMembers the members of new configuration. + */ + void following(String leader, Set clusterMembers); +} diff --git a/src/test/java/com/github/zk1931/skipper/SkipperHashMapTest.java b/src/test/java/com/github/zk1931/skipper/SkipperHashMapTest.java index 0ec53bc..ff93172 100644 --- a/src/test/java/com/github/zk1931/skipper/SkipperHashMapTest.java +++ b/src/test/java/com/github/zk1931/skipper/SkipperHashMapTest.java @@ -21,6 +21,7 @@ import java.io.File; import java.util.HashMap; import java.util.Map; +import java.util.Set; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -44,8 +45,8 @@ public void testCluster() throws Exception { String server2 = getUniqueHostPort(); // Creats two Skippers. - Skipper sk1 = new Skipper(server1, server1, getDir(server1)); - Skipper sk2 = new Skipper(server2, server1, getDir(server2)); + Skipper sk1 = new Skipper(server1, server1, getDir(server1), null); + Skipper sk2 = new Skipper(server2, server1, getDir(server2), null); // Gets one SkipperHashMap from each context with the same name. SkipperHashMap map1 = @@ -78,7 +79,7 @@ public void testCluster() throws Exception { @Test public void testPut() throws Exception { String server1 = getUniqueHostPort(); - Skipper sk1 = new Skipper(server1, server1, getDir(server1)); + Skipper sk1 = new Skipper(server1, server1, getDir(server1), null); SkipperHashMap map1 = sk1.getHashMap("m1", String.class, String.class); map1.putAsync("key1", "value1"); @@ -91,7 +92,7 @@ public void testPut() throws Exception { @Test public void testRemove() throws Exception { String server1 = getUniqueHostPort(); - Skipper sk1 = new Skipper(server1, server1, getDir(server1)); + Skipper sk1 = new Skipper(server1, server1, getDir(server1), null); SkipperHashMap map1 = sk1.getHashMap("m1", String.class, String.class); map1.putAsync("key1", "value1"); @@ -109,8 +110,8 @@ public void testWrongType() throws Exception { String server2 = getUniqueHostPort(); // Creats two Skippers. - Skipper sk1 = new Skipper(server1, server1, getDir(server1)); - Skipper sk2 = new Skipper(server2, server1, getDir(server2)); + Skipper sk1 = new Skipper(server1, server1, getDir(server1), null); + Skipper sk2 = new Skipper(server2, server1, getDir(server2), null); // Gets one SkipperHashMap from each context with the same name. SkipperHashMap map1 = @@ -131,8 +132,8 @@ public void testClear() throws Exception { String server2 = getUniqueHostPort(); // Creats two Skippers. - Skipper sk1 = new Skipper(server1, server1, getDir(server1)); - Skipper sk2 = new Skipper(server2, server1, getDir(server2)); + Skipper sk1 = new Skipper(server1, server1, getDir(server1), null); + Skipper sk2 = new Skipper(server2, server1, getDir(server2), null); // Gets one SkipperHashMap from each context with the same name. SkipperHashMap map1 = @@ -170,8 +171,8 @@ public void testPutIfAbsent() throws Exception { String server2 = getUniqueHostPort(); // Creats two Skippers. - Skipper sk1 = new Skipper(server1, server1, getDir(server1)); - Skipper sk2 = new Skipper(server2, server1, getDir(server2)); + Skipper sk1 = new Skipper(server1, server1, getDir(server1), null); + Skipper sk2 = new Skipper(server2, server1, getDir(server2), null); // Gets one SkipperHashMap from each context with the same name. SkipperHashMap map1 = @@ -204,7 +205,7 @@ public void testPutIfAbsent() throws Exception { @Test public void testPutAll() throws Exception { String server1 = getUniqueHostPort(); - Skipper sk1 = new Skipper(server1, server1, getDir(server1)); + Skipper sk1 = new Skipper(server1, server1, getDir(server1), null); SkipperHashMap map1 = sk1.getHashMap("m1", String.class, String.class); Map hm = new HashMap(); @@ -220,7 +221,7 @@ public void testPutAll() throws Exception { @Test public void testRemoveIf() throws Exception { String server1 = getUniqueHostPort(); - Skipper sk1 = new Skipper(server1, server1, getDir(server1)); + Skipper sk1 = new Skipper(server1, server1, getDir(server1), null); SkipperHashMap map1 = sk1.getHashMap("m1", String.class, String.class); map1.put("key1", "value1"); @@ -238,7 +239,7 @@ public void testRemoveIf() throws Exception { @Test public void testReplace() throws Exception { String server1 = getUniqueHostPort(); - Skipper sk1 = new Skipper(server1, server1, getDir(server1)); + Skipper sk1 = new Skipper(server1, server1, getDir(server1), null); SkipperHashMap map1 = sk1.getHashMap("m1", String.class, String.class); // Replace key-value pair key1 with value1. @@ -262,4 +263,44 @@ public void testReplace() throws Exception { Assert.assertEquals("value3", map1.get("key1")); sk1.shutdown(); } + + @Test + public void testStateChangeCallback() throws Exception { + String server1 = getUniqueHostPort(); + String server2 = getUniqueHostPort(); + String server3 = getUniqueHostPort(); + + class TestChangeCallback implements StateChangeCallback { + volatile String leader; + volatile Set activeFollowers; + volatile Set clusterMembers; + + @Override + public void leading(Set actives, + Set members) { + this.activeFollowers = actives; + this.clusterMembers = members; + } + + @Override + public void following(String ld, Set members) { + this.leader = ld; + this.clusterMembers = members; + } + } + + TestChangeCallback testCallback = new TestChangeCallback(); + + Skipper sk1 = new Skipper(server1, server1, getDir(server1), null); + Skipper sk2 = new Skipper(server2, server1, getDir(server2), null); + Skipper sk3 = new Skipper(server3, server1, getDir(server3), testCallback); + + Assert.assertTrue(server1.equals(testCallback.leader) || + server2.equals(testCallback.leader)); + Assert.assertEquals(3, testCallback.clusterMembers.size()); + + sk1.shutdown(); + sk2.shutdown(); + sk3.shutdown(); + } }