Skip to content

Commit

Permalink
wire/socketfiles/repl wasi loop
Browse files Browse the repository at this point in the history
  • Loading branch information
pmp-p committed Jan 16, 2025
1 parent 5b80f3a commit 25532b3
Showing 1 changed file with 37 additions and 69 deletions.
106 changes: 37 additions & 69 deletions patches/interactive_one_wasi.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ use_wire(int state) {
if (state>0) {
puts("180: wire mode, repl off, echo on");
force_echo=true;
is_wire = 1;
is_wire = true;
is_repl = false;
} else {
//puts("184: repl mode, echo off");
Expand Down Expand Up @@ -264,6 +264,7 @@ EMSCRIPTEN_KEEPALIVE void
interactive_one() {
int peek = -1; /* preview of firstchar with no pos change */
int firstchar = 0; /* character read from getc() */
bool pipelining = true;
StringInfoData input_message;
StringInfoData *inBuf;
FILE *stream ;
Expand Down Expand Up @@ -367,7 +368,6 @@ interactive_one() {
if (packetlen) {
// always.
is_wire = true;

sockfiles = true;
whereToSendOutput = DestRemote;
resetStringInfo(inBuf);
Expand Down Expand Up @@ -397,16 +397,16 @@ interactive_one() {
if (packetlen) {
// it was startup/auth , write and return fast.
if (peek<0) {
PDEBUG("# 394: handshake/auth/pass skip");
PDEBUG("# 399: handshake/auth/pass skip");
goto wire_flush;
}

/* else it was wire msg */
#if PGDEBUG
printf("# 395 : node+repl is_wire -> true : %c\n", peek);
printf("# 405: node+repl is_wire -> true : %c\n", peek);
force_echo = true;
#endif
whereToSendOutput = DestRemote;
firstchar = peek;
goto incoming;
} // wire msg

Expand All @@ -416,8 +416,9 @@ interactive_one() {
if (!peek)
return;

puts("# 410 : defaulting to REPL mode");
puts("# 418 : defaulting to REPL mode");

firstchar = peek ;
is_repl = true;
is_wire = false;
whereToSendOutput = DestNone;
Expand All @@ -428,7 +429,7 @@ interactive_one() {
} // !cma_rsize -> socketfiles -> repl

#if PGDEBUG
printf("# 418: fd %s: %s fd=%d is_embed=%d is_repl=%d is_wire=%d peek=%d len=%d\n", PGS_OLOCK, IO, MyProcPort->sock, is_embed, is_repl, is_wire, peek, packetlen);
printf("# 429: fd %s: %s fd=%d is_embed=%d is_repl=%d is_wire=%d peek=%d len=%d\n", PGS_OLOCK, IO, MyProcPort->sock, is_embed, is_repl, is_wire, peek, packetlen);
#endif

// buffer query TODO: direct access ?
Expand All @@ -440,7 +441,7 @@ interactive_one() {
}

if (packetlen<2) {
puts("# 477: WARNING: empty packet");
puts("# 441: WARNING: empty packet");
cma_rsize= 0;
pg_prompt();
// always free cma buffer !!!
Expand All @@ -450,96 +451,65 @@ interactive_one() {

incoming:
#if defined(__wasi__) //PGDEBUG
PDEBUG("# 486: sjlj exception handler off");
PDEBUG("# 451: sjlj exception handler off");
#else
#error "sigsetjmp unsupported"
#endif // wasi


/*
if (is_wire) {
if (cma_rsize) {
if (startup_auth(peek)) {
PDEBUG("# 453: handshake/auth skip");
peek = -1;
goto wire_flush;
}
}
firstchar = SocketBackend(inBuf);
} else {
if (peek == EOF && inBuf->len == 0) {
firstchar = EOF;
} else {
appendStringInfoChar(inBuf, (char) '\0');
firstchar = 'Q';
}
if (is_repl)
while (pipelining) {
if (is_repl) {
// TODO: are we sure repl could not pipeline ?
pipelining = false;
/* stdio node repl */
whereToSendOutput = DestDebug;
}
*/


bool pipelining = true;

if (!is_wire) {
/* nowire */
if (firstchar == EOF && inBuf->len == 0) {
firstchar = EOF;
pipelining=false;
} else {
appendStringInfoChar(inBuf, (char) '\0');
firstchar = 'Q';
}

/* stdio node repl */
if (is_repl)
whereToSendOutput = DestDebug;

// won't have auth
peek = -1;
}

while (pipelining) {
// TODO: are we sure ?
pipelining = !is_repl;

if (is_wire) {
/* wire on a socket or cma may auth, not handled by pg_proto block */
if (peek==0) {
PDEBUG("# 507: handshake/auth");
PDEBUG("# 470: handshake/auth");
startup_auth();
PDEBUG("# 509: auth request");
PDEBUG("# 472: auth request");
break;
}

if (peek==112) {
PDEBUG("# 529: password");
PDEBUG("# 477: password");
startup_pass(true);
break;
}

firstchar = SocketBackend(inBuf);

#if PGDEBUG
if (force_echo) {
printf("# 517: wire=%d 1stchar=%c Q: %s", is_wire, firstchar, inBuf->data);
printf("# 486: wire=%d 1stchar=%c Q: %s\n", is_wire, firstchar, inBuf->data);
force_echo = false;
} else {
printf("520: PIPELINING [%c]!\n", firstchar);
printf("# 489: PIPELINING [%c]!\n", firstchar);
}
#endif
pipelining = pq_buffer_has_data();
if (!pipelining) {
printf("# 524: end of wire, rfq=%d\n", send_ready_for_query);
printf("# 494: end of wire, rfq=%d\n", send_ready_for_query);
} else {
printf("# 496: no end of wire -> pipelining, rfq=%d\n", send_ready_for_query);
}
} else {
/* nowire */
if (firstchar == EOF && inBuf->len == 0) {
firstchar = EOF;
} else {
appendStringInfoChar(inBuf, (char) '\0');
firstchar = 'Q';
}
}

if (!ignore_till_sync)
send_ready_for_query = true;

if (ignore_till_sync && firstchar != EOF) {
puts("@@@@@@@@@@@@@ 476 TODO: postgres.c 4684 : continue");
puts("@@@@@@@@@@@@@ 512 TODO: postgres.c 4684 : continue");
} else {
/* process notifications */
ProcessClientReadInterrupt(true);
Expand All @@ -558,10 +528,10 @@ interactive_one() {
ReadyForQuery(DestRemote);
//done at postgres.c 4623 send_ready_for_query = false;
} else {
PDEBUG("# 560: end packet - with no rfq");
PDEBUG("# 531: end packet - with no rfq");
}
} else {
PDEBUG("# 563: end packet (ClientAuthInProgress - no rfq) ");
PDEBUG("# 534: end packet (ClientAuthInProgress - no rfq) ");
}

if (SOCKET_DATA>0) {
Expand All @@ -578,10 +548,10 @@ interactive_one() {
SOCKET_FILE = NULL;
SOCKET_DATA = 0;
if (cma_wsize)
PDEBUG("# 580: cma and sockfile ???");
PDEBUG("# 551: cma and sockfile ???");
if (sockfiles) {
#if PGDEBUG
printf("# 583: client:ready -> read(%d) " PGS_OLOCK "->" PGS_OUT"\n", outb);
printf("# 554: client:ready -> read(%d) " PGS_OLOCK "->" PGS_OUT"\n", outb);
#endif
rename(PGS_OLOCK, PGS_OUT);
}
Expand All @@ -599,5 +569,3 @@ interactive_one() {
#undef IO
}



0 comments on commit 25532b3

Please sign in to comment.