From 227d4e60d803be327550bc5f5175c0f51c572641 Mon Sep 17 00:00:00 2001 From: Reggie Cushing Date: Thu, 30 May 2024 12:00:29 +0200 Subject: [PATCH] fix race condition: user queues while waiting in agreement --- UserDb.js | 39 +++++++--- agreement.js | 2 +- local-queue.js | 31 ++++++++ package-lock.json | 12 ++- package.json | 1 + schedulers/gat-scheduler.js | 19 ++++- server.js | 85 +++++++++++++------- server_test/test_virtual_user.js | 43 ----------- server_test/virtual-user-ws.js | 129 ++++++++++++++++++++----------- user.js | 16 ++-- 10 files changed, 239 insertions(+), 138 deletions(-) delete mode 100644 server_test/test_virtual_user.js diff --git a/UserDb.js b/UserDb.js index fad6558..5b56b2c 100644 --- a/UserDb.js +++ b/UserDb.js @@ -3,7 +3,7 @@ const murmurhash = require("murmurhash") const fs = require("fs").promises const User = require("./user.js") -let writeFlag = false +const { SHA1 } = require("crypto-js") class UserDb extends Map { constructor(file) { @@ -11,6 +11,8 @@ class UserDb extends Map { this.seed = 42 this.file = file this.lastHash = 0 + this.writeCounter = 0 + this.forcedSave = 0 } load() { let data = [] @@ -56,7 +58,31 @@ class UserDb extends Map { return JSON.stringify(data) } - save() { + save(){ + this.writeCounter += 1 + const currentCounter = this.writeCounter + setTimeout(() => { + if ( (currentCounter != this.writeCounter) || (this.writeCounter === 0) ){ + return + } + this.#save() + }, 500) + } + forceSave(){ + if (this.forcedSave > 0) { + return + } + this.forcedSave = 1 + const data = [] + this.forEach((v, k) => { + data.push(v.serialize()) + }) + const dump = JSON.stringify(data, null, 2) + + console.log("Saving DB: ", dump) + return fs.writeFile(this.file, dump) + } + #save() { const data = [] this.forEach((v, k) => { data.push(v.serialize()) @@ -64,19 +90,14 @@ class UserDb extends Map { const dump = JSON.stringify(data, null, 2) const h = murmurhash(dump, this.seed) - console.log(`${h}:${this.lastHash}:${dump}`) + //console.log(`${h}:${this.lastHash}:${dump}`) if (this.lastHash !== h) { - if (writeFlag) { - console.log('WARN FILe STILL WRITING') - } - writeFlag = true fs.writeFile(this.file, dump) .then(() => { - writeFlag = false this.lastHash = h + this.writeCounter = 0 }) .catch((err) => { - writeFlag = false console.error("[ERROR] Writing UserDb to file.") }) } diff --git a/agreement.js b/agreement.js index eed116d..2ba6536 100644 --- a/agreement.js +++ b/agreement.js @@ -7,7 +7,7 @@ class Agreement { this.urls = urls this.server = server this.state = "new" - this.timeout = timeout || 30 + this.timeout = timeout || 3 } startTimeout(fn) { diff --git a/local-queue.js b/local-queue.js index 041e7e2..9db4ac6 100644 --- a/local-queue.js +++ b/local-queue.js @@ -1,8 +1,11 @@ +var Mutex = require('async-mutex').Mutex // A LocalQueue implementation class LocalQueue { constructor(queueName) { this.name = queueName this.queue = [] + this.mutex = new Mutex() + this.simple_lock = 0 } push(item) { @@ -16,6 +19,34 @@ class LocalQueue { this.queue.length = 0 } + lock() { + this.simple_lock = 1 + // return this.mutex.acquire() + } + + unlock() { + this.simple_lock = 0 + // this.mutex.release() + } + + wait() { + return new Promise((resolve, reject) => { + // this.mutex.waitForUnlock().then(() => { + // this.lock().then(() => { + // resolve() + // }) + // }) + const testLockInterval = setInterval(() => { + if (this.simple_lock > 0) { + return + } + this.lock() + clearInterval(testLockInterval) + resolve() + }, 200) + }) + } + pushAndGetQueue(item) { this.push(item) return this.queue diff --git a/package-lock.json b/package-lock.json index 0154917..be5d6d3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "0.1.0", "license": "ISC", "dependencies": { + "async-mutex": "^0.5.0", "axios": "^1.5.0", "commander": "^12.0.0", "crypto-js": "^4.2.0", @@ -4088,6 +4089,14 @@ "integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==", "dev": true }, + "node_modules/async-mutex": { + "version": "0.5.0", + "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.5.0.tgz", + "integrity": "sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==", + "dependencies": { + "tslib": "^2.4.0" + } + }, "node_modules/asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", @@ -9921,8 +9930,7 @@ "node_modules/tslib": { "version": "2.6.2", "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", - "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==", - "dev": true + "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" }, "node_modules/tsutils": { "version": "3.21.0", diff --git a/package.json b/package.json index f26f97a..858bfbd 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,7 @@ "author": "", "license": "ISC", "dependencies": { + "async-mutex": "^0.5.0", "axios": "^1.5.0", "commander": "^12.0.0", "crypto-js": "^4.2.0", diff --git a/schedulers/gat-scheduler.js b/schedulers/gat-scheduler.js index 1e5f864..91cf0df 100644 --- a/schedulers/gat-scheduler.js +++ b/schedulers/gat-scheduler.js @@ -17,11 +17,13 @@ class GatScheduler { /* * @type user {User} */ - queueUser(user) { + async queueUser(user) { + await this.queue.wait() // Queue user object which allows schedulers to decide on additional parameters // passed on in token like age and gender. //console.log(`GAT queued user: ${user.userId} params ${JSON.stringify(user.tokenParams)}`) this.queue.push(user) + this.queue.unlock() } resetQueue() { @@ -29,6 +31,12 @@ class GatScheduler { this.queue.reset() } + getQueuedUserIds() { + return this.queue.getQueue().map((u) => { + return u.userId + }) + } + playersToWaitFor() { return Math.max(0, this.min - this.queue.size()) } @@ -64,7 +72,8 @@ class GatScheduler { return Object.values(experiment.servers).some((s) => s.length >= this.min) } - checkConditionAndReturnUsers(experiments, usedUrls) { + async checkConditionAndReturnUsers(experiments, usedUrls) { + await this.queue.wait() const queueSize = this.queue.size() const falseCondition = { condition: false, @@ -80,6 +89,7 @@ class GatScheduler { experiments[this.experimentName].servers ) ) { + this.queue.unlock() return falseCondition } @@ -100,18 +110,22 @@ class GatScheduler { continue } + console.log(`before: ${JSON.stringify(this.getQueuedUserIds())}`) const users = this.queue.pop(this.min) for (const user of users) { while (true) { const url = serverUrls.pop() if (!usedUrls.has(url)) { user.redirectedUrl = url + user.changeState("waitAgreement") usedUrls.add(url) break } serverUrls.unshift(url) } } + console.log(`after: ${JSON.stringify(this.getQueuedUserIds())}`) + this.queue.unlock() return { condition: true, users: users, @@ -120,6 +134,7 @@ class GatScheduler { } } + this.queue.unlock() return falseCondition } } diff --git a/server.js b/server.js index 7a56049..f5c22a4 100644 --- a/server.js +++ b/server.js @@ -41,10 +41,6 @@ const port = options.port || "8060" const userDbFile = options.dbFile || "./data/userdb.json" //const publicKey = fs.readFileSync("./public-key.pem", "utf8") // -process.on("SIGINT", function () { - console.log("\nGracefully shutting down from SIGINT (Ctrl-C)") - process.exit(0) -}) /** * * @type {Set} @@ -60,6 +56,14 @@ if (options.resetDb && fs.existsSync(userDbFile)) { */ const usersDb = new UserDb(userDbFile) +process.on("SIGINT", async () => { + console.log("\nGracefully shutting down from SIGINT (Ctrl-C)") + await usersDb.forceSave() + setTimeout(() => { + process.exit(0) + }, 1000) +}) + function getOrSetValue(obj, key, defaultValue) { if (!(key in obj)) { obj[key] = defaultValue @@ -121,6 +125,8 @@ function getOtreeUrls(otreeIPs, otreeRestKey) { // Put back experiment urls function revertUrls(exp, urls, serverKey) { + // console.log(`reverting ${urls}`) + // console.log(`before ${JSON.stringify(Array.from(usedUrls))} ${exp.servers[serverKey].length}`) urls.forEach((url) => { usedUrls.delete(url) if (exp.servers[serverKey].includes(url)) { @@ -128,6 +134,7 @@ function revertUrls(exp, urls, serverKey) { } exp.servers[serverKey].push(url) }) + // console.log(`after ${JSON.stringify(Array.from(usedUrls))} ${exp.servers[serverKey].length}`) } function lastElement(arr) { @@ -242,14 +249,14 @@ function agreeGame(users, uuid, agreement, usersDb) { const user = usersDb.get(compoundKey) const sock = user.webSocket if (!sock) { - console.error(`User ${userId} has not socket!`) + console.error(`User ${user.userId} has not socket!`) return } sock.emit("agree", { uuid: uuid, timeout: agreement.timeout }) } } -function startReadyGames(experiments, agreementIds, usersDb) { +async function startReadyGames(experiments, agreementIds, usersDb) { for (const [experimentId, _] of Object.entries(experiments)) { const experiment = experiments[experimentId] const scheduler = experiment.scheduler @@ -265,7 +272,7 @@ function startReadyGames(experiments, agreementIds, usersDb) { let canMaybeScheduleNext = true while (canMaybeScheduleNext) { canMaybeScheduleNext = false - const conditionObject = scheduler.checkConditionAndReturnUsers( + const conditionObject = await scheduler.checkConditionAndReturnUsers( experiments, usedUrls, ) @@ -273,7 +280,7 @@ function startReadyGames(experiments, agreementIds, usersDb) { if (conditionObject.condition) { //console.log(conditionObject) canMaybeScheduleNext = true - console.log("Enough users; waiting for agreement.") + //console.log("Enough users; waiting for agreement.") // Generate an agreement object which // waits for all users to 'agree' and // proceed to the game together @@ -288,6 +295,7 @@ function startReadyGames(experiments, agreementIds, usersDb) { conditionObject.users.map((u) => u.redirectedUrl), conditionObject.server, ) + console.log(`New agreement: ${agreement.agreementId} ${JSON.stringify(gameUsersIds)}.`) agreementIds[uuid] = agreement agreeGame(gameUsersIds, uuid, agreement, usersDb) // Agreement timeout function @@ -309,7 +317,7 @@ function startReadyGames(experiments, agreementIds, usersDb) { //agreedUsersIds.forEach((userId) => { agreedUsersIds.forEach((compoundKey) => { // const compoundKey = `${userId}:${agreement.experimentId}` - console.log(`agree: ${compoundKey}`) + //console.log(`agree: ${compoundKey}`) const user = usersDb.get(compoundKey) user.changeState("queued") }) @@ -317,9 +325,9 @@ function startReadyGames(experiments, agreementIds, usersDb) { nonAgreedUsersIds.forEach((compoundKey) => { // usersDb.dump() // const compoundKey = `${userId}:${agreement.experimentId}` - console.log(`Non agree: ${compoundKey}`) + //console.log(`Non agree: ${compoundKey}`) const user = usersDb.get(compoundKey) - user.reset() + user.reset(2) }) } }, @@ -332,10 +340,10 @@ function startReadyGames(experiments, agreementIds, usersDb) { conditionObject.waitForCount === 0 && conditionObject.server === null ) { - console.warn(`[WARNING] Experiment ${experimentId} ran out of slots!`) + //console.warn(`[WARNING] Experiment ${experimentId} ran out of slots!`) scheduler.resetQueue() conditionObject.users.forEach((user) => { - user.reset() + user.reset(3) }) } } @@ -393,7 +401,7 @@ async function main() { process.exit(1) } - // Maybe only check for every new user queued instead on interval. + // // Maybe only check for every new user queued instead on interval. // setInterval(async () => { // await getExperimentUrls(experiments) // startReadyGames(experiments, agreementIds, usersDb) @@ -511,8 +519,7 @@ async function main() { ) app.get("/api/experiments", validateSignature, async (req, res) => { - console.log("HEADERS: ", req.headers) - + //console.log("HEADERS: ", req.headers) res.status(201).json(experiments) }) @@ -539,7 +546,7 @@ async function main() { usersDb.get(compoundKey) || new User(userId, params.experimentId) user.tokenParams = params usersDb.set(compoundKey, user) - console.log(`Token params: ${JSON.stringify(user.tokenParams)}`) + //console.log(`Token params: ${JSON.stringify(user.tokenParams)}`) if ( fs.existsSync( __dirname + "/webpage_templates/" + params.experimentId + ".html", @@ -566,19 +573,23 @@ async function main() { // queued events. switch (user.state) { case "queued": - user.changeState("queued") + //user.changeState("queued") break case "inoTreePages": - console.log(`RE-REDIRECT ${userId}.`) - const expUrl = user.redirectedUrl - socket.emit("gameStart", { room: expUrl.toString() }) + //console.log(`RE-REDIRECT ${userId}.`) + //const expUrl = user.redirectedUrl + //socket.emit("gameStart", { room: expUrl.toString() }) + break + case "agreed": + break + case "waitAgreement": break default: user.changeState("startedPage") - user.reset() + user.reset(1) usersDb.set(compoundKey, user) console.log( - `User ${userId} connected for experiment ${experimentId}.`, + `User ${userId} in state ${user.state} connected for experiment ${experimentId}.`, ) } }) @@ -594,10 +605,24 @@ async function main() { } user.webSocket = socket if (user.state === "queued") { - console.log(`User ${userId} already queued`) + //console.log(`User ${userId} already queued`) + return + } + if (user.state === "waitAgreement") { + // User is waiting in an agreement + return + } + if (user.state === "agreed") { + // User is waiting in an agreement + return + } + if (user.state === "inoTreePages") { + //console.log(`User ${userId} already already redirected`) + socket.emit("gameStart", { room: user.redirectedUrl.toString() }) return } + console.log(`NEw user: ${user.userId} ${user.state}`) user.addListenerForState("queued", async (user, state) => { const userId = user.userId const experimentId = user.experimentId @@ -608,14 +633,14 @@ async function main() { ) } const scheduler = experiment.scheduler - scheduler.queueUser(user) + await scheduler.queueUser(user) // Condition object returns // { // condition: true|false // users: [] of type Users // waitForCount: int // } - console.log(`User ${userId} in event listener in state ${state}`) + //console.log(`User ${userId} in event listener in state ${state}`) user.webSocket.emit("wait", { playersToWaitFor: scheduler.playersToWaitFor(), @@ -643,7 +668,7 @@ async function main() { usersDb.get(compoundKey).changeState("agreed") // If everyone agrees, start game if (agreement.agree(compoundKey)) { - console.log("Start Game!") + console.log(`Start Game! agreement ${agreement.agreementId}.`) startGame(agreement.agreedUsers, agreement.urls, agreement.experimentId, agreement.agreementId) } }) @@ -685,12 +710,12 @@ async function main() { // Emit a custom event with the game room URL user.experimentUrl = expUrl user.groupId = agreementId - user.redirectedUrl = `${expUrl}?participant_label=${user.userId}&group_id=${user.groupId}` - sock.emit("gameStart", { room: user.redirectedUrl }) + user.redirectedUrl = `${expUrl}?participant_label=${user.userId}` user.changeState("inoTreePages") + sock.emit("gameStart", { room: user.redirectedUrl }) console.log(`Redirecting user ${user.userId} to ${user.redirectedUrl}`) - //We do not need to update vars on oTree anymore since they are not comming + //We do not need to update vars on oTree anymore since they are not coming //through the url encoding // // const apiUrl = `http://${expUrl.host}/api/participant_vars/${participantCode}` diff --git a/server_test/test_virtual_user.js b/server_test/test_virtual_user.js deleted file mode 100644 index dd029c7..0000000 --- a/server_test/test_virtual_user.js +++ /dev/null @@ -1,43 +0,0 @@ -const VirtualUser = require('./virtual-user-ws') -const fs = require('fs') - -function randomBetween(min, max) { - return Math.floor( - Math.random() * (max - min) + min - ) -} - -const maxUsers = 1000 -const experimentId = "public_goods_game" -const url = "http://localhost:8060" -const virtUsers = {} - -for (let i = 0; i < maxUsers; i++) { - const id = 1000 + i - //const id = randomBetween(1, 9999999) - vu = new VirtualUser(id, experimentId, url) - virtUsers[id] = vu -} - -Object.values(virtUsers).forEach(vu => { - vu.connect().then(() => { - vu.attemptNormalQueueFlow() - }) -}) - -setTimeout(() => { - const ids = Object.values(virtUsers).filter(vu => { - if (vu.state == "redirected"){ - return true - } - }).map(vu => { - return vu.userId - }) - const idsString = JSON.stringify(ids, null, 2) - fs.writeFile('redirected_users.json', idsString, (err) => { - if (err) { - console.log('Error ', err) - } - }) -}, 5000) - diff --git a/server_test/virtual-user-ws.js b/server_test/virtual-user-ws.js index 15fd14c..eb0c9e3 100644 --- a/server_test/virtual-user-ws.js +++ b/server_test/virtual-user-ws.js @@ -1,88 +1,127 @@ +const { weightSrvRecords } = require('ioredis/built/cluster/util') const io = require('socket.io-client') -// let socket = null -// -// module.exports = { -// init: async (serverUrl, userId) => { -// socket = io('http://localhost:8060') -// socket.on('connect', () => { -// console.log(`User ${userId} connected to ${serverUrl}`) -// }) -// }, -// queue: async (experimentId) => { -// socket.on('wait', (data) => { -// console.log(`User ${userId} in wait state.`) -// }) -// socket.on("reset", (data) => { -// console.log(`User ${userId} in reset.`) -// }) -// socket.emit("landingPage", { -// experimentId: experimentId, -// userId: userId, -// }) -// } -// } -// class VirtualUser { constructor(userId, experimentId, serverUrl){ this.userId = userId this.experimentId = experimentId this.serverUrl = serverUrl + this.redirectUrl = null this.state = "new" + this.flag = 0 } connect(serverUrl) { return new Promise((resolve,reject) => { this.serverUrl = this.serverUrl || serverUrl this.socket = io(this.serverUrl) this.socket.on('connect', () => { + this.flag = 0 console.log(`[${this.userId}] connected to ${this.serverUrl}`) resolve() }) }) } - attemptNormalQueueFlow(){ + #setupSocketEvents(){ + if (this.state == "redirected") { + return + } if (!this.socket) { console.error(`[${this.userId}] socket null!!`) return } this.socket.on("wait", (data) => { + this.flag = 0 this.state = "queued" }) - this.socket.on("reset", (data) => { - this.state = "startedPage" - }) this.socket.on("queueUpdate" ,(data) => { + this.flag = 0 // Nothing to do }) - this.socket.on("agree", (data) => { - const uuid = data.uuid - // Choose to agree or not - this.socket.emit("userAgreed", { - experimentId: this.experimentId, - userId: this.userId, - uuid: uuid, - }) - this.state = "agreed" - }) this.socket.on("gameStart", (data) => { + this.flag = 0 console.log(`[${this.userId}] redirected to ${data.room}.`) this.state = "redirected" + this.redirectUrl = data.room + this.socket.close() }) this.socket.on("disconnect", () => { + this.flag = 0 console.log(`[${this.userId}] disconnected from ${this.serverUrl}`) this.socket = null this.state = "disconnected" }) - this.socket.emit("landingPage", { - experimentId: this.experimentId, - userId: this.userId + + } + #goToLandingPage(){ + if (this.state == "redirected") { + return + } + console.log(`${this.userId} emmiting landingPage`) + this.socket.emit("landingPage", { + experimentId: this.experimentId, + userId: this.userId + }) + console.log(`${this.userId} emmiting newUser`) + this.socket.emit("newUser", { + experimentId: this.experimentId, + userId: this.userId + }) + const that = this + const intervatl = setInterval(() => { + if ((that.state == "redirected") || (!that.socket)) { + clearInterval(intervatl) + return + } + if (that.flag > 0) { + that.socket.emit("newUser", { + experimentId: that.experimentId, + userId: that.userId + }) + } else { + that.flag += 1 + } + }, 1000) + + } + + #flipCoin() { + // Generate a random number between 0 and 1 + const randomNumber = Math.random(); + // Return "Heads" if the number is less than 0.5, otherwise return "Tails" + return (randomNumber < 0.5) + } + + attemptQueueFlow(random){ + this.#setupSocketEvents() + + this.socket.on("agree", (data) => { + this.flag = 0 + if (random && this.#flipCoin()) { + console.log(`${this.userId} ignoring agreement`) + return + } + console.log(`${this.userId} accepting agreement`) + const uuid = data.uuid + // Choose to agree or not + this.socket.emit("userAgreed", { + experimentId: this.experimentId, + userId: this.userId, + uuid: uuid, + }) + this.state = "agreed" }) - this.state = "startedPage" - this.socket.emit("newUser", { - experimentId: this.experimentId, - userId: this.userId + + this.socket.on("reset", (data) => { + this.flag = 0 + this.state = "startedPage" + console.log(`${this.userId} emmiting newUser`) + this.socket.emit("newUser", { + experimentId: this.experimentId, + userId: this.userId + }) }) - } + this.#goToLandingPage() + }//attemptQueueFlow } module.exports = VirtualUser diff --git a/user.js b/user.js index 37602f1..32c1fcf 100644 --- a/user.js +++ b/user.js @@ -30,9 +30,10 @@ class User { // DFA transition table transitionTable = { new: ["startedPage"], - startedPage: ["queued", "droppedOut"], - queued: ["queued", "agreed", "droppedOut"], - agreed: ["redirected", "timedOut", "queued", "droppedOut"], + startedPage: ["startedPage", "queued", "droppedOut"], + queued: ["queued", "agreed", "droppedOut", "waitAgreement"], + waitAgreement: ["waitAgreement", "queued", "agreed"], + agreed: ["agreed", "redirected", "timedOut", "queued", "droppedOut"], redirected: ["inoTreePages", "droppedOut"], inoTreePages: ["inoTreePages", "oTreeCompleted", "oTreeDroppedOut"], oTreeCompleted: ["final", "allowedBack", "droppedOut"], @@ -41,7 +42,10 @@ class User { droppedOut: ["allowedBack", "nonAllowedBack"], } - reset() { + reset(x) { + if (this.state === "inoTreePages") { + console.log(`${x} WARNING resetting ${JSON.stringify(this.serialize(), null, 2)}`) + } this.state = "startedPage" this.timestamp = new Date().toISOString() this.listeners = [] @@ -86,10 +90,10 @@ class User { // Valid transition this.state = action this.timestamp = new Date().toISOString() - console.log(`${this.userId}'s state has been changed to ${this.state}`) + //console.log(`${this.userId}'s state has been changed to ${this.state}`) this.notifyListeners(this.state) // Notify listeners about the state change } else { - console.log("Invalid state transition. State not changed.") + console.log(`[${this.userId}] Invalid state transition. ${this.state} -> ${action}. State not changed.`) } } }