Skip to content

Commit c163ed9

Browse files
committed
add zk
1 parent 02c958a commit c163ed9

File tree

10 files changed

+115
-106
lines changed

10 files changed

+115
-106
lines changed

07rpc/rpc01/rpcfx-core/pom.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,31 @@
2424
<version>1.2.70</version>
2525
</dependency>
2626

27+
<dependency>
28+
<groupId>org.projectlombok</groupId>
29+
<artifactId>lombok</artifactId>
30+
<version>1.18.16</version>
31+
</dependency>
32+
2733
<dependency>
2834
<groupId>com.squareup.okhttp3</groupId>
2935
<artifactId>okhttp</artifactId>
3036
<version>3.12.2</version>
3137
</dependency>
3238

39+
40+
<dependency>
41+
<groupId>org.apache.curator</groupId>
42+
<artifactId>curator-client</artifactId>
43+
<version>5.1.0</version>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>org.apache.curator</groupId>
48+
<artifactId>curator-framework</artifactId>
49+
<version>5.1.0</version>
50+
</dependency>
51+
3352
<dependency>
3453
<groupId>org.springframework.boot</groupId>
3554
<artifactId>spring-boot-starter</artifactId>

07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/api/Filter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ public interface Filter {
44

55
boolean filter(RpcfxRequest request);
66

7-
Filter next();
7+
// Filter next();
88

99
}
Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,10 @@
11
package io.kimmking.rpcfx.api;
22

