Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a pg_background_discard_result() function. #41

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ MODULE_big = pg_background
OBJS = pg_background.o

EXTENSION = pg_background
DATA = pg_background--1.0.sql
DATA = pg_background--1.0.sql \
pg_background--1.0--1.1.sql \
pg_background--1.1.sql
REGRESS = pg_background

PG_CONFIG = pg_config
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ This module comes with following SQL APIs:

1. ***pg_background_launch*** : This API takes SQL command, which user wants to execute, and size of queue buffer. This function returns the process id of background worker.
2. ***pg_background_result*** : This API takes the process id as input parameter and returns the result of command executed throught the background worker.
2. ***pg_background_discard_result*** : This API takes the process id as input parameter and discards the result of command executed throught the background worker, only emitted any error message.
3. ***pg_background_detach*** : This API takes the process id and detach the background process which is waiting for user to read its results.

## Installation steps
Expand Down Expand Up @@ -41,6 +42,11 @@ To fetch the result of command executed background worker, user can use followin
SELECT pg_background_result(pid)
```

Alternatively, to discard the result but still get error messages if any, user can use following command:
```sql
SELECT pg_background_discard_result(pid)
```

**pid is process id returned by pg_background_launch function**

## Example:
Expand Down Expand Up @@ -113,3 +119,13 @@ CPU 0.00s/0.00u sec elapsed 0.00 sec.
VACUUM
(1 row)
```

Or to only get the error data:
```sql
SELECT * FROM pg_background_discard_result(pg_background_launch('DROP TABLE IF EXISTS table_to_drop'));
NOTICE: table "table_to_drop" does not exist, skipping
pg_background_discard_result
------------------------------

(1 row)
```
7 changes: 7 additions & 0 deletions expected/pg_background.out
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,10 @@ SELECT * FROM t;
1
(1 row)

SELECT * FROM pg_background_discard_result(pg_background_launch($$DROP TABLE IF EXISTS not_a_table; SELECT 1, 'val'$$));
NOTICE: table "not_a_table" does not exist, skipping
pg_background_discard_result
------------------------------

(1 row)

7 changes: 7 additions & 0 deletions pg_background--1.0--1.1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- complain if script is sourced in psql, rather than via ALTER EXTENSION
\echo Use "ALTER EXTENSION pg_background UPDATE TO '1.1'" to load this file. \quit


CREATE FUNCTION pg_background_discard_result(pid pg_catalog.int4)
RETURNS void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
25 changes: 25 additions & 0 deletions pg_background--1.1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION pg_background" to load this file. \quit

CREATE FUNCTION pg_background_launch(sql pg_catalog.text,
queue_size pg_catalog.int4 DEFAULT 65536)
RETURNS pg_catalog.int4 STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;

CREATE FUNCTION pg_background_discard_result(pid pg_catalog.int4)
RETURNS void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;

CREATE FUNCTION pg_background_result(pid pg_catalog.int4)
RETURNS SETOF pg_catalog.record STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;

CREATE FUNCTION pg_background_detach(pid pg_catalog.int4)
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
REVOKE ALL ON FUNCTION pg_background_launch(pg_catalog.text, pg_catalog.int4)
FROM public;
REVOKE ALL ON FUNCTION pg_background_result(pg_catalog.int4)
FROM public;
REVOKE ALL ON FUNCTION pg_background_detach(pg_catalog.int4)
FROM public;
204 changes: 156 additions & 48 deletions pg_background.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,13 @@ static void save_worker_info(pid_t pid, dsm_segment *seg,
shm_mq_handle *responseq);
static void pg_background_error_callback(void *arg);

static Datum pg_background_process_result(FunctionCallInfo fcinfo, int32 pid,
bool emit_rows);
static HeapTuple form_result_tuple(pg_background_result_state *state,
TupleDesc tupdesc, StringInfo msg);
static TupleDesc pg_background_setup_tupdesc(FunctionCallInfo fcinfo,
FuncCallContext *funcctx,
pg_background_result_state *state);

static void handle_sigterm(SIGNAL_ARGS);
static void execute_sql_string(const char *sql);
Expand All @@ -110,6 +115,7 @@ static bool exists_binary_recv_fn(Oid type);
PG_MODULE_MAGIC;

