Skip to content

Commit

Permalink
Merge pull request #13 from p0pr0ck5/master
Browse files Browse the repository at this point in the history
Implement hash load balancing method
  • Loading branch information
hamishforbes authored Jul 4, 2016
2 parents f97c08f + 7e3c372 commit 10048a8
Show file tree
Hide file tree
Showing 3 changed files with 583 additions and 7 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,22 @@ Takes an optional id parameter, this *must* be unique if multiple instances of u
Initialises the background thread, should be called in `init_worker_by_lua`

### connect
`syntax: ok, err = upstream:connect(client?)`
`syntax: ok, err = upstream:connect(client?, key?)`

Attempts to connect to a host in the defined pools in priority order using the selected load balancing method.
Returns a connected socket and a table containing the connected `host`, `poolid` and `pool` or nil and an error message.

When passed a [socket](https://github.com/openresty/lua-nginx-module#ngxsockettcp) or resty module it will return the same object after successful connection or nil.

Additionally, hash methods may take an optional `key` to define how to hash the connection to determine the host. By default `ngx.var.remote_addr` is used. This value is ignored when the pool's method is round robin.

```lua
resty_redis = require('resty.redis')
local redis = resty_redis.new()

local redis, err = upstream:connect(redis)
local key = ngx.req.get_headers()["X-Forwarded-For"]

local redis, err = upstream:connect(redis, key)

if not redis then
ngx.log(ngx.ERR, err)
Expand Down Expand Up @@ -250,7 +254,7 @@ Returns a new api object using the provided upstream object.
`syntax: ok, err = api:set_method(poolid, method)`

Sets the load balancing method for the specified pool.
Currently only randomised round robin is supported.
Currently randomised round robin and hashing methods are supported.

### create_pool
`syntax: ok, err = api:create_pool(pool)`
Expand All @@ -259,7 +263,7 @@ Creates a new pool from a table of options, `pool` must contain at least 1 key `

Other valid options are

* `method` Balancing method, currently only `round_robin` is supported
* `method` Balancing method
* `timeout` Connection timeout in ms
* `priority` Higher priority pools are used later
* `read_timeout`
Expand Down
110 changes: 107 additions & 3 deletions lib/resty/upstream/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,111 @@ function _M.connect_failed(self, host, poolid, failed_hosts)
end


local function get_hash_host(vars)
local h = vars.hash
local hosts = vars.available_hosts
local weight_sum = vars.weight_sum
local hostcount = #hosts

if hostcount == 0 then return end

local cur_idx = 1

-- figure where we should go
local cur_weight = hosts[cur_idx].weight

while (h >= cur_weight) do
h = h - cur_weight

if (h < 0) then
h = maxweight + h
end

cur_idx = cur_idx + 1

if (cur_idx > hostcount) then
cur_idx = 1
end

cur_weight = hosts[cur_idx].weight
end

-- now cur_idx points us to where we should go
return hosts[cur_idx]
end

function get_hash_vars(hosts, failed_hosts, key)
local available_hosts = {} -- new tab needed here
local n = 0
local weight_sum = 0

for i=1, #hosts do
local host = hosts[i]

if (host.up and not failed_hosts[host.id]) then
n = n + 1
available_hosts[n] = host
weight_sum = weight_sum + host.weight
end
end

local hash = ngx.crc32_short(key) % weight_sum

return {
available_hosts = available_hosts,
weight_sum = weight_sum,
hash = hash,
}
end

_M.available_methods.hash = function(self, pool, sock, key)
local hosts = pool.hosts
local poolid = pool.id
local hash_key = key or ngx.var.remote_addr

local failed_hosts = self:get_failed_hosts(poolid)

-- Attempt a connection
if #hosts == 1 then
-- Don't bother trying to balance between 1 host
local host = hosts[1]
if host.up == false or failed_hosts[host.id] then
return nil, sock, {}, nil
end
local connected, err = sock:connect(host.host, host.port)
if not connected then
self:connect_failed(host, poolid, failed_hosts)
end
return connected, sock, host, err
end

local hash_vars = get_hash_vars(hosts, failed_hosts, hash_key)

local connected, err
repeat
local host = get_hash_host(hash_vars)
if not host then
-- Ran out of hosts, break out of the loop (go to next pool)
break
end

-- Try connecting to the selected host
connected, err = sock:connect(host.host, host.port)

if connected then
return connected, sock, host, err
else
-- Mark the host bad and retry
self:connect_failed(host, poolid, failed_hosts)

-- rehash
hash_vars = get_hash_vars(hosts, failed_hosts, hash_key)
end
until connected
-- All hosts have failed
return nil, sock, {}, err
end

local function select_weighted_rr_host(hosts, failed_hosts, round_robin_vars)
local idx = round_robin_vars.idx
local cw = round_robin_vars.cw
Expand Down Expand Up @@ -569,7 +674,6 @@ local function get_round_robin_vars(self, pool)
return round_robin_vars
end


_M.available_methods.round_robin = function(self, pool, sock)
local hosts = pool.hosts
local poolid = pool.id
Expand Down Expand Up @@ -616,7 +720,7 @@ _M.available_methods.round_robin = function(self, pool, sock)
end


function _M.connect(self, sock)
function _M.connect(self, sock, key)
-- Get pool data
local priority_index, err = self:get_priority_index()
if not priority_index then
Expand Down Expand Up @@ -652,7 +756,7 @@ function _M.connect(self, sock)
set_timeout(sock, pool.timeout)

-- Load balance between available hosts using specified method
connected, sock, host, err = available_methods[pool.method](self, pool, sock)
connected, sock, host, err = available_methods[pool.method](self, pool, sock, key)

if connected then
-- Return connected socket!
Expand Down
Loading

0 comments on commit 10048a8

Please sign in to comment.