diff --git a/.eslintrc.yml b/.eslintrc.yml index 4638d8c..4778248 100644 --- a/.eslintrc.yml +++ b/.eslintrc.yml @@ -2,6 +2,8 @@ env: es6: true node: true extends: 'eslint:recommended' +parserOptions: + ecmaVersion: 2020 rules: indent: - error diff --git a/lib/client.js b/lib/client.js index 574b57a..61c31ce 100644 --- a/lib/client.js +++ b/lib/client.js @@ -107,6 +107,10 @@ class LimitdRedis extends EventEmitter { this.handler('takeElevated', type, key, opts, cb); } + takeExponential(type, key, opts, cb) { + this.handler('takeExponential', type, key, opts, cb); + } + wait(type, key, opts, cb) { this.handler('wait', type, key, opts, cb); } diff --git a/lib/db.js b/lib/db.js index fdcd689..0ae9f13 100644 --- a/lib/db.js +++ b/lib/db.js @@ -9,11 +9,13 @@ const { validateParams, validateERLParams } = require('./validation'); const { calculateQuotaExpiration, resolveElevatedParams, isFixedWindowEnabled, removeHashtag } = require('./utils'); const EventEmitter = require('events').EventEmitter; -const TAKE_LUA = fs.readFileSync(`${__dirname}/take.lua`, "utf8"); -const TAKE_ELEVATED_LUA = fs.readFileSync(`${__dirname}/take_elevated.lua`, "utf8"); -const PUT_LUA = fs.readFileSync(`${__dirname}/put.lua`, "utf8"); +const TAKE_LUA = fs.readFileSync(`${__dirname}/take.lua`, 'utf8'); +const TAKE_ELEVATED_LUA = fs.readFileSync(`${__dirname}/take_elevated.lua`, 'utf8'); +const TAKE_EXPONENTIAL_LUA = fs.readFileSync(`${__dirname}/take_exponential.lua`, 'utf8'); +const PUT_LUA = fs.readFileSync(`${__dirname}/put.lua`, 'utf8'); -const DEFAULT_COMMAND_TIMEOUT = 125; // Milliseconds +const BACKOFF_FACTOR_DEFAULT = 2; +const MULTIPLE_UNIT_DEFAULT = 1000; const DEFAULT_KEEPALIVE = 10000; // Milliseconds class LimitDBRedis extends EventEmitter { @@ -78,6 +80,11 @@ class LimitDBRedis extends EventEmitter { lua: TAKE_LUA }); + this.redis.defineCommand('takeExponential', { + numberOfKeys: 1, + lua: TAKE_EXPONENTIAL_LUA + }); + this.redis.defineCommand('takeElevated', { numberOfKeys: 3, lua: TAKE_ELEVATED_LUA @@ -200,7 +207,7 @@ class LimitDBRedis extends EventEmitter { const prevCall = this.callCounts.get(key); if (prevCall) { - const shouldGoToRedis = prevCall?.count >= bucketKeyConfig.skip_n_calls + const shouldGoToRedis = prevCall?.count >= bucketKeyConfig.skip_n_calls; if (!shouldGoToRedis) { @@ -220,7 +227,7 @@ class LimitDBRedis extends EventEmitter { } } - takeFunc(key, bucketKeyConfig, count) + takeFunc(key, bucketKeyConfig, count); } /** @@ -260,7 +267,49 @@ class LimitDBRedis extends EventEmitter { } return callback(null, res); }); - }) + }); + } + + takeExponential(params, callback) { + this._doTake(params, callback, (key, bucketKeyConfig, count) => { + const useFixedWindow = isFixedWindowEnabled(bucketKeyConfig.fixed_window, params.fixed_window); + this.redis.takeExponential(key, + bucketKeyConfig.ms_per_interval || 0, + bucketKeyConfig.size, + count, + Math.ceil(bucketKeyConfig.ttl || this.globalTTL), + bucketKeyConfig.drip_interval || 0, + bucketKeyConfig.backoff_factor || BACKOFF_FACTOR_DEFAULT, + bucketKeyConfig.multiple_unit || MULTIPLE_UNIT_DEFAULT, + useFixedWindow ? bucketKeyConfig.interval : 0, + (err, results) => { + if (err) { + return callback(err); + } + const remaining = parseInt(results[0], 10); + const conformant = parseInt(results[1], 10) ? true : false; + const current_timestamp_ms = parseInt(results[2], 10); + const reset = parseInt(results[3], 10); + const backoff_factor = parseInt(results[4], 10); + const backoff_time = parseInt(results[5], 10); + const next_token_ms = parseInt(results[6], 10); + const res = { + conformant, + remaining, + reset: Math.ceil(reset / 1000), + limit: bucketKeyConfig.size, + delayed: false, + backoff_factor, + backoff_time, + delta_reset_ms: Math.max(reset - current_timestamp_ms, 0), + delta_backoff_time: next_token_ms - current_timestamp_ms, + }; + if (bucketKeyConfig.skip_n_calls > 0) { + this.callCounts.set(key, { res, count: 0 }); + } + return callback(null, res); + }); + }); } takeElevated(params, callback) { @@ -270,7 +319,7 @@ class LimitDBRedis extends EventEmitter { erlParams = utils.getERLParams(params.elevated_limits); const valError = validateERLParams(erlParams); if (valError) { - return callback(valError) + return callback(valError); } } @@ -325,7 +374,7 @@ class LimitDBRedis extends EventEmitter { } return callback(null, res); }); - }) + }); } /** diff --git a/lib/take_exponential.lua b/lib/take_exponential.lua new file mode 100644 index 0000000..a3b0e4f --- /dev/null +++ b/lib/take_exponential.lua @@ -0,0 +1,82 @@ +local tokens_per_ms = tonumber(ARGV[1]) +local bucket_size = tonumber(ARGV[2]) +local new_content = tonumber(ARGV[2]) +local tokens_to_take = tonumber(ARGV[3]) +local ttl = tonumber(ARGV[4]) +local drip_interval = tonumber(ARGV[5]) +local backoff_factor = tonumber(ARGV[6]) +local mult_unit = tonumber(ARGV[7]) +local fixed_window = tonumber(ARGV[8]) + +local current_time = redis.call('TIME') +local current_timestamp_ms = current_time[1] * 1000 + current_time[2] / 1000 + +local current = redis.pcall('HMGET', KEYS[1], 'd', 'r') + +if current.err ~= nil then + current = {} +end + +-- calculate the time of next available token +local last_token_ms = current[1] or 0 +local remaining_tokens = 0 +if current[2] then + remaining_tokens = tonumber(current[2]) +else + remaining_tokens = bucket_size +end + +local backoff_step = bucket_size - remaining_tokens +local backoff_time = math.ceil(backoff_factor ^ backoff_step) * mult_unit +local next_token_ms = last_token_ms + backoff_time +local is_passed_wait_time = current_timestamp_ms >= next_token_ms + +if current[1] and tokens_per_ms then + -- drip bucket + + if fixed_window > 0 then + -- fixed window for granting new tokens + local interval_correction = (current_timestamp_ms - last_token_ms) % fixed_window + current_timestamp_ms = current_timestamp_ms - interval_correction + end + + is_passed_wait_time = current_timestamp_ms >= next_token_ms + + if not is_passed_wait_time then + new_content = tonumber(current[2]) + last_token_ms = current[1] + else + local last_drip = current[1] + local content = current[2] + local delta_ms = math.max(current_timestamp_ms - last_drip, 0) + local drip_amount = delta_ms * tokens_per_ms + new_content = math.min(content + drip_amount, bucket_size) + end +elseif current[1] and tokens_per_ms == 0 and is_passed_wait_time then + -- fixed bucket + new_content = current[2] +end + +local enough_tokens = (new_content >= tokens_to_take) and is_passed_wait_time + +if enough_tokens then + new_content = new_content - 1 + last_token_ms = current_timestamp_ms +end + +-- https://redis.io/commands/EVAL#replicating-commands-instead-of-scripts +redis.replicate_commands() + +redis.call('HMSET', KEYS[1], + 'd', last_token_ms, + 'r', new_content) +redis.call('EXPIRE', KEYS[1], ttl) + +local reset_ms = 0 +if fixed_window > 0 then + reset_ms = current_timestamp_ms + fixed_window +elseif drip_interval > 0 then + reset_ms = math.ceil(current_timestamp_ms + (bucket_size - new_content) * drip_interval) +end + +return { new_content, enough_tokens, current_timestamp_ms, reset_ms, backoff_factor, backoff_time, next_token_ms } diff --git a/lib/utils.js b/lib/utils.js index 4c393ed..3e4714c 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -11,6 +11,11 @@ const INTERVAL_TO_MS = { const INTERVAL_SHORTCUTS = Object.keys(INTERVAL_TO_MS); +const EXPONENTIAL_BACKOFF_DEFAULTS = { + backoff_factor: 0, + multiple_unit: 1 +}; + const ERL_QUOTA_INTERVAL_PER_CALENDAR_MONTH = 'quota_per_calendar_month'; const ERL_QUOTA_INTERVALS = { [ERL_QUOTA_INTERVAL_PER_CALENDAR_MONTH]: () => endOfMonthTimestamp() @@ -24,7 +29,8 @@ function normalizeTemporals(params) { 'size', 'unlimited', 'skip_n_calls', - 'fixed_window' + 'fixed_window', + 'exponential_backoff', ]); INTERVAL_SHORTCUTS.forEach(intervalShortcut => { @@ -39,6 +45,12 @@ function normalizeTemporals(params) { type.size = type.per_interval; } + if(type.exponential_backoff) { + type.backoff_factor= type.exponential_backoff.backoff_factor || EXPONENTIAL_BACKOFF_DEFAULTS.backoff_factor; + type.multiple_unit = type.exponential_backoff.multiple_unit || EXPONENTIAL_BACKOFF_DEFAULTS.multiple_unit; + delete type.exponential_backoff; + } + if (type.per_interval) { type.ttl = ((type.size * type.interval) / type.per_interval) / 1000; type.ms_per_interval = type.per_interval / type.interval; @@ -100,9 +112,9 @@ function normalizeType(params) { function normalizeElevatedOverrides(type, override) { // If the override doesn't provide elevated_limits use the ones defined in the base type (if any) - const normalizedOverride = {} + const normalizedOverride = {}; if (!override.elevated_limits) { - Object.assign(normalizedOverride, override, { elevated_limits: type.elevated_limits }) + Object.assign(normalizedOverride, override, { elevated_limits: type.elevated_limits }); return normalizedOverride; } @@ -111,11 +123,11 @@ function normalizeElevatedOverrides(type, override) { if (typeof override.unlimited === 'undefined' && typeof override.size === 'undefined' && typeof override.per_interval === 'undefined') { - Object.assign(normalizedOverride, - override, - _.omit(type, 'overrides', 'overridesMatch'), - {elevated_limits: override.elevated_limits} - ); + Object.assign(normalizedOverride, + override, + _.omit(type, 'overrides', 'overridesMatch'), + {elevated_limits: override.elevated_limits} + ); } return normalizedOverride; } @@ -138,7 +150,7 @@ function buildBucket(bucket) { } function functionOrFalse(fun) { - return !!(fun && fun.constructor && fun.call && fun.apply) + return fun && fun.constructor && fun.call && fun.apply ? fun : false; } @@ -194,20 +206,20 @@ function resolveElevatedParams(erlParams, bucketKeyConfig, key, prefix) { erl_configured_for_bucket: !!(erlParams && bucketKeyConfig.elevated_limits?.erl_configured_for_bucket), }; - elevatedLimits.erl_is_active_key = replicateHashtag(key, prefix, elevatedLimits.erl_is_active_key) - elevatedLimits.erl_quota_key = replicateHashtag(key, prefix, elevatedLimits.erl_quota_key) + elevatedLimits.erl_is_active_key = replicateHashtag(key, prefix, elevatedLimits.erl_is_active_key); + elevatedLimits.erl_quota_key = replicateHashtag(key, prefix, elevatedLimits.erl_quota_key); return elevatedLimits; } function replicateHashtag(baseKey, prefix, key) { const prefixedBaseKey = key + `:{${prefix}${baseKey}}`; - const idxOpenBrace = baseKey.indexOf('{') + const idxOpenBrace = baseKey.indexOf('{'); if (idxOpenBrace < 0) { return prefixedBaseKey; } - const idxCloseBrace = baseKey.indexOf('}', idxOpenBrace) + const idxCloseBrace = baseKey.indexOf('}', idxOpenBrace); if ( idxCloseBrace <= idxOpenBrace ) { return prefixedBaseKey; } diff --git a/test/cb.tests.js b/test/cb.tests.js index 41b4a33..9ca4a4f 100644 --- a/test/cb.tests.js +++ b/test/cb.tests.js @@ -1,3 +1,4 @@ +/* eslint-disable */ var assert = require('assert'), cb = require('../lib/cb'); diff --git a/test/client.clustermode.tests.js b/test/client.clustermode.tests.js index d1d37fc..9feaa0f 100644 --- a/test/client.clustermode.tests.js +++ b/test/client.clustermode.tests.js @@ -1,3 +1,4 @@ +/* eslint-disable */ /* eslint-env node, mocha */ const _ = require('lodash'); const assert = require('chai').assert; diff --git a/test/client.standalonemode.tests.js b/test/client.standalonemode.tests.js index 012d6b1..5b68834 100644 --- a/test/client.standalonemode.tests.js +++ b/test/client.standalonemode.tests.js @@ -1,3 +1,4 @@ +/* eslint-disable */ /* eslint-env node, mocha */ const _ = require('lodash'); const assert = require('chai').assert; diff --git a/test/client.tests.js b/test/client.tests.js index 6806820..f503c15 100644 --- a/test/client.tests.js +++ b/test/client.tests.js @@ -1,3 +1,4 @@ +/* eslint-disable */ /* eslint-env node, mocha */ const _ = require('lodash'); const assert = require('chai').assert; diff --git a/test/db.clustermode.tests.js b/test/db.clustermode.tests.js index 34c7a89..cd65f99 100644 --- a/test/db.clustermode.tests.js +++ b/test/db.clustermode.tests.js @@ -1,3 +1,4 @@ +/* eslint-disable */ const LimitDB = require('../lib/db'); const _ = require('lodash'); const { tests: dbTests } = require('./db.tests'); diff --git a/test/db.standalonemode.tests.js b/test/db.standalonemode.tests.js index ce032f0..ee126ef 100644 --- a/test/db.standalonemode.tests.js +++ b/test/db.standalonemode.tests.js @@ -1,3 +1,4 @@ +/* eslint-disable */ const LimitDB = require('../lib/db'); const _ = require('lodash'); const { tests: dbTests, buckets} = require('./db.tests'); diff --git a/test/db.tests.js b/test/db.tests.js index 163aaea..e8ba520 100644 --- a/test/db.tests.js +++ b/test/db.tests.js @@ -1,3 +1,4 @@ +/* eslint-disable */ /* eslint-env node, mocha */ const ms = require('ms'); const async = require('async'); @@ -157,7 +158,6 @@ module.exports.tests = (clientCreator) => { buckets: undefined }), /Buckets must be specified for Limitd/); }); - }); describe('#configurateBucketKey', () => { @@ -165,7 +165,6 @@ module.exports.tests = (clientCreator) => { db.configurateBucket('test', { size: 5 }); assert.containsAllKeys(db.buckets, ['ip', 'test']); }); - it('should replace configuration of existing type', () => { db.configurateBucket('ip', { size: 1 }); assert.equal(db.buckets.ip.size, 1); @@ -173,6 +172,82 @@ module.exports.tests = (clientCreator) => { }); }); + describe('TAKE_EXPONENTIAL', () => { + const testRuns = [ + { + bucket: { + ipExponential: { + size: 10, + per_second: 0, + exponential_backoff: { + backoff_factor: 2, + multiple_unit: 10, + }, + }, + }, + backoff_times: [1, 2, 4, 8], + }, + { + bucket: { + ipExponential: { + size: 10, + per_second: 0, + exponential_backoff: { + backoff_factor: 3, + multiple_unit: 10, + }, + }, + }, + backoff_times: [1, 3, 9, 27], + }, + { + bucket: { + ipExponential: { + size: 10, + per_second: 0, + exponential_backoff: { + backoff_factor: 4, + multiple_unit: 10, + }, + }, + }, + backoff_times: [1, 4, 16, 64], + }, + ]; + const testParams = { + name: `test 1`, + init: (test_buckets) => db.configurateBuckets(test_buckets), + take: (params, callback) => db.takeExponential(params, callback), + }; + + describe('simple backoff', () => { + testRuns.forEach(run => { + it(`should backoff exponentially according to 'backoff_factor = ${run.bucket.ipExponential.exponential_backoff.backoff_factor}'`, async function(){ + this.timeout(20000) + testParams.init(run.bucket); + const tryTake = async (exp) => { + return new Promise((resolve) => { + testParams.take({ type: 'ipExponential', key: '21.17.65.41'}, (err, res) => { + if (res?.conformant) { + assert.equal(res.backoff_time, exp); + resolve(); + } else { + setTimeout(() => { + tryTake(exp).then(resolve); + }, 1000); + } + }); + }) + }; + + for (let c = 0; c < run.backoff_times.length; c++) { + await tryTake(run.backoff_times[c]*run.bucket.ipExponential.exponential_backoff.multiple_unit); + } + }, 20000); + }); + }); + }); + describe('TAKE', () => { const testsParams = [ { diff --git a/test/utils.tests.js b/test/utils.tests.js index 6e1ebf3..8491af2 100644 --- a/test/utils.tests.js +++ b/test/utils.tests.js @@ -1,3 +1,4 @@ +/* eslint-disable */ /* eslint-env node, mocha */ //const assert = require('chai').assert; const chai = require('chai'); @@ -42,6 +43,21 @@ describe('utils', () => { }); }); + it('should return a normalized bucket with exponential backoff', () => { + const bucket= { + exponential_backoff: { + backoff_factor: 5, + multiple_unit: 6 + } + } + const response = normalizeType(bucket); + const {size, elevated_limits, overrides, overridesMatch, overridesCache, ...rest } = response; + expect(rest).to.deep.equal({ + backoff_factor: 5, + multiple_unit: 6 + }); + }); + it('should return normalized bucket without ERL', () => { const bucket = { size: 100, diff --git a/test/validation.tests.js b/test/validation.tests.js index 17de5fe..b81a752 100644 --- a/test/validation.tests.js +++ b/test/validation.tests.js @@ -1,3 +1,4 @@ +/* eslint-disable */ /* eslint-env node, mocha */ const assert = require('chai').assert;