diff --git a/README.md b/README.md index bb9b413..a797175 100644 --- a/README.md +++ b/README.md @@ -118,18 +118,22 @@ Takes an optional id parameter, this *must* be unique if multiple instances of u Initialises the background thread, should be called in `init_worker_by_lua` ### connect -`syntax: ok, err = upstream:connect(client?)` +`syntax: ok, err = upstream:connect(client?, key?)` Attempts to connect to a host in the defined pools in priority order using the selected load balancing method. Returns a connected socket and a table containing the connected `host`, `poolid` and `pool` or nil and an error message. When passed a [socket](https://github.com/openresty/lua-nginx-module#ngxsockettcp) or resty module it will return the same object after successful connection or nil. +Additionally, hash methods may take an optional `key` to define how to hash the connection to determine the host. By default `ngx.var.remote_addr` is used. This value is ignored when the pool's method is round robin. + ```lua resty_redis = require('resty.redis') local redis = resty_redis.new() -local redis, err = upstream:connect(redis) +local key = ngx.req.get_headers()["X-Forwarded-For"] + +local redis, err = upstream:connect(redis, key) if not redis then ngx.log(ngx.ERR, err) @@ -250,7 +254,7 @@ Returns a new api object using the provided upstream object. `syntax: ok, err = api:set_method(poolid, method)` Sets the load balancing method for the specified pool. -Currently only randomised round robin is supported. +Currently randomised round robin and hashing methods are supported. ### create_pool `syntax: ok, err = api:create_pool(pool)` @@ -259,7 +263,7 @@ Creates a new pool from a table of options, `pool` must contain at least 1 key ` Other valid options are -* `method` Balancing method, currently only `round_robin` is supported +* `method` Balancing method * `timeout` Connection timeout in ms * `priority` Higher priority pools are used later * `read_timeout` diff --git a/lib/resty/upstream/socket.lua b/lib/resty/upstream/socket.lua index 79a8516..0a0b5a2 100644 --- a/lib/resty/upstream/socket.lua +++ b/lib/resty/upstream/socket.lua @@ -515,6 +515,111 @@ function _M.connect_failed(self, host, poolid, failed_hosts) end +local function get_hash_host(vars) + local h = vars.hash + local hosts = vars.available_hosts + local weight_sum = vars.weight_sum + local hostcount = #hosts + + if hostcount == 0 then return end + + local cur_idx = 1 + + -- figure where we should go + local cur_weight = hosts[cur_idx].weight + + while (h >= cur_weight) do + h = h - cur_weight + + if (h < 0) then + h = maxweight + h + end + + cur_idx = cur_idx + 1 + + if (cur_idx > hostcount) then + cur_idx = 1 + end + + cur_weight = hosts[cur_idx].weight + end + + -- now cur_idx points us to where we should go + return hosts[cur_idx] +end + +function get_hash_vars(hosts, failed_hosts, key) + local available_hosts = {} -- new tab needed here + local n = 0 + local weight_sum = 0 + + for i=1, #hosts do + local host = hosts[i] + + if (host.up and not failed_hosts[host.id]) then + n = n + 1 + available_hosts[n] = host + weight_sum = weight_sum + host.weight + end + end + + local hash = ngx.crc32_short(key) % weight_sum + + return { + available_hosts = available_hosts, + weight_sum = weight_sum, + hash = hash, + } +end + +_M.available_methods.hash = function(self, pool, sock, key) + local hosts = pool.hosts + local poolid = pool.id + local hash_key = key or ngx.var.remote_addr + + local failed_hosts = self:get_failed_hosts(poolid) + + -- Attempt a connection + if #hosts == 1 then + -- Don't bother trying to balance between 1 host + local host = hosts[1] + if host.up == false or failed_hosts[host.id] then + return nil, sock, {}, nil + end + local connected, err = sock:connect(host.host, host.port) + if not connected then + self:connect_failed(host, poolid, failed_hosts) + end + return connected, sock, host, err + end + + local hash_vars = get_hash_vars(hosts, failed_hosts, hash_key) + + local connected, err + repeat + local host = get_hash_host(hash_vars) + if not host then + -- Ran out of hosts, break out of the loop (go to next pool) + break + end + + -- Try connecting to the selected host + connected, err = sock:connect(host.host, host.port) + + if connected then + return connected, sock, host, err + else + -- Mark the host bad and retry + self:connect_failed(host, poolid, failed_hosts) + + -- rehash + hash_vars = get_hash_vars(hosts, failed_hosts, hash_key) + end + until connected + -- All hosts have failed + return nil, sock, {}, err +end + local function select_weighted_rr_host(hosts, failed_hosts, round_robin_vars) local idx = round_robin_vars.idx local cw = round_robin_vars.cw @@ -569,7 +674,6 @@ local function get_round_robin_vars(self, pool) return round_robin_vars end - _M.available_methods.round_robin = function(self, pool, sock) local hosts = pool.hosts local poolid = pool.id @@ -616,7 +720,7 @@ _M.available_methods.round_robin = function(self, pool, sock) end -function _M.connect(self, sock) +function _M.connect(self, sock, key) -- Get pool data local priority_index, err = self:get_priority_index() if not priority_index then @@ -652,7 +756,7 @@ function _M.connect(self, sock) set_timeout(sock, pool.timeout) -- Load balance between available hosts using specified method - connected, sock, host, err = available_methods[pool.method](self, pool, sock) + connected, sock, host, err = available_methods[pool.method](self, pool, sock, key) if connected then -- Return connected socket! diff --git a/t/11_hash.t b/t/11_hash.t new file mode 100644 index 0000000..2907569 --- /dev/null +++ b/t/11_hash.t @@ -0,0 +1,468 @@ +# vim:set ft= ts=4 sw=4 et: + +use Test::Nginx::Socket; +use Cwd qw(cwd); + +plan tests => repeat_each() * 44; + +my $pwd = cwd(); + +our $HttpConfig = qq{ + lua_package_path "$pwd/lib/?.lua;;"; + error_log logs/error.log debug; + + lua_shared_dict test_upstream 1m; + + +}; + +our $InitConfig = qq{ + init_by_lua ' + cjson = require "cjson" + upstream_socket = require("resty.upstream.socket") + upstream_api = require("resty.upstream.api") + + upstream, configured = upstream_socket:new("test_upstream") + test_api = upstream_api:new(upstream) + + test_api:create_pool({id = "primary", timeout = 100}) + test_api:set_method("primary", "hash") + +}; + +$ENV{TEST_NGINX_RESOLVER} = '8.8.8.8'; + +no_long_string(); +#no_diff(); + +run_tests(); + +__DATA__ + +=== TEST 1: Hash method, single host +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.say(info.host.id) + sock:setkeepalive() + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body +1 + +=== TEST 2: Hash between multiple hosts, default settings +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + local sock, info = upstream:connect(nil) + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.say(info.host.id) + sock:setkeepalive() + end + + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body +1 + +=== TEST 2b: Hash between multiple hosts, provide a user-defined key +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + local sock = nil + local key = "1.2.3.4" -- i know this will hash to 2 from trial and error. hooray unit tests! + + local sock, info = upstream:connect(sock, key) + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.say(info.host.id) + sock:setkeepalive() + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body +2 + +=== TEST 3: Hash is consistent +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 1111111111 + +=== TEST 4: Hash with user provided weights +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 1 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 1 }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 1111111111 + +=== TEST 5: Weighted hash is consistent +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 2 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 1 }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 2222222222 + +=== TEST 5b: Weighted hash is consistent, odd number of hosts +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 10 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 20 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 30 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 40 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 50 }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 20 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 33333333333333333333 + +=== TEST 5c: Weighted hash is consistent, last host has highest weight +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 1 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 2 }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 2222222222 + +=== TEST 6: Weighted hash is consistent, divisable weights +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 20 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 10 }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 1111111111 + +=== TEST 7: Hash is consistent, re-keyed, and consistent +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + + test_api:down_host("primary", 1) + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + '; + } + +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 11111111112222222222 + +=== TEST 7b: Hash is consistent, re-keyed, consistent, re-keyed again, and consistent +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + test_api:down_host("primary", 1) + + upstream:process_failed_hosts() + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + + test_api:up_host("primary", 1) + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + '; + } + +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 111111111144444444441111111111 +