Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threadpool #3

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions wrtnode/uixo_client/src/client_console.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ static int socketfd;

static void quit_signal_handler(int a)
{
send(socketfd, "exit", sizeof("exit"), 0);
// send(socketfd, "exit", sizeof("exit"), 0);
close(socketfd);
exit(0);
}
Expand Down Expand Up @@ -78,16 +78,16 @@ int main(int argc,char *argv[])
if(string_len > MAX_UIXO_MSG_LEN) {
printf("%s: input string too long. len=%d.\n", __func__, string_len);
}
sprintf(msg.head, "%04d", string_len);
if(send(socketfd, (char*)&msg, string_len+UIXO_HEAD_LEN, 0) < 0) {
printf("%s: send %s error\n", __func__, msg.head);
// sprintf(msg.head, "%04d", string_len);
if(send(socketfd, msg.data, string_len, 0) < 0) {
printf("%s: send %s error\n", __func__, msg.data);
}
}
{
if(rttimes == 0){
if(send(socketfd, "exit", sizeof("exit"), 0) < 0) {
printf("send error\n");
}
// if(send(socketfd, "exit", sizeof("exit"), 0) < 0) {
// printf("send error\n");
// }
close(socketfd);
}
else {
Expand All @@ -110,8 +110,8 @@ int main(int argc,char *argv[])

printf("%s",buff);
}
send(socketfd, "exit", sizeof("exit"), 0);
close(socketfd);
// send(socketfd, "exit", sizeof("exit"), 0);
close(socketfd);
}
}

Expand Down
167 changes: 48 additions & 119 deletions wrtnode/uixo_console/src/HandleMsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ int handle_msg_del_msglist(struct list_head* msg_head)
return 0;
}

