-
Notifications
You must be signed in to change notification settings - Fork 690
/
Copy pathplacement_connection.c
1137 lines (962 loc) · 34.2 KB
/
placement_connection.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*-------------------------------------------------------------------------
*
* placement_connection.c
* Per placement connection handling.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/hash.h"
#include "common/hashfn.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "pg_version_constants.h"
#include "distributed/colocation_utils.h"
#include "distributed/connection_management.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/distributed_planner.h"
#include "distributed/hash_helpers.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
/*
* A connection reference is used to register that a connection has been used
* to read or modify either a) a shard placement as a particular user b) a
* group of colocated placements (which depend on whether the reference is
* from ConnectionPlacementHashEntry or ColocatedPlacementHashEntry).
*/
typedef struct ConnectionReference
{
/*
* The user used to read/modify the placement. We cannot reuse connections
* that were performed using a different role, since it would not have the
* right permissions.
*/
const char *userName;
/* the connection */
MultiConnection *connection;
/*
* Information about what the connection is used for. There can only be
* one connection executing DDL/DML for a placement to avoid deadlock
* issues/read-your-own-writes violations. The difference between DDL/DML
* currently is only used to emit more precise error messages.
*/
bool hadDML;
bool hadDDL;
/* colocation group of the placement, if any */
uint32 colocationGroupId;
uint32 representativeValue;
/* placementId of the placement, used only for append distributed tables */
uint64 placementId;
/* membership in MultiConnection->referencedPlacements */
dlist_node connectionNode;
} ConnectionReference;
struct ColocatedPlacementsHashEntry;
/*
* Hash table mapping placements to a list of connections.
*
* This stores a list of connections for each placement, because multiple
* connections to the same placement may exist at the same time. E.g. an
* adaptive executor query may reference the same placement in several
* sub-tasks.
*
* We keep track about a connection having executed DML or DDL, since we can
* only ever allow a single transaction to do either to prevent deadlocks and
* consistency violations (e.g. read-your-own-writes).
*/
/* hash key */
typedef struct ConnectionPlacementHashKey
{
uint64 placementId;
} ConnectionPlacementHashKey;
/* hash entry */
typedef struct ConnectionPlacementHashEntry
{
ConnectionPlacementHashKey key;
/* did any remote transactions fail? */
bool failed;
/* primary connection used to access the placement */
ConnectionReference *primaryConnection;
/* are any other connections reading from the placements? */
bool hasSecondaryConnections;
/* entry for the set of co-located placements */
struct ColocatedPlacementsHashEntry *colocatedEntry;
/* membership in ConnectionShardHashEntry->placementConnections */
dlist_node shardNode;
} ConnectionPlacementHashEntry;
/* hash table */
static HTAB *ConnectionPlacementHash;
/*
* A hash-table mapping colocated placements to connections. Colocated
* placements being the set of placements on a single node that represent the
* same value range. This is needed because connections for colocated
* placements (i.e. the corresponding placements for different colocated
* distributed tables) need to share connections. Otherwise things like
* foreign keys can very easily lead to unprincipled deadlocks. This means
* that there can only be one DML/DDL connection for a set of colocated
* placements.
*
* A set of colocated placements is identified, besides node identifying
* information, by the associated colocation group id and the placement's
* 'representativeValue' which currently is the lower boundary of it's
* hash-range.
*
* Note that this hash-table only contains entries for hash-partitioned
* tables, because others so far don't support colocation.
*/
/* hash key */
typedef struct ColocatedPlacementsHashKey
{
/* to identify host - database can't differ */
uint32 nodeId;
/* colocation group, or invalid */
uint32 colocationGroupId;
/* to represent the value range */
uint32 representativeValue;
} ColocatedPlacementsHashKey;
/* hash entry */
typedef struct ColocatedPlacementsHashEntry
{
ColocatedPlacementsHashKey key;
/* primary connection used to access the co-located placements */
ConnectionReference *primaryConnection;
/* are any other connections reading from the placements? */
bool hasSecondaryConnections;
} ColocatedPlacementsHashEntry;
static HTAB *ColocatedPlacementsHash;
/*
* Hash table mapping shard ids to placements.
*
* This is used to track whether placements of a shard have to be marked
* invalid after a failure, or whether a coordinated transaction has to be
* aborted, to avoid all placements of a shard to be marked invalid.
*/
/* hash key */
typedef struct ConnectionShardHashKey
{
uint64 shardId;
} ConnectionShardHashKey;
/* hash entry */
typedef struct ConnectionShardHashEntry
{
ConnectionShardHashKey key;
dlist_head placementConnections;
} ConnectionShardHashEntry;
/* hash table */
static HTAB *ConnectionShardHash;
static MultiConnection * FindPlacementListConnection(int flags, List *placementAccessList,
const char *userName);
static ConnectionPlacementHashEntry * FindOrCreatePlacementEntry(
ShardPlacement *placement);
static bool CanUseExistingConnection(uint32 flags, const char *userName,
ConnectionReference *placementConnection);
static bool ConnectionAccessedDifferentPlacement(MultiConnection *connection,
ShardPlacement *placement);
static void AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
ShardPlacement *placement);
static bool HasModificationFailedForShard(ConnectionShardHashEntry *shardEntry);
static uint32 ColocatedPlacementsHashHash(const void *key, Size keysize);
static int ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize);
/*
* GetPlacementConnection establishes a connection for a placement.
*
* See StartPlacementConnection for details.
*/
MultiConnection *
GetPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName)
{
MultiConnection *connection = StartPlacementConnection(flags, placement, userName);
if (connection == NULL)
{
/* connection can only be NULL for optional connections */
Assert((flags & OPTIONAL_CONNECTION));
return NULL;
}
FinishConnectionEstablishment(connection);
return connection;
}
/*
* StartPlacementConnection initiates a connection to a remote node,
* associated with the placement and transaction.
*
* The connection is established for the current database. If userName is NULL
* the current user is used, otherwise the provided one.
*
* See StartNodeUserDatabaseConnection for details.
*
* Flags have the corresponding meaning from StartNodeUserDatabaseConnection,
* except that two additional flags have an effect:
* - FOR_DML - signal that connection is going to be used for DML (modifications)
* - FOR_DDL - signal that connection is going to be used for DDL
*
* Only one connection associated with the placement may have FOR_DML or
* FOR_DDL set. For hash-partitioned tables only one connection for a set of
* colocated placements may have FOR_DML/DDL set. This restriction prevents
* deadlocks and wrong results due to in-progress transactions.
*/
MultiConnection *
StartPlacementConnection(uint32 flags, ShardPlacement *placement, const char *userName)
{
ShardPlacementAccess *placementAccess =
(ShardPlacementAccess *) palloc0(sizeof(ShardPlacementAccess));
placementAccess->placement = placement;
if (flags & FOR_DDL)
{
placementAccess->accessType = PLACEMENT_ACCESS_DDL;
}
else if (flags & FOR_DML)
{
placementAccess->accessType = PLACEMENT_ACCESS_DML;
}
else
{
placementAccess->accessType = PLACEMENT_ACCESS_SELECT;
}
return StartPlacementListConnection(flags, list_make1(placementAccess), userName);
}
/*
* StartPlacementListConnection returns a connection to a remote node suitable for
* a placement accesses (SELECT, DML, DDL) or throws an error if no suitable
* connection can be established if would cause a self-deadlock or consistency
* violation.
*/
MultiConnection *
StartPlacementListConnection(uint32 flags, List *placementAccessList,
const char *userName)
{
char *freeUserName = NULL;
if (userName == NULL)
{
userName = freeUserName = CurrentUserName();
}
MultiConnection *chosenConnection = FindPlacementListConnection(flags,
placementAccessList,
userName);
if (chosenConnection == NULL)
{
/* use the first placement from the list to extract nodename and nodeport */
ShardPlacementAccess *placementAccess =
(ShardPlacementAccess *) linitial(placementAccessList);
ShardPlacement *placement = placementAccess->placement;
char *nodeName = placement->nodeName;
int nodePort = placement->nodePort;
/*
* No suitable connection in the placement->connection mapping, get one from
* the node->connection pool.
*/
chosenConnection = StartNodeUserDatabaseConnection(flags, nodeName, nodePort,
userName, NULL);
if (chosenConnection == NULL)
{
/* connection can only be NULL for optional connections */
Assert((flags & OPTIONAL_CONNECTION));
return NULL;
}
if ((flags & REQUIRE_CLEAN_CONNECTION) &&
ConnectionAccessedDifferentPlacement(chosenConnection, placement))
{
/*
* Cached connection accessed a non-co-located placement in the same
* table or co-location group, while the caller asked for a clean
* connection. Open a new connection instead.
*
* We use this for situations in which we want to use a different
* connection for every placement, such as COPY. If we blindly returned
* a cached connection that already modified a different, non-co-located
* placement B in the same table or in a table with the same co-location
* ID as the current placement, then we'd no longer able to write to
* placement B later in the COPY.
*/
chosenConnection = StartNodeUserDatabaseConnection(flags |
FORCE_NEW_CONNECTION,
nodeName, nodePort,
userName, NULL);
if (chosenConnection == NULL)
{
/* connection can only be NULL for optional connections */
Assert((flags & OPTIONAL_CONNECTION));
return NULL;
}
Assert(!ConnectionAccessedDifferentPlacement(chosenConnection, placement));
}
}
/* remember which connection we're going to use to access the placements */
AssignPlacementListToConnection(placementAccessList, chosenConnection);
if (freeUserName)
{
pfree(freeUserName);
}
return chosenConnection;
}
/*
* AssignPlacementListToConnection assigns a set of shard placement accesses to a
* given connection, meaning that connection must be used for all (conflicting)
* accesses of the same shard placements to make sure reads see writes and to
* make sure we don't take conflicting locks.
*/
void
AssignPlacementListToConnection(List *placementAccessList, MultiConnection *connection)
{
const char *userName = connection->user;
ShardPlacementAccess *placementAccess = NULL;
foreach_ptr(placementAccess, placementAccessList)
{
ShardPlacement *placement = placementAccess->placement;
ShardPlacementAccessType accessType = placementAccess->accessType;
if (placement->shardId == INVALID_SHARD_ID)
{
/*
* When a SELECT prunes down to 0 shard, we use a dummy placement
* which is only used to route the query to a worker node, but
* the SELECT doesn't actually access any shard placement.
*
* FIXME: this can be removed if we evaluate empty SELECTs locally.
*/
continue;
}
ConnectionPlacementHashEntry *placementEntry = FindOrCreatePlacementEntry(
placement);
ConnectionReference *placementConnection = placementEntry->primaryConnection;
if (placementConnection->connection == connection)
{
/* using the connection that was already assigned to the placement */
}
else if (placementConnection->connection == NULL)
{
/* placement does not have a connection assigned yet */
placementConnection->connection = connection;
placementConnection->hadDDL = false;
placementConnection->hadDML = false;
placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
userName);
placementConnection->placementId = placementAccess->placement->placementId;
/* record association with connection */
dlist_push_tail(&connection->referencedPlacements,
&placementConnection->connectionNode);
}
else
{
/* using a different connection than the one assigned to the placement */
if (accessType != PLACEMENT_ACCESS_SELECT)
{
/*
* We previously read from the placement, but now we're writing to
* it (if we had written to the placement, we would have either chosen
* the same connection, or errored out). Update the connection reference
* to point to the connection used for writing. We don't need to remember
* the existing connection since we won't be able to reuse it for
* accessing the placement. However, we do register that it exists in
* hasSecondaryConnections.
*/
placementConnection->connection = connection;
placementConnection->userName = MemoryContextStrdup(TopTransactionContext,
userName);
Assert(!placementConnection->hadDDL);
Assert(!placementConnection->hadDML);
/* record association with connection */
dlist_push_tail(&connection->referencedPlacements,
&placementConnection->connectionNode);
}
/*
* There are now multiple connections that read from the placement
* and DDL commands are forbidden.
*/
placementEntry->hasSecondaryConnections = true;
if (placementEntry->colocatedEntry != NULL)
{
/* we also remember this for co-located placements */
placementEntry->colocatedEntry->hasSecondaryConnections = true;
}
}
/*
* Remember that we used the current connection for writes.
*/
if (accessType == PLACEMENT_ACCESS_DDL)
{
placementConnection->hadDDL = true;
}
if (accessType == PLACEMENT_ACCESS_DML)
{
placementConnection->hadDML = true;
}
/* record the relation access */
Oid relationId = RelationIdForShard(placement->shardId);
RecordRelationAccessIfNonDistTable(relationId, accessType);
}
}
/*
* GetConnectionIfPlacementAccessedInXact returns the connection over which
* the placement has been access in the transaction. If not found, returns
* NULL.
*/
MultiConnection *
GetConnectionIfPlacementAccessedInXact(int flags, List *placementAccessList,
const char *userName)
{
char *freeUserName = NULL;
if (userName == NULL)
{
userName = freeUserName = CurrentUserName();
}
MultiConnection *connection = FindPlacementListConnection(flags, placementAccessList,
userName);
if (freeUserName != NULL)
{
pfree(freeUserName);
}
return connection;
}
/*
* FindPlacementListConnection determines whether there is a connection that must
* be used to perform the given placement accesses.
*
* If a placement was only read in this transaction, then the same connection must
* be used for DDL to prevent self-deadlock. If a placement was modified in this
* transaction, then the same connection must be used for all subsequent accesses
* to ensure read-your-writes consistency and prevent self-deadlock. If those
* conditions cannot be met, because a connection is in use or the placements in
* the placement access list were modified over multiple connections, then this
* function throws an error.
*
* The function returns the connection that needs to be used, if such a connection
* exists.
*/
static MultiConnection *
FindPlacementListConnection(int flags, List *placementAccessList, const char *userName)
{
bool foundModifyingConnection = false;
MultiConnection *chosenConnection = NULL;
/*
* Go through all placement accesses to find a suitable connection.
*
* If none of the placements have been accessed in this transaction, connection
* remains NULL.
*
* If one or more of the placements have been modified in this transaction, then
* use the connection that performed the write. If placements have been written
* over multiple connections or the connection is not available, error out.
*
* If placements have only been read in this transaction, then use the last
* suitable connection found for a placement in the placementAccessList.
*/
ShardPlacementAccess *placementAccess = NULL;
foreach_ptr(placementAccess, placementAccessList)
{
ShardPlacement *placement = placementAccess->placement;
ShardPlacementAccessType accessType = placementAccess->accessType;
if (placement->shardId == INVALID_SHARD_ID)
{
/*
* When a SELECT prunes down to 0 shard, we use a dummy placement.
* In that case, we can fall back to the default connection.
*
* FIXME: this can be removed if we evaluate empty SELECTs locally.
*/
continue;
}
ConnectionPlacementHashEntry *placementEntry = FindOrCreatePlacementEntry(
placement);
ColocatedPlacementsHashEntry *colocatedEntry = placementEntry->colocatedEntry;
ConnectionReference *placementConnection = placementEntry->primaryConnection;
/* note: the Asserts below are primarily for clarifying the conditions */
if (placementConnection->connection == NULL)
{
/* no connection has been chosen for the placement */
}
else if (accessType == PLACEMENT_ACCESS_DDL &&
placementEntry->hasSecondaryConnections)
{
/*
* If a placement has been read over multiple connections (typically as
* a result of a reference table join) then a DDL command on the placement
* would create a self-deadlock.
*/
Assert(placementConnection != NULL);
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot perform DDL on placement " UINT64_FORMAT
", which has been read over multiple connections",
placement->placementId)));
}
else if (accessType == PLACEMENT_ACCESS_DDL && colocatedEntry != NULL &&
colocatedEntry->hasSecondaryConnections)
{
/*
* If a placement has been read over multiple (uncommitted) connections
* then a DDL command on a co-located placement may create a self-deadlock
* if there exist some relationship between the co-located placements
* (e.g. foreign key, partitioning).
*/
Assert(placementConnection != NULL);
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot perform DDL on placement " UINT64_FORMAT
" since a co-located placement has been read over multiple connections",
placement->placementId)));
}
else if (foundModifyingConnection)
{
/*
* We already found a connection that performed writes on of the placements
* and must use it.
*/
if ((placementConnection->hadDDL || placementConnection->hadDML) &&
placementConnection->connection != chosenConnection)
{
/*
* The current placement may have been modified over a different
* connection. Neither connection is guaranteed to see all uncomitted
* writes and therefore we cannot proceed.
*/
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot perform query with placements that were "
"modified over multiple connections")));
}
}
else if (accessType == PLACEMENT_ACCESS_SELECT &&
placementEntry->hasSecondaryConnections &&
!placementConnection->hadDDL && !placementConnection->hadDML)
{
/*
* Two separate connections have already selected from this placement
* and it was not modified. There is no benefit to using this connection.
*/
}
else if (CanUseExistingConnection(flags, userName, placementConnection))
{
/*
* There is an existing connection for the placement and we can use it.
*/
Assert(placementConnection != NULL);
chosenConnection = placementConnection->connection;
if (placementConnection->hadDDL || placementConnection->hadDML)
{
/* this connection performed writes, we must use it */
foundModifyingConnection = true;
}
}
else if (placementConnection->hadDDL || placementConnection->hadDML)
{
if (strcmp(placementConnection->userName, userName) != 0)
{
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot perform query on placements that were "
"modified in this transaction by a different "
"user")));
}
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot perform query, because modifications were "
"made over a connection that cannot be used at "
"this time. This is most likely a Citus bug so "
"please report it"
)));
}
}
return chosenConnection;
}
/*
* FindOrCreatePlacementEntry finds a placement entry in either the
* placement->connection hash or the co-located placements->connection hash,
* or adds a new entry if the placement has not yet been accessed in the
* current transaction.
*/
static ConnectionPlacementHashEntry *
FindOrCreatePlacementEntry(ShardPlacement *placement)
{
ConnectionPlacementHashKey connKey;
bool found = false;
connKey.placementId = placement->placementId;
ConnectionPlacementHashEntry *placementEntry = hash_search(ConnectionPlacementHash,
&connKey, HASH_ENTER,
&found);
if (!found)
{
/* no connection has been chosen for this placement */
placementEntry->failed = false;
placementEntry->primaryConnection = NULL;
placementEntry->hasSecondaryConnections = false;
placementEntry->colocatedEntry = NULL;
if (placement->partitionMethod == DISTRIBUTE_BY_HASH ||
placement->partitionMethod == DISTRIBUTE_BY_NONE)
{
ColocatedPlacementsHashKey coloKey;
coloKey.nodeId = placement->nodeId;
coloKey.colocationGroupId = placement->colocationGroupId;
coloKey.representativeValue = placement->representativeValue;
/* look for a connection assigned to co-located placements */
ColocatedPlacementsHashEntry *colocatedEntry = hash_search(
ColocatedPlacementsHash, &coloKey, HASH_ENTER,
&found);
if (!found)
{
void *conRef = MemoryContextAllocZero(TopTransactionContext,
sizeof(ConnectionReference));
ConnectionReference *connectionReference = (ConnectionReference *) conRef;
/*
* Store the co-location group information such that we can later
* determine whether a connection accessed different placements
* of the same co-location group.
*/
connectionReference->colocationGroupId = placement->colocationGroupId;
connectionReference->representativeValue = placement->representativeValue;
/*
* Create a connection reference that can be used for the entire
* set of co-located placements.
*/
colocatedEntry->primaryConnection = connectionReference;
colocatedEntry->hasSecondaryConnections = false;
}
/*
* Assign the connection reference for the set of co-located placements
* to the current placement.
*/
placementEntry->primaryConnection = colocatedEntry->primaryConnection;
placementEntry->colocatedEntry = colocatedEntry;
}
else
{
void *conRef = MemoryContextAllocZero(TopTransactionContext,
sizeof(ConnectionReference));
placementEntry->primaryConnection = (ConnectionReference *) conRef;
}
}
/* record association with shard, for invalidation */
AssociatePlacementWithShard(placementEntry, placement);
return placementEntry;
}
/*
* CanUseExistingConnection is a helper function for CheckExistingConnections()
* that checks whether an existing connection can be reused.
*/
static bool
CanUseExistingConnection(uint32 flags, const char *userName,
ConnectionReference *connectionReference)
{
MultiConnection *connection = connectionReference->connection;
if (!connection)
{
/* if already closed connection obviously not usable */
return false;
}
else if (connection->claimedExclusively)
{
/* already used */
return false;
}
else if (flags & FORCE_NEW_CONNECTION)
{
/* no connection reuse desired */
return false;
}
else if (strcmp(connectionReference->userName, userName) != 0)
{
/* connection for different user, check for conflict */
return false;
}
else
{
return true;
}
}
/*
* ConnectionAccessedDifferentPlacement returns true if the connection accessed another
* placement in the same colocation group with a different representative value,
* meaning it's not strictly colocated.
*/
static bool
ConnectionAccessedDifferentPlacement(MultiConnection *connection,
ShardPlacement *placement)
{
dlist_iter placementIter;
dlist_foreach(placementIter, &connection->referencedPlacements)
{
ConnectionReference *connectionReference =
dlist_container(ConnectionReference, connectionNode, placementIter.cur);
/* handle append and range distributed tables */
if (placement->partitionMethod != DISTRIBUTE_BY_HASH &&
placement->placementId != connectionReference->placementId)
{
return true;
}
/* handle hash distributed tables */
if (placement->colocationGroupId != INVALID_COLOCATION_ID &&
placement->colocationGroupId == connectionReference->colocationGroupId &&
placement->representativeValue != connectionReference->representativeValue)
{
/* non-co-located placements from the same co-location group */
return true;
}
}
return false;
}
/*
* ConnectionModifiedPlacement returns true if any DML or DDL is executed over
* the connection on any placement/table.
*/
bool
ConnectionModifiedPlacement(MultiConnection *connection)
{
dlist_iter placementIter;
if (connection->remoteTransaction.transactionState == REMOTE_TRANS_NOT_STARTED)
{
/*
* When StartPlacementListConnection() is called, we set the
* hadDDL/hadDML even before the actual command is sent to
* remote nodes. And, if this function is called at that
* point, we should not assume that the connection has already
* done any modifications.
*/
return false;
}
if (dlist_is_empty(&connection->referencedPlacements))
{
/*
* When referencesPlacements are empty, it means that we come here
* from an API that uses a node connection (e.g., not placement connection),
* which doesn't set placements.
* In that case, the command sent could be either write or read, so we assume
* it is write to be on the safe side.
*/
return true;
}
dlist_foreach(placementIter, &connection->referencedPlacements)
{
ConnectionReference *connectionReference =
dlist_container(ConnectionReference, connectionNode, placementIter.cur);
if (connectionReference->hadDDL || connectionReference->hadDML)
{
return true;
}
}
return false;
}
/*
* AssociatePlacementWithShard records shard->placement relation in
* ConnectionShardHash.
*
* That association is later used, in CheckForFailedPlacements, to invalidate
* shard placements if necessary.
*/
static void
AssociatePlacementWithShard(ConnectionPlacementHashEntry *placementEntry,
ShardPlacement *placement)
{
ConnectionShardHashKey shardKey;
bool found = false;
dlist_iter placementIter;
shardKey.shardId = placement->shardId;
ConnectionShardHashEntry *shardEntry = hash_search(ConnectionShardHash, &shardKey,
HASH_ENTER, &found);
if (!found)
{
dlist_init(&shardEntry->placementConnections);
}
/*
* Check if placement is already associated with shard (happens if there's
* multiple connections for a placement). There'll usually only be few
* placement per shard, so the price of iterating isn't large.
*/
dlist_foreach(placementIter, &shardEntry->placementConnections)
{
ConnectionPlacementHashEntry *currPlacementEntry =
dlist_container(ConnectionPlacementHashEntry, shardNode, placementIter.cur);
if (currPlacementEntry->key.placementId == placement->placementId)
{
return;
}
}
/* otherwise add */
dlist_push_tail(&shardEntry->placementConnections, &placementEntry->shardNode);
}
/*
* CloseShardPlacementAssociation handles a connection being closed before
* transaction end.
*
* This should only be called by connection_management.c.
*/
void
CloseShardPlacementAssociation(struct MultiConnection *connection)
{
dlist_iter placementIter;
/* set connection to NULL for all references to the connection */
dlist_foreach(placementIter, &connection->referencedPlacements)
{
ConnectionReference *reference =
dlist_container(ConnectionReference, connectionNode, placementIter.cur);
reference->connection = NULL;
/*
* Note that we don't reset ConnectionPlacementHashEntry's
* primaryConnection here, that'd be more complicated than it seems
* worth. That means we'll error out spuriously if a DML/DDL
* executing connection is closed earlier in a transaction.
*/
}
}
/*
* ResetShardPlacementAssociation resets the association of connections to
* shard placements at the end of a transaction.
*
* This should only be called by connection_management.c.
*/
void
ResetShardPlacementAssociation(struct MultiConnection *connection)
{
dlist_init(&connection->referencedPlacements);
}
/*
* ResetPlacementConnectionManagement() dissociates connections from
* placements and shards. This will be called at the end of XACT_EVENT_COMMIT
* and XACT_EVENT_ABORT.
*/
void
ResetPlacementConnectionManagement(void)
{
/* Simply delete all entries */
hash_delete_all(ConnectionPlacementHash);
hash_delete_all(ConnectionShardHash);
hash_delete_all(ColocatedPlacementsHash);
/*
* NB: memory for ConnectionReference structs and subordinate data is
* deleted by virtue of being allocated in TopTransactionContext.
*/
}
/*
* ErrorIfPostCommitFailedShardPlacements throws an error if any of the placements
* that modified the database and involved in the transaction has failed.
*
* Note that Citus already fails queries/commands in case of any failures during query
* processing. However, there are certain failures that can only be detected on the
* COMMIT time. And, this check mainly ensures to catch errors that happens on the
* COMMIT time on the placements.
*
* The most common example for this case is the deferred errors that are thrown by
* triggers or constraints at the COMMIT time.
*/
void
ErrorIfPostCommitFailedShardPlacements(void)
{
HASH_SEQ_STATUS status;
ConnectionShardHashEntry *shardEntry = NULL;
hash_seq_init(&status, ConnectionShardHash);
while ((shardEntry = (ConnectionShardHashEntry *) hash_seq_search(&status)) != 0)