Skip to content

Commit

Permalink
load-from-near-lake -> load-from-source
Browse files Browse the repository at this point in the history
implementation based on fast-near

should save on S3 download costs
  • Loading branch information
vgrichina committed Sep 12, 2024
1 parent d46e8cf commit 624d491
Show file tree
Hide file tree
Showing 3 changed files with 1,754 additions and 48 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"@types/node": "^18.0.6",
"debug": "^4.3.2",
"fast-ipfs": "^0.2.0",
"fast-near": "^0.5.0",
"file-type": "^18.5.0",
"is-html": "^2.0.0",
"koa": "^2.14.1",
Expand Down
87 changes: 54 additions & 33 deletions scripts/load-from-near-lake.js → scripts/load-from-source.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,35 @@
const minimatch = require('minimatch');

const storage = require('../src/storage');
const { computeHash } = require('../src/util/hash');
const { blockStream } = require('../src/util/near-lake');
const { withTimeCounter, getCounters, resetCounters } = require('fast-near/utils/counters');

let totalMessages = 0;
let timeStarted = Date.now();

function formatDuration(milliseconds) {
let seconds = Math.floor((milliseconds / 1000) % 60);
let minutes = Math.floor((milliseconds / (1000 * 60)) % 60);
let hours = Math.floor((milliseconds / (1000 * 60 * 60)) % 24);
let days = Math.floor((milliseconds / (1000 * 60 * 60 * 24)));
return [days, hours, minutes, seconds].map(n => n.toString().padStart(2, '0')).join(':');
}

async function handleStreamerMessage(streamerMessage, options = {}) {
console.log('handleStreamerMessage.size', JSON.stringify(streamerMessage).length);
const { height: blockHeight, timestamp } = streamerMessage.block.header;
totalMessages++;
const speed = totalMessages * 1000 / (Date.now() - timeStarted);
const lagSeconds = (Date.now() - (timestamp / 1000000)) / 1000;
const estimatedSyncSeconds = lagSeconds / speed;
console.log(new Date(), `Block #${blockHeight} Shards: ${streamerMessage.shards.length}`,
`Speed: ${speed.toFixed(2)} blocks/second`,
`Lag: ${lagSeconds.toFixed(2)} seconds`,
`Fully synced in: ${estimatedSyncSeconds.toFixed(2)} seconds`);
`Lag: ${formatDuration(lagSeconds * 1000)}`,
`Fully synced in: ${formatDuration(estimatedSyncSeconds * 1000)}`);

const pipeline = [
dumpBlockReceipts,
].filter(Boolean);
await processBlockReceipts(streamerMessage, options);

if (pipeline.length === 0) {
console.warn('NOTE: No data output pipeline configured. Performing dry run.');
if (options.updateBlockHeight) {
await storage.writeLatestBlockHeight(blockHeight);
}

for (let fn of pipeline) {
await fn(streamerMessage, options);
}

await storage.writeLatestBlockHeight(blockHeight);
}

function parseRustEnum(enumObj) {
Expand All @@ -47,20 +45,19 @@ function parseRustEnum(enumObj) {
}
}

// TODO: Should be possible to parse from transactions directly when listening to network?

async function dumpBlockReceipts(streamerMessage, { include, exclude }) {
async function processBlockReceipts(streamerMessage, { include, exclude }) {
console.time('processBlockReceipts');
for (let shard of streamerMessage.shards) {
let { chunk } = shard;
if (!chunk) {
continue;
}
for (let { receipt, receiver_id } of chunk.receipts) {
if (include && include.find(pattern => !minimatch(receiver_id, pattern))) {
return;
continue;
}
if (exclude && exclude.find(pattern => minimatch(receiver_id, pattern))) {
return;
continue;
}

if (receipt.Action) {
Expand All @@ -81,6 +78,7 @@ async function dumpBlockReceipts(streamerMessage, { include, exclude }) {
}
}
}
console.timeEnd('processBlockReceipts');
}

async function loadStream(options) {
Expand All @@ -93,29 +91,42 @@ async function loadStream(options) {
limit,
include,
exclude,
updateBlockHeight,
source,
} = options;

const { readBlocks } = require(`fast-near/source/${source}`);

const defaultStartBlockHeight = parseInt(process.env.NEARFS_DEFAULT_START_BLOCK_HEIGHT || '0');
const start = startBlockHeight || await storage.readLatestBlockHeight() || defaultStartBlockHeight;

let blocksProcessed = 0;
for await (let streamerMessage of blockStream({
startAfter: startBlockHeight || await storage.readLatestBlockHeight() || defaultStartBlockHeight, // TODO: -1?
bucketName: bucketName || "near-lake-data-mainnet",
region: regionName || "eu-central-1",
endpoint,
pageSize: batchSize

for await (let streamerMessage of readBlocks({
startBlockHeight: start,
s3BucketName: bucketName || "near-lake-data-mainnet",
s3RegionName: regionName || "eu-central-1",
s3Endpoint: endpoint,
})) {
await handleStreamerMessage(streamerMessage, {
batchSize,
include,
exclude,
await withTimeCounter('handleStreamerMessage', async () => {
await handleStreamerMessage(streamerMessage, {
batchSize,
include,
exclude,
updateBlockHeight,
});
});

console.log('counters', getCounters());
resetCounters();

blocksProcessed++;
if (limit && blocksProcessed >= limit) {
break;
}
}

await storage.closeDatabase();
}

module.exports = {
Expand All @@ -127,9 +138,14 @@ if (require.main === module) {
const DEFAULT_BATCH_SIZE = 20;
const yargs = require('yargs/yargs');
yargs(process.argv.slice(2))
.command(['s3 [bucket-name] [start-block-height] [region-name] [endpoint]', '$0'],
'loads data from NEAR Lake S3 into other datastores',
.command(['[bucket-name] [start-block-height] [region-name] [endpoint]', '$0'],
'loads data from NEAR Lake S3 or other sources into other datastores',
yargs => yargs
.option('source', {
describe: 'Source of the data. Defaults to `neardata`.',
choices: ['redis-blocks', 'lake', 's3-lake', 'neardata'],
default: 'neardata'
})
.option('start-block-height', {
describe: 'block height to start loading from. By default starts from latest known block height or genesis.',
number: true
Expand All @@ -153,6 +169,11 @@ if (require.main === module) {
.option('limit', {
describe: 'How many blocks to fetch before stopping. Unlimited by default.',
number: true
})
.option('update-block-height', {
describe: 'update block height in storage',
boolean: true,
default: true
}),
loadStream)
.parse();
Expand Down
Loading

0 comments on commit 624d491

Please sign in to comment.