Skip to content

Commit 5d9d01b

Browse files
committed
Merge branch 'merge_concurrent' of github.com:postgrespro/pg_pathman into merge_concurrent_locks
2 parents e1ba3af + 43b0b6e commit 5d9d01b

File tree

6 files changed

+81
-27
lines changed

6 files changed

+81
-27
lines changed

Diff for: src/hooks.c

+1
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,7 @@ pathman_relcache_hook(Datum arg, Oid relid)
521521
case PPS_ENTRY_NOT_FOUND:
522522
{
523523
/* NOTE: Remove NOT_USED when it's time */
524+
delay_invalidation_parent_rel(partitioned_table);
524525
#ifdef NOT_USED
525526
elog(DEBUG2, "Invalidation message for relation %u [%u]",
526527
relid, MyProcPid);

Diff for: src/init.c

+11
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,17 @@ init_main_pathman_toggle(void)
118118
NULL,
119119
pg_pathman_enable_assign_hook,
120120
NULL);
121+
122+
DefineCustomBoolVariable("pg_pathman.enable_auto_partition",
123+
"Enables auto partition propagation",
124+
NULL,
125+
&pg_pathman_init_state.auto_partition,
126+
true,
127+
PGC_USERSET,
128+
0,
129+
NULL,
130+
NULL,
131+
NULL);
121132
}
122133

123134
/*

Diff for: src/init.h

+16
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
typedef struct
2727
{
2828
bool pg_pathman_enable; /* GUC variable implementation */
29+
bool auto_partition; /* GUC variable for auto partition propagation */
2930
bool initialization_needed; /* do we need to perform init? */
3031
} PathmanInitState;
3132

@@ -52,6 +53,21 @@ extern PathmanInitState pg_pathman_init_state;
5253
*/
5354
#define IsPathmanReady() ( IsPathmanInitialized() && IsPathmanEnabled() )
5455

56+
/*
57+
* Check if auto partition propagation enabled
58+
*/
59+
#define IsAutoPartitionEnabled() ( pg_pathman_init_state.auto_partition )
60+
61+
/*
62+
* Enable/disable auto partition propagation. Note that this only works if
63+
* partitioned relation supports this. See enable_auto() and disable_auto()
64+
* functions.
65+
*/
66+
#define SetAutoPartitionEnabled(value) \
67+
do { \
68+
pg_pathman_init_state.auto_partition = value; \
69+
} while (0)
70+
5571
/*
5672
* Emergency disable mechanism.
5773
*/

Diff for: src/partition_filter.c

+6-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "partition_filter.h"
1212
#include "nodes_common.h"
1313
#include "utils.h"
14+
#include "init.h"
1415

