Skip to content

Commit de68393

Browse files
committed
High availablity. Performing round robin between nodes .issue #3
1 parent 8d5f003 commit de68393

File tree

12 files changed

+178
-41
lines changed

12 files changed

+178
-41
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
<topq.repository.rootUrl>http://maven.top-q.co.il</topq.repository.rootUrl>
2626
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
2727
<java.version>1.8</java.version>
28-
<elastic.version>5.2.0</elastic.version>
28+
<elastic.version>6.0.0-alpha2</elastic.version>
2929
</properties>
3030
<dependencies>
3131
<dependency>

src/main/java/il/co/topq/elastic/ESClient.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import java.io.Closeable;
44
import java.io.IOException;
5+
import java.util.ArrayList;
6+
import java.util.List;
57
import java.util.Map;
68

79
import org.apache.http.HttpHost;
@@ -15,7 +17,17 @@ public class ESClient implements Closeable {
1517
private final ESRest rest;
1618

1719
public ESClient(String host, int port) {
18-
rest = new ESRest(RestClient.builder(new HttpHost(host, port, "http")).build());
20+
final List<RestClient> clients = new ArrayList<RestClient>();
21+
clients.add(RestClient.builder(new HttpHost(host, port, "http")).build());
22+
rest = new ESRest(clients);
23+
}
24+
25+
public static Builder builder() {
26+
return new Builder();
27+
}
28+
29+
private ESClient(List<RestClient> clients) {
30+
rest = new ESRest(clients);
1931
}
2032

2133
@Override
@@ -42,4 +54,26 @@ public GenericResponseHandler stats() throws IOException{
4254
return new GenericResponseHandler(response);
4355
}
4456

57+
public static class Builder {
58+
59+
private List<RestClient> clients;
60+
61+
private Builder() {
62+
clients = new ArrayList<RestClient>();
63+
}
64+
65+
public Builder addClient(String host, int port) {
66+
RestClient client = RestClient.builder(new HttpHost(host, port, "http")).build();
67+
clients.add(client);
68+
return this;
69+
}
70+
71+
public ESClient build() {
72+
if (clients.isEmpty()) {
73+
throw new IllegalArgumentException("Clients can't be null");
74+
}
75+
return new ESClient(clients);
76+
}
77+
78+
}
4579
}

src/main/java/il/co/topq/elastic/ESRest.java

Lines changed: 78 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33
import java.io.Closeable;
44
import java.io.IOException;
5+
import java.util.ArrayList;
56
import java.util.Collections;
7+
import java.util.List;
8+
import java.util.Map;
9+
import java.util.Objects;
610

711
import org.apache.commons.io.IOUtils;
812
import org.apache.http.HttpEntity;
@@ -13,34 +17,37 @@
1317

1418
import com.fasterxml.jackson.databind.ObjectMapper;
1519

