Skip to content

Commit 03077ea

Browse files
author
egor
committed
https://chronobank.atlassian.net/browse/MD-1185 - middleware eth / move to web3 1.0 / block processor / update tests
https://chronobank.atlassian.net/browse/MD-1186 - middleware eth / move to web3 1.0 / block processor / bugs and features
1 parent c554b52 commit 03077ea

18 files changed

+244
-219
lines changed

index.js

+23-18
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111
const mongoose = require('mongoose'),
1212
config = require('./config'),
1313
models = require('./models'),
14+
RMQBlockModel = require('middleware-common-components/models/rmq/eth/blockModel'),
15+
RMQTxModel = require('middleware-common-components/models/rmq/eth/txModel'),
1416
MasterNodeService = require('middleware-common-components/services/blockProcessor/MasterNodeService'),
1517
Promise = require('bluebird'),
1618
_ = require('lodash'),
1719
providerService = require('./services/providerService'),
18-
1920
AmqpService = require('middleware_common_infrastructure/AmqpService'),
2021
InfrastructureInfo = require('middleware_common_infrastructure/InfrastructureInfo'),
2122
InfrastructureService = require('middleware_common_infrastructure/InfrastructureService'),
@@ -79,7 +80,7 @@ const init = async () => {
7980
const masterNodeService = new MasterNodeService(channel, config.rabbit.serviceName);
8081
await masterNodeService.start();
8182

82-
providerService.events.on('provider_set', providerURI => {
83+
providerService.on('provider_set', providerURI => {
8384
let providerIndex = _.findIndex(config.web3.providers, providerURI);
8485
if (providerIndex !== -1)
8586
channel.publish('internal', `${config.rabbit.serviceName}_current_provider.set`, new Buffer(JSON.stringify({index: providerIndex})));
@@ -96,52 +97,56 @@ const init = async () => {
9697

9798
let blockEventCallback = async block => {
9899
log.info(`${block.hash} (${block.number}) added to cache.`);
99-
await channel.publish('events', `${config.rabbit.serviceName}_block`, new Buffer(JSON.stringify({block: block.number})));
100+
101+
const blockModel = new RMQBlockModel({block: block.number});
102+
103+
await channel.publish('events', `${config.rabbit.serviceName}_block`, new Buffer(blockModel.toString()));
100104
const filteredTxs = await filterTxsByAccountService(block.transactions);
101105

102106
for (let item of filteredTxs)
103-
for (let tx of item.txs)
104-
await channel.publish('events', `${config.rabbit.serviceName}_transaction.${item.address}`, new Buffer(JSON.stringify(tx)));
107+
for (let tx of item.txs) {
108+
109+
const txModel = new RMQTxModel(tx);
110+
await channel.publish('events', `${config.rabbit.serviceName}_transaction.${item.address}`, new Buffer(txModel.toString()));
111+
}
105112
};
106113
let txEventCallback = async tx => {
107114
const filteredTxs = await filterTxsByAccountService([tx]);
108115
for (let item of filteredTxs)
109116
for (let tx of item.txs) {
110-
tx = _.omit(tx, ['blockHash', 'transactionIndex']);
111117
tx.blockNumber = -1;
112-
await channel.publish('events', `${config.rabbit.serviceName}_transaction.${item.address}`, new Buffer(JSON.stringify(tx)));
118+
const txModel = new RMQTxModel(tx);
119+
await channel.publish('events', `${config.rabbit.serviceName}_transaction.${item.address}`, new Buffer(txModel.toString()));
113120
}
114121
};
115122

116-
syncCacheService.events.on('block', blockEventCallback);
123+
syncCacheService.on('block', blockEventCallback);
117124

118125
let endBlock = await syncCacheService.start();
119126

120127
await new Promise((res) => {
121128
if (config.sync.shadow)
122129
return res();
123130

124-
syncCacheService.events.on('end', () => {
131+
syncCacheService.on('end', () => {
125132
log.info(`cached the whole blockchain up to block: ${endBlock}`);
126133
res();
127134
});
128135
});
129136

130137
let blockWatchingService = new BlockWatchingService(endBlock);
131138

132-
blockWatchingService.events.on('block', blockEventCallback);
133-
blockWatchingService.events.on('tx', txEventCallback);
139+
blockWatchingService.on('block', blockEventCallback);
140+
blockWatchingService.on('tx', txEventCallback);
134141

135-
//await blockWatchingService.startSync();
142+
await blockWatchingService.startSync();
136143
};
137144

138145

139-
/*
140-
process.on('unhandledRejection', (reason, promise) => {//todo remove
141-
console.log('Unhandled Rejection at:', reason.stack || reason)
142-
process.exit(0)
143-
})
144-
*/
146+
providerService.on('connection_error', err => {
147+
log.error(err);
148+
process.exit(1);
149+
});
145150

146151
module.exports = init().catch(err => {
147152
log.error(err);

ipcConverter.js

+20-11
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const net = require('net'),
1919
_ = require('lodash'),
2020
log = bunyan.createLogger({name: 'ipcConverter', level: config.logs.level}),
2121
dbPath = path.join(__dirname, 'testrpc_db'),
22+
{URL} = require('url'),
2223
TestRPC = require('ganache-cli');
2324

2425
const accounts = [
@@ -28,13 +29,26 @@ const accounts = [
2829
'51cd20e24463a0e86c540f074a5f083c334659353eec43bb0bd9297b5929bd35',
2930
'7af5f0d70d97f282dfd20a9b611a2e4bd40572c038a89c0ee171a3c93bd6a17a',
3031
'cfc6d3fa2b579e3023ff0085b09d7a1cf13f6b6c995199454b739d24f2cf23a5'
31-
].map(privKey => ({secretKey: Buffer.from(privKey, 'hex'), balance: web3.toWei(500, 'ether')}));
32+
].map(privKey => ({secretKey: Buffer.from(privKey, 'hex'), balance: web3.utils.toWei('500', 'ether')}));
3233

3334
if (!fs.existsSync(dbPath))
3435
fs.mkdirSync(dbPath);
3536

36-
let RPCServer = TestRPC.server({accounts: accounts, default_balance_ether: 500, db_path: dbPath, network_id: 86});
37-
RPCServer.listen(parseInt(process.env.RPC_PORT || 8545));
37+
let RPCServer = TestRPC.server({
38+
accounts: accounts,
39+
default_balance_ether: 500,
40+
db_path: dbPath,
41+
network_id: 86,
42+
ws: true
43+
});
44+
45+
let port = (/^http/.test(config.web3.providers[0]) || /^ws/.test(config.web3.providers[0])) ?
46+
new URL(config.web3.providers[0]).port : 8545;
47+
48+
49+
50+
RPCServer.listen(port);
51+
3852
const web3ProviderUri = `${/^win/.test(process.platform) ? '\\\\.\\pipe\\' : ''}${config.web3.providers[0]}`;
3953

4054
let addresses = _.chain(RPCServer.provider.manager.state.accounts)
@@ -48,6 +62,7 @@ let addresses = _.chain(RPCServer.provider.manager.state.accounts)
4862

4963
console.log(addresses);
5064

65+
5166
// create RPC server
5267
const server = net.createServer(stream => {
5368
stream.on('data', async c => {
@@ -76,11 +91,7 @@ const server = net.createServer(stream => {
7691
process.exit(1);
7792
});
7893

79-
/**
80-
* Remove pipe file
81-
* @param {string} filename Path to pipe file
82-
* @return {boolean} Whether file removed or not
83-
*/
94+
8495
const removePipeFile = filename => {
8596
try {
8697
fs.accessSync(filename, fs.F_OK | fs.W_OK) || fs.unlinkSync(filename);
@@ -101,9 +112,6 @@ if (!/^win/.test(process.platform)) {
101112

102113
// Clean up pipe file after shutdown process
103114

104-
/**
105-
* Stub for windows. Emulate SIGINT for Win32
106-
*/
107115
if (process.platform === 'win32') {
108116
const rl = require('readline').createInterface({
109117
input: process.stdin,
@@ -127,3 +135,4 @@ process.on('SIGINT', function () {
127135
server.listen(web3ProviderUri, () => {
128136
log.info(`Server: on listening for network - ${config.web3.network}`);
129137
});
138+

models/blockModel.js

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const Block = new mongoose.Schema({
1919
timestamp: {type: Number, required: true},
2020
uncleAmount: {type: Number, required: true},
2121
totalTxFee: {type: String, required: true},
22+
miner: {type: String, required: true},
2223
created: {type: Date, required: true, default: Date.now}
2324
}, {_id: false});
2425

models/txLogModel.js

+6-3
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,17 @@ const mongoose = require('mongoose'),
2424
* @return {*}
2525
*/
2626
const setArgs = function (topics) {
27-
_.pullAt(topics, 0);
28-
return topics.map((topic, index) => {
27+
28+
let clonedTopics = _.cloneDeep(topics);
29+
30+
_.pullAt(clonedTopics, 0);
31+
return clonedTopics.map((topic, index) => {
2932
let bn = BigNumber(topic, 16);
3033
return {
3134
e: bn.e,
3235
c: bn.c,
3336
index: index
34-
}
37+
};
3538
});
3639
};
3740

models/txModel.js

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const TX = new mongoose.Schema({
2323
nonce: {type: Number},
2424
gasPrice: {type: String},
2525
gas: {type: String},
26+
gasUsed: {type: String},
2627
from: {type: String, index: true}
2728
}, {_id: false});
2829

package.json

+1-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
"eslint-plugin-chronobank": "github:chronobank/eslint-plugin-chronobank",
3434
"fs-extra": "^7.0.0",
3535
"ganache-cli": "^6.1.0",
36-
"mocha": "^3.5.0",
37-
"truffle-contract": "^3.0.6"
36+
"mocha": "^3.5.0"
3837
}
3938
}

services/blockWatchingService.js

+18-9
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ const config = require('../config'),
2626
* @returns Object<BlockWatchingService>
2727
*/
2828

29-
class BlockWatchingService {
29+
class BlockWatchingService extends EventEmitter {
3030

3131
constructor (currentHeight) {
32-
this.events = new EventEmitter();
32+
super();
3333
this.currentHeight = currentHeight;
3434
this.isSyncing = false;
3535
}
@@ -54,7 +54,7 @@ class BlockWatchingService {
5454
this.doJob();
5555

5656
this.unconfirmedTxEventCallback = result=> this.unconfirmedTxEvent(result).catch();
57-
providerService.events.on('unconfirmedTx', this.unconfirmedTxEventCallback);
57+
providerService.on('unconfirmedTx', this.unconfirmedTxEventCallback);
5858

5959
}
6060

@@ -69,7 +69,7 @@ class BlockWatchingService {
6969
const block = await this.processBlock();
7070
await addBlock(block, true);
7171
this.currentHeight++;
72-
this.events.emit('block', block);
72+
this.emit('block', block);
7373
} catch (err) {
7474

7575
if (_.get(err, 'code') === 0) {
@@ -100,21 +100,30 @@ class BlockWatchingService {
100100
/**
101101
* @function
102102
* @description process unconfirmed tx
103-
* @param hash - the hash of transaction
103+
* @param transactionReceipt - the receipt of transaction
104104
* @return {Promise<void>}
105105
*/
106-
async unconfirmedTxEvent (hash) {
106+
async unconfirmedTxEvent (transactionReceipt) {
107107

108108
let web3 = await providerService.get();
109-
let tx = await web3.eth.getTransaction(hash);
109+
let tx = await web3.eth.getTransaction(transactionReceipt.transactionHash);
110110

111111
if (!_.has(tx, 'hash'))
112112
return;
113113

114+
115+
if (tx.from)
116+
tx.from = tx.from.toLowerCase();
117+
118+
if (tx.to)
119+
tx.to = tx.to.toLowerCase();
120+
121+
tx.gasUsed = transactionReceipt.gasUsed;
122+
114123
tx.logs = [];
115124

116125
await addUnconfirmedTx(tx);
117-
this.events.emit('tx', tx);
126+
this.emit('tx', tx);
118127

119128
}
120129

@@ -125,7 +134,7 @@ class BlockWatchingService {
125134
*/
126135
async stopSync () {
127136
this.isSyncing = false;
128-
providerService.events.removeListener('unconfirmedTx', this.unconfirmedTxEventCallback);
137+
providerService.removeListener('unconfirmedTx', this.unconfirmedTxEventCallback);
129138

130139
}
131140

services/filterTxsByAccountService.js

+5-5
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ module.exports = async (txs) => {
4646
.flattenDeep()
4747
.uniq()
4848
.map(address => ({
49-
address: address,
50-
txs: _.filter(txs, tx =>
51-
_.union(tx.logs.map(log => log.address), [tx.to, tx.from]).includes(address)
52-
)
53-
})
49+
address: address,
50+
txs: _.filter(txs, tx =>
51+
_.union(tx.logs.map(log => log.address), [tx.to, tx.from]).includes(address)
52+
)
53+
})
5454
)
5555
.value();
5656
};

0 commit comments

Comments
 (0)