PG_FUNCTION_INFO_V1(pg_background_launch);
PG_FUNCTION_INFO_V1(pg_background_discard_result);
PG_FUNCTION_INFO_V1(pg_background_result);
PG_FUNCTION_INFO_V1(pg_background_detach);

Expand Down Expand Up @@ -263,6 +269,18 @@ pg_background_launch(PG_FUNCTION_ARGS)
PG_RETURN_INT32(pid);
}

/*
* Retrieve the results of a background query previously launched in this
* session, discarding all except error data.
*/
Datum
pg_background_discard_result(PG_FUNCTION_ARGS)
{
int32 pid = PG_GETARG_INT32(0);

return pg_background_process_result(fcinfo, pid, false);
}

/*
* Retrieve the results of a background query previously launched in this
* session.
Expand All @@ -271,22 +289,38 @@ Datum
pg_background_result(PG_FUNCTION_ARGS)
{
int32 pid = PG_GETARG_INT32(0);

return pg_background_process_result(fcinfo, pid, true);
}

/*
* Internal processing of the message queue for the given worker.
* This will consume all the received message and emit data as wanted.
*
* If emit_rows is true, it will emit all the received messages using the
* ValuePerCall STF interface.

* Otherwise, it will only rethrow all error data and silently discard all data
* rows (while still consuming them). This mode is not an SRF and will simply
* returns void.
*/
static Datum
pg_background_process_result(FunctionCallInfo fcinfo, int32 pid, bool emit_rows)
{
shm_mq_result res;
pg_background_worker_info *info = NULL;
FuncCallContext *funcctx;
TupleDesc tupdesc;
StringInfoData msg;
pg_background_result_state *state;
pg_background_result_state *state = NULL;

/* First-time setup. */
if (SRF_IS_FIRSTCALL())
/*
* First, retrieve the worker info and do all underlying sanity checks.
*/
if (!emit_rows || SRF_IS_FIRSTCALL())
{
MemoryContext oldcontext;
pg_background_worker_info *info;
dsm_segment *seg;

funcctx = SRF_FIRSTCALL_INIT();
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

/* See if we have a connection to the specified PID. */
if ((info = find_worker_info(pid)) == NULL)
ereport(ERROR,
Expand All @@ -309,46 +343,47 @@ pg_background_result(PG_FUNCTION_ARGS)
seg = info->seg;

dsm_unpin_mapping(seg);
}

/* Set up tuple-descriptor based on colum definition list. */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record"),
errhint("Try calling the function in the FROM clause "
"using a column definition list.")));
funcctx->tuple_desc = BlessTupleDesc(tupdesc);

/* Cache state that will be needed on every call. */
/* First-time setup. */
if (!emit_rows)
{
/*
* If we don't emit the rows, we can allocate the state in current
* context
*/
state = palloc0(sizeof(pg_background_result_state));
state->info = info;
if (funcctx->tuple_desc->natts > 0)
{
int natts = funcctx->tuple_desc->natts;
int i;
}
else if (emit_rows && SRF_IS_FIRSTCALL())
{
MemoryContext oldcontext;

state->receive_functions = palloc(sizeof(FmgrInfo) * natts);
state->typioparams = palloc(sizeof(Oid) * natts);
funcctx = SRF_FIRSTCALL_INIT();
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

for (i = 0; i < natts; ++i)
{
Oid receive_function_id;
/* Cache state that will be needed on every call. */
Assert(!state);
state = palloc0(sizeof(pg_background_result_state));

getTypeBinaryInputInfo(TupleDescAttr(funcctx->tuple_desc, i)->atttypid,
&receive_function_id,
&state->typioparams[i]);
/* Set up tuple-descriptor based on colum definition list, if needed. */
if (emit_rows)
tupdesc = pg_background_setup_tupdesc(fcinfo, funcctx, state);

fmgr_info(receive_function_id, &state->receive_functions[i]);
}
}
Assert(info);
state->info = info;
funcctx->user_fctx = state;

MemoryContextSwitchTo(oldcontext);
}
funcctx = SRF_PERCALL_SETUP();
tupdesc = funcctx->tuple_desc;
state = funcctx->user_fctx;

/* In SRF mode, we need to retrieve the previously initialized state. */
if (emit_rows)
{
funcctx = SRF_PERCALL_SETUP();
tupdesc = funcctx->tuple_desc;
state = funcctx->user_fctx;
}