16-
public class ESRest implements Closeable{
20+
public class ESRest implements Closeable {
1721

1822
protected static final ObjectMapper mapper = new ObjectMapper();
1923

20-
protected final RestClient client;
24+
protected List<RestClient> clients;
2125

22-
public ESRest(RestClient client) {
23-
this.client = client;
26+
public ESRest(List<RestClient> clients) {
27+
Objects.requireNonNull(clients,"Clients can't be null");
28+
if (clients.isEmpty()) {
29+
throw new IllegalArgumentException("Client list can't be empty");
30+
}
31+
this.clients = Collections.synchronizedList(clients);
2432
}
25-
26-
public <T> T get(String resource, Class<T> responseClass, boolean assertSuccess) throws IOException{
27-
final Response response = client.performRequest("GET", resource, Collections.singletonMap("pretty", "true"));
28-
if (assertSuccess){
33+
34+
public <T> T get(String resource, Class<T> responseClass, boolean assertSuccess) throws IOException {
35+
final Response response = performRequest("GET", resource, Collections.singletonMap("pretty", "true"));
36+
if (assertSuccess) {
2937
assertSuccess(response);
3038
}
3139
return mapper.readValue(IOUtils.toString(response.getEntity().getContent(), "UTF-8"), responseClass);
3240
}
3341

34-
public <T> T post(String resource, String body, Class<T> responseClass, boolean assertSuccess)
35-
throws IOException {
36-
final Response response = client.performRequest("POST", resource, Collections.singletonMap("pretty", "true"),
42+
public <T> T post(String resource, String body, Class<T> responseClass, boolean assertSuccess) throws IOException {
43+
final Response response = performRequest("POST", resource, Collections.singletonMap("pretty", "true"),
3744
new NStringEntity(body, ContentType.APPLICATION_JSON));
3845
if (assertSuccess) {
3946
assertSuccess(response);
4047
}
4148
return mapper.readValue(IOUtils.toString(response.getEntity().getContent(), "UTF-8"), responseClass);
4249
}
43-
50+
4451
/**
4552
*
4653
* @param resource
@@ -49,40 +56,87 @@ public <T> T post(String resource, String body, Class<T> responseClass, boolean
4956
* @throws IOException
5057
*/
5158
public int head(String resource, boolean assertSuccess) throws IOException {
52-
Response response = client.performRequest("HEAD", resource, Collections.singletonMap("pretty", "true"));
59+
Response response = performRequest("HEAD", resource, Collections.singletonMap("pretty", "true"));
5360
return response.getStatusLine().getStatusCode();
5461
}
5562

56-
public <T> T put(String resource, String body ,Class<T> responseClass, boolean assertSuccess) throws IOException {
63+
public <T> T put(String resource, String body, Class<T> responseClass, boolean assertSuccess) throws IOException {
5764
final HttpEntity entity = new NStringEntity(body, ContentType.APPLICATION_JSON);
58-
final Response response = client.performRequest("PUT", resource, Collections.<String, String>emptyMap(),
59-
entity);
65+
final Response response = performRequest("PUT", resource, Collections.<String, String>emptyMap(), entity);
6066
if (assertSuccess) {
6167
assertSuccess(response);
6268
}
6369
return mapper.readValue(IOUtils.toString(response.getEntity().getContent(), "UTF-8"), responseClass);
6470
}
6571

66-
public <T> T delete(String resource,Class<T> responseClass, boolean assertSuccess) throws IOException {
67-
final Response response = client.performRequest("DELETE", resource, Collections.<String, String>emptyMap());
72+
private Response performRequest(String method, String endpoint, Map<String, String> headers) throws IOException {
73+
return performRequest(method, endpoint, headers, null);
74+
}
75+
76+
private Response performRequest(String method, String endpoint, Map<String, String> headers, HttpEntity entity)
77+
throws IOException {
78+
IOException savedException = null;
79+
int clientIndex = 0;
80+
for (; clientIndex < clients.size(); clientIndex++) {
81+
try {
82+
final Response response = clients.get(clientIndex).performRequest(method, endpoint, headers, entity);
83+
if (clientIndex != 0) {
84+
synchronized (clients) {
85+
// If the client is not at the top of the list, we should move
86+
// it since it is the successful one.
87+
clients = moveToTop(clients, clients.get(clientIndex));
88+
}
89+
}
90+
return response;
91+
} catch (IOException e) {
92+
// TODO: Save all the exceptions and throw them like in TestNG
93+
// soft assert:
94+
// https://github.com/cbeust/testng/blob/master/src/main/java/org/testng/asserts/SoftAssert.java
95+
savedException = e;
96+
}
97+
98+
}
99+
throw savedException;
100+
}
101+
102+
private static <T> List<T> moveToTop(List<T> items, T input) {
103+
int index = items.indexOf(input);
104+
List<T> copy;
105+
if (index >= 0) {
106+
copy = new ArrayList<T>(items.size());
107+
copy.add(items.get(index));
108+
copy.addAll(items.subList(0, index));
109+
copy.addAll(items.subList(index + 1, items.size()));
110+
} else {
111+
return items;
112+
}
113+
return copy;
114+
}
115+
116+
public <T> T delete(String resource, Class<T> responseClass, boolean assertSuccess) throws IOException {
117+
final Response response = performRequest("DELETE", resource, Collections.<String, String>emptyMap());
68118
if (assertSuccess) {
69119
assertSuccess(response);
70120
}
71121
return mapper.readValue(IOUtils.toString(response.getEntity().getContent(), "UTF-8"), responseClass);
72122
}
73-
123+
74124
private void assertSuccess(Response response) throws IOException {
75125
if (response.getStatusLine().getStatusCode() != 200) {
76126
throw new IOException("Return status is " + response.getStatusLine().getStatusCode());
77127
}
78128
}
79-
129+
80130
@Override
81-
public void close() throws IOException{
82-
if (client != null){
83-
client.close();
131+
public void close() throws IOException {
132+
if (null == clients || clients.isEmpty()) {
133+
return;
134+
}
135+
for (RestClient client : clients) {
136+
if (client != null) {
137+
client.close();
138+
}
84139
}
85140
}
86141

87-
88142
}

src/test/java/il/co/topq/elastic/AbstractCreateRemoveIndexTestCase.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,20 @@
55
import org.junit.After;
66
import org.junit.Before;
77

8-
public abstract class AbstractCreateRemoveIndexTestCase extends AbstractTestCase{
9-
8+
public abstract class AbstractCreateRemoveIndexTestCase extends AbstractTestCase {
9+
1010
@Before
11-
public void setUp() throws IOException{
12-
if (!client.index(INDEX).isExists()){
11+
public void setUp() throws IOException {
12+
if (!client.index(INDEX).isExists()) {
1313
client.index(INDEX).create(SETTINGS);
1414
}
1515
}
16-
16+
1717
@After
18-
public void tearDown() throws IOException{
19-
if (client.index(INDEX).isExists()){
18+
public void tearDown() throws IOException {
19+
if (client.index(INDEX).isExists()) {
2020
client.index(INDEX).delete();
2121
}
2222
}
23-
23+
2424
}

src/test/java/il/co/topq/elastic/AbstractTestCase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@ protected void sleep(int timeInSeconds) {
2323

2424
@Before
2525
public void setup() {
26-
client = new ESClient("localhost", 9200);
26+
client = initClient();
27+
}
28+
29+
protected ESClient initClient() {
30+
return ESClient.builder().addClient("localhost", 9200).build();
2731
}
2832

2933
@After

src/test/java/il/co/topq/elastic/TestAggregations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
import il.co.topq.elastic.model.Post;
1010

11-
@Ignore
11+
//@Ignore
1212
public class TestAggregations extends AbstractCreateRemoveIndexTestCase {
1313

1414
@Test

src/test/java/il/co/topq/elastic/TestDocument.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
import il.co.topq.elastic.model.Post;
1313

14-
@Ignore
14+
//@Ignore
1515
public class TestDocument extends AbstractCreateRemoveIndexTestCase {
1616

1717

src/test/java/il/co/topq/elastic/TestElasticStatus.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import org.junit.Ignore;
88
import org.junit.Test;
99

10-
@Ignore
10+
//@Ignore
1111
public class TestElasticStatus extends AbstractCreateRemoveIndexTestCase {
1212

1313
@Test

src/test/java/il/co/topq/elastic/TestIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import org.junit.Ignore;
77
import org.junit.Test;
88

9-
@Ignore
9+
//@Ignore
1010
public class TestIndex extends AbstractTestCase{
1111

1212
@Test

src/test/java/il/co/topq/elastic/TestIndexStatus.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import org.junit.Ignore;
88
import org.junit.Test;
99

10-
@Ignore
10+
//@Ignore
1111
public class TestIndexStatus extends AbstractCreateRemoveIndexTestCase {
1212

1313
@Test
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package il.co.topq.elastic;
2+
3+
import java.io.IOException;
4+
import java.util.List;
5+
6+
import org.junit.Assert;
7+
import org.junit.Test;
8+
9+
import il.co.topq.elastic.model.Post;
10+
11+
public class TestMutipleNodes extends AbstractCreateRemoveIndexTestCase {
12+
13+
@Override
14+
protected ESClient initClient() {
15+
return ESClient.builder().addClient("localhost", 9200).addClient("localhost", 9201).build();
16+
}
17+
18+
@Test
19+
public void testHighAvailiablity() throws IOException, InterruptedException {
20+
addPostAndVerify(555);
21+
System.out.println("Stop the main Elasticsearch node");
22+
// sleep(10);
23+
addPostAndVerify(666);
24+
System.out.println("Restart the main Elasticsearch node and take down the secondery");
25+
// sleep(10);
26+
addPostAndVerify(777);
27+
}
28+
29+
private void addPostAndVerify(int id) throws IOException {
30+
if (!client.index(INDEX).isExists()) {
31+
client.index(INDEX).create(SETTINGS);
32+
}
33+
Post post0 = new Post();
34+
post0.setId(id);
35+
post0.setOp("Itai");
36+
post0.setPoints(100);
37+
post0.setSubreddit("all");
38+
client.index(INDEX).document(DOC).add().single(id + "", post0);
39+
sleep(1);
40+
List<Post> posts = client.index(INDEX).document(DOC).search().byTerm("id", id + "").asClass(Post.class);
41+
Assert.assertNotNull(posts);
42+
Assert.assertEquals(1, posts.size());
43+
}
44+
45+
}

src/test/java/il/co/topq/elastic/TestSearch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import il.co.topq.elastic.model.Post;
1111

12-
@Ignore
12+
//@Ignore
1313
public class TestSearch extends AbstractCreateRemoveIndexTestCase {
1414

1515
@Test

0 commit comments

Comments
 (0)