Skip to content

Commit 761c3f0

Browse files
committed
feat>(rpc): adapt ethereum event related interfaces
1 parent 1c03c93 commit 761c3f0

26 files changed

+1948
-5
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ ext {
2323
bcprovJDK18onVersion = '1.75'
2424
webankJavaCryptoVersion = "1.0.3"
2525
junitVersion = '4.13.2'
26+
rxjavaVersion = '2.2.2'
2627
commonsCollections4Version = "4.4"
2728
bcosSdkJniVersion = "3.7.0"
2829
slf4jApiVerison = '1.7.36'
@@ -140,6 +141,7 @@ dependencies {
140141
api("com.moandjiezana.toml:toml4j:${toml4jVersion}") {
141142
exclude group: "com.google.code.gson"
142143
}
144+
api("io.reactivex.rxjava2:rxjava:$rxjavaVersion")
143145

144146
integrationTestImplementation project
145147
integrationWasmTestImplementation project
Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
package org.fisco.bcos.sdk.v3.test.filter;
2+
3+
import io.reactivex.Flowable;
4+
import io.reactivex.disposables.Disposable;
5+
import org.fisco.bcos.sdk.jni.common.JniException;
6+
import org.fisco.bcos.sdk.v3.BcosSDK;
7+
import org.fisco.bcos.sdk.v3.client.Client;
8+
import org.fisco.bcos.sdk.v3.client.protocol.request.DefaultBlockParameter;
9+
import org.fisco.bcos.sdk.v3.client.protocol.request.DefaultBlockParameterName;
10+
import org.fisco.bcos.sdk.v3.client.protocol.request.EthFilter;
11+
import org.fisco.bcos.sdk.v3.client.protocol.response.Log;
12+
import org.fisco.bcos.sdk.v3.codec.EventEncoder;
13+
import org.fisco.bcos.sdk.v3.config.exceptions.ConfigException;
14+
import org.fisco.bcos.sdk.v3.filter.FilterException;
15+
import org.fisco.bcos.sdk.v3.filter.FilterSystem;
16+
import org.fisco.bcos.sdk.v3.model.ConstantConfig;
17+
import org.fisco.bcos.sdk.v3.model.TransactionReceipt;
18+
import org.fisco.bcos.sdk.v3.test.contract.solidity.EventSubDemo;
19+
import org.fisco.bcos.sdk.v3.transaction.model.exception.ContractException;
20+
import org.junit.Assert;
21+
import org.junit.Test;
22+
23+
import java.math.BigInteger;
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import java.util.regex.Pattern;
27+
28+
public class FilterSystemTest {
29+
private static final String configFile =
30+
FilterSystemTest.class
31+
.getClassLoader()
32+
.getResource(ConstantConfig.CONFIG_FILE_NAME)
33+
.getPath();
34+
private static final String GROUP = "group0";
35+
private static final String INVALID_BLOCK_RANGE =
36+
"(?i)\\binvalid\\s+block\\s+range\\s+params\\b";
37+
private static final String EXCEED_MAX_TOPICS = "(?i)\\bexceed\\s+max\\s+topics\\b";
38+
39+
private static void checkLogEntry(
40+
Client client,
41+
TransactionReceipt receipt,
42+
String txHash,
43+
String blockHash,
44+
Log log,
45+
String address) {
46+
47+
Assert.assertEquals(receipt.getStatus(), 0);
48+
Assert.assertEquals(receipt.getTransactionHash(), txHash);
49+
String hash = client.getBlockHashByNumber(receipt.getBlockNumber()).getBlockHashByNumber();
50+
Assert.assertEquals(blockHash, hash);
51+
List<TransactionReceipt.Logs> txLogs = receipt.getLogEntries();
52+
int logIndex = log.getLogIndex().intValue();
53+
Assert.assertTrue(logIndex < txLogs.size());
54+
TransactionReceipt.Logs matched = txLogs.get(logIndex);
55+
Assert.assertEquals(log.getBlockHash(), blockHash);
56+
Assert.assertEquals(log.getTransactionHash(), txHash);
57+
Assert.assertEquals(matched.getData(), log.getData());
58+
Assert.assertEquals(matched.getTopics(), log.getTopics());
59+
Assert.assertEquals(address, log.getAddress());
60+
}
61+
62+
@Test
63+
public void normalTest()
64+
throws FilterException, ContractException, ConfigException, JniException,
65+
InterruptedException {
66+
BcosSDK sdk = BcosSDK.build(configFile);
67+
Client client = sdk.getClient(GROUP);
68+
FilterSystem filterSystem = sdk.getFilterSystem(client, 1, 1000);
69+
EventSubDemo eventSubDemo =
70+
EventSubDemo.deploy(client, client.getCryptoSuite().getCryptoKeyPair());
71+
EventEncoder encoder = new EventEncoder(client.getCryptoSuite().getHashImpl());
72+
73+
List<String> blockHashes = new ArrayList<>();
74+
List<String> txHashes = new ArrayList<>();
75+
List<Log> logs = new ArrayList<>();
76+
List<Log> logs1 = new ArrayList<>();
77+
List<Log> logs2 = new ArrayList<>();
78+
List<Log> logs3 = new ArrayList<>();
79+
80+
Disposable blockSub =
81+
filterSystem
82+
.blockHashFlowable()
83+
.subscribe(
84+
block -> {
85+
blockHashes.add(block);
86+
});
87+
88+
Disposable txSub =
89+
filterSystem
90+
.transactionHashFlowable()
91+
.subscribe(
92+
tx -> {
93+
txHashes.add(tx);
94+
});
95+
96+
EthFilter ethFilter =
97+
new EthFilter(DefaultBlockParameterName.LATEST, DefaultBlockParameterName.LATEST)
98+
.addSingleTopic(encoder.encode(EventSubDemo.TRANSFERACCOUNT_EVENT));
99+
100+
Disposable logSub =
101+
filterSystem
102+
.logFlowable(ethFilter, false)
103+
.subscribe(
104+
log -> {
105+
logs.add(log);
106+
});
107+
108+
EthFilter ethFilter1 =
109+
new EthFilter(DefaultBlockParameterName.LATEST, DefaultBlockParameterName.LATEST)
110+
.addSingleTopic(encoder.encode(EventSubDemo.TRANSFERACCOUNT_EVENT))
111+
.addSingleTopic(encoder.buildEventSignature("test1"))
112+
.addSingleTopic(encoder.buildEventSignature("test2"));
113+
114+
Disposable logSub1 =
115+
filterSystem
116+
.logFlowable(ethFilter1, false)
117+
.subscribe(
118+
log -> {
119+
logs1.add(log);
120+
});
121+
122+
EthFilter ethFilter2 =
123+
new EthFilter(DefaultBlockParameterName.LATEST, DefaultBlockParameterName.LATEST)
124+
.addNullTopic()
125+
.addSingleTopic(encoder.buildEventSignature("test1"))
126+
.addNullTopic();
127+
128+
Disposable logSub2 =
129+
filterSystem
130+
.logFlowable(ethFilter2, false)
131+
.subscribe(
132+
log -> {
133+
logs2.add(log);
134+
});
135+
136+
EthFilter ethFilter3 =
137+
new EthFilter(DefaultBlockParameterName.LATEST, DefaultBlockParameterName.LATEST)
138+
.addNullTopic()
139+
.addOptionalTopics(encoder.buildEventSignature("test1"), encoder.buildEventSignature("test2"))
140+
.addNullTopic();
141+
142+
Disposable logSub3 =
143+
filterSystem
144+
.logFlowable(ethFilter3, false)
145+
.subscribe(
146+
log -> {
147+
logs3.add(log);
148+
});
149+
150+
List<TransactionReceipt> receipts = new ArrayList<>();
151+
receipts.add(eventSubDemo.transfer("test1", "test2", BigInteger.valueOf(100)));
152+
receipts.add(eventSubDemo.transfer("test1", "test4", BigInteger.valueOf(100)));
153+
receipts.add(eventSubDemo.transfer("test2", "test6", BigInteger.valueOf(100)));
154+
155+
Thread.sleep(2 * 1000);
156+
157+
Assert.assertTrue(receipts.size() == txHashes.size());
158+
Assert.assertTrue(receipts.size() == blockHashes.size());
159+
Assert.assertTrue(receipts.size() == logs.size());
160+
161+
for (int i = 0; i < logs.size(); i++) {
162+
checkLogEntry(
163+
client,
164+
receipts.get(i),
165+
txHashes.get(i),
166+
blockHashes.get(i),
167+
logs.get(i),
168+
eventSubDemo.getContractAddress());
169+
}
170+
171+
Assert.assertTrue("logs1.size = " + logs1.size(),1 == logs1.size());
172+
for (int i = 0; i < logs1.size(); i++) {
173+
checkLogEntry(
174+
client,
175+
receipts.get(i),
176+
txHashes.get(i),
177+
blockHashes.get(i),
178+
logs1.get(i),
179+
eventSubDemo.getContractAddress());
180+
}
181+
182+
Assert.assertTrue("logs2.size = " + logs2.size(),2 == logs2.size());
183+
for (int i = 0; i < logs2.size(); i++) {
184+
checkLogEntry(
185+
client,
186+
receipts.get(i),
187+
txHashes.get(i),
188+
blockHashes.get(i),
189+
logs2.get(i),
190+
eventSubDemo.getContractAddress());
191+
}
192+
193+
Assert.assertTrue("logs3.size = " + logs3.size(),3 == logs3.size());
194+
for (int i = 0; i < logs3.size(); i++) {
195+
checkLogEntry(
196+
client,
197+
receipts.get(i),
198+
txHashes.get(i),
199+
blockHashes.get(i),
200+
logs3.get(i),
201+
eventSubDemo.getContractAddress());
202+
}
203+
204+
blockSub.dispose();
205+
txSub.dispose();
206+
logSub.dispose();
207+
logSub1.dispose();
208+
logSub2.dispose();
209+
logSub3.dispose();
210+
filterSystem.stop();
211+
}
212+
213+
@Test
214+
public void failedTest()
215+
throws FilterException, ContractException, ConfigException, JniException,
216+
InterruptedException {
217+
BcosSDK sdk = BcosSDK.build(configFile);
218+
Client client = sdk.getClient(GROUP);
219+
FilterSystem filterSystem = sdk.getFilterSystem(client, 1, 1000);
220+
EventSubDemo eventSubDemo =
221+
EventSubDemo.deploy(client, client.getCryptoSuite().getCryptoKeyPair());
222+
EventEncoder encoder = new EventEncoder(client.getCryptoSuite().getHashImpl());
223+
224+
List<Log> logs1 = new ArrayList<>();
225+
List<Log> logs2 = new ArrayList<>();
226+
List<Log> logs3 = new ArrayList<>();
227+
228+
EthFilter ethFilter1 =
229+
new EthFilter(DefaultBlockParameter.valueOf(4), DefaultBlockParameter.valueOf(3))
230+
.addSingleTopic(encoder.encode(EventSubDemo.TRANSFERACCOUNT_EVENT));
231+
232+
EthFilter ethFilter2 =
233+
new EthFilter(DefaultBlockParameterName.LATEST, DefaultBlockParameter.valueOf(3))
234+
.addSingleTopic(encoder.encode(EventSubDemo.TRANSFERACCOUNT_EVENT));
235+
236+
EthFilter ethFilter3 =
237+
new EthFilter(DefaultBlockParameterName.LATEST, DefaultBlockParameter.valueOf(3))
238+
.addSingleTopic(encoder.encode(EventSubDemo.TRANSFERACCOUNT_EVENT))
239+
.addSingleTopic(encoder.encode(EventSubDemo.ECHOBYTES_EVENT))
240+
.addSingleTopic(encoder.encode(EventSubDemo.TRANSFERAMOUNT_EVENT))
241+
.addSingleTopic(encoder.encode(EventSubDemo.TRANSFER_EVENT))
242+
.addSingleTopic(encoder.encode(EventSubDemo.TRANSFERDATA_EVENT));
243+
244+
Disposable logSub1 =
245+
filterSystem
246+
.logFlowable(ethFilter1, false)
247+
.doOnError(
248+
e -> {
249+
Assert.assertTrue(
250+
Pattern.compile(INVALID_BLOCK_RANGE)
251+
.matcher(e.getMessage())
252+
.find());
253+
})
254+
.onErrorResumeNext(Flowable.empty())
255+
.subscribe(
256+
log -> {
257+
logs1.add(log);
258+
});
259+
260+
Disposable logSub2 =
261+
filterSystem
262+
.logFlowable(ethFilter2, false)
263+
.doOnError(
264+
e -> {
265+
Assert.assertTrue(
266+
Pattern.compile(INVALID_BLOCK_RANGE)
267+
.matcher(e.getMessage())
268+
.find());
269+
})
270+
.onErrorResumeNext(Flowable.empty())
271+
.subscribe(
272+
log -> {
273+
logs2.add(log);
274+
});
275+
276+
Disposable logSub3 =
277+
filterSystem
278+
.logFlowable(ethFilter3, false)
279+
.doOnError(
280+
e -> {
281+
Assert.assertTrue(
282+
Pattern.compile(EXCEED_MAX_TOPICS)
283+
.matcher(e.getMessage())
284+
.find());
285+
})
286+
.onErrorResumeNext(Flowable.empty())
287+
.subscribe(
288+
log -> {
289+
logs3.add(log);
290+
});
291+
292+
List<TransactionReceipt> receipts = new ArrayList<>();
293+
receipts.add(eventSubDemo.transfer("test1", "test2", BigInteger.valueOf(100)));
294+
receipts.add(eventSubDemo.transfer("test3", "test4", BigInteger.valueOf(100)));
295+
receipts.add(eventSubDemo.transfer("test5", "test6", BigInteger.valueOf(100)));
296+
297+
Thread.sleep(2 * 1000);
298+
299+
Assert.assertTrue("logs1.size = " + logs1.size(), logs1.size() == 0);
300+
Assert.assertTrue("logs2.size = " + logs2.size(), logs2.size() == 0);
301+
Assert.assertTrue("logs3.size = " + logs3.size(), logs3.size() == 0);
302+
303+
Assert.assertTrue(logSub1.isDisposed());
304+
Assert.assertTrue(logSub2.isDisposed());
305+
Assert.assertTrue(logSub3.isDisposed());
306+
filterSystem.stop();
307+
}
308+
}

src/main/java/org/fisco/bcos/sdk/v3/BcosSDK.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.fisco.bcos.sdk.v3.config.ConfigOption;
2323
import org.fisco.bcos.sdk.v3.config.exceptions.ConfigException;
2424
import org.fisco.bcos.sdk.v3.eventsub.EventSubscribe;
25+
import org.fisco.bcos.sdk.v3.filter.FilterSystem;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
2728

@@ -150,6 +151,38 @@ public EventSubscribe getEventSubscribe(String groupId) throws BcosSDKException
150151
}
151152
}
152153

154+
/**
155+
* Get an event subscribe instance of a specific group
156+
*
157+
* @param client
158+
* @param poolSize the size of scheduledExecutorService
159+
* @return FilterSystem
160+
*/
161+
public FilterSystem getFilterSystem(Client client, int poolSize) throws BcosSDKException {
162+
try {
163+
return new FilterSystem(client, poolSize);
164+
} catch (Exception e) {
165+
throw new BcosSDKException("get filter system failed, e: " + e.getMessage());
166+
}
167+
}
168+
169+
/**
170+
* Get an event subscribe instance of a specific group
171+
*
172+
* @param client
173+
* @param poolSize the size of scheduledExecutorService
174+
* @param pollingInterval The time interval for polling getFilterChange
175+
* @return FilterSystem
176+
*/
177+
public FilterSystem getFilterSystem(Client client, int poolSize, long pollingInterval)
178+
throws BcosSDKException {
179+
try {
180+
return new FilterSystem(client, poolSize, pollingInterval);
181+
} catch (Exception e) {
182+
throw new BcosSDKException("get filter system failed, e: " + e.getMessage());
183+
}
184+
}
185+
153186
/** Stop all module of BcosSDK */
154187
public void stopAll() {}
155188
}

0 commit comments

Comments
 (0)