/* Initialize message buffer. */
initStringInfo(&msg);
Expand Down Expand Up @@ -419,14 +454,21 @@ pg_background_result(PG_FUNCTION_ARGS)
int16 natts = pq_getmsgint(&msg, 2);
int16 i;

if (state->has_row_description)
elog(ERROR, "multiple RowDescription messages");
state->has_row_description = true;
if (natts != tupdesc->natts)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("remote query result rowtype does not match "
"the specified FROM clause rowtype")));
/*
* We only need to do the sanity checks on the tupledesc if
* we emit the rows.
*/
if (emit_rows)
{
if (state->has_row_description)
elog(ERROR, "multiple RowDescription messages");
state->has_row_description = true;
if (natts != tupdesc->natts)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("remote query result rowtype does not match "
"the specified FROM clause rowtype")));
}

for (i = 0; i < natts; ++i)
{
Expand All @@ -440,6 +482,13 @@ pg_background_result(PG_FUNCTION_ARGS)
(void) pq_getmsgint(&msg, 4); /* typmod */
(void) pq_getmsgint(&msg, 2); /* format code */

/*
* Ignore the type sanity check after consuming the
* data if caller doesn't want to emit the rows.
*/
if (!emit_rows)
continue;

if (exists_binary_recv_fn(type_id))
{
if (type_id != TupleDescAttr(tupdesc, i)->atttypid)
Expand Down Expand Up @@ -469,6 +518,13 @@ pg_background_result(PG_FUNCTION_ARGS)
/* Handle DataRow message. */
HeapTuple result;

/*
* Ignore the message if caller doesn't want to emit the
* rows.
*/
if (!emit_rows)
break;

result = form_result_tuple(state, tupdesc, &msg);
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(result));
}
Expand All @@ -478,6 +534,13 @@ pg_background_result(PG_FUNCTION_ARGS)
MemoryContext oldcontext;
const char *tag = pq_getmsgstring(&msg);

/*
* Ignore the message after consuming it caller doesn't
* want to emit data.
*/
if (!emit_rows)
break;

oldcontext =
MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
state->command_tags = lappend(state->command_tags,
Expand Down Expand Up @@ -514,8 +577,11 @@ pg_background_result(PG_FUNCTION_ARGS)
errmsg("lost connection to worker process with PID %d",
pid)));

/* If no data rows, return the command tags instead. */
if (!state->has_row_description)
/*
* If no data rows, return the command tags instead, if callers want to
* output rows.
*/
if (emit_rows && !state->has_row_description)
{
if (tupdesc->natts != 1 || TupleDescAttr(tupdesc, 0)->atttypid != TEXTOID)
{
Expand All @@ -541,7 +607,10 @@ pg_background_result(PG_FUNCTION_ARGS)

/* We're done! */
dsm_detach(state->info->seg);
SRF_RETURN_DONE(funcctx);
if (emit_rows)
SRF_RETURN_DONE(funcctx);
else
PG_RETURN_VOID();
}

/*
Expand Down Expand Up @@ -598,6 +667,45 @@ form_result_tuple(pg_background_result_state *state, TupleDesc tupdesc,
return heap_form_tuple(tupdesc, values, isnull);
}

static TupleDesc
pg_background_setup_tupdesc(FunctionCallInfo fcinfo,
FuncCallContext *funcctx,
pg_background_result_state *state)
{
TupleDesc tupdesc;

if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record"),
errhint("Try calling the function in the FROM clause "
"using a column definition list.")));
funcctx->tuple_desc = BlessTupleDesc(tupdesc);

if (funcctx->tuple_desc->natts > 0)
{
int natts = funcctx->tuple_desc->natts;
int i;

state->receive_functions = palloc(sizeof(FmgrInfo) * natts);
state->typioparams = palloc(sizeof(Oid) * natts);

for (i = 0; i < natts; ++i)
{
Oid receive_function_id;

getTypeBinaryInputInfo(TupleDescAttr(funcctx->tuple_desc, i)->atttypid,
&receive_function_id,
&state->typioparams[i]);

fmgr_info(receive_function_id, &state->receive_functions[i]);
}
}

return tupdesc;
}

/*
* Detach from the dynamic shared memory segment used for communication with
* a background worker. This prevents the worker from stalling waiting for
Expand Down
Loading