Skip to content

Commit

Permalink
Remove dependency on near-lake-framework
Browse files Browse the repository at this point in the history
  • Loading branch information
vgrichina committed Mar 17, 2024
1 parent 6457619 commit c2f437d
Show file tree
Hide file tree
Showing 4 changed files with 1,156 additions and 284 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"author": "",
"license": "ISC",
"dependencies": {
"@aws-sdk/client-s3": "^3.535.0",
"@aws-sdk/credential-providers": "^3.231.0",
"@koa/cors": "^4.0.0",
"@types/node": "^18.0.6",
Expand All @@ -34,7 +35,6 @@
"minio": "^7.1.3",
"multibase": "^4.0.6",
"near-api-js": "^0.45.1",
"near-lake-framework": "^1.0.6",
"node-fetch": "^3.2.6",
"timeout-signal": "^1.1.0",
"write-file-atomic": "^5.0.0",
Expand Down
24 changes: 12 additions & 12 deletions scripts/load-from-near-lake.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
const { stream } = require('near-lake-framework');
const minimatch = require('minimatch');

const storage = require('../src/storage');
const { computeHash } = require('../src/util/hash');
const { blockStream } = require('../src/util/near-lake');

let totalMessages = 0;
let timeStarted = Date.now();

async function handleStreamerMessage(streamerMessage, options = {}) {
console.log('handleStreamerMessage.size', JSON.stringify(streamerMessage).length);
const { height: blockHeight, timestamp } = streamerMessage.block.header;
totalMessages++;
const speed = totalMessages * 1000 / (Date.now() - timeStarted);
Expand Down Expand Up @@ -54,19 +55,19 @@ async function dumpBlockReceipts(streamerMessage, { include, exclude }) {
if (!chunk) {
continue;
}
for (let { predecessorId, receipt, receiptId, receiverId } of chunk.receipts) {
if (include && include.find(pattern => !minimatch(accountId, pattern))) {
for (let { receipt, receiver_id } of chunk.receipts) {
if (include && include.find(pattern => !minimatch(receiver_id, pattern))) {
return;
}
if (exclude && exclude.find(pattern => minimatch(accountId, pattern))) {
if (exclude && exclude.find(pattern => minimatch(receiver_id, pattern))) {
return;
}

if (receipt.Action) {
for (let action of receipt.Action.actions) {
const [, actionArgs] = parseRustEnum(action);

if (actionArgs.methodName === 'fs_store') {
if (actionArgs.method_name === 'fs_store') {
const data = Buffer.from(actionArgs.args, 'base64');
try {
const hash = await computeHash(data);
Expand Down Expand Up @@ -98,13 +99,12 @@ async function loadStream(options) {

const { fromEnv } = require("@aws-sdk/credential-providers");
let blocksProcessed = 0;
for await (let streamerMessage of stream({
credentials: fromEnv(),
startBlockHeight: startBlockHeight || await storage.readLatestBlockHeight() || defaultStartBlockHeight,
s3BucketName: bucketName || "near-lake-data-mainnet",
s3RegionName: regionName || "eu-central-1",
s3Endpoint: endpoint,
blocksPreloadPoolSize: batchSize
for await (let streamerMessage of blockStream({
startAfter: startBlockHeight || await storage.readLatestBlockHeight() || defaultStartBlockHeight, // TODO: -1?
bucketName: bucketName || "near-lake-data-mainnet",
region: regionName || "eu-central-1",
endpoint,
pageSize: batchSize
})) {
await handleStreamerMessage(streamerMessage, {
batchSize,
Expand Down
102 changes: 102 additions & 0 deletions src/util/near-lake.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
const {
S3Client,
ListObjectsV2Command,
GetObjectCommand,
} = require('@aws-sdk/client-s3');
const { fromEnv } = require('@aws-sdk/credential-providers');

// Setup keep-alive agents for AWS
const { NodeHttpHandler } = require('@smithy/node-http-handler');
const { Agent: HttpAgent } = require('http');
const { Agent: HttpsAgent } = require('https');
const httpAgent = new HttpAgent({ keepAlive: true });
const httpsAgent = new HttpsAgent({ keepAlive: true });

async function listObjects(client, { bucketName, startAfter, maxKeys }) {
return await client.send(
new ListObjectsV2Command({
Bucket: bucketName,
MaxKeys: maxKeys,
Delimiter: '/',
StartAfter: startAfter,
RequestPayer: 'requester',
})
);
}

async function getObject(client, { bucketName, key }) {
return await client.send(
new GetObjectCommand({
Bucket: bucketName,
Key: key,
RequestPayer: 'requester',
})
);
}

function normalizeBlockHeight(number) {
return number.toString().padStart(12, '0');
}

async function asBuffer(readable) {
const chunks = [];
for await (const chunk of readable) {
chunks.push(chunk);
}
return Buffer.concat(chunks);
}

async function* blockNumbersStream(client, bucketName, startAfter, limit, pageSize = 250) {
let listObjectsResult;
const endAt = startAfter + limit;
do {
listObjectsResult = await listObjects(client, { bucketName, startAfter: normalizeBlockHeight(startAfter), maxKeys: pageSize });
const blockNumbers = (listObjectsResult.CommonPrefixes || []).map((p) => parseInt(p.Prefix.split('/')[0]));

for (const blockNumber of blockNumbers) {
if (parseInt(blockNumber, 10) >= endAt) {
return;
}

yield blockNumber;
}

startAfter = blockNumbers[blockNumbers.length - 1] + 1;
} while (listObjectsResult.IsTruncated);
}

async function* blockStream({ bucketName, region, endpoint, startAfter, limit, pageSize }) {
const client = new S3Client({
credentials: fromEnv(),
region,
endpoint,
requestHandler: new NodeHttpHandler({
httpAgent,
httpsAgent,
}),
maxAttempts: 3,
});

async function getFile(fileName, blockNumber) {
const blockHeight = normalizeBlockHeight(blockNumber);
const blockResponse = await getObject(client, { bucketName, key: `${blockHeight}/${fileName}` });
const data = await asBuffer(blockResponse.Body);
return { data, blockHeight };
}

try {
for await (const blockNumber of blockNumbersStream(client, bucketName, startAfter, limit, pageSize)) {
const block = JSON.parse((await getFile('block.json', blockNumber)).data.toString());
const result = { block, shards: [] };
for (let shard = 0; shard < block.chunks.length; shard++) {
const chunk = JSON.parse((await getFile(`shard_${shard}.json`, blockNumber)).data.toString());
result.shards.push( chunk );
}
yield result;
}
} finally {
client.destroy();
}
}

module.exports = { blockStream };
Loading

0 comments on commit c2f437d

Please sign in to comment.