Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add callback interface for notifying user cluster changes. #11

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions src/main/java/com/github/zk1931/skipper/Skipper.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,26 @@ public class Skipper {

private final SkipperContext skipperCtx;

private final StateChangeCallback stateChangeCallback;

/**
* Constructs Skipper object by joining a Skipper server.
*
* @param serverId the server address of this server.
* @param joinPeer the server address you want to join in.
* @param logDir the log directory for Skipper.
* @param stateChangeCallback the callback that notifies user the changes of
* cluster membership.
* @throws InterruptedException in case of interruption.
*/
public Skipper(String serverId, String joinPeer, File logdir)
public Skipper(String serverId, String joinPeer, File logdir,
StateChangeCallback stateChangeCallback)
throws InterruptedException {
Properties prop = new Properties();
prop.setProperty("logdir", logdir.getPath());
prop.setProperty("serverId", serverId);
this.stateMachine = new SkipperStateMachine();
this.stateChangeCallback = stateChangeCallback;
this.zab = new QuorumZab(this.stateMachine, prop, joinPeer);
this.commandsPool= new CommandPool(this.zab);
this.serverId = this.zab.getServerId();
Expand All @@ -77,12 +83,16 @@ public Skipper(String serverId, String joinPeer, File logdir)
* Constructs Skipper object by recovering from a log directory.
*
* @param logDir the log directory for Skipper.
* @param stateChangeCallback the callback that notifies user the changes of
* cluster membership.
* @throws InterruptedException in case of interruption.
*/
public Skipper(File logdir) throws InterruptedException {
public Skipper(File logdir, StateChangeCallback stateChangeCallback)
throws InterruptedException {
Properties prop = new Properties();
prop.setProperty("logdir", logdir.getPath());
this.stateMachine = new SkipperStateMachine();
this.stateChangeCallback = stateChangeCallback;
this.zab = new QuorumZab(this.stateMachine, prop);
this.commandsPool= new CommandPool(this.zab);
this.serverId = this.zab.getServerId();
Expand Down Expand Up @@ -206,13 +216,19 @@ public void leading(Set<String> activeFollowers,
LOG.info("Leading : ");
LOG.info("- Active followers : {}", setToString(activeFollowers));
LOG.info("- Cluster members: {}", setToString(clusterMembers));
if (stateChangeCallback != null) {
stateChangeCallback.leading(activeFollowers, clusterMembers);
}
broadcasting.countDown();
}

@Override
public void following(String leader, Set<String> clusterMembers) {
LOG.info("Following {} : ", leader);
LOG.info("- Cluster members: {}", setToString(clusterMembers));
if (stateChangeCallback != null) {
stateChangeCallback.following(leader, clusterMembers);
}
broadcasting.countDown();
}
}
Expand Down
49 changes: 49 additions & 0 deletions src/main/java/com/github/zk1931/skipper/StateChangeCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the zk1931 under one or more contributor license
* agreements. See the NOTICE file distributed with this work
* for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.zk1931.skipper;

import java.util.Set;

/**
* The callback interface for cluster state changes.
*/
public interface StateChangeCallback {
/**
* Upcall to notify the application who is running on the leader role of
* Skipper instance the membership changes of Zab cluster. The membership
* changes include the detection of recovered members or disconnected members
* in current configuration or new configuration after some one joined or be
* removed from current configuration.
*
* @param activeFollowers current alive followers.
* @param clusterMembers the members of new configuration.
*/
void leading(Set<String> activeFollowers, Set<String> clusterMembeers);

/**
* Upcall to notify the application who is running on the follower role of
* Skipper instance the membership changes of Zab cluster. The membership
* changes include the detection of the leader or the new cluster
* configuration after some servers are joined or removed.
*
* @param leader current leader.
* @param clusterMembers the members of new configuration.
*/
void following(String leader, Set<String> clusterMembers);
}
67 changes: 54 additions & 13 deletions src/test/java/com/github/zk1931/skipper/SkipperHashMapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
Expand All @@ -44,8 +45,8 @@ public void testCluster() throws Exception {
String server2 = getUniqueHostPort();

// Creats two Skippers.
Skipper sk1 = new Skipper(server1, server1, getDir(server1));
Skipper sk2 = new Skipper(server2, server1, getDir(server2));
Skipper sk1 = new Skipper(server1, server1, getDir(server1), null);
Skipper sk2 = new Skipper(server2, server1, getDir(server2), null);

// Gets one SkipperHashMap from each context with the same name.
SkipperHashMap<String, String> map1 =
Expand Down Expand Up @@ -78,7 +79,7 @@ public void testCluster() throws Exception {
@Test
public void testPut() throws Exception {
String server1 = getUniqueHostPort();
Skipper sk1 = new Skipper(server1, server1, getDir(server1));
Skipper sk1 = new Skipper(server1, server1, getDir(server1), null);
SkipperHashMap<String, String> map1 =
sk1.getHashMap("m1", String.class, String.class);
map1.putAsync("key1", "value1");
Expand All @@ -91,7 +92,7 @@ public void testPut() throws Exception {
@Test
public void testRemove() throws Exception {
String server1 = getUniqueHostPort();
Skipper sk1 = new Skipper(server1, server1, getDir(server1));
Skipper sk1 = new Skipper(server1, server1, getDir(server1), null);
SkipperHashMap<String, String> map1 =
sk1.getHashMap("m1", String.class, String.class);
map1.putAsync("key1", "value1");
Expand All @@ -109,8 +110,8 @@ public void testWrongType() throws Exception {
String server2 = getUniqueHostPort();

// Creats two Skippers.
Skipper sk1 = new Skipper(server1, server1, getDir(server1));
Skipper sk2 = new Skipper(server2, server1, getDir(server2));
Skipper sk1 = new Skipper(server1, server1, getDir(server1), null);
Skipper sk2 = new Skipper(server2, server1, getDir(server2), null);

// Gets one SkipperHashMap from each context with the same name.
SkipperHashMap<String, String> map1 =
Expand All @@ -131,8 +132,8 @@ public void testClear() throws Exception {
String server2 = getUniqueHostPort();

// Creats two Skippers.
Skipper sk1 = new Skipper(server1, server1, getDir(server1));
Skipper sk2 = new Skipper(server2, server1, getDir(server2));
Skipper sk1 = new Skipper(server1, server1, getDir(server1), null);
Skipper sk2 = new Skipper(server2, server1, getDir(server2), null);

// Gets one SkipperHashMap from each context with the same name.
SkipperHashMap<String, String> map1 =
Expand Down Expand Up @@ -170,8 +171,8 @@ public void testPutIfAbsent() throws Exception {
String server2 = getUniqueHostPort();

// Creats two Skippers.
Skipper sk1 = new Skipper(server1, server1, getDir(server1));
Skipper sk2 = new Skipper(server2, server1, getDir(server2));
Skipper sk1 = new Skipper(server1, server1, getDir(server1), null);
Skipper sk2 = new Skipper(server2, server1, getDir(server2), null);

// Gets one SkipperHashMap from each context with the same name.
SkipperHashMap<String, String> map1 =
Expand Down Expand Up @@ -204,7 +205,7 @@ public void testPutIfAbsent() throws Exception {
@Test
public void testPutAll() throws Exception {
String server1 = getUniqueHostPort();
Skipper sk1 = new Skipper(server1, server1, getDir(server1));
Skipper sk1 = new Skipper(server1, server1, getDir(server1), null);
SkipperHashMap<String, String> map1 =
sk1.getHashMap("m1", String.class, String.class);
Map<String, String> hm = new HashMap<String, String>();
Expand All @@ -220,7 +221,7 @@ public void testPutAll() throws Exception {
@Test
public void testRemoveIf() throws Exception {
String server1 = getUniqueHostPort();
Skipper sk1 = new Skipper(server1, server1, getDir(server1));
Skipper sk1 = new Skipper(server1, server1, getDir(server1), null);
SkipperHashMap<String, String> map1 =
sk1.getHashMap("m1", String.class, String.class);
map1.put("key1", "value1");
Expand All @@ -238,7 +239,7 @@ public void testRemoveIf() throws Exception {
@Test
public void testReplace() throws Exception {
String server1 = getUniqueHostPort();
Skipper sk1 = new Skipper(server1, server1, getDir(server1));
Skipper sk1 = new Skipper(server1, server1, getDir(server1), null);
SkipperHashMap<String, String> map1 =
sk1.getHashMap("m1", String.class, String.class);
// Replace key-value pair key1 with value1.
Expand All @@ -262,4 +263,44 @@ public void testReplace() throws Exception {
Assert.assertEquals("value3", map1.get("key1"));
sk1.shutdown();
}

@Test
public void testStateChangeCallback() throws Exception {
String server1 = getUniqueHostPort();
String server2 = getUniqueHostPort();
String server3 = getUniqueHostPort();

class TestChangeCallback implements StateChangeCallback {
volatile String leader;
volatile Set<String> activeFollowers;
volatile Set<String> clusterMembers;

@Override
public void leading(Set<String> actives,
Set<String> members) {
this.activeFollowers = actives;
this.clusterMembers = members;
}

@Override
public void following(String ld, Set<String> members) {
this.leader = ld;
this.clusterMembers = members;
}
}

TestChangeCallback testCallback = new TestChangeCallback();

Skipper sk1 = new Skipper(server1, server1, getDir(server1), null);
Skipper sk2 = new Skipper(server2, server1, getDir(server2), null);
Skipper sk3 = new Skipper(server3, server1, getDir(server3), testCallback);

Assert.assertTrue(server1.equals(testCallback.leader) ||
server2.equals(testCallback.leader));
Assert.assertEquals(3, testCallback.clusterMembers.size());

sk1.shutdown();
sk2.shutdown();
sk3.shutdown();
}
}