static int handle_msg_format_data(char* dest, const char* src)
int handle_msg_format_data(char* dest, const char* src)
{
int len = 0;
if((NULL == dest) || (NULL == src)) {
Expand Down Expand Up @@ -97,27 +97,13 @@ static int handle_msg_format_data(char* dest, const char* src)
return len;
}

int handle_msg_transmit_data(uixo_port_t* port, uixo_message_t* msg)
int handle_msg_transmit_data(uixo_port_t* port, const char* data, int len)
{
char tx_data[msg->len+1];
int data_len = 0;

if((NULL == port) || (NULL == port->port) || (NULL == msg)) {
printf("%s: input data is NULL\n", __func__);
return -1;
}
data_len = handle_msg_format_data(tx_data, msg->data);
if(data_len <= 0) {
printf("%s: data len = %d\n", __func__, data_len);
return -1;
}
PR_DEBUG("%s: TX=%s, LEN=%d\n", __func__, tx_data, data_len);

if(strncmp(port->name, "/dev/spiS", strlen("/dev/spiS")) == 0) {
struct spi_mt7688* sm = (struct spi_mt7688*)(port->port);
int writen = 0;
PR_DEBUG("%s: send to port data = %s and len = %d\n", __func__, tx_data, data_len);
writen = sm->write(sm, tx_data, data_len);
PR_DEBUG("%s: send to port data = %s and len = %d\n", __func__, data, len);
writen = sm->write(sm, data, len);
if(writen < 0) {
printf("%s: send message failed\n", __func__);
return -1;
Expand All @@ -126,8 +112,8 @@ int handle_msg_transmit_data(uixo_port_t* port, uixo_message_t* msg)
else {
struct posix_serial* ps = (struct posix_serial*)port->port;
int writen = 0;
PR_DEBUG("%s: send to port data = %s and len = %d\n", __func__, tx_data, data_len);
writen = ps->write(ps, tx_data, data_len);
PR_DEBUG("%s: send to port data = %s and len = %d\n", __func__, data, len);
writen = ps->write(ps, data, len);
if(writen < 0) {
printf("%s: send message failed\n", __func__);
return -1;
Expand All @@ -136,78 +122,41 @@ int handle_msg_transmit_data(uixo_port_t* port, uixo_message_t* msg)
return 0;
}

static void* _handle_msg_receive_data_thread(void* arg)
int handle_msg_receive_data(uixo_port_t* port, uixo_message_t* msg)
{
uixo_port_t* port = (uixo_port_t*)arg;
uixo_message_t* msg = NULL;

pthread_mutex_lock(&port->port_mutex);
PR_DEBUG("%s: take port lock.\n", __func__);
port->rx_thread_is_run = 1;
pthread_mutex_unlock(&port->port_mutex);
PR_DEBUG("%s: release port lock.\n", __func__);
PR_DEBUG("%s: thread(%d) running.\n", __func__, (int)port->rx_msg_thread);
PR_DEBUG("%s: got port(%s) in receive thread.\n", __func__, port->name);

while(!(pthread_mutex_lock(&port->port_mutex) ||
list_empty(&port->msghead))) {
PR_DEBUG("%s: take port lock.\n", __func__);
msg = list_first_entry(&port->msghead, typeof(*msg), list);
list_del(&msg->list);
pthread_mutex_unlock(&port->port_mutex);
PR_DEBUG("%s: release port lock.\n", __func__);
char rx_data[MAX_UIXO_MSG_LEN];

PR_DEBUG("%s: got a message from list, rttimes=%d\n", __func__, msg->rttimes);
if(UIXO_MSG_ALWAYS_WAIT_MSG == msg->rttimes) {
while(1) {
int len = 0;
len = handle_port_read_line(port, rx_data, MAX_UIXO_MSG_LEN*sizeof(*rx_data));
if(0 != len) {
PR_DEBUG("%s", rx_data);
if(send(msg->socketfd, rx_data, len, 0) < 0) {
printf("%s: send to client error.\n", __func__);
}
PR_DEBUG("%s: send %s to client(fd=%d).\n", __func__, rx_data, msg->socketfd);
char rx_data[MAX_UIXO_MSG_LEN];

if(UIXO_MSG_ALWAYS_WAIT_MSG == msg->rttimes) {
while(1) {
int len = 0;
len = handle_port_read_line(port, rx_data, MAX_UIXO_MSG_LEN*sizeof(*rx_data));
if(0 != len) {
PR_DEBUG("%s", rx_data);
if(send(msg->socketfd, rx_data, len, 0) < 0) {
printf("%s: send to client error.\n", __func__);
return -1;
}
PR_DEBUG("%s: send %s to client(fd=%d) in always wait.\n", __func__, rx_data, msg->socketfd);
}
}
else if (0 < msg->rttimes) {
while(msg->rttimes--) {
int len = 0;
len = handle_port_read_line(port, rx_data, MAX_UIXO_MSG_LEN*sizeof(*rx_data));
if(0 != len) {
PR_DEBUG("%s", rx_data);
if(send(msg->socketfd, rx_data, len, 0) < 0) {
printf("%s: send to client error.\n", __func__);
}
}
else if (0 < msg->rttimes) {
while(msg->rttimes--) {
int len = 0;
len = handle_port_read_line(port, rx_data, MAX_UIXO_MSG_LEN*sizeof(*rx_data));
if(0 != len) {
PR_DEBUG("%s", rx_data);
if(send(msg->socketfd, rx_data, len, 0) < 0) {
printf("%s: send to client error.\n", __func__);
return -1;
}
PR_DEBUG("%s: send %s to client(fd=%d).\n", __func__, rx_data, msg->socketfd);
}
handle_msg_free_msg(msg);
}
else {
printf("%s: WARNING: got rttimes=0 in rx thread. port(%s), client(%d)\n",
__func__, port->name, msg->socketfd);
handle_msg_free_msg(msg);
}
}
if(list_empty(&port->msghead)) {
port->rx_thread_is_run = 0;
pthread_mutex_unlock(&port->port_mutex);
PR_DEBUG("%s: release port lock.\n", __func__);
PR_DEBUG("%s: WARNING: port(%s) message list is empty.\n", __func__, port->name);
}
else {
printf("%s: take port(%s) lock error.\n", __func__, port->name);
}
return 0;
}

int handle_msg_receive_data(uixo_port_t* port)
{
if(pthread_create(&port->rx_msg_thread, NULL, _handle_msg_receive_data_thread, port) < 0) {
printf("%s: create port(%s) rx message thread failed.\n", __func__, port->name);
return -1;
printf("%s: WARNING: got rttimes=0 in rx thread. port(%s), client(%d)\n",
__func__, port->name, msg->socketfd);
}
return 0;
}
Expand Down Expand Up @@ -274,11 +223,12 @@ static int handle_msg_parse_msg(const char* data, const ssize_t len, uixo_messag
{
char* word = NULL;
char* sep = ":";
char* last = NULL;
ret = 0;

for(word = strtok(valid_data, sep);
for(word = strtok_r(valid_data, sep, &last);
word;
word = strtok(NULL, sep)) {
word = strtok_r(NULL, sep, &last)) {
PR_DEBUG("%s: strtok word = %s\n", __func__, word);
switch(ret) {
case 0: ret += sscanf(word, "%ld", &msg->time); break;
Expand Down Expand Up @@ -313,49 +263,28 @@ static int handle_msg_parse_msg(const char* data, const ssize_t len, uixo_messag
int handle_msg_resolve_msg(const int fd)
{
uixo_message_t msg;
char head[UIXO_HEAD_LEN] = {0};
char read_buf[MAX_UIXO_MSG_LEN];
ssize_t readn = 0;
int ret = 0;

PR_DEBUG("%s: client(fd = %d) send data in.\n", __func__, fd);

msg.socketfd = fd;
readn = read(fd, head, UIXO_HEAD_LEN);
PR_DEBUG("%s: got message head = %s, len = %ld.\n", __func__, head, readn);
if(readn != UIXO_HEAD_LEN) {
printf("%s: read client head error. data = %s return = %ld\n", __func__, head, readn);
readn = read(fd, read_buf, MAX_UIXO_MSG_LEN);
if(readn < 0) {
printf("%s: read client fd error. return = %ld\n", __func__, readn);
return -1;
}
read_buf[readn] = '\0';
PR_DEBUG("%s: read data = %s, length = %ld\n", __func__, read_buf, readn);

if(0 == strcmp(head, "exit")) {
PR_DEBUG("%s: read client exit message.\n", __func__);
return UIXO_MSG_CLIENT_EXIT_MSG;
if(handle_msg_parse_msg(read_buf, readn, &msg) != UIXO_ERR_OK) {
printf("%s: uixo message parse err.\n", __func__);
return -1;
}
else {
int buf_len = atoi(head);
char read_buf[buf_len+1];

if(0 == buf_len) {
printf("%s: client send data length is 0.\n", __func__);
return -1;
}
readn = read(fd, read_buf, buf_len);
if((readn != buf_len)||(readn == -1)) {
printf("%s: read client fd error. return = %ld\n", __func__, readn);
return -1;
}
read_buf[readn] = '\0';
PR_DEBUG("%s: read data = %s, length = %ld\n", __func__, read_buf, readn);

if(handle_msg_parse_msg(read_buf, readn, &msg) != UIXO_ERR_OK) {
printf("%s: uixo message parse err.\n", __func__);
return -1;
}
if(handle_port_fun_types(&msg) < 0) {
printf("%s: parse message error.\n", __func__);
return -1;
}
if(handle_port_fun_types(&msg) < 0) {
printf("%s: parse message error.\n", __func__);
return -1;
}

return ret;
return 0;
}
Loading