From 7e3c372dc4dea011db12c9f5c8ded2e71db66ea5 Mon Sep 17 00:00:00 2001 From: Robert Paprocki Date: Fri, 1 Jul 2016 09:28:48 -0700 Subject: [PATCH] Implement hash load balancing method Add a new hash load balancing method to resty.upstream.socket. This hash is a CRC32 of the given key modulo the sum of weights of available hosts. 'Available hosts' are determined before a connection attempt is made, so that host weights can be taken into account when deriving the hash. This stands in contrast to the existing round robin implementation, which walks the entire list of host and then checks its availability after making a weighted decision. Hash keys can be provided by an optional param to upstream:connect(). It would make more sense to provide a function to derive the key as part of defining the pool, but because pools are serialized as JSON, this is a no-go. See issue #12 for the initial discussion regarding additional load balancing schemes. --- README.md | 12 +- lib/resty/upstream/socket.lua | 110 +++++++- t/11_hash.t | 468 ++++++++++++++++++++++++++++++++++ 3 files changed, 583 insertions(+), 7 deletions(-) create mode 100644 t/11_hash.t diff --git a/README.md b/README.md index bb9b413..a797175 100644 --- a/README.md +++ b/README.md @@ -118,18 +118,22 @@ Takes an optional id parameter, this *must* be unique if multiple instances of u Initialises the background thread, should be called in `init_worker_by_lua` ### connect -`syntax: ok, err = upstream:connect(client?)` +`syntax: ok, err = upstream:connect(client?, key?)` Attempts to connect to a host in the defined pools in priority order using the selected load balancing method. Returns a connected socket and a table containing the connected `host`, `poolid` and `pool` or nil and an error message. When passed a [socket](https://github.com/openresty/lua-nginx-module#ngxsockettcp) or resty module it will return the same object after successful connection or nil. +Additionally, hash methods may take an optional `key` to define how to hash the connection to determine the host. By default `ngx.var.remote_addr` is used. This value is ignored when the pool's method is round robin. + ```lua resty_redis = require('resty.redis') local redis = resty_redis.new() -local redis, err = upstream:connect(redis) +local key = ngx.req.get_headers()["X-Forwarded-For"] + +local redis, err = upstream:connect(redis, key) if not redis then ngx.log(ngx.ERR, err) @@ -250,7 +254,7 @@ Returns a new api object using the provided upstream object. `syntax: ok, err = api:set_method(poolid, method)` Sets the load balancing method for the specified pool. -Currently only randomised round robin is supported. +Currently randomised round robin and hashing methods are supported. ### create_pool `syntax: ok, err = api:create_pool(pool)` @@ -259,7 +263,7 @@ Creates a new pool from a table of options, `pool` must contain at least 1 key ` Other valid options are -* `method` Balancing method, currently only `round_robin` is supported +* `method` Balancing method * `timeout` Connection timeout in ms * `priority` Higher priority pools are used later * `read_timeout` diff --git a/lib/resty/upstream/socket.lua b/lib/resty/upstream/socket.lua index 79a8516..0a0b5a2 100644 --- a/lib/resty/upstream/socket.lua +++ b/lib/resty/upstream/socket.lua @@ -515,6 +515,111 @@ function _M.connect_failed(self, host, poolid, failed_hosts) end +local function get_hash_host(vars) + local h = vars.hash + local hosts = vars.available_hosts + local weight_sum = vars.weight_sum + local hostcount = #hosts + + if hostcount == 0 then return end + + local cur_idx = 1 + + -- figure where we should go + local cur_weight = hosts[cur_idx].weight + + while (h >= cur_weight) do + h = h - cur_weight + + if (h < 0) then + h = maxweight + h + end + + cur_idx = cur_idx + 1 + + if (cur_idx > hostcount) then + cur_idx = 1 + end + + cur_weight = hosts[cur_idx].weight + end + + -- now cur_idx points us to where we should go + return hosts[cur_idx] +end + +function get_hash_vars(hosts, failed_hosts, key) + local available_hosts = {} -- new tab needed here + local n = 0 + local weight_sum = 0 + + for i=1, #hosts do + local host = hosts[i] + + if (host.up and not failed_hosts[host.id]) then + n = n + 1 + available_hosts[n] = host + weight_sum = weight_sum + host.weight + end + end + + local hash = ngx.crc32_short(key) % weight_sum + + return { + available_hosts = available_hosts, + weight_sum = weight_sum, + hash = hash, + } +end + +_M.available_methods.hash = function(self, pool, sock, key) + local hosts = pool.hosts + local poolid = pool.id + local hash_key = key or ngx.var.remote_addr + + local failed_hosts = self:get_failed_hosts(poolid) + + -- Attempt a connection + if #hosts == 1 then + -- Don't bother trying to balance between 1 host + local host = hosts[1] + if host.up == false or failed_hosts[host.id] then + return nil, sock, {}, nil + end + local connected, err = sock:connect(host.host, host.port) + if not connected then + self:connect_failed(host, poolid, failed_hosts) + end + return connected, sock, host, err + end + + local hash_vars = get_hash_vars(hosts, failed_hosts, hash_key) + + local connected, err + repeat + local host = get_hash_host(hash_vars) + if not host then + -- Ran out of hosts, break out of the loop (go to next pool) + break + end + + -- Try connecting to the selected host + connected, err = sock:connect(host.host, host.port) + + if connected then + return connected, sock, host, err + else + -- Mark the host bad and retry + self:connect_failed(host, poolid, failed_hosts) + + -- rehash + hash_vars = get_hash_vars(hosts, failed_hosts, hash_key) + end + until connected + -- All hosts have failed + return nil, sock, {}, err +end + local function select_weighted_rr_host(hosts, failed_hosts, round_robin_vars) local idx = round_robin_vars.idx local cw = round_robin_vars.cw @@ -569,7 +674,6 @@ local function get_round_robin_vars(self, pool) return round_robin_vars end - _M.available_methods.round_robin = function(self, pool, sock) local hosts = pool.hosts local poolid = pool.id @@ -616,7 +720,7 @@ _M.available_methods.round_robin = function(self, pool, sock) end -function _M.connect(self, sock) +function _M.connect(self, sock, key) -- Get pool data local priority_index, err = self:get_priority_index() if not priority_index then @@ -652,7 +756,7 @@ function _M.connect(self, sock) set_timeout(sock, pool.timeout) -- Load balance between available hosts using specified method - connected, sock, host, err = available_methods[pool.method](self, pool, sock) + connected, sock, host, err = available_methods[pool.method](self, pool, sock, key) if connected then -- Return connected socket! diff --git a/t/11_hash.t b/t/11_hash.t new file mode 100644 index 0000000..2907569 --- /dev/null +++ b/t/11_hash.t @@ -0,0 +1,468 @@ +# vim:set ft= ts=4 sw=4 et: + +use Test::Nginx::Socket; +use Cwd qw(cwd); + +plan tests => repeat_each() * 44; + +my $pwd = cwd(); + +our $HttpConfig = qq{ + lua_package_path "$pwd/lib/?.lua;;"; + error_log logs/error.log debug; + + lua_shared_dict test_upstream 1m; + + +}; + +our $InitConfig = qq{ + init_by_lua ' + cjson = require "cjson" + upstream_socket = require("resty.upstream.socket") + upstream_api = require("resty.upstream.api") + + upstream, configured = upstream_socket:new("test_upstream") + test_api = upstream_api:new(upstream) + + test_api:create_pool({id = "primary", timeout = 100}) + test_api:set_method("primary", "hash") + +}; + +$ENV{TEST_NGINX_RESOLVER} = '8.8.8.8'; + +no_long_string(); +#no_diff(); + +run_tests(); + +__DATA__ + +=== TEST 1: Hash method, single host +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.say(info.host.id) + sock:setkeepalive() + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body +1 + +=== TEST 2: Hash between multiple hosts, default settings +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + local sock, info = upstream:connect(nil) + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.say(info.host.id) + sock:setkeepalive() + end + + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body +1 + +=== TEST 2b: Hash between multiple hosts, provide a user-defined key +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + local sock = nil + local key = "1.2.3.4" -- i know this will hash to 2 from trial and error. hooray unit tests! + + local sock, info = upstream:connect(sock, key) + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.say(info.host.id) + sock:setkeepalive() + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body +2 + +=== TEST 3: Hash is consistent +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 1111111111 + +=== TEST 4: Hash with user provided weights +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 1 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 1 }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 1111111111 + +=== TEST 5: Weighted hash is consistent +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 2 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 1 }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 2222222222 + +=== TEST 5b: Weighted hash is consistent, odd number of hosts +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 10 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 20 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 30 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 40 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 50 }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 20 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 33333333333333333333 + +=== TEST 5c: Weighted hash is consistent, last host has highest weight +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 1 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 2 }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 2222222222 + +=== TEST 6: Weighted hash is consistent, divisable weights +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 20 }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT, weight = 10 }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + '; + } +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 1111111111 + +=== TEST 7: Hash is consistent, re-keyed, and consistent +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + + test_api:down_host("primary", 1) + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + '; + } + +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 11111111112222222222 + +=== TEST 7b: Hash is consistent, re-keyed, consistent, re-keyed again, and consistent +--- http_config eval +"$::HttpConfig" +."$::InitConfig" +. q{ + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + test_api:add_host("primary", { host = "127.0.0.1", port = $TEST_NGINX_SERVER_PORT }) + '; +} +--- log_level: debug +--- config + location = / { + content_by_lua ' + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + test_api:down_host("primary", 1) + + upstream:process_failed_hosts() + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + + upstream:process_failed_hosts() + + test_api:up_host("primary", 1) + + local count = 10 + for i=1,count do + local sock, info = upstream:connect() + if not sock then + ngx.log(ngx.ERR, info) + else + ngx.print(info.host.id) + sock:setkeepalive() + end + end + '; + } + +--- request +GET / +--- no_error_log +[error] +[warn] +--- response_body: 111111111144444444441111111111 +