Skip to content

Commit c554b52

Browse files
author
egor
committed
https://chronobank.atlassian.net/browse/MD-973 - middleware eth / move to web3 1.0 / block processor
1 parent 5636f0c commit c554b52

9 files changed

+69
-47
lines changed

index.js

+9-1
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,17 @@ const init = async () => {
132132
blockWatchingService.events.on('block', blockEventCallback);
133133
blockWatchingService.events.on('tx', txEventCallback);
134134

135-
await blockWatchingService.startSync();
135+
//await blockWatchingService.startSync();
136136
};
137137

138+
139+
/*
140+
process.on('unhandledRejection', (reason, promise) => {//todo remove
141+
console.log('Unhandled Rejection at:', reason.stack || reason)
142+
process.exit(0)
143+
})
144+
*/
145+
138146
module.exports = init().catch(err => {
139147
log.error(err);
140148
process.exit(0);

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"require-all": "^2.2.0",
1717
"semaphore": "^1.1.0",
1818
"uniqid": "^4.1.1",
19-
"web3": "^0.19.0"
19+
"web3": "^1.0.0-beta.36"
2020
},
2121
"scripts": {
2222
"lint": "node ./node_modules/eslint/bin/eslint.js -c .eslintrc.js --ext .js --fix --ignore-path .eslintignore .",

services/blockWatchingService.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class BlockWatchingService {
4545
this.isSyncing = true;
4646
let web3 = await providerService.get();
4747

48-
const pendingBlock = await Promise.promisify(web3.eth.getBlock)('pending').timeout(5000);
48+
const pendingBlock = await web3.eth.getBlock('pending');
4949

5050
if (!pendingBlock)
5151
await removeUnconfirmedTxs();
@@ -106,7 +106,7 @@ class BlockWatchingService {
106106
async unconfirmedTxEvent (hash) {
107107

108108
let web3 = await providerService.get();
109-
let tx = await Promise.promisify(web3.eth.getTransaction)(hash);
109+
let tx = await web3.eth.getTransaction(hash);
110110

111111
if (!_.has(tx, 'hash'))
112112
return;
@@ -137,13 +137,13 @@ class BlockWatchingService {
137137
async processBlock () {
138138

139139
let web3 = await providerService.get();
140-
const block = await Promise.promisify(web3.eth.getBlockNumber)().timeout(2000).catch(() => 0);
140+
const block = await web3.eth.getBlockNumber().catch(() => 0);
141141

142142
if (block === this.currentHeight - 1)
143143
return Promise.reject({code: 0});
144144

145145
const lastBlock = this.currentHeight === 0 ? null :
146-
await Promise.promisify(web3.eth.getBlock)(this.currentHeight - 1, false).timeout(60000).catch(() => null);
146+
await web3.eth.getBlock(this.currentHeight - 1, false);
147147

148148

149149
if (_.get(lastBlock, 'hash')) {

services/providerService.js

+42-28
Original file line numberDiff line numberDiff line change
@@ -42,23 +42,29 @@ class providerService {
4242
*/
4343
makeWeb3FromProviderURI (providerURI) {
4444

45-
const provider = /^http/.test(providerURI) ?
46-
new Web3.providers.HttpProvider(providerURI) :
47-
new Web3.providers.IpcProvider(`${/^win/.test(process.platform) ? '\\\\.\\pipe\\' : ''}${providerURI}`, net);
45+
if (/^http/.test(providerURI) || /^ws/.test(providerURI))
46+
return new Web3(providerURI);
4847

49-
const web3 = new Web3();
50-
web3.setProvider(provider);
51-
return web3;
48+
providerURI = `${/^win/.test(process.platform) ? '\\\\.\\pipe\\' : ''}${providerURI}`;
49+
return new Web3(providerURI, net);
5250
}
5351

5452
/** @function
5553
* @description reset the current connection
5654
* @return {Promise<void>}
5755
*/
5856
async resetConnector () {
59-
await this.connector.reset();
57+
58+
if (this.filter) {
59+
//await Promise.promisify(this.filter.unsubscribe.bind(this.connector))();
60+
await new Promise(res => this.filter.unsubscribe(res));
61+
this.filter = null;
62+
}
63+
64+
if (_.has(this.connector, 'currentProvider.connection.close'))
65+
this.connector.currentProvider.connection.close();
6066
this.switchConnector();
61-
this.events.emit('disconnected');
67+
6268
}
6369

6470
/**
@@ -68,16 +74,20 @@ class providerService {
6874
*/
6975
async switchConnector () {
7076

77+
console.log('switching connector')
7178
const providerURI = await Promise.any(config.web3.providers.map(async providerURI => {
7279
const web3 = this.makeWeb3FromProviderURI(providerURI);
73-
await Promise.promisify(web3.eth.getBlockNumber)().timeout(5000);
74-
web3.reset();
80+
await web3.eth.getBlockNumber();
81+
if (_.has(web3, 'currentProvider.connection.close'))
82+
web3.currentProvider.connection.close();
7583
return providerURI;
7684
})).catch(() => {
7785
log.error('no available connection!');
78-
process.exit(0);
86+
process.exit(0); //todo move to root
7987
});
8088

89+
console.log('switched connector')
90+
8191
const fullProviderURI = !/^http/.test(providerURI) ? `${/^win/.test(process.platform) ? '\\\\.\\pipe\\' : ''}${providerURI}` : providerURI;
8292
const currentProviderURI = this.connector ? this.connector.currentProvider.path || this.connector.currentProvider.host : '';
8393

@@ -87,31 +97,28 @@ class providerService {
8797
this.connector = this.makeWeb3FromProviderURI(providerURI);
8898

8999
if (_.get(this.connector.currentProvider, 'connection')) {
90-
this.connector.currentProvider.connection.on('end', () => this.resetConnector());
91-
this.connector.currentProvider.connection.on('error', () => this.resetConnector());
92-
} else
100+
101+
/* this.connector.currentProvider.connection.on('end', () => this.resetConnector());
102+
this.connector.currentProvider.connection.on('error', () => this.resetConnector());*/
103+
this.connector.currentProvider.connection.onerror(()=>this.resetConnector());
104+
this.connector.currentProvider.connection.onclose(()=>this.resetConnector());
105+
106+
107+
} else
93108
this.pingIntervalId = setInterval(async () => {
94109

95-
const isConnected = await new Promise((res, rej) => {
96-
this.connector.currentProvider.sendAsync({
97-
id: 9999999999,
98-
jsonrpc: '2.0',
99-
method: 'net_listening',
100-
params: []
101-
}, (err, result) => err ? rej(err) : res(result.result));
102-
});
110+
const isConnected = await this.connector.eth.getProtocolVersion().catch(() => null);
103111

104112
if (!isConnected) {
105113
clearInterval(this.pingIntervalId);
106114
this.resetConnector();
107115
}
108116
}, 5000);
109-
110117

111-
this.filter = this.connector.eth.filter('pending');
112-
this.filter.watch((err, result) => {
113-
if (!err)
114-
this.events.emit('unconfirmedTx', result);
118+
119+
this.filter = this.connector.eth.subscribe('pendingTransactions');
120+
this.filter.on('data', (transaction) => {
121+
this.events.emit('unconfirmedTx', transaction);
115122
});
116123

117124
this.events.emit('provider_set');
@@ -126,6 +133,8 @@ class providerService {
126133
*/
127134
async switchConnectorSafe () {
128135

136+
console.log('going to switch connector')
137+
129138
return new Promise(res => {
130139
sem.take(async () => {
131140
await this.switchConnector();
@@ -141,7 +150,12 @@ class providerService {
141150
* @return {Promise<*|bluebird>}
142151
*/
143152
async get () {
144-
return this.connector && this.connector.isConnected() ? this.connector : await this.switchConnectorSafe();
153+
154+
if(this.connector){
155+
console.log('is listening: ', await this.connector.eth.getProtocolVersion().catch(()=>null))
156+
}
157+
158+
return this.connector && await this.connector.eth.getProtocolVersion().catch(() => false) ? this.connector : await this.switchConnectorSafe();
145159
}
146160

147161
}

services/syncCacheService.js

+7-2
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,26 @@ class SyncCacheService {
9393
*/
9494
async runPeer(bucket) {
9595

96+
console.log('getting web3 instance')
9697
let web3 = await providerService.get();
9798

98-
const lastBlock = await Promise.promisify(web3.eth.getBlock)(_.last(bucket), false).timeout(60000);
99-
99+
const lastBlock = await web3.eth.getBlock(_.last(bucket), false);
100100

101101
if (!lastBlock || (_.last(bucket) !== 0 && !lastBlock.number))
102102
return await Promise.delay(10000);
103103

104104
log.info(`web3 provider took chuck of blocks ${bucket[0]} - ${_.last(bucket)}`);
105105

106106
await Promise.mapSeries(bucket, async (blockNumber) => {
107+
console.log('getting block')
107108
const block = await getBlock(blockNumber);
109+
console.log('adding block')
108110
await addBlock(block);
109111

112+
console.log('remove block from bucket')
110113
_.pull(bucket, blockNumber);
114+
115+
console.log('emitting event...')
111116
this.events.emit('block', block);
112117
});
113118

utils/blocks/addBlock.js

-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ const bunyan = require('bunyan'),
99
removeUnconfirmedTxs = require('../txs/removeUnconfirmedTxs'),
1010
crypto = require('crypto'),
1111
sem = require('semaphore')(3),
12-
Promise = require('bluebird'),
1312
config = require('../../config'),
1413
models = require('../../models'),
1514
BigNumber = require('bignumber.js'),

utils/blocks/allocateBlockBuckets.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ module.exports = async function () {
6060

6161
let web3 = await providerService.get();
6262

63-
let currentNodeHeight = await Promise.promisify(web3.eth.getBlockNumber)().timeout(10000).catch(() => -1);
63+
let currentNodeHeight = await web3.eth.getBlockNumber().catch(() => -1);
6464
currentNodeHeight = parseInt(currentNodeHeight);
6565

6666
if (currentNodeHeight === -1)

utils/blocks/getBlock.js

+4-8
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
* @author Egor Zuev <[email protected]>
55
*/
66

7-
const Promise = require('bluebird'),
8-
providerService = require('../../services/providerService'),
7+
const providerService = require('../../services/providerService'),
98
_ = require('lodash');
109

1110
/**
@@ -23,9 +22,9 @@ module.exports = async (blockNumber) => {
2322

2423
let web3 = await providerService.get();
2524

26-
let rawBlock = await Promise.promisify(web3.eth.getBlock)(blockNumber, true).timeout(10000);
25+
let rawBlock = await web3.eth.getBlock(blockNumber, true);
2726

28-
if(!rawBlock)
27+
if (!rawBlock)
2928
return Promise.reject({code: 2});
3029

3130
rawBlock.uncleAmount = rawBlock.uncles.length;
@@ -35,10 +34,7 @@ module.exports = async (blockNumber) => {
3534
return rawBlock;
3635
}
3736

38-
let logs = await new Promise((res, rej) =>
39-
web3.eth.filter({fromBlock: blockNumber, toBlock: blockNumber})
40-
.get((err, result) => err ? rej(err) : res(result))
41-
).timeout(30000);
37+
let logs = await web3.eth.getPastLogs({fromBlock: blockNumber, toBlock: blockNumber});
4238

4339
rawBlock.transactions = rawBlock.transactions.map(tx => {
4440
tx.logs = _.chain(logs)

utils/txs/removeUnconfirmedTxs.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ module.exports = async () => {
1212

1313
let web3 = await providerService.get();
1414

15-
const pendingBlock = await Promise.promisify(web3.eth.getBlock)('pending').timeout(5000);
15+
const pendingBlock = await web3.eth.getBlock('pending');
1616

1717
if (!_.get(pendingBlock, 'transactions', []).length)
1818
return;

0 commit comments

Comments
 (0)