Skip to content

Commit

Permalink
added lru cache for tcp
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu- committed May 24, 2018
1 parent d895b05 commit 39ee33a
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 54 deletions.
18 changes: 10 additions & 8 deletions common.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include<list>
#include <memory>
#include <vector>
#include <deque>
//#include <pair>
using namespace std;

Expand Down Expand Up @@ -114,29 +115,30 @@ struct tcp_info_t:not_copy_able_t
{
fd64_t fd64;
epoll_event ev;
char * data;
//char data[max_data_len_tcp+200];//use a larger buffer than udp
//char * data;
char data[max_data_len_tcp+200];//use a larger buffer than udp
char * begin;
int data_len;
tcp_info_t()
{
data=(char*)malloc(max_data_len_tcp+200);
//data=(char*)malloc(max_data_len_tcp+200);

begin=data;
data_len=0;

}
~tcp_info_t()
{
if(data)
free(data);
//if(data)
//free(data);
}
/*
void free_memory()
{
free(data);
data=0;
begin=0;
}
}*/
};

u32_t djb2(unsigned char *str,int len);
Expand Down Expand Up @@ -222,10 +224,10 @@ struct tcp_pair_t:not_copy_able_t
{
tcp_info_t local;
tcp_info_t remote;
u64_t last_active_time;
//u64_t last_active_time;
list<tcp_pair_t>::iterator it;
char addr_s[max_addr_len];
int not_used=0;
//int not_used=0;
tcp_pair_t()
{
addr_s[0]=0;
Expand Down
166 changes: 120 additions & 46 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,89 @@ address_t local_addr,remote_addr;

int VVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVV;

//template <class key_t>
struct lru_collector_t
{
typedef void* key_t;
//#define key_t void*
struct lru_pair_t
{
key_t key;
my_time_t ts;
};
unordered_map<key_t,list<lru_pair_t>::iterator> mp;
list<lru_pair_t> q;
int update(key_t key)
{
assert(mp.find(key)!=mp.end());
auto it=mp[key];
q.erase(it);

my_time_t value=get_current_time();
if(!q.empty())
{
assert(value >=q.front().ts);
}
lru_pair_t tmp; tmp.key=key; tmp.ts=value;
q.push_front( tmp);
mp[key]=q.begin();

return 0;
}
int new_key(key_t key)
{
assert(mp.find(key)==mp.end());

my_time_t value=get_current_time();
if(!q.empty())
{
assert(value >=q.front().ts);
}
lru_pair_t tmp; tmp.key=key; tmp.ts=value;
q.push_front( tmp);
mp[key]=q.begin();

return 0;
}
int size()
{
return q.size();
}
int empty()
{
return q.empty();
}
void clear()
{
mp.clear(); q.clear();
}
my_time_t ts_of(key_t key)
{
assert(mp.find(key)!=mp.end());
return mp[key]->ts;
}

my_time_t peek_back(key_t &key)
{
assert(!q.empty());
auto it=q.end(); it--;
key=it->key;
return it->ts;
}
void erase(key_t key)
{
assert(mp.find(key)!=mp.end());
q.erase(mp[key]);
mp.erase(key);
}
void erase_back()
{
assert(!q.empty());
auto it=q.end(); it--;
key_t key=it->key;
erase(key);
}
};

struct conn_manager_udp_t
{
Expand Down Expand Up @@ -102,37 +185,26 @@ struct conn_manager_tcp_t
{
list<tcp_pair_t> tcp_pair_list;
long long last_clear_time;
list<tcp_pair_t>::iterator clear_it;
lru_collector_t lru;
conn_manager_tcp_t()
{
last_clear_time=0;
clear_it=tcp_pair_list.begin();
}
int delayed_erase(list<tcp_pair_t>::iterator &it)
int erase(list<tcp_pair_t>::iterator &it)
{
fd_manager.fd64_close( it->local.fd64);
fd_manager.fd64_close( it->remote.fd64);
it->not_used=1;
it->local.free_memory();
it->remote.free_memory();
mylog(log_info,"[tcp]inactive connection {%s} cleared, tcp connections=%d\n",it->addr_s,(int)(tcp_pair_list.size()-1));
lru.erase(&*it);
tcp_pair_list.erase(it);
return 0;
}
int erase(list<tcp_pair_t>::iterator &it)
int erase_closed(list<tcp_pair_t>::iterator &it)
{
/*if(clear_it==it)
{
clear_it++;
}*/
if(!it->not_used)
{
fd_manager.fd64_close( it->local.fd64);
fd_manager.fd64_close( it->remote.fd64);
mylog(log_info,"[tcp]inactive connection {%s} cleared, tcp connections=%d\n",it->addr_s,(int)(tcp_pair_list.size()-1));
}
else
{
mylog(log_info,"[tcp]closed connection {%s} cleared, tcp connections=%d\n",it->addr_s,(int)(tcp_pair_list.size()-1));
}
fd_manager.fd64_close( it->local.fd64);
fd_manager.fd64_close( it->remote.fd64);
mylog(log_info,"[tcp]closed connection {%s} cleared, tcp connections=%d\n",it->addr_s,(int)(tcp_pair_list.size()-1));
lru.erase(&*it);
tcp_pair_list.erase(it);
return 0;
}
Expand All @@ -147,10 +219,11 @@ struct conn_manager_tcp_t
}
int clear_inactive0()
{

if(disable_conn_clear) return 0;

int cnt=0;
list<tcp_pair_t>::iterator it=clear_it,old_it;
//list<tcp_pair_t>::iterator it=clear_it,old_it;
int size=tcp_pair_list.size();
int num_to_clean=size/conn_clear_ratio+conn_clear_min; //clear 2% each time,to avoid latency glitch

Expand All @@ -159,26 +232,18 @@ struct conn_manager_tcp_t
for(;;)
{
if(cnt>=num_to_clean) break;
if(tcp_pair_list.begin()==tcp_pair_list.end()) break;
//if(tcp_pair_list.begin()==tcp_pair_list.end()) break;
if(lru.empty()) break;
lru_collector_t::key_t key;
my_time_t ts=lru.peek_back(key);
if(current_time- ts < conn_timeout_tcp) break;

if(it==tcp_pair_list.end())
{
it=tcp_pair_list.begin();
}
erase( ((tcp_pair_t *) key)->it );

if( it->not_used||current_time - it->last_active_time >conn_timeout_tcp)
{
old_it=it;
it++;
erase(old_it);
}
else
{
it++;
}
cnt++;
}
clear_it=it;
//clear_it=it;

return 0;
}
}conn_manager_tcp;
Expand Down Expand Up @@ -346,6 +411,9 @@ int event_loop()
conn_manager_tcp.tcp_pair_list.emplace_back();
auto it=conn_manager_tcp.tcp_pair_list.end();
it--;

conn_manager_tcp.lru.new_key(&(*it));

tcp_pair_t &tcp_pair=*it;
strcpy(tcp_pair.addr_s,ip_addr);

Expand All @@ -363,7 +431,8 @@ int event_loop()
tcp_pair.remote.ev.events=EPOLLIN;
tcp_pair.remote.ev.data.u64=tcp_pair.remote.fd64;

tcp_pair.last_active_time=get_current_time();
conn_manager_tcp.lru.update(&(*it));
//tcp_pair.last_active_time=get_current_time();
tcp_pair.it=it;

epoll_event ev;
Expand Down Expand Up @@ -508,7 +577,7 @@ int event_loop()
if((events[idx].events & EPOLLERR) !=0 ||(events[idx].events & EPOLLHUP) !=0)
{
mylog(log_info,"[tcp]connection closed, events[idx].events=%x \n",events[idx].events);
conn_manager_tcp.delayed_erase(tcp_pair.it);
conn_manager_tcp.erase(tcp_pair.it);
continue;
}

Expand Down Expand Up @@ -550,16 +619,17 @@ int event_loop()
if(recv_len==0)
{
mylog(log_info,"[tcp]recv_len=%d,connection {%s} closed bc of EOF\n",recv_len,tcp_pair.addr_s);
conn_manager_tcp.delayed_erase(tcp_pair.it);
conn_manager_tcp.erase_closed(tcp_pair.it);
continue;
}
if(recv_len<0)
{
mylog(log_info,"[tcp]recv_len=%d,connection {%s} closed bc of %s\n",recv_len,tcp_pair.addr_s,strerror(errno));
conn_manager_tcp.delayed_erase(tcp_pair.it);
conn_manager_tcp.erase_closed(tcp_pair.it);
continue;
}
tcp_pair.last_active_time=get_current_time();
conn_manager_tcp.lru.update(&(*tcp_pair.it));
//tcp_pair.last_active_time=get_current_time();

my_info.data_len=recv_len;
my_info.begin=my_info.data;
Expand Down Expand Up @@ -608,16 +678,18 @@ int event_loop()
if(send_len==0)
{
mylog(log_warn,"[tcp]send_len=%d,connection {%s} closed bc of send_len==0\n",send_len,tcp_pair.addr_s);
conn_manager_tcp.delayed_erase(tcp_pair.it);
conn_manager_tcp.erase(tcp_pair.it);
continue;
}
if(send_len<0)
{
mylog(log_info,"[tcp]send_len=%d,connection {%s} closed bc of %s\n",send_len,tcp_pair.addr_s,strerror(errno));
conn_manager_tcp.delayed_erase(tcp_pair.it);
conn_manager_tcp.erase(tcp_pair.it);
continue;
}
tcp_pair.last_active_time=get_current_time();
conn_manager_tcp.lru.update(&(*tcp_pair.it));

//tcp_pair.last_active_time=get_current_time();

mylog(log_trace,"[tcp]fd=%d send len=%d\n",my_fd,send_len);
other_info.data_len-=send_len;
Expand Down Expand Up @@ -873,6 +945,8 @@ void process_arg(int argc, char *argv[])

int unit_test()
{
//lru_cache_t<string,u64_t> cache;

address_t::hash_function hash;
address_t test;
test.from_str((char*)"[2001:19f0:7001:1111:00:ff:11:22]:443");
Expand Down

0 comments on commit 39ee33a

Please sign in to comment.