Skip to content

Commit 3c05fac

Browse files
AMOP module add topic manager and block notify subscription. (FISCO-BCOS#47)
Co-authored-by: cyjseagull <[email protected]>
1 parent 2a134d8 commit 3c05fac

File tree

8 files changed

+191
-17
lines changed

8 files changed

+191
-17
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2014-2020 [fisco-dev]
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5+
* in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License
10+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11+
* or implied. See the License for the specific language governing permissions and limitations under
12+
* the License.
13+
*
14+
*/
15+
16+
package org.fisco.bcos.sdk.amop;
17+
18+
import org.fisco.bcos.sdk.BcosSDK;
19+
import org.fisco.bcos.sdk.BcosSDKTest;
20+
import org.fisco.bcos.sdk.config.ConfigException;
21+
import org.junit.Assert;
22+
import org.junit.Test;
23+
24+
import java.util.List;
25+
26+
public class blockNotifyTest {
27+
private static final String configFile = BcosSDKTest.class.getClassLoader().getResource("config-example.yaml").getPath();
28+
29+
@Test
30+
public void testBlockNotify() throws ConfigException, InterruptedException {
31+
BcosSDK sdk = new BcosSDK(configFile);
32+
Assert.assertTrue(sdk.getChannel().getAvailablePeer().size() >= 1);
33+
Amop amop = Amop.build(sdk.getGroupManagerService(),null);
34+
}
35+
}
36+

src/main/java/org/fisco/bcos/sdk/amop/Amop.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
package org.fisco.bcos.sdk.amop;
1717

1818
import java.util.List;
19-
import org.fisco.bcos.sdk.channel.Channel;
2019
import org.fisco.bcos.sdk.config.ConfigOption;
2120
import org.fisco.bcos.sdk.crypto.keystore.KeyManager;
21+
import org.fisco.bcos.sdk.service.GroupManagerService;
2222

2323
/**
2424
* AMOP module interface.
@@ -29,12 +29,12 @@ public interface Amop {
2929
/**
3030
* Create a Amop object.
3131
*
32-
* @param channel
32+
* @param groupManager
3333
* @param config
3434
* @return Amop instance
3535
*/
36-
static Amop build(Channel channel, ConfigOption config) {
37-
return null;
36+
static Amop build(GroupManagerService groupManager, ConfigOption config) {
37+
return new AmopImp(groupManager, config);
3838
}
3939

4040
/**

src/main/java/org/fisco/bcos/sdk/amop/AmopImp.java

Lines changed: 79 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,20 @@
1515

1616
package org.fisco.bcos.sdk.amop;
1717

18+
import com.fasterxml.jackson.core.JsonProcessingException;
1819
import java.util.List;
19-
import org.fisco.bcos.sdk.amop.exception.AmopException;
20-
import org.fisco.bcos.sdk.channel.Channel;
20+
import java.util.Set;
21+
import java.util.UUID;
22+
import org.fisco.bcos.sdk.amop.topic.TopicManager;
23+
import org.fisco.bcos.sdk.channel.ResponseCallback;
24+
import org.fisco.bcos.sdk.channel.model.Options;
2125
import org.fisco.bcos.sdk.config.ConfigOption;
2226
import org.fisco.bcos.sdk.crypto.keystore.KeyManager;
27+
import org.fisco.bcos.sdk.model.Message;
28+
import org.fisco.bcos.sdk.model.MsgType;
29+
import org.fisco.bcos.sdk.model.Response;
30+
import org.fisco.bcos.sdk.service.GroupManagerService;
31+
import org.fisco.bcos.sdk.utils.ObjectMapperFactory;
2332
import org.slf4j.Logger;
2433
import org.slf4j.LoggerFactory;
2534

@@ -30,11 +39,19 @@
3039
*/
3140
public class AmopImp implements Amop {
3241
private static Logger logger = LoggerFactory.getLogger(AmopImp.class);
33-
private Channel ch;
42+
private GroupManagerService groupManager;
43+
private TopicManager topicManager;
3444

35-
public AmopImp(Channel channel, ConfigOption config) throws AmopException {
36-
this.ch = channel;
45+
public AmopImp(GroupManagerService groupManager, ConfigOption config) {
46+
this.groupManager = groupManager;
47+
topicManager = new TopicManager();
48+
List<String> peers = groupManager.getChannel().getAvailablePeer();
49+
for (String peer : peers) {
50+
List<String> groupInfo = groupManager.getGroupInfoByNodeInfo(peer);
51+
topicManager.addBlockNotify(peer, groupInfo);
52+
}
3753
// todo load topics ConfigOption
54+
sendSubscribe();
3855
}
3956

4057
@Override
@@ -59,8 +76,64 @@ public List<String> getSubTopics() {
5976
}
6077

6178
@Override
62-
public void start() {}
79+
public void start() {};
6380

6481
@Override
6582
public void stop() {}
83+
84+
private void sendSubscribe() {
85+
List<String> peers = groupManager.getChannel().getAvailablePeer();
86+
logger.debug("send subscribe to {} peers", peers.size());
87+
for (String peer : peers) {
88+
try {
89+
updateSubscribeToPeer(peer);
90+
} catch (JsonProcessingException e) {
91+
logger.error(
92+
"update amop subscription to node {}, json processed error, error message: {}",
93+
peer,
94+
e.getMessage());
95+
}
96+
}
97+
}
98+
99+
private void updateSubscribeToPeer(String peer) throws JsonProcessingException {
100+
Message msg = new Message();
101+
msg.setType((short) MsgType.AMOP_CLIENT_TOPICS.getType());
102+
msg.setResult(0);
103+
msg.setSeq(newSeq());
104+
msg.setData(getSubData(topicManager.getSubByPeer(peer)));
105+
ResponseCallback callback =
106+
new ResponseCallback() {
107+
@Override
108+
public void onResponse(Response response) {
109+
logger.info(
110+
"amop response, seq : {}, error: {}, content: {}",
111+
response.getMessageID(),
112+
response.getErrorCode(),
113+
response.getContent());
114+
// todo
115+
}
116+
};
117+
Options opt = new Options();
118+
groupManager.getChannel().asyncSendToPeer(msg, peer, callback, opt);
119+
logger.info(
120+
" send update topic message request, seq: {}, content: {}",
121+
msg.getSeq(),
122+
new String(msg.getData()));
123+
}
124+
125+
private String newSeq() {
126+
return UUID.randomUUID().toString().replaceAll("-", "");
127+
}
128+
129+
private byte[] getSubData(Set<String> topics) throws JsonProcessingException {
130+
byte[] topicBytes =
131+
ObjectMapperFactory.getObjectMapper().writeValueAsBytes(topics.toArray());
132+
/*int b = 1 + topicBytes.length;
133+
byte length = (byte)b;
134+
byte[] content = new byte[1+topicBytes.length];
135+
content[0] = length;
136+
System.arraycopy(topicBytes, 0, content, 1, topicBytes.length);*/
137+
return topicBytes;
138+
}
66139
}

src/main/java/org/fisco/bcos/sdk/amop/topic/AmopTopic.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515

1616
package org.fisco.bcos.sdk.amop.topic;
1717

18+
import java.security.KeyStore;
19+
import java.util.List;
1820
import org.fisco.bcos.sdk.amop.AmopCallback;
1921

2022
public class AmopTopic {
2123
private String topicName;
22-
private String type;
23-
private String topicString;
24+
private TopicType type;
2425
private AmopCallback callback;
26+
private KeyStore privateKey;
27+
private List<KeyStore> publicKeys;
2528
}

src/main/java/org/fisco/bcos/sdk/amop/topic/TopicManager.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,25 @@
1616
package org.fisco.bcos.sdk.amop.topic;
1717

1818
import java.security.KeyStore;
19+
import java.util.HashMap;
20+
import java.util.HashSet;
1921
import java.util.List;
2022
import java.util.Map;
23+
import java.util.Set;
2124
import org.fisco.bcos.sdk.amop.AmopCallback;
2225

2326
public class TopicManager {
2427
Map<String, AmopCallback> seq2Callback;
2528
Map<String, KeyStore> topic2PrivateKey;
2629
Map<String, List<KeyStore>> topic2PublicKey;
30+
Set<String> topics = new HashSet<>();
31+
Map<String, Set<String>> peer2BlockNotify = new HashMap<>();
2732

28-
public void addTopic(String topicName, AmopCallback callback) {
29-
return;
33+
public void addTopic(String topicString, AmopCallback callback) {
34+
topics.add(topicString);
35+
if (callback == null) {
36+
return;
37+
}
3038
}
3139

3240
public void addPrivateTopic(String topicName, KeyStore privateKeyStore, AmopCallback callback) {
@@ -40,4 +48,33 @@ public void removeTopic(String topicName) {
4048
public AmopCallback getCallback(String seq) {
4149
return seq2Callback.get(seq);
4250
}
51+
52+
public Set<String> getSubByPeer(String peerIpPort) {
53+
Set<String> notify = peer2BlockNotify.get(peerIpPort);
54+
if (notify != null && topics != null) {
55+
Set<String> peerSub = new HashSet<>();
56+
peerSub.addAll(topics);
57+
peerSub.addAll(notify);
58+
return peerSub;
59+
} else if (notify != null && topics == null) {
60+
return notify;
61+
} else {
62+
return topics;
63+
}
64+
}
65+
66+
public void addBlockNotify(String peerIpPort, List<String> groupInfo) {
67+
Set<String> pnf = peer2BlockNotify.get(peerIpPort);
68+
if (null == pnf) {
69+
pnf = new HashSet<>();
70+
for (String group : groupInfo) {
71+
pnf.add("_block_notify_" + group);
72+
}
73+
peer2BlockNotify.put(peerIpPort, pnf);
74+
} else {
75+
for (String group : groupInfo) {
76+
pnf.add("_block_notify_" + group);
77+
}
78+
}
79+
}
4380
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2014-2020 [fisco-dev]
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5+
* in compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License
10+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11+
* or implied. See the License for the specific language governing permissions and limitations under
12+
* the License.
13+
*
14+
*/
15+
16+
package org.fisco.bcos.sdk.amop.topic;
17+
18+
public enum TopicType {
19+
/** Type of AMOP topic */
20+
PUBLIC_TOPIC(0),
21+
PRIVATE_TOPIC(1),
22+
;
23+
24+
TopicType(int i) {}
25+
}

src/main/java/org/fisco/bcos/sdk/channel/ChannelImp.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,9 +333,9 @@ public void run(Timeout timeout) {
333333
TimeUnit.MILLISECONDS));
334334
}
335335
ctx.writeAndFlush(out);
336-
logger.debug("send message to {} success ", peerIpPort);
336+
logger.debug("send message {} to {} success ", out.getSeq(), peerIpPort);
337337
} else {
338-
logger.error("send message to {} failed ", peerIpPort);
338+
logger.debug("send message {} to {} failed ", out.getSeq(), peerIpPort);
339339
Response response = new Response();
340340
response.setErrorCode(ChannelMessageError.CONNECTION_INVALID.getError());
341341
response.setErrorMessage(

src/main/java/org/fisco/bcos/sdk/network/NetworkImp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public List<ConnectionInfo> getConnectionInfo() {
6868
@Override
6969
public void start() throws NetworkException {
7070
connManager.startConnect();
71-
connManager.startReconnectSchedule();
71+
// connManager.startReconnectSchedule();
7272
}
7373

7474
@Override

0 commit comments

Comments
 (0)