3-
public class RpcfxRequest {
3+
import lombok.Data;
44

5+
@Data
6+
public class RpcfxRequest {
57
private String serviceClass;
6-
78
private String method;
8-
99
private Object[] params;
10-
11-
public String getServiceClass() {
12-
return serviceClass;
13-
}
14-
15-
public void setServiceClass(String serviceClass) {
16-
this.serviceClass = serviceClass;
17-
}
18-
19-
public String getMethod() {
20-
return method;
21-
}
22-
23-
public void setMethod(String method) {
24-
this.method = method;
25-
}
26-
27-
public Object[] getParams() {
28-
return params;
29-
}
30-
31-
public void setParams(Object[] params) {
32-
this.params = params;
33-
}
3410
}
Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,10 @@
11
package io.kimmking.rpcfx.api;
22

3-
public class RpcfxResponse {
3+
import lombok.Data;
44

5+
@Data
6+
public class RpcfxResponse {
57
private Object result;
6-
78
private boolean status;
8-
99
private Exception exception;
10-
11-
public Object getResult() {
12-
return result;
13-
}
14-
15-
public void setResult(Object result) {
16-
this.result = result;
17-
}
18-
19-
public boolean isStatus() {
20-
return status;
21-
}
22-
23-
public void setStatus(boolean status) {
24-
this.status = status;
25-
}
26-
27-
public Exception getException() {
28-
return exception;
29-
}
30-
31-
public void setException(Exception exception) {
32-
this.exception = exception;
33-
}
3410
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.kimmking.rpcfx.api;
2+
3+
import lombok.Builder;
4+
import lombok.Data;
5+
6+
@Data
7+
@Builder
8+
public class ServiceProviderDesc {
9+
10+
private String host;
11+
private Integer port;
12+
private String serviceClass;
13+
14+
// group
15+
// version
16+
}

07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/Rpcfx.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ public static <T, filters> T createFromRegistry(final Class<T> serviceClass, fin
3737

3838
}
3939

40-
public static <T> T create(final Class<T> serviceClass, final String url, Filter filter) {
40+
public static <T> T create(final Class<T> serviceClass, final String url, Filter... filters) {
4141

4242
// 0. 替换动态代理 -> AOP
43-
return (T) Proxy.newProxyInstance(Rpcfx.class.getClassLoader(), new Class[]{serviceClass}, new RpcfxInvocationHandler(serviceClass, url, filter));
43+
return (T) Proxy.newProxyInstance(Rpcfx.class.getClassLoader(), new Class[]{serviceClass}, new RpcfxInvocationHandler(serviceClass, url, filters));
4444

4545
}
4646

@@ -73,9 +73,11 @@ public Object invoke(Object proxy, Method method, Object[] params) throws Throwa
7373
request.setMethod(method.getName());
7474
request.setParams(params);
7575

76-
for (Filter filter : filters) {
77-
if(!filter.filter(request)) {
78-
return null;
76+
if (null!=filters) {
77+
for (Filter filter : filters) {
78+
if (!filter.filter(request)) {
79+
return null;
80+
}
7981
}
8082
}
8183

07rpc/rpc01/rpcfx-demo-consumer/src/main/java/io/kimmking/rpcfx/demo/consumer/RpcfxClientApplication.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
package io.kimmking.rpcfx.demo.consumer;
22

3+
import io.kimmking.rpcfx.api.Filter;
4+
import io.kimmking.rpcfx.api.LoadBalancer;
5+
import io.kimmking.rpcfx.api.Router;
6+
import io.kimmking.rpcfx.api.RpcfxRequest;
37
import io.kimmking.rpcfx.client.Rpcfx;
48
import io.kimmking.rpcfx.demo.api.Order;
59
import io.kimmking.rpcfx.demo.api.OrderService;
610
import io.kimmking.rpcfx.demo.api.User;
711
import io.kimmking.rpcfx.demo.api.UserService;
12+
import lombok.extern.slf4j.Slf4j;
813
import org.springframework.boot.autoconfigure.SpringBootApplication;
914

15+
import java.util.List;
1016
import java.util.Random;
1117

1218
@SpringBootApplication
@@ -36,6 +42,28 @@ public static void main(String[] args) {
3642
// SpringApplication.run(RpcfxClientApplication.class, args);
3743
}
3844

45+
private static class TagRouter implements Router {
46+
@Override
47+
public List<String> route(List<String> urls) {
48+
return urls;
49+
}
50+
}
51+
52+
private static class RandomLoadBalancer implements LoadBalancer {
53+
@Override
54+
public String select(List<String> urls) {
55+
return urls.get(0);
56+
}
57+
}
58+
59+
@Slf4j
60+
private static class CuicuiFilter implements Filter {
61+
@Override
62+
public boolean filter(RpcfxRequest request) {
63+
log.info("filter {} -> {}", this.getClass().getName(), request.toString());
64+
return true;
65+
}
66+
}
3967
}
4068

4169

07rpc/rpc01/rpcfx-demo-provider/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,6 @@
3838
<artifactId>spring-boot-starter-web</artifactId>
3939
</dependency>
4040

41-
<dependency>
42-
<groupId>org.apache.curator</groupId>
43-
<artifactId>curator-client</artifactId>
44-
<version>5.1.0</version>
45-
</dependency>
46-
4741
<dependency>
4842
<groupId>org.springframework.boot</groupId>
4943
<artifactId>spring-boot-starter-test</artifactId>

07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/RpcfxServerApplication.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
package io.kimmking.rpcfx.demo.provider;
22

3+
import com.alibaba.fastjson.JSON;
34
import io.kimmking.rpcfx.api.RpcfxRequest;
45
import io.kimmking.rpcfx.api.RpcfxResolver;
56
import io.kimmking.rpcfx.api.RpcfxResponse;
7+
import io.kimmking.rpcfx.api.ServiceProviderDesc;
68
import io.kimmking.rpcfx.demo.api.OrderService;
79
import io.kimmking.rpcfx.demo.api.UserService;
810
import io.kimmking.rpcfx.server.RpcfxInvoker;
11+
import org.apache.curator.RetryPolicy;
12+
import org.apache.curator.framework.CuratorFramework;
13+
import org.apache.curator.framework.CuratorFrameworkFactory;
14+
import org.apache.curator.retry.ExponentialBackoffRetry;
15+
import org.apache.zookeeper.CreateMode;
916
import org.springframework.beans.factory.annotation.Autowired;
1017
import org.springframework.boot.SpringApplication;
1118
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -24,18 +31,44 @@ public class RpcfxServerApplication {
2431

2532
public static void main(String[] args) throws Exception {
2633

34+
// start zk client
35+
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
36+
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("localhost:2181").namespace("rpcfx").retryPolicy(retryPolicy).build();
37+
client.start();
38+
39+
40+
// register service
2741
// xxx "io.kimmking.rpcfx.demo.api.UserService"
2842

29-
ServiceProviderDesc desc = new ServiceProviderDesc();
30-
desc.setHost(InetAddress.getLocalHost().getHostAddress());
31-
desc.setPort(8080);
32-
desc.setServiceClass("io.kimmking.rpcfx.demo.api.UserService");
43+
String userService = "io.kimking.rpcfx.demo.api.UserService";
44+
registerService(client, userService);
45+
String orderService = "io.kimking.rpcfx.demo.api.OrderService";
46+
registerService(client, orderService);
47+
3348

34-
// Curator.
49+
// 进一步的优化,是在spring加载完成后,从里面拿到特定注解的bean,自动注册到zk
3550

3651
SpringApplication.run(RpcfxServerApplication.class, args);
3752
}
3853

54+
private static void registerService(CuratorFramework client, String service) throws Exception {
55+
ServiceProviderDesc userServiceSesc = ServiceProviderDesc.builder()
56+
.host(InetAddress.getLocalHost().getHostAddress())
57+
.port(8080).serviceClass(service).build();
58+
// String userServiceSescJson = JSON.toJSONString(userServiceSesc);
59+
60+
try {
61+
if ( null == client.checkExists().forPath("/" + service)) {
62+
client.create().withMode(CreateMode.PERSISTENT).forPath("/" + service, "service".getBytes());
63+
}
64+
} catch (Exception ex) {
65+
ex.printStackTrace();
66+
}
67+
68+
client.create().withMode(CreateMode.EPHEMERAL).
69+
forPath( "/" + service + "/" + userServiceSesc.getHost() + "_" + userServiceSesc.getPort(), "provider".getBytes());
70+
}
71+
3972
@Autowired
4073
RpcfxInvoker invoker;
4174

07rpc/rpc01/rpcfx-demo-provider/src/main/java/io/kimmking/rpcfx/demo/provider/ServiceProviderDesc.java

Lines changed: 0 additions & 35 deletions
This file was deleted.

0 commit comments

Comments
 (0)