Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make(under Linux) & async IO (send, recv timeout, and reconnect to mongod after SIGPIPE) #3

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
all: src test
@make -C src
@make -C test

clean:
@make -C src clean
@make -C test clean
14 changes: 14 additions & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
all: libmongodb.a

SOURCES = $(wildcard *.c)
OBJS = $(patsubst %.c, %.o, $(SOURCES))

libmongodb.a: $(OBJS)
@rm -rf $@
ar cr $@ $(OBJS)

%.o:%.c
gcc --std=c99 -fPIC -D_GNU_SOURCE -c $< -o $@

clean:
@rm -rf $(OBJS) *.a
6 changes: 3 additions & 3 deletions src/bson.c
Original file line number Diff line number Diff line change
Expand Up @@ -633,10 +633,10 @@ void bson_fatal_msg( int ok , const char* msg){

if (err_handler){
err_handler(msg);
} else {
fprintf( stderr , "error: %s\n" , msg );
exit(-5);
}

fprintf( stderr , "error: %s\n" , msg );
exit(-5);
}

extern const char bson_numstrs[1000][4];
Expand Down
55 changes: 40 additions & 15 deletions src/mongo.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,42 +36,60 @@ static const int one = 1;
message stuff
------------------------------ */

static void looping_write(mongo_connection * conn, const void* buf, int len){
static int looping_write(mongo_connection * conn, const void* buf, int len){
const char* cbuf = buf;
int flags = 0;

if (conn->left_opts->ignore_sigpipe)
flags = flags | MSG_NOSIGNAL;
if (conn->left_opts->timeout.tv_sec == 0 && conn->left_opts->timeout.tv_usec == 0)
flags = flags | MSG_DONTWAIT;

while (len){
int sent = send(conn->sock, cbuf, len, 0);
if (sent == -1) MONGO_THROW(MONGO_EXCEPT_NETWORK);
int sent = send(conn->sock, cbuf, len, flags);
if (sent == -1) {
if (errno == EPIPE)
return -1;
return 1;
}
cbuf += sent;
len -= sent;
}

return 0;
}

static void looping_read(mongo_connection * conn, void* buf, int len){
char* cbuf = buf;
int flags = 0;

if (conn->left_opts->ignore_sigpipe)
flags = flags | MSG_NOSIGNAL;
if (conn->left_opts->timeout.tv_sec == 0 && conn->left_opts->timeout.tv_usec == 0)
flags = flags | MSG_DONTWAIT;

while (len){
int sent = recv(conn->sock, cbuf, len, 0);
int sent = recv(conn->sock, cbuf, len, flags);
if (sent == 0 || sent == -1) MONGO_THROW(MONGO_EXCEPT_NETWORK);
cbuf += sent;
len -= sent;
}
}

/* Always calls free(mm) */
void mongo_message_send(mongo_connection * conn, mongo_message* mm){
int mongo_message_send(mongo_connection * conn, mongo_message* mm){
mongo_header head; /* little endian */
bson_little_endian32(&head.len, &mm->head.len);
bson_little_endian32(&head.id, &mm->head.id);
bson_little_endian32(&head.responseTo, &mm->head.responseTo);
bson_little_endian32(&head.op, &mm->head.op);

MONGO_TRY{
looping_write(conn, &head, sizeof(head));
looping_write(conn, &mm->data, mm->head.len - sizeof(head));
}MONGO_CATCH{
free(mm);
MONGO_RETHROW();
}

int ret = looping_write(conn, &head, sizeof(head));
if (ret == 0)
ret = looping_write(conn, &mm->data, mm->head.len - sizeof(head));
free(mm);

return ret;
}

char * mongo_data_append( char * start , const void * data , int len ){
Expand Down Expand Up @@ -131,7 +149,14 @@ static int mongo_connect_helper( mongo_connection * conn ){
/* nagle */
setsockopt( conn->sock, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(one) );

struct timeval *tv = &(conn->left_opts->timeout);
if (tv->tv_sec != 0 || tv->tv_usec != 0) {
setsockopt(conn->sock, SOL_SOCKET, SO_RCVTIMEO, (char*)tv, sizeof(*tv));
setsockopt(conn->sock, SOL_SOCKET, SO_SNDTIMEO, (char*)tv, sizeof(*tv));
}

/* TODO signals */
/* signal need set on every send() and recv(). why? */

conn->connected = 1;
return 0;
Expand Down Expand Up @@ -229,7 +254,7 @@ void mongo_insert_batch( mongo_connection * conn , const char * ns , bson ** bso
mongo_message_send(conn, mm);
}

void mongo_insert( mongo_connection * conn , const char * ns , bson * bson ){
int mongo_insert( mongo_connection * conn , const char * ns , bson * bson ){
char * data;
mongo_message * mm = mongo_message_create( 16 /* header */
+ 4 /* ZERO */
Expand All @@ -242,7 +267,7 @@ void mongo_insert( mongo_connection * conn , const char * ns , bson * bson ){
data = mongo_data_append(data, ns, strlen(ns) + 1);
data = mongo_data_append(data, bson->data, bson_size(bson));

mongo_message_send(conn, mm);
return mongo_message_send(conn, mm);
}

void mongo_update(mongo_connection* conn, const char* ns, const bson* cond, const bson* op, int flags){
Expand Down
7 changes: 6 additions & 1 deletion src/mongo.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#ifndef _MONGO_H_
#define _MONGO_H_

#include <sys/time.h>
#include <errno.h>

#include "mongo_except.h"
#include "bson.h"

Expand All @@ -38,6 +41,8 @@ MONGO_EXTERN_C_START
typedef struct mongo_connection_options {
char host[255];
int port;
int ignore_sigpipe;
struct timeval timeout;
} mongo_connection_options;

typedef struct {
Expand Down Expand Up @@ -122,7 +127,7 @@ bson_bool_t mongo_destroy( mongo_connection * conn ); /* you must call this even
CORE METHODS - insert update remove query getmore
------------------------------ */

void mongo_insert( mongo_connection * conn , const char * ns , bson * data );
int mongo_insert( mongo_connection * conn , const char * ns , bson * data );
void mongo_insert_batch( mongo_connection * conn , const char * ns , bson ** data , int num );

static const int MONGO_UPDATE_UPSERT = 0x1;
Expand Down
1 change: 1 addition & 0 deletions test/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
simple
12 changes: 12 additions & 0 deletions test/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
bin = simple

all: $(bin)

INC_MG = -I ../src
LIB_MG = -L ../src -lmongodb

%:%.c
gcc --std=c99 -fPIC -D_GNU_SOURCE -o $@ $< $(INC_MG) $(LIB_MG)

clean:
@rm -rf $(bin)
1 change: 1 addition & 0 deletions test/test.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <stdlib.h>

#define TEST_SERVER "127.0.0.1"
#define ASSERT(x) \
do{ \
if(!(x)){ \
Expand Down