-
Notifications
You must be signed in to change notification settings - Fork 691
/
Copy pathadaptive_executor.c
5016 lines (4300 loc) · 154 KB
/
adaptive_executor.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*-------------------------------------------------------------------------
*
* adaptive_executor.c
*
* The adaptive executor executes a list of tasks (queries on shards) over
* a connection pool per worker node. The results of the queries, if any,
* are written to a tuple store.
*
* The concepts in the executor are modelled in a set of structs:
*
* - DistributedExecution:
* Execution of a Task list over a set of WorkerPools.
* - WorkerPool
* Pool of WorkerSessions for the same worker which opportunistically
* executes "unassigned" tasks from a queue.
* - WorkerSession:
* Connection to a worker that is used to execute "assigned" tasks
* from a queue and may execute unassigned tasks from the WorkerPool.
* - ShardCommandExecution:
* Execution of a Task across a list of placements.
* - TaskPlacementExecution:
* Execution of a Task on a specific placement.
* Used in the WorkerPool and WorkerSession queues.
*
* Every connection pool (WorkerPool) and every connection (WorkerSession)
* have a queue of tasks that are ready to execute (readyTaskQueue) and a
* queue/set of pending tasks that may become ready later in the execution
* (pendingTaskQueue). The tasks are wrapped in a ShardCommandExecution,
* which keeps track of the state of execution and is referenced from a
* TaskPlacementExecution, which is the data structure that is actually
* added to the queues and describes the state of the execution of a task
* on a particular worker node.
*
* When the task list is part of a bigger distributed transaction, the
* shards that are accessed or modified by the task may have already been
* accessed earlier in the transaction. We need to make sure we use the
* same connection since it may hold relevant locks or have uncommitted
* writes. In that case we "assign" the task to a connection by adding
* it to the task queue of specific connection (in
* AssignTasksToConnectionsOrWorkerPool). Otherwise we consider the task
* unassigned and add it to the task queue of a worker pool, which means
* that it can be executed over any connection in the pool.
*
* A task may be executed on multiple placements in case of a reference
* table or a replicated distributed table. Depending on the type of
* task, it may not be ready to be executed on a worker node immediately.
* For instance, INSERTs on a reference table are executed serially across
* placements to avoid deadlocks when concurrent INSERTs take conflicting
* locks. At the beginning, only the "first" placement is ready to execute
* and therefore added to the readyTaskQueue in the pool or connection.
* The remaining placements are added to the pendingTaskQueue. Once
* execution on the first placement is done the second placement moves
* from pendingTaskQueue to readyTaskQueue. The same approach is used to
* fail over read-only tasks to another placement.
*
* Once all the tasks are added to a queue, the main loop in
* RunDistributedExecution repeatedly does the following:
*
* For each pool:
* - ManageWorkPool evaluates whether to open additional connections
* based on the number unassigned tasks that are ready to execute
* and the targetPoolSize of the execution.
*
* Poll all connections:
* - We use a WaitEventSet that contains all (non-failed) connections
* and is rebuilt whenever the set of active connections or any of
* their wait flags change.
*
* We almost always check for WL_SOCKET_READABLE because a session
* can emit notices at any time during execution, but it will only
* wake up WaitEventSetWait when there are actual bytes to read.
*
* We check for WL_SOCKET_WRITEABLE just after sending bytes in case
* there is not enough space in the TCP buffer. Since a socket is
* almost always writable we also use WL_SOCKET_WRITEABLE as a
* mechanism to wake up WaitEventSetWait for non-I/O events, e.g.
* when a task moves from pending to ready.
*
* For each connection that is ready:
* - ConnectionStateMachine handles connection establishment and failure
* as well as command execution via TransactionStateMachine.
*
* When a connection is ready to execute a new task, it first checks its
* own readyTaskQueue and otherwise takes a task from the worker pool's
* readyTaskQueue (on a first-come-first-serve basis).
*
* In cases where the tasks finish quickly (e.g. <1ms), a single
* connection will often be sufficient to finish all tasks. It is
* therefore not necessary that all connections are established
* successfully or open a transaction (which may be blocked by an
* intermediate pgbouncer in transaction pooling mode). It is therefore
* essential that we take a task from the queue only after opening a
* transaction block.
*
* When a command on a worker finishes or the connection is lost, we call
* PlacementExecutionDone, which then updates the state of the task
* based on whether we need to run it on other placements. When a
* connection fails or all connections to a worker fail, we also call
* PlacementExecutionDone for all queued tasks to try the next placement
* and, if necessary, mark shard placements as inactive. If a task fails
* to execute on all placements, the execution fails and the distributed
* transaction rolls back.
*
* For multi-row INSERTs, tasks are executed sequentially by
* SequentialRunDistributedExecution instead of in parallel, which allows
* a high degree of concurrency without high risk of deadlocks.
* Conversely, multi-row UPDATE/DELETE/DDL commands take aggressive locks
* which forbids concurrency, but allows parallelism without high risk
* of deadlocks. Note that this is unrelated to SEQUENTIAL_CONNECTION,
* which indicates that we should use at most one connection per node, but
* can run tasks in parallel across nodes. This is used when there are
* writes to a reference table that has foreign keys from a distributed
* table.
*
* Execution finishes when all tasks are done, the query errors out, or
* the user cancels the query.
*
*-------------------------------------------------------------------------
*/
#include <math.h>
#include <sys/stat.h>
#include <unistd.h>
#include "postgres.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "access/htup_details.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/schemacmds.h"
#include "lib/ilist.h"
#include "portability/instr_time.h"
#include "storage/fd.h"
#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/timestamp.h"
#include "distributed/adaptive_executor.h"
#include "distributed/backend_data.h"
#include "distributed/cancel_utils.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/connection_management.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/executor_util.h"
#include "distributed/intermediate_result_pruning.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_explain.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/param_utils.h"
#include "distributed/placement_access.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/repartition_join_execution.h"
#include "distributed/resource_lock.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/subplan_execution.h"
#include "distributed/transaction_identifier.h"
#include "distributed/transaction_management.h"
#include "distributed/tuple_destination.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
#define SLOW_START_DISABLED 0
/*
* DistributedExecution represents the execution of a distributed query
* plan.
*/
typedef struct DistributedExecution
{
/* the corresponding distributed plan's modLevel */
RowModifyLevel modLevel;
/*
* remoteAndLocalTaskList contains all the tasks required to finish the
* execution. remoteTaskList contains all the tasks required to
* finish the remote execution. localTaskList contains all the
* local tasks required to finish the local execution.
*
* remoteAndLocalTaskList is the union of remoteTaskList and localTaskList.
*/
List *remoteAndLocalTaskList;
List *remoteTaskList;
List *localTaskList;
/*
* If a task specific destination is not provided for a task, then use
* defaultTupleDest.
*/
TupleDestination *defaultTupleDest;
/* Parameters for parameterized plans. Can be NULL. */
ParamListInfo paramListInfo;
/* list of workers involved in the execution */
List *workerList;
/* list of all connections used for distributed execution */
List *sessionList;
/*
* Flag to indiciate that the set of connections we are interested
* in has changed and waitEventSet needs to be rebuilt.
*/
bool rebuildWaitEventSet;
/*
* Flag to indiciate that the set of wait events we are interested
* in might have changed and waitEventSet needs to be updated.
*
* Note that we set this flag whenever we assign a value to waitFlags,
* but we don't check that the waitFlags is actually different from the
* previous value. So we might have some false positives for this flag,
* which is OK, because in this case ModifyWaitEvent() is noop.
*/
bool waitFlagsChanged;
/*
* WaitEventSet used for waiting for I/O events.
*
* This could also be local to RunDistributedExecution(), but in that case
* we had to mark it as "volatile" to avoid PG_TRY()/PG_CATCH() issues, and
* cast it to non-volatile when doing WaitEventSetFree(). We thought that
* would make code a bit harder to read than making this non-local, so we
* move it here. See comments for PG_TRY() in postgres/src/include/elog.h
* and "man 3 siglongjmp" for more context.
*
* Another reason for keeping these here is to cache a single
* WaitEventSet/WaitEvent within the execution pair until we
* need to rebuild the waitEvents.
*/
WaitEventSet *waitEventSet;
WaitEvent *events;
int eventSetSize;
/*
* The number of connections we aim to open per worker.
*
* If there are no more tasks to assigned, the actual number may be lower.
* If there are already more connections, the actual number may be higher.
*/
int targetPoolSize;
/* total number of tasks to execute */
int totalTaskCount;
/* number of tasks that still need to be executed */
int unfinishedTaskCount;
/*
* Flag to indicate whether throwing errors on cancellation is
* allowed.
*/
bool raiseInterrupts;
/* transactional properties of the current execution */
TransactionProperties *transactionProperties;
/* indicates whether distributed execution has failed */
bool failed;
/*
* For SELECT commands or INSERT/UPDATE/DELETE commands with RETURNING,
* the total number of rows received from the workers. For
* INSERT/UPDATE/DELETE commands without RETURNING, the total number of
* tuples modified.
*
* Note that for replicated tables (e.g., reference tables), we only consider
* a single replica's rows that are processed.
*/
uint64 rowsProcessed;
/*
* The following fields are used while receiving results from remote nodes.
* We store this information here to avoid re-allocating it every time.
*
* columnArray field is reset/calculated per row, so might be useless for
* other contexts. The benefit of keeping it here is to avoid allocating
* the array over and over again.
*/
uint32 allocatedColumnCount;
void **columnArray;
StringInfoData *stringInfoDataArray;
/*
* jobIdList contains all jobs in the job tree, this is used to
* do cleanup for repartition queries.
*/
List *jobIdList;
/*
* Indicates whether we can execute tasks locally during distributed
* execution. In other words, this flag must be set to false when
* executing a command that we surely know that local execution would
* fail, such as CREATE INDEX CONCURRENTLY.
*/
bool localExecutionSupported;
} DistributedExecution;
/*
* WorkerPoolFailureState indicates the current state of the
* pool.
*/
typedef enum WorkerPoolFailureState
{
/* safe to continue execution*/
WORKER_POOL_NOT_FAILED,
/* if a pool fails, the execution fails */
WORKER_POOL_FAILED,
/*
* The remote execution over the pool failed, but we failed over
* to the local execution and still finish the execution.
*/
WORKER_POOL_FAILED_OVER_TO_LOCAL
} WorkerPoolFailureState;
/*
* WorkerPool represents a pool of sessions on the same worker.
*
* A WorkerPool has two queues containing the TaskPlacementExecutions that need
* to be executed on the worker.
*
* TaskPlacementExecutions that are ready to execute are in readyTaskQueue.
* TaskPlacementExecutions that may need to be executed once execution on
* another worker finishes or fails are in pendingTaskQueue.
*
* In TransactionStateMachine, the sessions opportunistically take
* TaskPlacementExecutions from the readyQueue when they are ready and have no
* assigned tasks.
*
* We track connection timeouts per WorkerPool. When the first connection is
* established we set the poolStartTime and if no connection can be established
* before NodeConnectionTime, the WorkerPool fails. There is some specialised
* logic in case citus.force_max_query_parallelization is enabled because we
* may fail to establish a connection per placement after already establishing
* some connections earlier in the execution.
*
* A WorkerPool fails if all connection attempts failed or all connections
* are lost. In that case, all TaskPlacementExecutions in the queues are
* marked as failed in PlacementExecutionDone, which typically causes the
* task and therefore the distributed execution to fail. In case of a
* replicated table or a SELECT on a reference table, the remaining placements
* will be tried by moving them from a pendingTaskQueue to a readyTaskQueue.
*/
typedef struct WorkerPool
{
/* distributed execution in which the worker participates */
DistributedExecution *distributedExecution;
/* worker node on which we have a pool of sessions */
char *nodeName;
int nodePort;
/* all sessions on the worker that are part of the current execution */
List *sessionList;
/* number of connections that were established */
int activeConnectionCount;
/*
* Keep track of how many connections are ready for execution, in
* order to (efficiently) know whether more connections to the worker
* are needed.
*/
int idleConnectionCount;
/* number of connections that did not send a command */
int unusedConnectionCount;
/* number of failed connections */
int failedConnectionCount;
/*
* Placement executions destined for worker node, but not assigned to any
* connection and not yet ready to start (depends on other placement
* executions).
*/
dlist_head pendingTaskQueue;
/*
* Placement executions destined for worker node, but not assigned to any
* connection and ready to start.
*/
dlist_head readyTaskQueue;
int readyTaskCount;
/*
* We keep this for enforcing the connection timeouts. In our definition, a pool
* starts when the first connection establishment starts.
*/
instr_time poolStartTime;
/* indicates whether to check for the connection timeout */
bool checkForPoolTimeout;
/* last time we opened a connection */
instr_time lastConnectionOpenTime;
/* maximum number of connections we are allowed to open at once */
uint32 maxNewConnectionsPerCycle;
/*
* Set to true if the pool is to local node. We use this value to
* avoid re-calculating often.
*/
bool poolToLocalNode;
/*
* This is only set in WorkerPoolFailed() function. Once a pool fails, we do not
* use it anymore.
*/
WorkerPoolFailureState failureState;
/* execution statistics per pool, in microseconds */
uint64 totalTaskExecutionTime;
int totalExecutedTasks;
} WorkerPool;
struct TaskPlacementExecution;
/*
* WorkerSession represents a session on a worker that can execute tasks
* (sequentially) and is part of a WorkerPool.
*
* Each WorkerSession has two queues containing TaskPlacementExecutions that
* need to be executed within this particular session because the session
* accessed the same or co-located placements earlier in the transaction.
*
* TaskPlacementExecutions that are ready to execute are in readyTaskQueue.
* TaskPlacementExecutions that may need to be executed once execution on
* another worker finishes or fails are in pendingTaskQueue.
*/
typedef struct WorkerSession
{
/* only useful for debugging */
uint64 sessionId;
/* worker pool of which this session is part */
WorkerPool *workerPool;
/* connection over which the session is established */
MultiConnection *connection;
/* tasks that need to be executed on this connection, but are not ready to start */
dlist_head pendingTaskQueue;
/* tasks that need to be executed on this connection and are ready to start */
dlist_head readyTaskQueue;
/* task the worker should work on or NULL */
struct TaskPlacementExecution *currentTask;
/*
* The number of commands sent to the worker over the session. Excludes
* distributed transaction related commands such as BEGIN/COMMIT etc.
*/
uint64 commandsSent;
/* index in the wait event set */
int waitEventSetIndex;
/* events reported by the latest call to WaitEventSetWait */
int latestUnconsumedWaitEvents;
/* for some restricted scenarios, we allow a single connection retry */
bool connectionRetried;
/* keep track of if the session has an active connection */
bool sessionHasActiveConnection;
} WorkerSession;
/* GUC, determining whether Citus opens 1 connection per task */
bool ForceMaxQueryParallelization = false;
int MaxAdaptiveExecutorPoolSize = 16;
bool EnableBinaryProtocol = true;
/* GUC, number of ms to wait between opening connections to the same worker */
int ExecutorSlowStartInterval = 10;
bool EnableCostBasedConnectionEstablishment = true;
bool PreventIncompleteConnectionEstablishment = true;
/*
* TaskExecutionState indicates whether or not a command on a shard
* has finished, or whether it has failed.
*/
typedef enum TaskExecutionState
{
TASK_EXECUTION_NOT_FINISHED,
TASK_EXECUTION_FINISHED,
TASK_EXECUTION_FAILED,
TASK_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION
} TaskExecutionState;
/*
* PlacementExecutionOrder indicates whether a command should be executed
* on any replica, on all replicas sequentially (in order), or on all
* replicas in parallel. In other words, EXECUTION_ORDER_ANY is used for
* SELECTs, EXECUTION_ORDER_SEQUENTIAL/EXECUTION_ORDER_PARALLEL is used for
* DML/DDL.
*/
typedef enum PlacementExecutionOrder
{
EXECUTION_ORDER_ANY,
EXECUTION_ORDER_SEQUENTIAL,
EXECUTION_ORDER_PARALLEL,
} PlacementExecutionOrder;
/*
* ShardCommandExecution represents an execution of a command on a shard
* that may (need to) run across multiple placements.
*/
typedef struct ShardCommandExecution
{
/* description of the task */
Task *task;
/* cached AttInMetadata for task */
AttInMetadata **attributeInputMetadata;
/* indicates whether the attributeInputMetadata has binary or text
* encoding/decoding functions */
bool binaryResults;
/* order in which the command should be replicated on replicas */
PlacementExecutionOrder executionOrder;
/* executions of the command on the placements of the shard */
struct TaskPlacementExecution **placementExecutions;
int placementExecutionCount;
/*
* RETURNING results from other shard placements can be ignored
* after we got results from the first placements.
*/
bool gotResults;
TaskExecutionState executionState;
/*
* Indicates whether given shard command can be executed locally on
* placements. Normally determined by DistributedExecution's same field.
*/
bool localExecutionSupported;
} ShardCommandExecution;
/*
* TaskPlacementExecutionState indicates whether a command is running
* on a shard placement, or finished or failed.
*/
typedef enum TaskPlacementExecutionState
{
PLACEMENT_EXECUTION_NOT_READY,
PLACEMENT_EXECUTION_READY,
PLACEMENT_EXECUTION_RUNNING,
PLACEMENT_EXECUTION_FINISHED,
PLACEMENT_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION,
PLACEMENT_EXECUTION_FAILED
} TaskPlacementExecutionState;
/*
* TaskPlacementExecution represents the execution of a command
* on a shard placement.
*/
typedef struct TaskPlacementExecution
{
/* shard command execution of which this placement execution is part */
ShardCommandExecution *shardCommandExecution;
/* shard placement on which this command runs */
ShardPlacement *shardPlacement;
/* state of the execution of the command on the placement */
TaskPlacementExecutionState executionState;
/*
* Task query can contain multiple queries. queryIndex tracks results of
* which query we are waiting for.
*/
uint32 queryIndex;
/* worker pool on which the placement needs to be executed */
WorkerPool *workerPool;
/* the session the placement execution is assigned to or NULL */
WorkerSession *assignedSession;
/* membership in assigned task queue of a particular session */
dlist_node sessionPendingQueueNode;
/* membership in ready-to-start assigned task queue of a particular session */
dlist_node sessionReadyQueueNode;
/* membership in assigned task queue of worker */
dlist_node workerPendingQueueNode;
/* membership in ready-to-start task queue of worker */
dlist_node workerReadyQueueNode;
/* index in array of placement executions in a ShardCommandExecution */
int placementExecutionIndex;
/* execution time statistics for this placement execution */
instr_time startTime;
instr_time endTime;
} TaskPlacementExecution;
/* local functions */
static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel,
List *taskList,
ParamListInfo paramListInfo,
int targetPoolSize,
TupleDestination *
defaultTupleDest,
TransactionProperties *
xactProperties,
List *jobIdList,
bool localExecutionSupported);
static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLevel
modLevel,
List *taskList,
bool
exludeFromTransaction);
static void StartDistributedExecution(DistributedExecution *execution);
static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution);
static void RunDistributedExecution(DistributedExecution *execution);
static void SequentialRunDistributedExecution(DistributedExecution *execution);
static void FinishDistributedExecution(DistributedExecution *execution);
static void CleanUpSessions(DistributedExecution *execution);
static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution);
static void AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution);
static void UnclaimAllSessionConnections(List *sessionList);
static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task);
static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution,
char *nodeName, int nodePort);
static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
MultiConnection *connection);
static void ManageWorkerPool(WorkerPool *workerPool);
static bool ShouldWaitForSlowStart(WorkerPool *workerPool);
static int CalculateNewConnectionCount(WorkerPool *workerPool);
static bool UsingExistingSessionsCheaperThanEstablishingNewConnections(int
readyTaskCount,
WorkerPool *
workerPool);
static double AvgTaskExecutionTimeApproximation(WorkerPool *workerPool);
static double AvgConnectionEstablishmentTime(WorkerPool *workerPool);
static void OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
TransactionProperties *transactionProperties);
static void CheckConnectionTimeout(WorkerPool *workerPool);
static void MarkEstablishingSessionsTimedOut(WorkerPool *workerPool);
static int UsableConnectionCount(WorkerPool *workerPool);
static long NextEventTimeout(DistributedExecution *execution);
static WaitEventSet * BuildWaitEventSet(List *sessionList);
static void FreeExecutionWaitEvents(DistributedExecution *execution);
static void AddSessionToWaitEventSet(WorkerSession *session,
WaitEventSet *waitEventSet);
static void RebuildWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList);
static TaskPlacementExecution * PopPlacementExecution(WorkerSession *session);
static TaskPlacementExecution * PopAssignedPlacementExecution(WorkerSession *session);
static TaskPlacementExecution * PopUnassignedPlacementExecution(WorkerPool *workerPool);
static bool StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
WorkerSession *session);
static bool SendNextQuery(TaskPlacementExecution *placementExecution,
WorkerSession *session);
static void ConnectionStateMachine(WorkerSession *session);
static bool HasUnfinishedTaskForSession(WorkerSession *session);
static void HandleMultiConnectionSuccess(WorkerSession *session);
static bool HasAnyConnectionFailure(WorkerPool *workerPool);
static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session);
static bool TransactionModifiedDistributedTable(DistributedExecution *execution);
static void TransactionStateMachine(WorkerSession *session);
static void UpdateConnectionWaitFlags(WorkerSession *session, int waitFlags);
static bool CheckConnectionReady(WorkerSession *session);
static bool ReceiveResults(WorkerSession *session, bool storeRows);
static void WorkerSessionFailed(WorkerSession *session);
static void WorkerPoolFailed(WorkerPool *workerPool);
static void PlacementExecutionDone(TaskPlacementExecution *placementExecution,
bool succeeded);
static void ScheduleNextPlacementExecution(TaskPlacementExecution *placementExecution,
bool succeeded);
static bool CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution *
placementExecution);
static void PlacementExecutionReady(TaskPlacementExecution *placementExecution);
static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution *
shardCommandExecution);
static int GetEventSetSize(List *sessionList);
static bool ProcessSessionsWithFailedWaitEventSetOperations(
DistributedExecution *execution);
static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution);
static void RebuildWaitEventSet(DistributedExecution *execution);
static void RebuildWaitEventSetForSessions(DistributedExecution *execution);
static void AddLatchWaitEventToExecution(DistributedExecution *execution);
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
eventCount, bool *cancellationReceived);
#if PG_VERSION_NUM >= PG_VERSION_15
static void RemoteSocketClosedForAnySession(DistributedExecution *execution);
static void ProcessWaitEventsForSocketClosed(WaitEvent *events, int eventCount);
#endif
static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
static uint64 MicrosecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
static int WorkerPoolCompare(const void *lhsKey, const void *rhsKey);
static void SetAttributeInputMetadata(DistributedExecution *execution,
ShardCommandExecution *shardCommandExecution);
static ExecutionParams * CreateDefaultExecutionParams(RowModifyLevel modLevel,
List *taskList,
TupleDestination *tupleDest,
bool expectResults,
ParamListInfo paramListInfo);
/*
* AdaptiveExecutorPreExecutorRun gets called right before postgres starts its executor
* run. Given that the result of our subplans would be evaluated before the first call to
* the exec function of our custom scan we make sure our subplans have executed before.
*/
void
AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
{
if (scanState->finishedPreScan)
{
/*
* Cursors (and hence RETURN QUERY syntax in pl/pgsql functions)
* may trigger AdaptiveExecutorPreExecutorRun() on every fetch
* operation. Though, we should only execute PreScan once.
*/
return;
}
DistributedPlan *distributedPlan = scanState->distributedPlan;
/*
* PostgreSQL takes locks on all partitions in the executor. It's not entirely
* clear why this is necessary (instead of locking the parent during DDL), but
* we do the same for consistency.
*/
LockPartitionsForDistributedPlan(distributedPlan);
ExecuteSubPlans(distributedPlan);
scanState->finishedPreScan = true;
}
/*
* AdaptiveExecutor is called via CitusExecScan on the
* first call of CitusExecScan. The function fills the tupleStore
* of the input scanScate.
*/
TupleTableSlot *
AdaptiveExecutor(CitusScanState *scanState)
{
TupleTableSlot *resultSlot = NULL;
DistributedPlan *distributedPlan = scanState->distributedPlan;
EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info;
bool randomAccess = true;
bool interTransactions = false;
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
List *jobIdList = NIL;
Job *job = distributedPlan->workerJob;
List *taskList = job->taskList;
/* we should only call this once before the scan finished */
Assert(!scanState->finishedRemoteScan);
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"AdaptiveExecutor",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
/* Reset Task fields that are only valid for a single execution */
ResetExplainAnalyzeData(taskList);
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
TupleDestination *defaultTupleDest =
CreateTupleStoreTupleDest(scanState->tuplestorestate, tupleDescriptor);
bool localExecutionSupported = true;
if (RequestedForExplainAnalyze(scanState))
{
/*
* We use multiple queries per task in EXPLAIN ANALYZE which need to
* be part of the same transaction.
*/
UseCoordinatedTransaction();
taskList = ExplainAnalyzeTaskList(taskList, defaultTupleDest, tupleDescriptor,
paramListInfo);
/*
* Multiple queries per task is not supported with local execution. See the Assert in
* TupleDestDestReceiverReceive.
*/
localExecutionSupported = false;
}
bool hasDependentJobs = job->dependentJobList != NIL;
if (hasDependentJobs)
{
/* jobs use intermediate results, which require a distributed transaction */
UseCoordinatedTransaction();
jobIdList = ExecuteDependentTasks(taskList, job);
}
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
/* defer decision after ExecuteSubPlans() */
targetPoolSize = 1;
}
bool excludeFromXact = false;
TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList(
distributedPlan->modLevel, taskList, excludeFromXact);
/*
* In some rare cases, we have prepared statements that pass a parameter
* and never used in the query, mark such parameters' type as Invalid(0),
* which will be used later in ExtractParametersFromParamList() to map them
* to a generic datatype. Skip for dynamic parameters.
*/
if (paramListInfo && !paramListInfo->paramFetch)
{
paramListInfo = copyParamList(paramListInfo);
MarkUnreferencedExternParams((Node *) job->jobQuery, paramListInfo);
}
DistributedExecution *execution = CreateDistributedExecution(
distributedPlan->modLevel,
taskList,
paramListInfo,
targetPoolSize,
defaultTupleDest,
&xactProperties,
jobIdList,
localExecutionSupported);
/*
* Make sure that we acquire the appropriate locks even if the local tasks
* are going to be executed with local execution.
*/
StartDistributedExecution(execution);
if (ShouldRunTasksSequentially(execution->remoteTaskList))
{
SequentialRunDistributedExecution(execution);
}
else
{
RunDistributedExecution(execution);
}
/* execute tasks local to the node (if any) */
if (list_length(execution->localTaskList) > 0)
{
/* now execute the local tasks */
RunLocalExecution(scanState, execution);
}
CmdType commandType = job->jobQuery->commandType;
if (commandType != CMD_SELECT)
{
executorState->es_processed = execution->rowsProcessed;
}
FinishDistributedExecution(execution);
if (SortReturning && distributedPlan->expectResults && commandType != CMD_SELECT)
{
SortTupleStore(scanState);
}
MemoryContextSwitchTo(oldContext);
return resultSlot;
}
/*
* RunLocalExecution runs the localTaskList in the execution, fills the tuplestore
* and sets the es_processed if necessary.
*
* It also sorts the tuplestore if there are no remote tasks remaining.
*/
static void
RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution)
{
EState *estate = ScanStateGetExecutorState(scanState);
bool isUtilityCommand = false;
uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList,
estate->es_param_list_info,
scanState->distributedPlan,
execution->defaultTupleDest,
isUtilityCommand);
execution->rowsProcessed += rowsProcessed;
}
/*
* ExecuteUtilityTaskList is a wrapper around executing task
* list for utility commands.
*/
uint64
ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported)
{
RowModifyLevel modLevel = ROW_MODIFY_NONE;
ExecutionParams *executionParams = CreateBasicExecutionParams(
modLevel, utilityTaskList, MaxAdaptiveExecutorPoolSize, localExecutionSupported
);
executionParams->xactProperties =
DecideTransactionPropertiesForTaskList(modLevel, utilityTaskList, false);
executionParams->isUtilityCommand = true;
return ExecuteTaskListExtended(executionParams);
}
/*
* ExecuteUtilityTaskListExtended is a wrapper around executing task
* list for utility commands.
*/
uint64
ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize,
bool localExecutionSupported)
{
RowModifyLevel modLevel = ROW_MODIFY_NONE;
ExecutionParams *executionParams = CreateBasicExecutionParams(
modLevel, utilityTaskList, poolSize, localExecutionSupported
);
bool excludeFromXact = false;
executionParams->xactProperties =
DecideTransactionPropertiesForTaskList(modLevel, utilityTaskList,
excludeFromXact);
executionParams->isUtilityCommand = true;
return ExecuteTaskListExtended(executionParams);
}
/*
* ExecuteTaskList is a proxy to ExecuteTaskListExtended
* with defaults for some of the arguments.
*/
uint64
ExecuteTaskList(RowModifyLevel modLevel, List *taskList)
{
bool localExecutionSupported = true;
ExecutionParams *executionParams = CreateBasicExecutionParams(
modLevel, taskList, MaxAdaptiveExecutorPoolSize, localExecutionSupported
);
bool excludeFromXact = false;
executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
modLevel, taskList, excludeFromXact);
return ExecuteTaskListExtended(executionParams);
}
/*
* ExecuteTaskListOutsideTransaction is a proxy to ExecuteTaskListExtended
* with defaults for some of the arguments.
*/
uint64
ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,