Skip to content

Commit 3a1512a

Browse files
author
sachin-maheshwari
authored
Merge pull request #233 from topcoder-platform/develop
migrate small chunks for v5 challenges (submissions) in the DB and change in ES accordingly.
2 parents 6a138fe + 1e15f23 commit 3a1512a

File tree

2 files changed

+115
-2
lines changed

2 files changed

+115
-2
lines changed

config/default.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@ module.exports = {
3636
},
3737
PAGE_SIZE: process.env.PAGE_SIZE || 20,
3838
MAX_PAGE_SIZE: parseInt(process.env.MAX_PAGE_SIZE) || 100,
39-
ES_BATCH_SIZE: process.env.ES_BATCH_SIZE || 250,
39+
ES_BATCH_SIZE: process.env.ES_BATCH_SIZE || 1000,
4040
UPDATE_V5_CHALLENGE_BATCH_SIZE: process.env.UPDATE_V5_CHALLENGE_BATCH_SIZE || 100,
4141
SUBMISSION_TABLE_NAME: process.env.SUBMISSION_TABLE_NAME || 'Submission',
4242
AUTH0_PROXY_SERVER_URL: process.env.AUTH0_PROXY_SERVER_URL,
4343
FETCH_CREATED_DATE_START: process.env.FETCH_CREATED_DATE_START || '2021-01-01',
44-
FETCH_PAGE_SIZE: process.env.FETCH_PAGE_SIZE || 500
44+
FETCH_PAGE_SIZE: process.env.FETCH_PAGE_SIZE || 500,
45+
MIGRATE_CHALLENGES: process.env.MIGRATE_CHALLENGES || []
4546
}
+112
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/**
2+
* Store v5 challenge id for current records
3+
*/
4+
5+
const _ = require('lodash')
6+
const co = require('co')
7+
const config = require('config')
8+
const logger = require('../src/common/logger')
9+
const dbhelper = require('../src/common/dbhelper')
10+
const helper = require('../src/common/helper')
11+
12+
const esClient = helper.getEsClient()
13+
14+
/**
15+
* Update Submission's challenge id to v5
16+
* @param {Object} submission The submission record
17+
* @param {Array} failedContainer The failed records container
18+
* @returns {Promise}
19+
*/
20+
function* updateRecord(submission, failedContainer) {
21+
let v5challengeId
22+
try {
23+
v5challengeId = yield helper.getV5ChallengeId(submission.challengeId)
24+
} catch (err) {
25+
logger.error(`fetching the details of the challenge(${submission.challengeId}) failed, ${err.message}`)
26+
failedContainer.push(submission)
27+
return
28+
}
29+
const record = {
30+
TableName: 'Submission',
31+
Key: {
32+
id: submission.id
33+
},
34+
UpdateExpression: `set challengeId = :c, legacyChallengeId = :l`,
35+
ExpressionAttributeValues: {
36+
':c': v5challengeId,
37+
':l': submission.challengeId
38+
}
39+
}
40+
if (!v5challengeId) {
41+
logger.warn(`the challengeId: ${submission.challengeId} is not having a v5 challengeId`)
42+
failedContainer.push(submission)
43+
return
44+
} else if (v5challengeId === submission.challengeId) {
45+
logger.info(`the challengeId: ${submission.challengeId} is already a v5 challengeId`)
46+
}
47+
48+
yield dbhelper.updateRecord(record)
49+
try {
50+
const response = yield esClient.update({
51+
index: config.get('esConfig.ES_INDEX'),
52+
type: config.get('esConfig.ES_TYPE'),
53+
id: submission.id,
54+
body: { doc: { challengeId: v5challengeId, legacyChallengeId: submission.challengeId } }
55+
})
56+
logger.info(`updated ES for submission ${submission.id}, response: ${JSON.stringify(response)}`)
57+
} catch (error) {
58+
logger.error(error.message)
59+
}
60+
}
61+
62+
/*
63+
* Update all submission's challenge id to v5
64+
* @returns {Promise}
65+
*/
66+
function* updateRecords() {
67+
const tableName = config.SUBMISSION_TABLE_NAME
68+
const promises = []
69+
const failedRecords = []
70+
const legacyChallengeIds = JSON.parse(config.MIGRATE_CHALLENGES)
71+
const queryParams = _.fromPairs(_.map(legacyChallengeIds, (c, i) => [`:challengeId${i}`, c]))
72+
const params = {
73+
TableName: tableName,
74+
FilterExpression: `#challengeId IN (${_.join(_.keys(queryParams), ',')})`,
75+
ExpressionAttributeNames: {
76+
'#challengeId': 'challengeId'
77+
},
78+
ExpressionAttributeValues: queryParams
79+
}
80+
// Process until all the records from DB is fetched
81+
while (true) {
82+
const records = yield dbhelper.scanRecords(params)
83+
const totalRecords = records.Items.length
84+
logger.debug(`Number of ${tableName}s fetched from DB - ${totalRecords}. More fetch iterations may follow (pagination in progress)`)
85+
for (let i = 0; i < totalRecords; i++) {
86+
const record = records.Items[i]
87+
promises.push(updateRecord(record, failedRecords))
88+
}
89+
// Continue fetching the remaining records from Database
90+
if (typeof records.LastEvaluatedKey !== 'undefined') {
91+
params.ExclusiveStartKey = records.LastEvaluatedKey
92+
} else {
93+
break // If there are no more records to process, exit the loop
94+
}
95+
}
96+
logger.debug(`All records fetched. Proceeding to update them in batches of ${config.UPDATE_V5_CHALLENGE_BATCH_SIZE}`)
97+
const paraRecords = _.chunk(promises, config.UPDATE_V5_CHALLENGE_BATCH_SIZE)
98+
for (const rs of paraRecords) {
99+
yield rs
100+
}
101+
logger.info(`Processed ${promises.length - failedRecords.length} records successfully`)
102+
if (failedRecords.length > 0) {
103+
logger.warn(`Processing of ${failedRecords.length} records failed`)
104+
logger.info(`Failed records: ${_.join(_.map(failedRecords, f => JSON.stringify(_.pick(f, ['id', 'challengeId'])), ','))}`)
105+
}
106+
}
107+
108+
co(function* () {
109+
yield updateRecords()
110+
}).catch((err) => {
111+
logger.logFullError(err)
112+
})

0 commit comments

Comments
 (0)