diff --git a/common.h b/common.h index bf32b45..7058fe7 100644 --- a/common.h +++ b/common.h @@ -54,6 +54,7 @@ #include #include #include +#include //#include using namespace std; @@ -114,13 +115,13 @@ struct tcp_info_t:not_copy_able_t { fd64_t fd64; epoll_event ev; - char * data; - //char data[max_data_len_tcp+200];//use a larger buffer than udp + //char * data; + char data[max_data_len_tcp+200];//use a larger buffer than udp char * begin; int data_len; tcp_info_t() { - data=(char*)malloc(max_data_len_tcp+200); + //data=(char*)malloc(max_data_len_tcp+200); begin=data; data_len=0; @@ -128,15 +129,16 @@ struct tcp_info_t:not_copy_able_t } ~tcp_info_t() { - if(data) - free(data); + //if(data) + //free(data); } + /* void free_memory() { free(data); data=0; begin=0; - } + }*/ }; u32_t djb2(unsigned char *str,int len); @@ -222,10 +224,10 @@ struct tcp_pair_t:not_copy_able_t { tcp_info_t local; tcp_info_t remote; - u64_t last_active_time; + //u64_t last_active_time; list::iterator it; char addr_s[max_addr_len]; - int not_used=0; + //int not_used=0; tcp_pair_t() { addr_s[0]=0; diff --git a/main.cpp b/main.cpp index b559f55..121af98 100644 --- a/main.cpp +++ b/main.cpp @@ -21,6 +21,89 @@ address_t local_addr,remote_addr; int VVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVV; +//template +struct lru_collector_t +{ + typedef void* key_t; +//#define key_t void* + struct lru_pair_t + { + key_t key; + my_time_t ts; + }; + unordered_map::iterator> mp; + list q; + int update(key_t key) + { + assert(mp.find(key)!=mp.end()); + auto it=mp[key]; + q.erase(it); + + my_time_t value=get_current_time(); + if(!q.empty()) + { + assert(value >=q.front().ts); + } + lru_pair_t tmp; tmp.key=key; tmp.ts=value; + q.push_front( tmp); + mp[key]=q.begin(); + + return 0; + } + int new_key(key_t key) + { + assert(mp.find(key)==mp.end()); + + my_time_t value=get_current_time(); + if(!q.empty()) + { + assert(value >=q.front().ts); + } + lru_pair_t tmp; tmp.key=key; tmp.ts=value; + q.push_front( tmp); + mp[key]=q.begin(); + + return 0; + } + int size() + { + return q.size(); + } + int empty() + { + return q.empty(); + } + void clear() + { + mp.clear(); q.clear(); + } + my_time_t ts_of(key_t key) + { + assert(mp.find(key)!=mp.end()); + return mp[key]->ts; + } + + my_time_t peek_back(key_t &key) + { + assert(!q.empty()); + auto it=q.end(); it--; + key=it->key; + return it->ts; + } + void erase(key_t key) + { + assert(mp.find(key)!=mp.end()); + q.erase(mp[key]); + mp.erase(key); + } + void erase_back() + { + assert(!q.empty()); + auto it=q.end(); it--; + key_t key=it->key; + erase(key); + } +}; struct conn_manager_udp_t { @@ -102,37 +185,26 @@ struct conn_manager_tcp_t { list tcp_pair_list; long long last_clear_time; - list::iterator clear_it; + lru_collector_t lru; conn_manager_tcp_t() { last_clear_time=0; - clear_it=tcp_pair_list.begin(); } - int delayed_erase(list::iterator &it) + int erase(list::iterator &it) { fd_manager.fd64_close( it->local.fd64); fd_manager.fd64_close( it->remote.fd64); - it->not_used=1; - it->local.free_memory(); - it->remote.free_memory(); + mylog(log_info,"[tcp]inactive connection {%s} cleared, tcp connections=%d\n",it->addr_s,(int)(tcp_pair_list.size()-1)); + lru.erase(&*it); + tcp_pair_list.erase(it); return 0; } - int erase(list::iterator &it) + int erase_closed(list::iterator &it) { - /*if(clear_it==it) - { - clear_it++; - }*/ - if(!it->not_used) - { - fd_manager.fd64_close( it->local.fd64); - fd_manager.fd64_close( it->remote.fd64); - mylog(log_info,"[tcp]inactive connection {%s} cleared, tcp connections=%d\n",it->addr_s,(int)(tcp_pair_list.size()-1)); - } - else - { - mylog(log_info,"[tcp]closed connection {%s} cleared, tcp connections=%d\n",it->addr_s,(int)(tcp_pair_list.size()-1)); - } + fd_manager.fd64_close( it->local.fd64); + fd_manager.fd64_close( it->remote.fd64); + mylog(log_info,"[tcp]closed connection {%s} cleared, tcp connections=%d\n",it->addr_s,(int)(tcp_pair_list.size()-1)); + lru.erase(&*it); tcp_pair_list.erase(it); return 0; } @@ -147,10 +219,11 @@ struct conn_manager_tcp_t } int clear_inactive0() { + if(disable_conn_clear) return 0; int cnt=0; - list::iterator it=clear_it,old_it; + //list::iterator it=clear_it,old_it; int size=tcp_pair_list.size(); int num_to_clean=size/conn_clear_ratio+conn_clear_min; //clear 2% each time,to avoid latency glitch @@ -159,26 +232,18 @@ struct conn_manager_tcp_t for(;;) { if(cnt>=num_to_clean) break; - if(tcp_pair_list.begin()==tcp_pair_list.end()) break; + //if(tcp_pair_list.begin()==tcp_pair_list.end()) break; + if(lru.empty()) break; + lru_collector_t::key_t key; + my_time_t ts=lru.peek_back(key); + if(current_time- ts < conn_timeout_tcp) break; - if(it==tcp_pair_list.end()) - { - it=tcp_pair_list.begin(); - } + erase( ((tcp_pair_t *) key)->it ); - if( it->not_used||current_time - it->last_active_time >conn_timeout_tcp) - { - old_it=it; - it++; - erase(old_it); - } - else - { - it++; - } cnt++; } - clear_it=it; + //clear_it=it; + return 0; } }conn_manager_tcp; @@ -346,6 +411,9 @@ int event_loop() conn_manager_tcp.tcp_pair_list.emplace_back(); auto it=conn_manager_tcp.tcp_pair_list.end(); it--; + + conn_manager_tcp.lru.new_key(&(*it)); + tcp_pair_t &tcp_pair=*it; strcpy(tcp_pair.addr_s,ip_addr); @@ -363,7 +431,8 @@ int event_loop() tcp_pair.remote.ev.events=EPOLLIN; tcp_pair.remote.ev.data.u64=tcp_pair.remote.fd64; - tcp_pair.last_active_time=get_current_time(); + conn_manager_tcp.lru.update(&(*it)); + //tcp_pair.last_active_time=get_current_time(); tcp_pair.it=it; epoll_event ev; @@ -508,7 +577,7 @@ int event_loop() if((events[idx].events & EPOLLERR) !=0 ||(events[idx].events & EPOLLHUP) !=0) { mylog(log_info,"[tcp]connection closed, events[idx].events=%x \n",events[idx].events); - conn_manager_tcp.delayed_erase(tcp_pair.it); + conn_manager_tcp.erase(tcp_pair.it); continue; } @@ -550,16 +619,17 @@ int event_loop() if(recv_len==0) { mylog(log_info,"[tcp]recv_len=%d,connection {%s} closed bc of EOF\n",recv_len,tcp_pair.addr_s); - conn_manager_tcp.delayed_erase(tcp_pair.it); + conn_manager_tcp.erase_closed(tcp_pair.it); continue; } if(recv_len<0) { mylog(log_info,"[tcp]recv_len=%d,connection {%s} closed bc of %s\n",recv_len,tcp_pair.addr_s,strerror(errno)); - conn_manager_tcp.delayed_erase(tcp_pair.it); + conn_manager_tcp.erase_closed(tcp_pair.it); continue; } - tcp_pair.last_active_time=get_current_time(); + conn_manager_tcp.lru.update(&(*tcp_pair.it)); + //tcp_pair.last_active_time=get_current_time(); my_info.data_len=recv_len; my_info.begin=my_info.data; @@ -608,16 +678,18 @@ int event_loop() if(send_len==0) { mylog(log_warn,"[tcp]send_len=%d,connection {%s} closed bc of send_len==0\n",send_len,tcp_pair.addr_s); - conn_manager_tcp.delayed_erase(tcp_pair.it); + conn_manager_tcp.erase(tcp_pair.it); continue; } if(send_len<0) { mylog(log_info,"[tcp]send_len=%d,connection {%s} closed bc of %s\n",send_len,tcp_pair.addr_s,strerror(errno)); - conn_manager_tcp.delayed_erase(tcp_pair.it); + conn_manager_tcp.erase(tcp_pair.it); continue; } - tcp_pair.last_active_time=get_current_time(); + conn_manager_tcp.lru.update(&(*tcp_pair.it)); + + //tcp_pair.last_active_time=get_current_time(); mylog(log_trace,"[tcp]fd=%d send len=%d\n",my_fd,send_len); other_info.data_len-=send_len; @@ -873,6 +945,8 @@ void process_arg(int argc, char *argv[]) int unit_test() { + //lru_cache_t cache; + address_t::hash_function hash; address_t test; test.from_str((char*)"[2001:19f0:7001:1111:00:ff:11:22]:443");