Skip to content

Commit

Permalink
Implement hash load balancing method
Browse files Browse the repository at this point in the history
Add a new hash load balancing method to resty.upstream.socket. This
hash is a CRC32 of the given key modulo the sum of weights of
available hosts. 'Available hosts' are determined before a connection
attempt is made, so that host weights can be taken into account when
deriving the hash. This stands in contrast to the existing round
robin implementation, which walks the entire list of host and then
checks its availability after making a weighted decision.

Hash keys can be provided by an optional param to upstream:connect().
It would make more sense to provide a function to derive the key as
part of defining the pool, but because pools are serialized as JSON,
this is a no-go.

See issue hamishforbes#12 for the initial discussion regarding additional load
balancing schemes.
  • Loading branch information
p0pr0ck5 committed Jul 1, 2016
1 parent f97c08f commit 7e3c372
Show file tree
Hide file tree
Showing 3 changed files with 583 additions and 7 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,22 @@ Takes an optional id parameter, this *must* be unique if multiple instances of u
Initialises the background thread, should be called in `init_worker_by_lua`

### connect
`syntax: ok, err = upstream:connect(client?)`
`syntax: ok, err = upstream:connect(client?, key?)`

Attempts to connect to a host in the defined pools in priority order using the selected load balancing method.
Returns a connected socket and a table containing the connected `host`, `poolid` and `pool` or nil and an error message.

When passed a [socket](https://github.com/openresty/lua-nginx-module#ngxsockettcp) or resty module it will return the same object after successful connection or nil.

Additionally, hash methods may take an optional `key` to define how to hash the connection to determine the host. By default `ngx.var.remote_addr` is used. This value is ignored when the pool's method is round robin.

```lua
resty_redis = require('resty.redis')
local redis = resty_redis.new()

local redis, err = upstream:connect(redis)
local key = ngx.req.get_headers()["X-Forwarded-For"]

local redis, err = upstream:connect(redis, key)

if not redis then
ngx.log(ngx.ERR, err)
Expand Down Expand Up @@ -250,7 +254,7 @@ Returns a new api object using the provided upstream object.
`syntax: ok, err = api:set_method(poolid, method)`

Sets the load balancing method for the specified pool.
Currently only randomised round robin is supported.
Currently randomised round robin and hashing methods are supported.

### create_pool
`syntax: ok, err = api:create_pool(pool)`
Expand All @@ -259,7 +263,7 @@ Creates a new pool from a table of options, `pool` must contain at least 1 key `

Other valid options are

* `method` Balancing method, currently only `round_robin` is supported
* `method` Balancing method
* `timeout` Connection timeout in ms
* `priority` Higher priority pools are used later
* `read_timeout`
Expand Down
110 changes: 107 additions & 3 deletions lib/resty/upstream/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,111 @@ function _M.connect_failed(self, host, poolid, failed_hosts)
end


local function get_hash_host(vars)
local h = vars.hash
local hosts = vars.available_hosts
local weight_sum = vars.weight_sum
local hostcount = #hosts

if hostcount == 0 then return end

local cur_idx = 1

-- figure where we should go
local cur_weight = hosts[cur_idx].weight

while (h >= cur_weight) do
h = h - cur_weight

if (h < 0) then
h = maxweight + h
end

cur_idx = cur_idx + 1

if (cur_idx > hostcount) then
cur_idx = 1
end

cur_weight = hosts[cur_idx].weight
end

-- now cur_idx points us to where we should go
return hosts[cur_idx]
end

function get_hash_vars(hosts, failed_hosts, key)
local available_hosts = {} -- new tab needed here
local n = 0
local weight_sum = 0

for i=1, #hosts do
local host = hosts[i]

if (host.up and not failed_hosts[host.id]) then
n = n + 1
available_hosts[n] = host
weight_sum = weight_sum + host.weight
end
end

local hash = ngx.crc32_short(key) % weight_sum

return {
available_hosts = available_hosts,
weight_sum = weight_sum,
hash = hash,
}
end

_M.available_methods.hash = function(self, pool, sock, key)
local hosts = pool.hosts
local poolid = pool.id
local hash_key = key or ngx.var.remote_addr

local failed_hosts = self:get_failed_hosts(poolid)

-- Attempt a connection
if #hosts == 1 then
-- Don't bother trying to balance between 1 host
local host = hosts[1]
if host.up == false or failed_hosts[host.id] then
return nil, sock, {}, nil
end
local connected, err = sock:connect(host.host, host.port)
if not connected then
self:connect_failed(host, poolid, failed_hosts)
end
return connected, sock, host, err
end

local hash_vars = get_hash_vars(hosts, failed_hosts, hash_key)

local connected, err
repeat
local host = get_hash_host(hash_vars)
if not host then
-- Ran out of hosts, break out of the loop (go to next pool)
break
end

-- Try connecting to the selected host
connected, err = sock:connect(host.host, host.port)

if connected then
return connected, sock, host, err
else
-- Mark the host bad and retry
self:connect_failed(host, poolid, failed_hosts)

-- rehash
hash_vars = get_hash_vars(hosts, failed_hosts, hash_key)
end
until connected
-- All hosts have failed
return nil, sock, {}, err
end

local function select_weighted_rr_host(hosts, failed_hosts, round_robin_vars)
local idx = round_robin_vars.idx
local cw = round_robin_vars.cw
Expand Down Expand Up @@ -569,7 +674,6 @@ local function get_round_robin_vars(self, pool)
return round_robin_vars
end


_M.available_methods.round_robin = function(self, pool, sock)
local hosts = pool.hosts
local poolid = pool.id
Expand Down Expand Up @@ -616,7 +720,7 @@ _M.available_methods.round_robin = function(self, pool, sock)
end


function _M.connect(self, sock)
function _M.connect(self, sock, key)
-- Get pool data
local priority_index, err = self:get_priority_index()
if not priority_index then
Expand Down Expand Up @@ -652,7 +756,7 @@ function _M.connect(self, sock)
set_timeout(sock, pool.timeout)

-- Load balance between available hosts using specified method
connected, sock, host, err = available_methods[pool.method](self, pool, sock)
connected, sock, host, err = available_methods[pool.method](self, pool, sock, key)

if connected then
-- Return connected socket!
Expand Down
Loading

0 comments on commit 7e3c372

Please sign in to comment.