Skip to content

Commit b12f81b

Browse files
ddragosdselfxp
authored andcommitted
Added Remote Cache Status class to obtain the first healthy redis node from an upstream (#7)
[#7] Added Remote Cache Status class to obtain the first healthy redis node from an upstream
1 parent 5c6e67a commit b12f81b

File tree

4 files changed

+534
-8
lines changed

4 files changed

+534
-8
lines changed

Makefile

+15-5
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ all: ;
1414

1515
install: all
1616
$(INSTALL) -d $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/
17-
$(INSTALL) -d $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/store/
1817
$(INSTALL) -d $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/request/
18+
$(INSTALL) -d $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/status/
19+
$(INSTALL) -d $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/store/
1920
$(INSTALL) src/lua/api-gateway/cache/*.lua $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/
20-
$(INSTALL) src/lua/api-gateway/cache/store/*.lua $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/store/
2121
$(INSTALL) src/lua/api-gateway/cache/request/*.lua $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/request/
22+
$(INSTALL) src/lua/api-gateway/cache/status/*.lua $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/status/
23+
$(INSTALL) src/lua/api-gateway/cache/store/*.lua $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/store/
2224

2325
test: redis
2426
echo "Starting redis server on default port"
@@ -46,6 +48,8 @@ redis: all
4648
.PHONY: pre-docker-test
4749
pre-docker-test:
4850
echo " pre-docker-test"
51+
echo " cleaning up any test_redis docker image"
52+
docker ps | grep test_redis | awk '{print $$1}' | xargs docker stop | xargs docker rm
4953
rm -rf $(BUILD_DIR)/*
5054
rm -rf ~/tmp/apiplatform/api-gateway-cachemanager/
5155
mkdir -p $(BUILD_DIR)
@@ -60,16 +64,22 @@ pre-docker-test:
6064
mkdir -p ~/tmp/apiplatform/api-gateway-cachemanager/target/test-logs
6165
ln -s ~/tmp/apiplatform/api-gateway-cachemanager/target/test-logs ./target/test-logs
6266

67+
.PHONY: get-redis-docker-ip
68+
get-redis-docker-ip:
69+
$(eval $@_IP := $(shell docker run --entrypoint=ifconfig alpine eth0 | grep "inet addr" | cut -d: -f2 | awk '{ print $$1}'))
70+
@echo "Assuming the next IP for the docker container is:" $($@_IP)
71+
sed -i '' 's/127\.0\.0\.1\:6379/$($@_IP)\:6379/g' ~/tmp/apiplatform/api-gateway-cachemanager/test/perl/api-gateway/cache/status/remoteCacheStatus.t
72+
6373
post-docker-test:
6474
echo " post-docker-test"
65-
# cp -r ~/tmp/apiplatform/api-gateway-cachemanager/target/ ./target
66-
# rm -rf ~/tmp/apiplatform/api-gateway-cachemanager
75+
cp -r ~/tmp/apiplatform/api-gateway-cachemanager/target/ ./target
76+
rm -rf ~/tmp/apiplatform/api-gateway-cachemanager
6777

6878
run-docker-test:
6979
echo " run-docker-test"
7080
- cd ./test && docker-compose up --force-recreate
7181

72-
test-docker: pre-docker-test run-docker-test post-docker-test
82+
test-docker: pre-docker-test get-redis-docker-ip run-docker-test post-docker-test
7383
echo "running tests with docker ..."
7484

7585
test-docker-manual: pre-docker-test
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
--[[
2+
Copyright 2016 Adobe Systems Incorporated. All rights reserved.
3+
4+
This file is licensed to you under the Apache License, Version 2.0 (the
5+
"License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR RESPRESENTATIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
10+
]]
11+
--
12+
-- Module for selecting a healthy server from an upstream.
13+
-- It's best to be used with health-checks so that a peer is maked UP or DOWN
14+
-- User: nramaswa
15+
-- Date: 4/17/14
16+
-- Time: 7:38 PM
17+
18+
local _M = {}
19+
local DEFAULT_SHARED_DICT = "cachedkeys"
20+
21+
function _M:new(o)
22+
o = o or {}
23+
setmetatable(o, self)
24+
self.__index = self
25+
if ( o ~= nil ) then
26+
self.shared_dict = o.shared_dict or DEFAULT_SHARED_DICT
27+
end
28+
return o
29+
end
30+
31+
--- Reused from the "resty.upstream.healthcheck" module to get the
32+
-- status of the upstream servers
33+
local function gen_peers_status_info(peers, bits, idx)
34+
local npeers = #peers
35+
for i = 1, npeers do
36+
local peer = peers[i]
37+
bits[idx] = peer.name
38+
if peer.down then
39+
bits[idx + 1] = " DOWN\n"
40+
else
41+
bits[idx + 1] = " up\n"
42+
end
43+
idx = idx + 2
44+
end
45+
return idx
46+
end
47+
48+
--- Returns the results of the health checks for the provided upstream_name
49+
-- as found in the "resty.upstream.healthcheck" module.
50+
-- @param upstream_name
51+
--
52+
local function get_health_check_for_upstream(upstream_name)
53+
local ok, upstream = pcall(require, "ngx.upstream")
54+
if not ok then
55+
error("ngx_upstream_lua module required")
56+
end
57+
58+
local get_primary_peers = upstream.get_primary_peers
59+
local get_backup_peers = upstream.get_backup_peers
60+
61+
local ok, new_tab = pcall(require, "table.new")
62+
if not ok or type(new_tab) ~= "function" then
63+
new_tab = function (narr, nrec) return {} end
64+
end
65+
66+
local n = 1
67+
local bits = new_tab(n * 20, 0)
68+
local idx = 1
69+
70+
local peers, err = get_primary_peers(upstream_name)
71+
if not peers then
72+
return "failed to get primary peers in upstream " .. upstream_name .. ": "
73+
.. err
74+
end
75+
76+
idx = gen_peers_status_info(peers, bits, idx)
77+
78+
peers, err = get_backup_peers(upstream_name)
79+
if not peers then
80+
return "failed to get backup peers in upstream " .. upstream_name .. ": "
81+
.. err
82+
end
83+
84+
idx = gen_peers_status_info(peers, bits, idx)
85+
86+
return bits
87+
end
88+
89+
--- Returns a cached healthy upstream.
90+
-- @param dict_name shared dict name
91+
-- @param upstream_name the name of the upstream as defined in the config
92+
--
93+
local function get_healthy_upstream_from_cache(dict_name, upstream_name)
94+
local dict = ngx.shared[dict_name]
95+
local healthy_upstream
96+
local health_upstream_key = "healthy_upstream:" .. tostring(upstream_name)
97+
if (nil ~= dict) then
98+
healthy_upstream = dict:get(health_upstream_key)
99+
end
100+
return healthy_upstream
101+
end
102+
103+
local function update_healthy_upstream_in_cache(dict_name, upstream_name, healthy_upstream)
104+
local dict = ngx.shared[dict_name];
105+
if (nil ~= dict) then
106+
ngx.log(ngx.DEBUG, "Saving a healthy upstream:", healthy_upstream, " in cache:", dict_name, " for upstream:", upstream_name)
107+
local exp_time_in_seconds = 5
108+
local health_upstream_key = "healthy_upstream:" .. tostring(upstream_name)
109+
dict:set(health_upstream_key, healthy_upstream, exp_time_in_seconds)
110+
return
111+
end
112+
113+
ngx.log(ngx.WARN, "Dictionary ", dict_name, " doesn't seem to be set. Did you define one ? ")
114+
end
115+
116+
--- Returns the host and port from an upstream like host:port
117+
-- @param upstream_host
118+
--
119+
local function get_host_and_port_in_upstream(upstream_host)
120+
local p = {}
121+
p.host = upstream_host
122+
123+
local idx = string.find(upstream_host, ":", 1, true)
124+
if idx then
125+
p.host = string.sub(upstream_host, 1, idx - 1)
126+
p.port = tonumber(string.sub(upstream_host, idx + 1))
127+
end
128+
return p.host, p.port
129+
end
130+
131+
132+
function _M:getStatus(upstream_name)
133+
return get_health_check_for_upstream(upstream_name)
134+
end
135+
136+
--- Returns the first healthy server found in the upstream_name
137+
-- Returns 3 values: <upstreamName , host, port >
138+
-- The difference between upstream and <host,port> is that the upstream may be just a string containing host:port
139+
-- @param upstream_name
140+
--
141+
function _M:getHealthyServer(upstream_name)
142+
143+
-- get the host and port from the local cache first
144+
local healthy_host = get_healthy_upstream_from_cache(self.shared_dict, upstream_name)
145+
if ( nil ~= healthy_host) then
146+
local host, port = get_host_and_port_in_upstream(healthy_host)
147+
return healthy_host, host, port
148+
end
149+
150+
-- if the host is not in the local cache get it from the upstream configuration
151+
ngx.log(ngx.DEBUG, "Looking up for a healthy peer in upstream:", upstream_name)
152+
local upstream_health_result = get_health_check_for_upstream(upstream_name)
153+
154+
if(upstream_health_result == nil) then
155+
ngx.log(ngx.ERR, "\n No upstream results!")
156+
return nil
157+
end
158+
159+
for key,value in ipairs(upstream_health_result) do
160+
-- return the first peer found to be up.
161+
-- TODO: save all the peers that are up and return them using round-robin alg
162+
if(value == " up\n") then
163+
healthy_host = upstream_health_result[key-1]
164+
update_healthy_upstream_in_cache(self.shared_dict, upstream_name, healthy_host)
165+
local host, port = get_host_and_port_in_upstream(healthy_host)
166+
return healthy_host, host, port
167+
end
168+
if(value == " DOWN\n" and upstream_health_result[key-1] ~= nil ) then
169+
ngx.log(ngx.WARN, "Peer ", tostring(upstream_health_result[key-1]), " is down! Checking for backup peers.")
170+
end
171+
end
172+
173+
ngx.log(ngx.ERR, "All peers are down!")
174+
return nil -- No peers are up
175+
end
176+
177+
return _M

src/lua/api-gateway/cache/store/redisCache.lua

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
--
2222

2323
local redis = require "resty.redis"
24-
local RedisHealthCheck = require "api-gateway.redis.redisHealthCheck"
24+
local RedisStatus = require "api-gateway.cache.status.remoteCacheStatus"
2525
local cjson = require "cjson"
2626

2727
-- redis endpoints are assumed to be global per GW node and therefore are read here
@@ -37,13 +37,13 @@ local REDIS_RW_UPSTREAM = "api-gateway-redis"
3737
-- Shared dictionary used by RedisHealthCheck
3838
local SHARED_DICT_NAME = "cachedkeys"
3939

40-
local redisHealthCheck = RedisHealthCheck:new({
40+
local redisStatus = RedisStatus:new({
4141
shared_dict = SHARED_DICT_NAME
4242
})
4343

4444
local function getRedisUpstream(upstream_name)
4545
local n = upstream_name or REDIS_RO_UPSTREAM
46-
local upstream, host, port = redisHealthCheck:getHealthyRedisNode(n)
46+
local upstream, host, port = redisStatus:getHealthyServer(n)
4747
ngx.log(ngx.DEBUG, "Obtained Redis Host:" .. tostring(host) .. ":" .. tostring(port), " from upstream:", n)
4848
if (nil ~= host and nil ~= port) then
4949
return host, port

0 commit comments

Comments
 (0)