From d48c439dfb179328a566cf36e2acb9630dcfbc36 Mon Sep 17 00:00:00 2001 From: Igor Lins e Silva Date: Sat, 23 May 2020 23:54:43 -0300 Subject: [PATCH] 3.0.1-beta.1 --- chains/example.config.json | 31 +++++---- helpers/common_functions.ts | 9 ++- interfaces/hyperionConfig.ts | 10 ++- modules/loader.ts | 4 ++ modules/master.ts | 110 +++++++++++++++++++++++++----- package-lock.json | 14 ++-- package.json | 6 +- run.sh | 2 +- workers/deserializer.ts | 128 +++++++++++++++++++++++++++++------ workers/state-reader.ts | 30 +++++++- 10 files changed, 276 insertions(+), 68 deletions(-) diff --git a/chains/example.config.json b/chains/example.config.json index 0af2690a..241653a3 100644 --- a/chains/example.config.json +++ b/chains/example.config.json @@ -45,17 +45,20 @@ "deltas": [] }, "scaling": { - "batch_size": 10000, - "queue_limit": 50000, - "readers": 1, - "ds_queues": 1, - "ds_threads": 1, - "ds_pool_size": 1, - "indexing_queues": 1, - "ad_idx_queues": 1, - "max_autoscale": 4, + "readers": 1, + "ds_queues": 1, + "ds_threads": 1, + "ds_pool_size": 1, + "indexing_queues": 1, + "ad_idx_queues": 1, + "max_autoscale": 4, + "batch_size": 5000, + "resume_trigger": 5000, "auto_scale_trigger": 20000, - "routing_mode": "heatmap" + "block_queue_limit": 10000, + "max_queue_limit": 100000, + "routing_mode": "heatmap", + "polling_interval": 10000 }, "indexer": { "start_on": 0, @@ -84,8 +87,12 @@ "voters": true }, "index_deltas": true, - "index_transfer_memo": true, - "index_all_deltas": true + "index_transfer_memo": false, + "index_all_deltas": true, + "deferred_trx": false, + "failed_trx": true, + "resource_limits": false, + "resource_usage": false }, "prefetch": { "read": 50, diff --git a/helpers/common_functions.ts b/helpers/common_functions.ts index 97d5b7ca..69fcee1e 100644 --- a/helpers/common_functions.ts +++ b/helpers/common_functions.ts @@ -1,7 +1,14 @@ import {ApiResponse, Client} from "@elastic/elasticsearch"; import {Serialize} from "../addons/eosjs-native"; -const config = require(`../${process.env.CONFIG_JSON}`); +let config; +try { + config = require(`../${process.env.CONFIG_JSON}`); +} catch (e) { + console.log(`Configuration not found: ${process.env.CONFIG_JSON}`); + process.exit(1); +} + const CHAIN = config.settings.chain; function getLastResult(results: ApiResponse) { diff --git a/interfaces/hyperionConfig.ts b/interfaces/hyperionConfig.ts index 65cb38e5..4bd0a1ed 100644 --- a/interfaces/hyperionConfig.ts +++ b/interfaces/hyperionConfig.ts @@ -1,4 +1,8 @@ export interface ScalingConfigs { + polling_interval: number; + resume_trigger: number; + max_queue_limit: number; + block_queue_limit: number; routing_mode: string; batch_size: number; queue_limit: number; @@ -110,7 +114,11 @@ export interface HyperionConfig { }, index_deltas: boolean, index_transfer_memo: boolean, - index_all_deltas: boolean + index_all_deltas: boolean, + deferred_trx: boolean, + failed_trx: boolean, + resource_usage: boolean, + resource_limits: boolean, }; prefetch: { diff --git a/modules/loader.ts b/modules/loader.ts index 3df581ff..beafc851 100644 --- a/modules/loader.ts +++ b/modules/loader.ts @@ -18,6 +18,10 @@ export class HyperionModuleLoader { constructor(private cm: ConfigurationModule) { this.conn = cm.connections; this.config = cm.config; + if (!this.conn.chains[this.config.settings.chain]) { + console.log('Chain ' + this.config.settings.chain + ' not defined on connections.json!'); + process.exit(0); + } this.chainID = this.conn.chains[this.config.settings.chain].chain_id; this.loadActionHandlers(); this.loadParser().catch((err) => { diff --git a/modules/master.ts b/modules/master.ts index d00fab3a..36711e90 100644 --- a/modules/master.ts +++ b/modules/master.ts @@ -34,10 +34,10 @@ import * as cluster from "cluster"; import {Worker} from "cluster"; import {HyperionWorkerDef} from "../interfaces/hyperionWorkerDef"; import {HyperionConfig} from "../interfaces/hyperionConfig"; -import moment = require("moment"); -import Timeout = NodeJS.Timeout; import {AsyncQueue, queue} from "async"; +import moment = require("moment"); +import Timeout = NodeJS.Timeout; export class HyperionMaster { @@ -86,7 +86,6 @@ export class HyperionMaster { private lastProducer: string = null; private handoffCounter: number = 0; private missedRounds: object = {}; - private blockMsgQueue: any[] = []; // IPC Messaging private totalMessages = 0; @@ -134,6 +133,7 @@ export class HyperionMaster { private proposedSchedule: any; private wsRouterWorker: cluster.Worker; private liveBlockQueue: AsyncQueue; + private readingPaused = false; constructor() { @@ -1140,17 +1140,65 @@ export class HyperionMaster { }, 5000); } - private monitorIndexingQueues() { - const limit = this.conf.scaling.auto_scale_trigger; - const autoscaleConsumers = {}; - setInterval(async () => { - const testedQueues = new Set(); - for (const worker of this.workerMap) { - if (worker.worker_role === 'ingestor') { - const queue = worker.queue; - if (!testedQueues.has(queue)) { - testedQueues.add(queue); - const size = await this.manager.checkQueueSize(queue); + private async checkQueues(autoscaleConsumers, limit) { + const testedQueues = new Set(); + for (const worker of this.workerMap) { + let queue = worker.queue; + + if (worker.worker_role === 'ds_pool_worker') { + queue = `${this.chain}:ds_pool:${worker.local_id}`; + } + + if (queue) { + if (!testedQueues.has(queue)) { + testedQueues.add(queue); + const size = await this.manager.checkQueueSize(queue); + + + // pause readers if queues are above the max_limit + if (size >= this.conf.scaling.max_queue_limit) { + this.readingPaused = true; + for (const worker of this.workerMap) { + if (worker.worker_role === 'reader') { + worker.wref.send({event: 'pause'}); + } + } + } + + // resume readers if the queues are below the trigger point + if ((this.readingPaused && size <= this.conf.scaling.resume_trigger)) { + this.readingPaused = false; + for (const worker of this.workerMap) { + if (worker.worker_role === 'reader') { + worker.wref.send({event: 'pause'}); + worker.wref.send({ + event: 'set_delay', + data: { + state: false, + delay: 0 + } + }); + } + } + } + + // apply block processing delay if 20% below max + if (size >= this.conf.scaling.max_queue_limit * 0.8) { + for (const worker of this.workerMap) { + if (worker.worker_role === 'reader') { + worker.wref.send({ + event: 'set_delay', + data: { + state: true, + delay: 500 + } + }); + } + } + } + + + if (worker.worker_role === 'ingestor') { if (size > limit) { if (!autoscaleConsumers[queue]) { autoscaleConsumers[queue] = 0; @@ -1164,14 +1212,24 @@ export class HyperionMaster { }); this.launchWorkers(); autoscaleConsumers[queue]++; - } else { - // hLog(`WARN: Max consumer limit reached on ${queue}!`); } } } } } - }, 20000); + } + } + + private monitorIndexingQueues() { + const limit = this.conf.scaling.auto_scale_trigger; + const autoscaleConsumers = {}; + this.checkQueues(autoscaleConsumers, limit).catch(console.log); + if (!this.conf.scaling.polling_interval) { + this.conf.scaling.polling_interval = 20000; + } + setInterval(async () => { + await this.checkQueues(autoscaleConsumers, limit); + }, this.conf.scaling.polling_interval); } private onPm2Stop() { @@ -1350,6 +1408,22 @@ export class HyperionMaster { async runMaster() { + // config checks + if (!this.conf.scaling.max_queue_limit) { + hLog(`scaling.max_queue_limit is not defined!`); + process.exit(1); + } + + if (!this.conf.scaling.resume_trigger) { + hLog(`scaling.resume_trigger is not defined!`); + process.exit(1); + } + + if (!this.conf.scaling.block_queue_limit) { + hLog(`scaling.block_queue_limit is not defined!`); + process.exit(1); + } + this.printMode(); // Preview mode - prints only the proposed worker map @@ -1365,7 +1439,7 @@ export class HyperionMaster { this.printActiveProds(); } - // ELasticsearch + // Elasticsearch this.client = this.manager.elasticsearchClient; try { const esInfo = await this.client.info(); diff --git a/package-lock.json b/package-lock.json index dd5f18db..ce2f9cc8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "hyperion-history", - "version": "3.0.0-beta.2", + "version": "3.0.1-beta.1", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -288,9 +288,9 @@ } }, "@types/lodash": { - "version": "4.14.151", - "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.14.151.tgz", - "integrity": "sha512-Zst90IcBX5wnwSu7CAS0vvJkTjTELY4ssKbHiTnGcJgi170uiS8yQDdc3v6S77bRqYQIN1App5a1Pc2lceE5/g==", + "version": "4.14.152", + "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.14.152.tgz", + "integrity": "sha512-Vwf9YF2x1GE3WNeUMjT5bTHa2DqgUo87ocdgTScupY2JclZ5Nn7W2RLM/N0+oreexUk8uaVugR81NnTY/jNNXg==", "dev": true }, "@types/node": { @@ -3171,9 +3171,9 @@ } }, "typescript": { - "version": "3.9.2", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-3.9.2.tgz", - "integrity": "sha512-q2ktq4n/uLuNNShyayit+DTobV2ApPEo/6so68JaD5ojvc/6GClBipedB9zNWYxRSAlZXAe405Rlijzl6qDiSw==" + "version": "3.9.3", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-3.9.3.tgz", + "integrity": "sha512-D/wqnB2xzNFIcoBG9FG8cXRDjiqSTbG2wd8DMZeQyJlP1vfTkIxH4GKveWaEBYySKIg+USu+E+EDIR47SqnaMQ==" }, "uid2": { "version": "0.0.3", diff --git a/package.json b/package.json index 1e276e92..fa884364 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "hyperion-history", - "version": "3.0.0-beta.4", + "version": "3.0.1-beta.1", "description": "Scalable Full History API Solution for EOSIO based blockchains", "main": "launcher.js", "scripts": { @@ -55,11 +55,11 @@ "socket.io-redis": "5.2.0", "ws": "7.3.0", "yargs": "15.3.1", - "typescript": "3.9.2" + "typescript": "3.9.3" }, "devDependencies": { "@types/ioredis": "4.16.2", - "@types/lodash": "4.14.151", + "@types/lodash": "4.14.152", "@types/amqplib": "0.5.13", "@types/async": "3.2.3", "@types/got": "9.6.11", diff --git a/run.sh b/run.sh index 58b93a1d..bf9a0eba 100644 --- a/run.sh +++ b/run.sh @@ -10,4 +10,4 @@ echo -e "\n-->> Starting $1..." echo -e "\n-->> Saving pm2 state..." (set -x; pm2 save) echo -e "\n-->> Reading $1 logs..." -(set -x; pm2 logs --raw --lines 0 "$@") +(set -x; pm2 logs --raw --lines 10 "$@") diff --git a/workers/deserializer.ts b/workers/deserializer.ts index 725002b1..7bf20c66 100644 --- a/workers/deserializer.ts +++ b/workers/deserializer.ts @@ -93,6 +93,7 @@ export default class MainDSWorker extends HyperionWorker { }); this.populateTableHandlers(); + } async run(): Promise { @@ -131,11 +132,6 @@ export default class MainDSWorker extends HyperionWorker { this.dsPoolMap = msg.data; break; } - default: { - hLog('-----------> IPC Message <--------------'); - hLog(msg); - hLog('----------------------------------------'); - } } } @@ -246,13 +242,47 @@ export default class MainDSWorker extends HyperionWorker { const failedTrx = []; block.transactions.forEach((trx) => { + total_cpu += trx['cpu_usage_us']; total_net += trx['net_usage_words']; - if (trx.status !== 0) { - failedTrx.push({ - id: trx.trx[1], - status: trx.status - }); + + if (this.conf.features.failed_trx) { + switch (trx.status) { + + // soft_fail: objectively failed (not executed), error handler executed + case 1: { + failedTrx.push({ + id: trx.trx[1], + status: trx.status + }); + break; + } + + // hard_fail: objectively failed and error handler objectively failed thus no state change + case 2: { + hLog('hard_fail', block_num); + console.log(trx); + break; + } + + // delayed: transaction delayed/deferred/scheduled for future execution + // case 3: { + // hLog('delayed', block_num); + // console.log(trx); + // const unpackedTrx = this.api.deserializeTransaction(Buffer.from(trx.trx[1].packed_trx, 'hex')); + // console.log(unpackedTrx); + // break; + // } + + // expired: transaction expired and storage space refunded to user + case 4: { + failedTrx.push({ + id: trx.trx[1], + status: trx.status + }); + break; + } + } } }); @@ -260,12 +290,13 @@ export default class MainDSWorker extends HyperionWorker { if (failedTrx.length > 0) { for (const tx of failedTrx) { if (typeof tx.id === 'string') { - this.pushToIndexQueue({ + const payload = { "@timestamp": ts, "block_num": block_num, trx_id: tx.id, status: tx.status - }, 'trx_error'); + }; + this.pushToIndexQueue(payload, 'trx_error'); } } } @@ -849,6 +880,22 @@ export default class MainDSWorker extends HyperionWorker { } } + private anyFromSender(gen_trx: any) { + return this.chain + '::' + gen_trx.sender + '::*'; + } + + checkDeltaBlacklistForGenTrx(gen_trx) { + if (this.filters.delta_blacklist.has(this.anyFromSender(gen_trx))) { + return true; + } + } + + checkDeltaWhitelistForGenTrx(gen_trx) { + if (this.filters.delta_whitelist.has(this.anyFromSender(gen_trx))) { + return true; + } + } + deltaStructHandlers = { "contract_row": async (payload, block_num, block_ts, row) => { @@ -976,7 +1023,20 @@ export default class MainDSWorker extends HyperionWorker { // Deferred Transactions "generated_transaction": async (generated_transaction: any, block_num, block_ts) => { - if (!this.conf.indexer.abi_scan_mode && this.conf.indexer.process_deltas) { + if (!this.conf.indexer.abi_scan_mode && this.conf.indexer.process_deltas && this.conf.features.deferred_trx) { + + // check delta blacklist chain::code::table + if (this.checkDeltaBlacklistForGenTrx(generated_transaction)) { + return false; + } + + // check delta whitelist chain::code::table + if (this.filters.delta_whitelist.size > 0) { + if (!this.checkDeltaWhitelistForGenTrx(generated_transaction)) { + return false; + } + } + const unpackedTrx = this.api.deserializeTransaction(Buffer.from(generated_transaction.packed_trx, 'hex')); for (const action of unpackedTrx.actions) { const act_data = await this.deserializeActionAtBlockNative(action, block_num); @@ -984,7 +1044,8 @@ export default class MainDSWorker extends HyperionWorker { action.data = act_data; } } - this.pushToIndexQueue({ + + const genTxPayload = { '@timestamp': block_ts, block_num: block_num, sender: generated_transaction.sender, @@ -993,14 +1054,16 @@ export default class MainDSWorker extends HyperionWorker { trx_id: generated_transaction.trx_id.toLowerCase(), actions: unpackedTrx.actions, packed_trx: generated_transaction.packed_trx - }, 'generated_transaction'); + }; + + this.pushToIndexQueue(genTxPayload, 'generated_transaction'); } }, // Account resource updates "resource_limits": async (resource_limits, block_num, block_ts) => { - if (!this.conf.indexer.abi_scan_mode && this.conf.indexer.process_deltas) { + if (!this.conf.indexer.abi_scan_mode && this.conf.indexer.process_deltas && this.conf.features.resource_limits) { const cpu = parseInt(resource_limits.cpu_weight); const net = parseInt(resource_limits.net_weight); this.pushToIndexQueue({ @@ -1024,7 +1087,7 @@ export default class MainDSWorker extends HyperionWorker { // }, "resource_usage": async (resource_usage, block_num, block_ts, row) => { - if (!this.conf.indexer.abi_scan_mode && this.conf.indexer.process_deltas) { + if (!this.conf.indexer.abi_scan_mode && this.conf.indexer.process_deltas && this.conf.features.resource_usage) { const net_used = parseInt(resource_usage.net_usage[1].consumed); const net_total = parseInt(resource_usage.net_usage[1].value_ex); let net_pct = 0.0; @@ -1106,11 +1169,32 @@ export default class MainDSWorker extends HyperionWorker { // const tRef = process.hrtime.bigint(); for (const row of deltaStruct[key]) { - const data = this.deserializeNative(key, row.data); - try { - await this.deltaStructHandlers[key](data[1], block_num, block_ts, row); - } catch (e) { - hLog(`Delta struct deserialization error: ${e.message}`); + let data = this.deserializeNative(key, row.data); + + if (!data) { + try { + data = this.types.get(key).deserialize( + new Serialize.SerialBuffer({ + textEncoder: this.txEnc, + textDecoder: this.txDec, + array: Buffer.from(row.data, 'hex') + }), + new Serialize.SerializerState({ + bytesAsUint8Array: true + })); + } catch (e) { + hLog(`Delta struct [${key}] deserialization error: ${e.message}`); + hLog(row.data); + } + } + + if (data) { + try { + await this.deltaStructHandlers[key](data[1], block_num, block_ts, row); + } catch (e) { + hLog(`Delta struct [${key}] processing error: ${e.message}`); + hLog(data); + } } } diff --git a/workers/state-reader.ts b/workers/state-reader.ts index c343c0a1..d0487c8f 100644 --- a/workers/state-reader.ts +++ b/workers/state-reader.ts @@ -31,6 +31,8 @@ export default class StateReader extends HyperionWorker { private currentIdx = 1; private receivedFirstBlock = false; private local_lib = 0; + private delay_block_processing = false; + private block_processing_delay = 100; constructor() { super(); @@ -43,9 +45,15 @@ export default class StateReader extends HyperionWorker { this.distribute(tasks, callback); }, this.conf.prefetch.read); - this.blockReadingQueue = cargo((tasks, callback) => { + this.blockReadingQueue = cargo((tasks, next) => { this.processIncomingBlocks(tasks).then(() => { - callback(); + if (this.delay_block_processing) { + setTimeout(() => { + next(); + }, this.block_processing_delay); + } else { + next(); + } }).catch((err) => { console.log('FATAL ERROR READING BLOCKS', err); process.exit(1); @@ -176,6 +184,22 @@ export default class StateReader extends HyperionWorker { } break; } + case 'pause': { + this.allowRequests = false; + break; + } + case 'resume': { + this.allowRequests = true; + if (this.pendingRequest) { + this.processPending(); + } + break; + } + case 'set_delay': { + this.delay_block_processing = msg.data.state; + this.block_processing_delay = msg.data.delay; + break; + } case 'stop': { if (this.isLiveReader) { console.log('[LIVE READER] Closing Websocket'); @@ -471,7 +495,7 @@ export default class StateReader extends HyperionWorker { checkArr.push(this.manager.checkQueueSize(q)); } Promise.all(checkArr).then(data => { - if (data.some(el => el > this.conf.scaling.queue_limit)) { + if (data.some(el => el > this.conf.scaling.block_queue_limit)) { this.allowRequests = false; } else { this.allowRequests = true;