1516
#include "utils/guc.h"
1617
#include "utils/memutils.h"
@@ -204,7 +205,11 @@ partition_filter_exec(CustomScanState *node)
204205
elog(ERROR, "PartitionFilter selected more than one partition");
205206
else if (nparts == 0)
206207
{
207-
if (prel->auto_partition)
208+
/*
209+
* If auto partition propagation is enabled then try to create
210+
* new partitions for the key
211+
*/
212+
if (prel->auto_partition && IsAutoPartitionEnabled())
208213
{
209214
selected_partid = create_partitions(state->partitioned_table,
210215
state->temp_const.constvalue,

Diff for: src/pathman_workers.c

+6-3
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,9 @@ bgw_main_concurrent_part(Datum main_arg)
417417
part_slot = &concurrent_part_slots[DatumGetInt32(main_arg)];
418418
part_slot->pid = MyProcPid;
419419

420+
/* Disable auto partition propagation */
421+
SetAutoPartitionEnabled(false);
422+
420423
/* Establish connection and start transaction */
421424
BackgroundWorkerInitializeConnectionByOid(part_slot->dbid, part_slot->userid);
422425

@@ -494,10 +497,11 @@ bgw_main_concurrent_part(Datum main_arg)
494497
/* Print messsage for this BGWorker to server log */
495498
sleep_time_str = datum_to_cstring(Float8GetDatum(part_slot->sleep_time),
496499
FLOAT8OID);
500+
failures_count++;
497501
ereport(LOG,
498502
(errmsg("%s: %s", concurrent_part_bgw, error->message),
499503
errdetail("Attempt: %d/%d, sleep time: %s",
500-
failures_count + 1,
504+
failures_count,
501505
PART_WORKER_MAX_ATTEMPTS,
502506
sleep_time_str)));
503507
pfree(sleep_time_str); /* free the time string */
@@ -509,7 +513,7 @@ bgw_main_concurrent_part(Datum main_arg)
509513
* concurrent user queries. Check that attempts count doesn't exceed
510514
* some reasonable value
511515
*/
512-
if (failures_count++ >= PART_WORKER_MAX_ATTEMPTS)
516+
if (failures_count >= PART_WORKER_MAX_ATTEMPTS)
513517
{
514518
/* Mark slot as FREE */
515519
cps_set_status(part_slot, CPS_FREE);
@@ -593,7 +597,6 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
593597
get_pathman_relation_info_after_lock(relid, true),
594598
/* Partitioning type does not matter here */
595599
PT_INDIFFERENT);
596-
597600
/*
598601
* Look for an empty slot and also check that a concurrent
599602
* partitioning operation for this table hasn't been started yet

Diff for: src/pg_pathman.c

+41-23
Original file line numberDiff line numberDiff line change
@@ -446,43 +446,61 @@ append_child_relation(PlannerInfo *root, RelOptInfo *rel, Index rti,
446446
memcpy(childrel->attr_widths, rel->attr_widths,
447447
(rel->max_attr - rel->min_attr + 1) * sizeof(int32));
448448

449-
450-
/* Copy restrictions */
449+
/*
450+
* Copy restrictions. If it's not the parent table then copy only those
451+
* restrictions that reference to this partition
452+
*/
451453
childrel->baserestrictinfo = NIL;
452-
forboth(lc, wrappers, lc2, rel->baserestrictinfo)
454+
if (rte->relid != childOid)
453455
{
454-
bool alwaysTrue;
455-
WrapperNode *wrap = (WrapperNode *) lfirst(lc);
456-
Node *new_clause = wrapper_make_expression(wrap, index, &alwaysTrue);
457-
RestrictInfo *old_rinfo = (RestrictInfo *) lfirst(lc2);
458-
459-
if (alwaysTrue)
456+
forboth(lc, wrappers, lc2, rel->baserestrictinfo)
460457
{
461-
continue;
462-
}
463-
Assert(new_clause);
458+
bool alwaysTrue;
459+
WrapperNode *wrap = (WrapperNode *) lfirst(lc);
460+
Node *new_clause = wrapper_make_expression(wrap, index, &alwaysTrue);
461+
RestrictInfo *old_rinfo = (RestrictInfo *) lfirst(lc2);
464462

465-
if (and_clause((Node *) new_clause))
466-
{
467-
ListCell *alc;
463+
if (alwaysTrue)
464+
{
465+
continue;
466+
}
467+
Assert(new_clause);
468468

469-
foreach(alc, ((BoolExpr *) new_clause)->args)
469+
if (and_clause((Node *) new_clause))
470470
{
471-
Node *arg = (Node *) lfirst(alc);
472-
RestrictInfo *new_rinfo = rebuild_restrictinfo(arg, old_rinfo);
471+
ListCell *alc;
472+
473+
foreach(alc, ((BoolExpr *) new_clause)->args)
474+
{
475+
Node *arg = (Node *) lfirst(alc);
476+
RestrictInfo *new_rinfo = rebuild_restrictinfo(arg, old_rinfo);
473477

478+
change_varnos((Node *)new_rinfo, rel->relid, childrel->relid);
479+
childrel->baserestrictinfo = lappend(childrel->baserestrictinfo,
480+
new_rinfo);
481+
}
482+
}
483+
else
484+
{
485+
RestrictInfo *new_rinfo = rebuild_restrictinfo(new_clause, old_rinfo);
486+
487+
/* Replace old relids with new ones */
474488
change_varnos((Node *)new_rinfo, rel->relid, childrel->relid);
489+
475490
childrel->baserestrictinfo = lappend(childrel->baserestrictinfo,
476-
new_rinfo);
491+
(void *) new_rinfo);
477492
}
478493
}
479-
else
494+
}
495+
/* If it's the parent table then copy all restrictions */
496+
else
497+
{
498+
foreach(lc, rel->baserestrictinfo)
480499
{
481-
RestrictInfo *new_rinfo = rebuild_restrictinfo(new_clause, old_rinfo);
500+
RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
501+
RestrictInfo *new_rinfo = (RestrictInfo *) copyObject(rinfo);
482502

483-
/* Replace old relids with new ones */
484503
change_varnos((Node *)new_rinfo, rel->relid, childrel->relid);
485-
486504
childrel->baserestrictinfo = lappend(childrel->baserestrictinfo,
487505
(void *) new_rinfo);
488506
}

0 commit comments

Comments
 (0)