From 72d431d7ae78f5363fda8056d9bc67211f064459 Mon Sep 17 00:00:00 2001 From: _AMD_ Date: Sun, 5 Jan 2025 20:32:15 +0300 Subject: [PATCH] =?UTF-8?q?=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D0=B0=20=D0=BD?= =?UTF-8?q?=D0=B0=D0=B4=20=D1=81=D1=82=D1=80=D1=83=D0=BA=D1=82=D1=83=D1=80?= =?UTF-8?q?=D0=BE=D0=B9=203?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/.env-example | 12 -- application/Dockerfile | 18 -- application/README.MD | 84 --------- application/docker-compose.yml | 40 ----- application/init.lua | 123 ------------- lua/long-polling/client/README.MD | 38 ---- lua/long-polling/client/init.lua | 157 ----------------- lua/long-polling/server/README.MD | 7 - .../server/dataproviders/localtable.lua | 25 --- .../server/dataproviders/redis.lua | 86 ---------- lua/long-polling/server/init.lua | 69 -------- lua/long-polling/server/misc/pubsub.lua | 39 ----- lua/long-polling/server/misc/redis-safe.lua | 162 ------------------ 13 files changed, 860 deletions(-) delete mode 100644 application/.env-example delete mode 100644 application/Dockerfile delete mode 100644 application/README.MD delete mode 100644 application/docker-compose.yml delete mode 100644 application/init.lua delete mode 100644 lua/long-polling/client/README.MD delete mode 100644 lua/long-polling/client/init.lua delete mode 100644 lua/long-polling/server/README.MD delete mode 100644 lua/long-polling/server/dataproviders/localtable.lua delete mode 100644 lua/long-polling/server/dataproviders/redis.lua delete mode 100644 lua/long-polling/server/init.lua delete mode 100644 lua/long-polling/server/misc/pubsub.lua delete mode 100644 lua/long-polling/server/misc/redis-safe.lua diff --git a/application/.env-example b/application/.env-example deleted file mode 100644 index 749b044..0000000 --- a/application/.env-example +++ /dev/null @@ -1,12 +0,0 @@ -LUA_ENV=production -DATA_PROVIDER=redis - -REDIS_HOST=127.0.0.1 -REDIS_PORT=6379 -REDIS_PASS=secret -REDIS_PREFIX=lpolling: -REDIS_DATA_TTL=604800 -CHANNEL_STORAGE_MAXSIZE=30 - -RATE_LIMIT_FRAME=60 -RATE_LIMIT_LIMIT=60 diff --git a/application/Dockerfile b/application/Dockerfile deleted file mode 100644 index 7981fa3..0000000 --- a/application/Dockerfile +++ /dev/null @@ -1,18 +0,0 @@ -FROM ghcr.io/trigonim/lua-express:main - -RUN luarocks install copas \ - && luarocks install lua-cjson \ - && luarocks install lua-express-middlewares \ - && luarocks install redis-lua - -# redis is optional. Only for DATA_PROVIDER=redis - -EXPOSE 3000 - -COPY . /app -WORKDIR /app - -# you can set this by passing this to docker run command -# ENV TZ= - -CMD ["lua", "init.lua"] diff --git a/application/README.MD b/application/README.MD deleted file mode 100644 index c572112..0000000 --- a/application/README.MD +++ /dev/null @@ -1,84 +0,0 @@ -# Long Polling Application - -> 🇺🇸 English version not ready for now (I am lazy and don't want to translate it). You can use https://www.deepl.com/ to translate text on your own. - -В папке находится исходный код поллинг сервера, который работает по адресу `poll.def.pm`. Но ниже приведен пример более примитивного приложения, которое тоже использует эту библиотеку. - -## Run advanced application - -```bash -cd directory_with_this_file/ - -# run server via docker-compose.yml (will build fresh image) -docker-compose up --build polling - -# alternatively run server via docker run (will use image from registry) -docker run --env-file .env -p 3000:3000 ghcr.io/trigonim/lua-long-polling:latest -``` - -All available settings vars are listed in [`.env-example`](.env-example) - -## Application REST API: - -![api demo](https://file.def.pm/uV3R6f28.gif) - -- **GET** `http://your.app/anySecretPath?ts=0&sleep=10`, where `ts` is the last update id you received from polling server. Take it as an offset. The `sleep` parameter is the timeout during which the connection will be kept open until a new update is available. If an update arrives within this timeout, the connection will be immediately closed with the new update in the response in the following format: `{ts = 123, ok = true, updates = {your_update}}` -- **POST** with json data to `http://your.app/anySecretPath?merge=me`. The final update will looks like this: `{"a": "b", "merge": "me"}`. - Curl example: `curl --verbose -d '{"a": 1}' "http://localhost:3000/SECRET_UID?merge=me"` - -No authorization. With API features in mind, you can use your server as a public API, because no one client will be able to get the other's data without knowing its "secret path". It's like in blockchain. - -**Pro tip:** If you want to use Traefik as web proxy (nginx is fine without it), you may need to configure `maxIdleConnsPerHost=-1` as Traefik has a limit of active connections and disconnects the excess ones with error 500 - -**👀 Test server:** poll.def.pm. But please, don't use it in production because I can turn it off at any moment. - - - -## Another example of simplest application - -There is a simplest realization of long-polling server application. - -All data is stored in RAM, so it will be lost after server restart. - -There is no request logging or error handling features implemented, - but you can find it in advanced app file located somewhere in this repo 😅 - -
-Toggle me! - -```lua -local polling = require("long-polling.server").new("localtable") - -local express = require("express") -local app = express() - -local function push_updates(req, res) - local channel = req.params.channel - local payload = req.query.data - - polling:publish_new(channel, payload) - res:send("OK") -end - -local function get_updates(req, res) - local channel = req.params.channel - local last_id = tonumber(req.query.last_id) or 0 - local timeout = tonumber(req.query.timeout) or 0 - - local data, total = polling:get_news(channel, last_id, timeout) - res:set("X-Total-Messages", total):json(data) -end - -app:post("/:channel", push_updates) -app:get("/:channel", get_updates) - -app:listen(3000) - -``` - -### API: - -- `POST` example.com/any_string?data=any_data => 200 OK -- `GET ` example.com/any_string?last_id=0&timeout=60 => 200 OK {"any_data"} - -
diff --git a/application/docker-compose.yml b/application/docker-compose.yml deleted file mode 100644 index e44af09..0000000 --- a/application/docker-compose.yml +++ /dev/null @@ -1,40 +0,0 @@ -volumes: - lp-redis: - -services: - redis-stack: - image: redis/redis-stack:latest - volumes: - - lp-redis:/data - ports: - - 8001:8001 # GUI (!) - # - 6379:6379 - # environment: - # - REDIS_ARGS=--requirepass qweqwe - # - REDIS_PORT=6379 - # - REDIS_HOST=redis - - polling: - build: . - # user: '1000' - depends_on: - - redis-stack - # restart: always - # env_file: .env - environment: - - LUA_ENV=development - - DATA_PROVIDER=redis - - REDIS_HOST=redis-stack - # - REDIS_PORT=6379 - # - REDIS_PASS=qweqwe - - REDIS_PREFIX= - - REDIS_DATA_TTL=604800 - - CHANNEL_STORAGE_MAXSIZE=30 - - RATE_LIMIT_FRAME=30 - - RATE_LIMIT_LIMIT=120 - # volumes: # development - # - $PWD/:/project - init: true # to kill faster via ctrl + c - # command: lua -e 'while true do end' # if you need to exec bash - ports: - - 3000:3000 diff --git a/application/init.lua b/application/init.lua deleted file mode 100644 index ca086ee..0000000 --- a/application/init.lua +++ /dev/null @@ -1,123 +0,0 @@ -package.path = string.format("%s;%s", "../?.lua", package.path) -package.path = string.format("%s;%s", "../?/init.lua", package.path) - -local function exec(cmd) - local handle = io.popen(cmd) - local result = handle:read("*a") - handle:close() - return result -end - -print("pwd", exec("pwd")) -print("ls", exec("ls")) - - -io.stdout:setvbuf("no") -- faster stdout without buffering - --------- - -local APP_VERSION = "v1.3.4" -- #todo automate -print("LP Server Loaded. Version " .. APP_VERSION) - -local dataprovider = os.getenv("DATA_PROVIDER") or "localtable" -local longpolling = require("long-polling.server").new(dataprovider) - -local json_encode = require("cjson.safe").encode -local json_decode = require("cjson.safe").decode - -local return_result = function(res, result) - result.ok = true - res:json(result) -end - -local return_error = function(res, err_code, err_descr) - res:status(err_code):json({ - ok = false, - error_code = err_code, - error_description = err_descr - }) -end - -local express = require("express") -local app = express() - -app:set("trust proxy", {"uniquelocal"}) -- correct recognition of req:ip() inside the docker container - --- log requests -app:use(function(req, res, next) - local kvs = {} - for k, v in pairs(req.query) do - kvs[#kvs + 1] = k .. "=" .. v - end - local querystring = kvs[1] and ("?" .. table.concat(kvs, "&")) or "" - print(req.method, req:ip(), req.url .. querystring) - next() -end) - --- parse body as json -app:use(require("body-parser").json({type = "*/*"})) -- https://github.com/TRIGONIM/lua-express-middlewares - -app:use(function(_, res, next) - res:set("X-Powered-By", "lua-long-polling " .. APP_VERSION) - next() -end) - -local rate_limiter = require("rate-limiter-simple") -- https://github.com/TRIGONIM/lua-express-middlewares -app:use(rate_limiter({ - frame_time = tonumber(os.getenv("RATE_LIMIT_FRAME")) or 60, - limit_amount = tonumber(os.getenv("RATE_LIMIT_LIMIT")) or 60, -})) - -local function pushUpdates(req, res) - local channel = req.params.channel - local updateObj = req.body - for k, v in pairs(req.query) do -- merge additional params from uri - updateObj[k] = v - end - - local update_json = json_encode(updateObj) - local total = longpolling:publish_new(channel, update_json) - - res:set("X-Total-Updates", total):send("OK") -end - -local function getUpdates(req, res) - local channel = req.params.channel - local offset = tonumber(req.query.ts) or 0 - - -- clamping between 0 and 60 - local timeout = math.min(math.max(tonumber(req.query.sleep) or 0, 0), 60) - - -- locks the thread until update or timeout - local data, total = longpolling:get_news(channel, offset, timeout) - for k, v in ipairs(data) do data[k] = json_decode(v) end - - return_result(res, { - updates = data, - ts = total, - }) -end - -app:all("/", function(_, res) - res:redirect("https://github.com/TRIGONIM/lua-long-polling") -end) - -app:post("/:channel/pushUpdates", pushUpdates) -app:post("/:channel", pushUpdates) - -app:get("/:channel/getUpdates", getUpdates) -app:get("/:channel", getUpdates) - -app:use(function(err, _, res, next) - if type(err) == "table" and err.body then -- bodyparser middleware - return_error(res, 500, err.message) - return - end - - print("polling error: " .. tostring(err)) - print(debug.traceback("trace")) - if os.getenv("LUA_ENV") == "development" then next(err) end -- full info - return_error(res, 500, "Internal Server Error") -end) - -app:listen(3000) diff --git a/lua/long-polling/client/README.MD b/lua/long-polling/client/README.MD deleted file mode 100644 index 995e888..0000000 --- a/lua/long-polling/client/README.MD +++ /dev/null @@ -1,38 +0,0 @@ -# Lua Long Polling Client Example - -> 🇺🇸 English version not ready for now (I am lazy and don't want to translate it). You can use https://www.deepl.com/ to translate text on your own. - -Работает как в чистом Lua, так и на Garry's Mod сервере. - -На чистом Lua требует copas (а он luasocket), lua-requests-async (он тоже требует copas) и cjson. - -## Использование - -**В гмоде** закинуть по любому пути, потом получить доступ к библиотеке через `kupol = include("path/to/client.lua")`. Автоматически в глобальное пространство ничего не добавляется. - -В чистом Lua можно использовать `kupol = require("long-polling.client")`. - -## Пример - -```lua -local kupol = require("long-polling.client") -- also works in Garry's Mod with include() -local Client = kupol.new("https://poll.def.pm/SomeSecretWord") - - --- Получение данных -Client:subscribe(function(upd, last_id) - print("LP сервер за все время принял " .. tostring(last_id) .. " обновлений") - print("Последнее обновление: " .. kupol.json_encode(upd)) -end, nil, 30) -- 30 это timeout в секундах. --- nil – это последний id, который был получен (offset) - - --- Отправка данных -kupol.thread_new(function() - for i = 1, 10 do - local ok, err = Client:publish({any = "value", counter = i}) - print(ok and "published" or "pub failed: " .. tostring(err)) - kupol.thread_pause(.05) - end -end) -``` diff --git a/lua/long-polling/client/init.lua b/lua/long-polling/client/init.lua deleted file mode 100644 index a8188cc..0000000 --- a/lua/long-polling/client/init.lua +++ /dev/null @@ -1,157 +0,0 @@ --- source: https://github.com/TRIGONIM/lua-long-polling/blob/e28fff8141c7d1a22e030c9d6132ef853032da84/lua/long-polling/client.lua --- local kupol = require("long-polling.client") --- local Client = kupol.new("https://lp.example.com/channel") --- By default, client use copas and lua-cjson, but you can use your own functions --- from Garry's Mod for example - -local kupol = {} - -local MT = {} -MT.__index = MT - -function MT:publish_async(tData, callback) - kupol.thread_new(function() - local ok, code = self:publish(tData) - if callback then callback(ok, code) end - end) -end - -function MT:publish(tData) - local body = kupol.json_encode(tData) - local res, code = kupol.http_post(self.url, body) - return code == 201 or res == "OK", code -end - -function MT:get(offset, timeout) - local paramstr = "?ts=" .. (offset or "") .. "&sleep=" .. (timeout or "") - - local body, code_or_err = kupol.http_get(self.url .. paramstr) - if not body then return false, code_or_err end - - local tData = kupol.json_decode(body) - return tData, body -- tData may be nil -end - -function MT:log(...) - local prefix = ("Kupol (%s): "):format(self.url) - print(prefix .. string.format(...)) -end - -function MT:handle_error(err) - self:log("🆘 Error\n\t%s", err) -end - -function MT:subscribe(fHandler, requested_ts, timeout) - self.thread = kupol.thread_new(function() repeat - local tmt = requested_ts and timeout or 0 -- it's better if first request will be fast if requested_ts not provided - local tData, body = self:get(requested_ts, tmt) - if tData then - local ts_diff = tData.ts - (requested_ts or 0) - - local REM = (requested_ts or 0) > tData.ts -- REMOTE FUCKUP. e.g. remote ts 0, local ts 1000 - local LOC = ts_diff > #tData.updates -- LOCAL FUCKUP. e.g. remote ts 100, local ts 0, but 30 updates instead of 100 - if REM or LOC then - if REM then - self:log("🚧 ts on server is less than requested (%d < %d)", tData.ts, requested_ts) - end - - if LOC then - self:log("🚧 updates lost: %d (got %d, expected %d)", ts_diff - #tData.updates, #tData.updates, ts_diff) -- too long time haven't requested them - end - - requested_ts = tData.ts -- emerg reset - else - requested_ts = requested_ts and (requested_ts + #tData.updates) or tData.ts - -- requested_ts = tData.ts - end - - for _, update in ipairs(tData.updates) do - local pcallok, res = pcall(fHandler, update, tData.ts) - if not pcallok then - self:log(debug.traceback("🆘 Kupol Error In The Handler Callback\n\t%s"), res) - end - end - else -- no tData - self:handle_error(body) - kupol.thread_pause(10) - end - until (not self.thread) end) -end - -local IS_GARRYSMOD = (GM or GAMEMODE) and RunString and hook - -local copas_ok, copas = pcall(require, "copas") -- should be loaded before http_v2 -if copas_ok then - local http = require("http_v2") -- https://github.com/TRIGONIM/lua-requests-async/blob/main/lua/http_v2.lua - local async_request = http.copas_request - - function kupol.http_post(url, data) - local body, code = async_request("POST", url, data, {["content-type"] = "application/json"}) - return body, code - end - - function kupol.http_get(url) - local body, code = async_request("GET", url) - return body, code - end - - kupol.thread_new = copas.addthread - kupol.thread_pause = copas.sleep - -elseif IS_GARRYSMOD then - function kupol.thread_new(f, ...) - local co = coroutine.create(f) - local function cont(...) - local ok, callback = coroutine.resume(co, ...) - if not ok then error( debug.traceback(co, callback) ) end - if coroutine.status(co) ~= "dead" then callback(cont) end - end - cont(...) - return co - end - - function kupol.thread_pause(seconds) - coroutine.yield(function(cont) timer.Simple(seconds, cont) end) - end - - function kupol.http_get(url) - return coroutine.yield(function(cont) - http.Fetch(url, function(body, _, _, code) cont(body, code) end, - function(err) cont(false, err) end) - end) - end - - function kupol.http_post(url, data) - return coroutine.yield(function(cont) - local ok = HTTP({ url = url, method = "POST", - body = data, type = "application/json", - success = function(code, body) cont(body, code) end, - failed = function(err) cont(false, err) end - }) - if not ok then cont(false, "HTTP() failed") end - end) - end - -else - print("Kupol: looks like copas is not installed. So you should provide own kupol.http_* and kupol.thread_* functions") -end - -local cjson_ok, cjson = pcall(require, "cjson.safe") -if cjson_ok then - kupol.json_encode = cjson.encode - kupol.json_decode = cjson.decode - -elseif IS_GARRYSMOD then - kupol.json_encode = util.TableToJSON - kupol.json_decode = util.JSONToTable - -else - print("Kupol: looks like lua-cjson is not installed. So you should provide own kupol.json_encode and kupol.json_decode functions") -end - --- url example: https://lp.example.com/channel -function kupol.new(url) - return setmetatable({url = url}, MT) -end - -return kupol diff --git a/lua/long-polling/server/README.MD b/lua/long-polling/server/README.MD deleted file mode 100644 index dd11f22..0000000 --- a/lua/long-polling/server/README.MD +++ /dev/null @@ -1,7 +0,0 @@ -# Lua Long Polling Server - -> 🇺🇸 English version not ready for now (I am lazy and don't want to translate it). You can use https://www.deepl.com/ to translate text on your own. - -There is components required to create long polling server applications. - -👉 See [application/README.MD](/lua/long-polling/application) for examples of how to use this library. diff --git a/lua/long-polling/server/dataproviders/localtable.lua b/lua/long-polling/server/dataproviders/localtable.lua deleted file mode 100644 index 68ad10e..0000000 --- a/lua/long-polling/server/dataproviders/localtable.lua +++ /dev/null @@ -1,25 +0,0 @@ -local MT = {} -MT.__index = MT - -function MT:get_updates(channel, offset) - local storage = self.storage - local total = storage[channel] and #storage[channel] or 0 - local updates = {} - for i = offset + 1, total do - updates[#updates + 1] = storage[channel][i] - end - return updates, total -end - -function MT:add_update(channel, data) - local storage = self.storage - if not storage[channel] then storage[channel] = {} end - storage[channel][#storage[channel] + 1] = data - return #storage[channel] -end - -function MT.new() - return setmetatable({storage = {}}, MT) -end - -return MT diff --git a/lua/long-polling/server/dataproviders/redis.lua b/lua/long-polling/server/dataproviders/redis.lua deleted file mode 100644 index 03b37a0..0000000 --- a/lua/long-polling/server/dataproviders/redis.lua +++ /dev/null @@ -1,86 +0,0 @@ -local rds = require("long-polling.server.misc.redis-safe") - -local MT = {} -MT.__index = MT - -function MT:getcon() - local opts = self.opts.redis - return rds.create(opts.own and opts or { - host = opts.host, - port = tonumber(opts.port), - password = opts.pass, - copas_wrap = true, - -- tcp_nodelay = false - }) -end - -function MT:get_updates(channel, offset) - local prefix = self.opts.data_prefix - local redis = self:getcon() - - -- The reason why there is script instead of just commands: - -- https://chatgpt.com/share/67686def-c558-8004-893a-c372405c2a8f - local script = [[ - local total_key, updates_key = KEYS[1], KEYS[2] - local offset = tonumber(ARGV[1]) - - local total = tonumber(redis.call('get', total_key)) or 0 - if offset >= total then return {{}, total} end - - local need_elements = total - offset - local updates = redis.call('lrange', updates_key, -need_elements, -1) - - return {updates, total} - ]] - - local total_key = prefix .. "total:" .. channel - local updates_key = prefix .. "updates:" .. channel - local result = redis:eval(script, 2, total_key, updates_key, offset) - redis:quit() - - return result[1], result[2] -end - -function MT:add_update(channel, data) - local opts = self.opts - local prefix, ttl, max_updates = opts.data_prefix, opts.data_ttl, opts.max_updates - local redis = self:getcon() - - -- Используем Lua-скрипт для атомарного выполнения всех операций - -- multi/exec почему-то не справился с этим - local script = [[ - local total_key, updates_key = KEYS[1], KEYS[2] - local data, ttl, max_updates = ARGV[1], tonumber(ARGV[2]), tonumber(ARGV[3]) - - local new_total = redis.call('incr', total_key) - redis.call('expire', total_key, ttl) - redis.call('rpush', updates_key, data) - redis.call('ltrim', updates_key, -max_updates, -1) - redis.call('expire', updates_key, ttl) - - return new_total - ]] - - local total_key = prefix .. "total:" .. channel - local updates_key = prefix .. "updates:" .. channel - local result = redis:eval(script, 2, total_key, updates_key, data, ttl, max_updates) - redis:quit() - - return result -end - -function MT.new(opts) - opts = opts or {} - opts.data_prefix = opts.data_prefix or os.getenv("REDIS_PREFIX") or "lpolling:" - opts.data_ttl = opts.data_ttl or os.getenv("REDIS_DATA_TTL") or (60 * 60 * 24 * 7) -- 1 week - opts.max_updates = opts.max_updates or tonumber(os.getenv("CHANNEL_STORAGE_MAXSIZE")) or 30 - - opts.redis = opts.redis or {} - opts.redis.host = opts.redis.host or os.getenv("REDIS_HOST") - opts.redis.port = opts.redis.port or os.getenv("REDIS_PORT") - opts.redis.pass = opts.redis.pass or os.getenv("REDIS_PASS") - - return setmetatable({opts = opts}, MT) -end - -return MT diff --git a/lua/long-polling/server/init.lua b/lua/long-polling/server/init.lua deleted file mode 100644 index 2e934b4..0000000 --- a/lua/long-polling/server/init.lua +++ /dev/null @@ -1,69 +0,0 @@ -local pubsub = require("long-polling.server.misc.pubsub") -local copas = require("copas") - -local server = {} - -local MT = {} -MT.__index = MT - -function MT:get_updates(channel, offset) - return self.dataprovider:get_updates(channel, offset) -end - -function MT:add_update(channel, data) - return self.dataprovider:add_update(channel, data) -end - -function MT:publish_new(channel, data) - local total = self:add_update(channel, data) - self.pubsub:publish(channel, {update = data, total = total}) - return total -end - -function MT:get_news(channel, offset, timeout) - local data, total = self:get_updates(channel, offset) - if (#data > 0) or (offset > total) or (timeout <= 0) then return data, total end -- return immediately - - -- \/ #data == 0, so we need to wait for updates - - local release = false -- ugly hack to pause this thread until callback - local client_uid = self.pubsub:subscribe(channel, function(client_uid, upd) - -- print("new update in subscribed callback") - self.pubsub:unsubscribe(channel, client_uid) - - data, total = {upd.update}, upd.total - release = true - end) - - -- lock this thread until update or timeout - local start_time = os.time() - while not release and os.time() - start_time <= timeout do - copas.sleep(0.1) - end - - if not release then - self.pubsub:unsubscribe(channel, client_uid) - end - - return data, total -end - - -function server.new(dataprovider_obj) - local dp = dataprovider_obj - if type(dataprovider_obj) == "string" then - dp = require("long-polling.server.dataproviders." .. dataprovider_obj).new() - else - dp = dp or require("long-polling.server.dataproviders.localtable").new() - end - - assert(dp.get_updates and dp.add_update, - "dataprovider must have get_updates and add_update methods") - - return setmetatable({ - pubsub = pubsub.new(), - dataprovider = dp, - }, MT) -end - -return server diff --git a/lua/long-polling/server/misc/pubsub.lua b/lua/long-polling/server/misc/pubsub.lua deleted file mode 100644 index 9d5fe8e..0000000 --- a/lua/long-polling/server/misc/pubsub.lua +++ /dev/null @@ -1,39 +0,0 @@ -local MT = {} -MT.__index = MT - -function MT:publish(channel, data) - local clients = self.clients[channel] - if not clients then return end - for client_uid, callback in pairs(clients) do - local ok, err = pcall(callback, client_uid, data) - if not ok then - print("Error in callback for " .. client_uid .. ": " .. err) - end - end -end - -function MT:subscribe(channel, callback) - local client_uid = tostring({}):sub(10) - local cbs = self.clients[channel] or {} - cbs[client_uid] = callback - self.clients[channel] = cbs - return client_uid -end - -function MT:unsubscribe(channel, client_uid) - if not self.clients[channel] then return nil end - local cb = self.clients[channel][client_uid] - self.clients[channel][client_uid] = nil - if not next(self.clients[channel]) then -- gc - self.clients[channel] = nil - end - return cb -end - -function MT.new() - return setmetatable({ - clients = {}, -- channel => {cl_uid = cb} - }, MT) -end - -return MT diff --git a/lua/long-polling/server/misc/redis-safe.lua b/lua/long-polling/server/misc/redis-safe.lua deleted file mode 100644 index 8f90247..0000000 --- a/lua/long-polling/server/misc/redis-safe.lua +++ /dev/null @@ -1,162 +0,0 @@ -local socket = require("socket") -local redis = require("redis") -- https://github.com/nrk/redis-lua/blob/version-2.0/src/redis.lua - -local redq = {} - -local MT = {} -function MT.__index(wrapped_client, method_name) - local redis_client = wrapped_client.redis_client - - local has_method = redis.commands[method_name] - if has_method then - return function(_, ...) - return redq.run_method_safe(wrapped_client, method_name, ...) - end - else - -- print("redis-safe: Attempting to access a field instead of the method", method_name) - return redis_client[method_name] - end -end - -function redq.run_method_safe(wrapped_client, method_name, ...) - local redis_client = wrapped_client.redis_client - local method_func = redis_client[method_name] - - -- wrapped_client.opts.socket:settimeout(1) - -- local args = {...} - -- print("args", wrapped_client.opts.socket, args[1], args[2], args[3], args[4]) - local ok, res = pcall(method_func, redis_client, ...) - if ok then - wrapped_client._retry_attemps = 0 - return res - end - - print("Redis Wrapper. Error with command " .. method_name .. "(" .. table.concat({...}, " ") .. "):" .. res) - local opts = wrapped_client.opts - - local want_reconnect = opts.reconnect_condition == nil - or opts.reconnect_condition(res) - - if not want_reconnect then error(res) end - - local max_retries = opts.max_retries_per_request or 3 - local retry_attempt = wrapped_client._retry_attemps - if retry_attempt >= max_retries then - wrapped_client._retry_attemps = 0 - error(res) - end - wrapped_client._retry_attemps = retry_attempt + 1 - - local retry_delay = opts.retry_delay - if not retry_delay then - retry_delay = 1 - elseif type(retry_delay) == "function" then - retry_delay = retry_delay(retry_attempt) - end - - retry_delay = retry_delay or 3 - - if opts.copas_wrap then - require("copas").pause(retry_delay) -- only thread (coro) - else - socket.sleep(retry_delay) -- whole process - end - - local reconnect_ok, err = pcall(redq.reconnect, redis_client, opts) - print("Redis Wrapper: " .. (reconnect_ok and "reconnected" or "reconnect failed: " .. err) .. " after " .. wrapped_client._retry_attemps .. " attempts") - return redq.run_method_safe(wrapped_client, method_name, ...) -end - -function redq.wrap(redis_client, opts) - local wrapped_client = setmetatable({ - redis_client = redis_client, - opts = opts, - - _retry_attemps = 0, - }, MT) - - return wrapped_client -end - -function redq.copas_socket(host, port, timeout, tcp_nodelay) - local sock = socket.tcp() - sock = require("copas").wrap(sock) - sock:connect(host, port) -- #todo check for errors? - sock:setoption("tcp-nodelay", tcp_nodelay) - sock:settimeouts(timeout, nil, nil) -- conn, send, recv - return sock -end - -local auth_client = function(redis_client, password) - if password then - local auth_ok, err = pcall(redis_client.auth, redis_client, password) - local all_ok = auth_ok or err:find("called without any password configured") - if not all_ok then - print("redis auth error: " .. err) - error(err) - end - end -end - -function redq.reconnect(redis_client, opts) - local sock = redis_client.network.socket - sock:close() - - if opts.copas_wrap then - sock = redq.copas_socket(opts.host, opts.port, opts.timeout, opts.tcp_nodelay) - redis_client.network.socket = sock - else - if opts.socket then error("we can't recreate custom socket") end - local new_redis_client = redis.connect(opts) - redis_client.network.socket = new_redis_client.network.socket - end - - auth_client(redis_client, opts.password) - if opts.db then redis_client:select(opts.db) end -end - -function redq.connect(opts) - if opts.copas_wrap then - opts.socket = redq.copas_socket(opts.host, opts.port, opts.timeout, opts.tcp_nodelay) - end - - local redis_client = redis.connect(opts) -- has .network.socket field - auth_client(redis_client, opts.password) - if opts.db then redis_client:select(opts.db) end - opts.socket = nil -- we don't need it anymore - return redis_client -end - -function redq.create(...) - local opts = { - host = "127.0.0.1", - port = 6379, - -- sock = "redis.sock", - tcp_nodelay = true, - copas_wrap = false, - -- socket = socket.tcp(), - - -- timeout = 10, -- connect timeout - -- password = "pass", - -- db = 0, - -- reconnect_condition = function(err) return err:match("timeout") end, - -- max_retries_per_request = 3, - -- retry_delay = function(times) return times * 1 end, -- in seconds - } - - local args = {...} - if type(args[1]) == "string" then - opts.sock = args[1] - elseif type(args[1]) == "number" then - opts.port, opts.host = args[1], args[2] - elseif type(args[1]) == "table" then - for k, v in pairs( args[1] ) do -- merge with overrides - opts[k] = v - end - end - - local redis_client = redq.connect(opts) - return redq.wrap(redis_client, opts) -end - -return redq