Skip to content

Commit

Permalink
fix race condition: user queues while waiting in agreement
Browse files Browse the repository at this point in the history
  • Loading branch information
recap committed May 30, 2024
1 parent 1403ffc commit 227d4e6
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 138 deletions.
39 changes: 30 additions & 9 deletions UserDb.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
const murmurhash = require("murmurhash")
const fs = require("fs").promises
const User = require("./user.js")
let writeFlag = false
const { SHA1 } = require("crypto-js")

class UserDb extends Map {
constructor(file) {
super()
this.seed = 42
this.file = file
this.lastHash = 0
this.writeCounter = 0
this.forcedSave = 0
}
load() {
let data = []
Expand Down Expand Up @@ -56,27 +58,46 @@ class UserDb extends Map {

return JSON.stringify(data)
}
save() {
save(){
this.writeCounter += 1
const currentCounter = this.writeCounter
setTimeout(() => {
if ( (currentCounter != this.writeCounter) || (this.writeCounter === 0) ){
return
}
this.#save()
}, 500)
}
forceSave(){
if (this.forcedSave > 0) {
return
}
this.forcedSave = 1
const data = []
this.forEach((v, k) => {
data.push(v.serialize())
})
const dump = JSON.stringify(data, null, 2)

console.log("Saving DB: ", dump)
return fs.writeFile(this.file, dump)
}
#save() {
const data = []
this.forEach((v, k) => {
data.push(v.serialize())
})

const dump = JSON.stringify(data, null, 2)
const h = murmurhash(dump, this.seed)
console.log(`${h}:${this.lastHash}:${dump}`)
//console.log(`${h}:${this.lastHash}:${dump}`)
if (this.lastHash !== h) {
if (writeFlag) {
console.log('WARN FILe STILL WRITING')
}
writeFlag = true
fs.writeFile(this.file, dump)
.then(() => {
writeFlag = false
this.lastHash = h
this.writeCounter = 0
})
.catch((err) => {
writeFlag = false
console.error("[ERROR] Writing UserDb to file.")
})
}
Expand Down
2 changes: 1 addition & 1 deletion agreement.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class Agreement {
this.urls = urls
this.server = server
this.state = "new"
this.timeout = timeout || 30
this.timeout = timeout || 3
}

startTimeout(fn) {
Expand Down
31 changes: 31 additions & 0 deletions local-queue.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
var Mutex = require('async-mutex').Mutex
// A LocalQueue implementation
class LocalQueue {
constructor(queueName) {
this.name = queueName
this.queue = []
this.mutex = new Mutex()
this.simple_lock = 0
}

push(item) {
Expand All @@ -16,6 +19,34 @@ class LocalQueue {
this.queue.length = 0
}

lock() {
this.simple_lock = 1
// return this.mutex.acquire()
}

unlock() {
this.simple_lock = 0
// this.mutex.release()
}

wait() {
return new Promise((resolve, reject) => {
// this.mutex.waitForUnlock().then(() => {
// this.lock().then(() => {
// resolve()
// })
// })
const testLockInterval = setInterval(() => {
if (this.simple_lock > 0) {
return
}
this.lock()
clearInterval(testLockInterval)
resolve()
}, 200)
})
}

pushAndGetQueue(item) {
this.push(item)
return this.queue
Expand Down
12 changes: 10 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"author": "",
"license": "ISC",
"dependencies": {
"async-mutex": "^0.5.0",
"axios": "^1.5.0",
"commander": "^12.0.0",
"crypto-js": "^4.2.0",
Expand Down
19 changes: 17 additions & 2 deletions schedulers/gat-scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,26 @@ class GatScheduler {
/*
* @type user {User}
*/
queueUser(user) {
async queueUser(user) {
await this.queue.wait()
// Queue user object which allows schedulers to decide on additional parameters
// passed on in token like age and gender.
//console.log(`GAT queued user: ${user.userId} params ${JSON.stringify(user.tokenParams)}`)
this.queue.push(user)
this.queue.unlock()
}

resetQueue() {
// Empties the Queue
this.queue.reset()
}

getQueuedUserIds() {
return this.queue.getQueue().map((u) => {
return u.userId
})
}

playersToWaitFor() {
return Math.max(0, this.min - this.queue.size())
}
Expand Down Expand Up @@ -64,7 +72,8 @@ class GatScheduler {
return Object.values(experiment.servers).some((s) => s.length >= this.min)
}

checkConditionAndReturnUsers(experiments, usedUrls) {
async checkConditionAndReturnUsers(experiments, usedUrls) {
await this.queue.wait()
const queueSize = this.queue.size()
const falseCondition = {
condition: false,
Expand All @@ -80,6 +89,7 @@ class GatScheduler {
experiments[this.experimentName].servers
)
) {
this.queue.unlock()
return falseCondition
}

Expand All @@ -100,18 +110,22 @@ class GatScheduler {
continue
}

console.log(`before: ${JSON.stringify(this.getQueuedUserIds())}`)
const users = this.queue.pop(this.min)
for (const user of users) {
while (true) {
const url = serverUrls.pop()
if (!usedUrls.has(url)) {
user.redirectedUrl = url
user.changeState("waitAgreement")
usedUrls.add(url)
break
}
serverUrls.unshift(url)
}
}
console.log(`after: ${JSON.stringify(this.getQueuedUserIds())}`)
this.queue.unlock()
return {
condition: true,
users: users,
Expand All @@ -120,6 +134,7 @@ class GatScheduler {
}
}

this.queue.unlock()
return falseCondition
}
}
Expand Down
Loading

0 comments on commit 227d4e6

Please sign in to comment.