Skip to content

Commit 6834b9f

Browse files
authored
IGNITE-24364 Store zone-wide tx state storage per zone (#5159)
1 parent 362cb43 commit 6834b9f

File tree

12 files changed

+379
-35
lines changed

12 files changed

+379
-35
lines changed

modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1527,7 +1527,8 @@ private class Node {
15271527
clockService,
15281528
placementDriver,
15291529
schemaSyncService,
1530-
systemDistributedConfiguration
1530+
systemDistributedConfiguration,
1531+
sharedTxStateStorage
15311532
),
15321533
minTimeCollectorService,
15331534
systemDistributedConfiguration

modules/partition-replicator/build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ dependencies {
4141
implementation project(':ignite-transactions')
4242
implementation project(':ignite-storage-api')
4343
implementation project(':ignite-low-watermark')
44+
implementation project(':ignite-workers')
45+
46+
testImplementation testFixtures(project(':ignite-core'))
47+
testImplementation libs.hamcrest.core
48+
testImplementation libs.mockito.core
49+
testImplementation libs.mockito.junit
4450

4551
integrationTestImplementation testFixtures(project(':ignite-runner'))
4652
integrationTestImplementation testFixtures(project(':ignite-core'))

modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ public CompletableFuture<Boolean> invoke(
549549
raftManager,
550550
partitionRaftConfigurer,
551551
view -> new LocalLogStorageFactory(),
552-
ForkJoinPool.commonPool(),
552+
threadPoolsManager.tableIoExecutor(),
553553
t -> converter.apply(t),
554554
replicaGrpId -> metaStorageManager.get(pendingPartAssignmentsKey((ZonePartitionId) replicaGrpId))
555555
.thenApply(Entry::value)
@@ -590,6 +590,13 @@ public CompletableFuture<Boolean> invoke(
590590
systemDistributedConfiguration
591591
);
592592

593+
sharedTxStateStorage = new TxStateRocksDbSharedStorage(
594+
storagePath.resolve("tx-state"),
595+
threadPoolsManager.commonScheduler(),
596+
threadPoolsManager.tableIoExecutor(),
597+
partitionsLogStorageFactory
598+
);
599+
593600
partitionReplicaLifecycleManager = new PartitionReplicaLifecycleManager(
594601
catalogManager,
595602
replicaManager,
@@ -603,21 +610,15 @@ public CompletableFuture<Boolean> invoke(
603610
clockService,
604611
placementDriver,
605612
schemaSyncService,
606-
systemDistributedConfiguration
613+
systemDistributedConfiguration,
614+
sharedTxStateStorage
607615
);
608616

609617
StorageUpdateConfiguration storageUpdateConfiguration = clusterConfigRegistry
610618
.getConfiguration(StorageUpdateExtensionConfiguration.KEY).storageUpdate();
611619

612620
MinimumRequiredTimeCollectorService minTimeCollectorService = new MinimumRequiredTimeCollectorServiceImpl();
613621

614-
sharedTxStateStorage = new TxStateRocksDbSharedStorage(
615-
storagePath.resolve("tx-state"),
616-
threadPoolsManager.commonScheduler(),
617-
threadPoolsManager.tableIoExecutor(),
618-
partitionsLogStorageFactory
619-
);
620-
621622
tableManager = new TableManager(
622623
name,
623624
registry,
@@ -729,8 +730,8 @@ public void start() {
729730
txManager,
730731
dataStorageMgr,
731732
schemaManager,
732-
partitionReplicaLifecycleManager,
733733
sharedTxStateStorage,
734+
partitionReplicaLifecycleManager,
734735
tableManager,
735736
indexManager
736737
)).thenComposeAsync(componentFuts -> {

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import static org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneAssignmentsGetLocally;
4141
import static org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zonePartitionAssignmentsGetLocally;
4242
import static org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
43+
import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
4344
import static org.apache.ignite.internal.lang.IgniteSystemProperties.getBoolean;
4445
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
4546
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
@@ -128,8 +129,11 @@
128129
import org.apache.ignite.internal.replicator.ZonePartitionId;
129130
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
130131
import org.apache.ignite.internal.schema.SchemaSyncService;
132+
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
133+
import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
131134
import org.apache.ignite.internal.util.Cursor;
132135
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
136+
import org.apache.ignite.internal.util.IgniteUtils;
133137
import org.apache.ignite.network.ClusterNode;
134138
import org.jetbrains.annotations.Nullable;
135139

@@ -213,6 +217,8 @@ public class PartitionReplicaLifecycleManager extends
213217

214218
private final ConcurrentMap<ZonePartitionId, ZonePartitionRaftListener> zonePartitionRaftListeners = new ConcurrentHashMap<>();
215219

220+
private final ZoneResourcesManager zoneResourcesManager;
221+
216222
/**
217223
* The constructor.
218224
*
@@ -228,6 +234,7 @@ public class PartitionReplicaLifecycleManager extends
228234
* @param placementDriver Placement driver.
229235
* @param schemaSyncService Schema synchronization service.
230236
* @param systemDistributedConfiguration System distributed configuration.
237+
* @param sharedTxStateStorage Shared tx state storage.
231238
*/
232239
public PartitionReplicaLifecycleManager(
233240
CatalogManager catalogMgr,
@@ -242,7 +249,8 @@ public PartitionReplicaLifecycleManager(
242249
ClockService clockService,
243250
PlacementDriver placementDriver,
244251
SchemaSyncService schemaSyncService,
245-
SystemDistributedConfiguration systemDistributedConfiguration
252+
SystemDistributedConfiguration systemDistributedConfiguration,
253+
TxStateRocksDbSharedStorage sharedTxStateStorage
246254
) {
247255
this.catalogMgr = catalogMgr;
248256
this.replicaMgr = replicaMgr;
@@ -266,6 +274,8 @@ public PartitionReplicaLifecycleManager(
266274
Integer::parseInt
267275
);
268276

277+
zoneResourcesManager = new ZoneResourcesManager(sharedTxStateStorage);
278+
269279
pendingAssignmentsRebalanceListener = createPendingAssignmentsRebalanceListener();
270280
stableAssignmentsRebalanceListener = createStableAssignmentsRebalanceListener();
271281
assignmentsSwitchRebalanceListener = createAssignmentsSwitchRebalanceListener();
@@ -415,14 +425,17 @@ private CompletableFuture<Void> calculateZoneAssignmentsAndCreateReplicationNode
415425

416426
return getOrCreateAssignments(zoneDescriptor, causalityToken, catalogVersion)
417427
.thenCompose(assignments -> writeZoneAssignmentsToMetastore(zoneId, assignments))
418-
.thenCompose(assignments -> createZoneReplicationNodes(zoneId, assignments, causalityToken));
428+
.thenCompose(
429+
assignments -> createZoneReplicationNodes(zoneId, assignments, causalityToken, zoneDescriptor.partitions())
430+
);
419431
});
420432
}
421433

422434
private CompletableFuture<Void> createZoneReplicationNodes(
423435
int zoneId,
424436
List<Assignments> assignments,
425-
long revision
437+
long revision,
438+
int partitionCount
426439
) {
427440
return inBusyLockAsync(busyLock, () -> {
428441
assert assignments != null : IgniteStringFormatter.format("Zone has empty assignments [id={}].", zoneId);
@@ -440,7 +453,8 @@ private CompletableFuture<Void> createZoneReplicationNodes(
440453
zonePartitionId,
441454
localMemberAssignment,
442455
zoneAssignment,
443-
revision
456+
revision,
457+
partitionCount
444458
);
445459
}
446460

@@ -456,13 +470,15 @@ private CompletableFuture<Void> createZoneReplicationNodes(
456470
* @param localMemberAssignment Assignment of the local member, or null if local member is not part of the assignment.
457471
* @param stableAssignments Stable assignments.
458472
* @param revision Event's revision.
473+
* @param partitionCount Number of partitions on the zone.
459474
* @return Future that completes when a replica is started.
460475
*/
461476
private CompletableFuture<?> createZonePartitionReplicationNode(
462477
ZonePartitionId zonePartitionId,
463478
@Nullable Assignment localMemberAssignment,
464479
Assignments stableAssignments,
465-
long revision
480+
long revision,
481+
int partitionCount
466482
) {
467483
if (localMemberAssignment == null) {
468484
return nullCompletedFuture();
@@ -490,6 +506,12 @@ private CompletableFuture<?> createZonePartitionReplicationNode(
490506
);
491507

492508
Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
509+
TxStatePartitionStorage txStatePartitionStorage = zoneResourcesManager.getOrCreatePartitionTxStateStorage(
510+
zonePartitionId.zoneId(),
511+
partitionCount,
512+
zonePartitionId.partitionId()
513+
);
514+
493515
try {
494516
return replicaMgr.startReplica(
495517
zonePartitionId,
@@ -499,6 +521,9 @@ private CompletableFuture<?> createZonePartitionReplicationNode(
499521
stablePeersAndLearners,
500522
raftGroupListener,
501523
raftGroupEventsListener,
524+
// TODO: IGNITE-24371 - pass real isVolatile flag
525+
false,
526+
txStatePartitionStorage,
502527
busyLock
503528
).thenCompose(replica -> executeUnderZoneWriteLock(zonePartitionId.zoneId(), () -> {
504529
replicationGroupIds.add(zonePartitionId);
@@ -998,11 +1023,19 @@ private CompletableFuture<Void> handleChangePendingAssignmentEvent(
9981023
CompletableFuture<?> localServicesStartFuture;
9991024

10001025
if (shouldStartLocalGroupNode) {
1026+
// We can safely access the Catalog at the timestamp because:
1027+
// 1. It is guaranteed that Catalog update bringing the Catalog version has been applied (as we are now handling
1028+
// a Metastorage event that was caused by that same Catalog version we need, so the Catalog version update is already
1029+
// handled), so no Schema sync is needed.
1030+
// 2. It is guaranteed that Catalog compactor cannot remove Catalog version corresponding to pending assignments timestamp.
1031+
CatalogZoneDescriptor zoneDescriptor = zoneDescriptorAt(replicaGrpId.zoneId(), pendingAssignments.timestamp());
1032+
10011033
localServicesStartFuture = createZonePartitionReplicationNode(
10021034
replicaGrpId,
10031035
localMemberAssignment,
10041036
computedStableAssignments,
1005-
revision
1037+
revision,
1038+
zoneDescriptor.partitions()
10061039
);
10071040
} else if (pendingAssignmentsAreForced && localMemberAssignment != null) {
10081041
localServicesStartFuture = runAsync(() -> {
@@ -1041,6 +1074,16 @@ private CompletableFuture<Void> handleChangePendingAssignmentEvent(
10411074
}), ioExecutor);
10421075
}
10431076

1077+
private CatalogZoneDescriptor zoneDescriptorAt(int zoneId, long timestamp) {
1078+
Catalog catalog = catalogMgr.activeCatalog(timestamp);
1079+
assert catalog != null : "Catalog is not available at " + nullableHybridTimestamp(timestamp);
1080+
1081+
CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
1082+
assert zoneDescriptor != null : "Zone descriptor is not available at " + nullableHybridTimestamp(timestamp) + " for zone " + zoneId;
1083+
1084+
return zoneDescriptor;
1085+
}
1086+
10441087
private CompletableFuture<Void> changePeersOnRebalance(
10451088
ReplicaManager replicaMgr,
10461089
ZonePartitionId replicaGrpId,
@@ -1191,6 +1234,12 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
11911234
return nullCompletedFuture();
11921235
}
11931236

1237+
try {
1238+
IgniteUtils.closeAllManually(zoneResourcesManager);
1239+
} catch (Exception e) {
1240+
return failedFuture(e);
1241+
}
1242+
11941243
return nullCompletedFuture();
11951244
}
11961245

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.partition.replicator;
19+
20+
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
21+
22+
import java.util.Map;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import org.apache.ignite.internal.close.ManuallyCloseable;
25+
import org.apache.ignite.internal.tx.storage.state.ThreadAssertingTxStateStorage;
26+
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
27+
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
28+
import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
29+
import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage;
30+
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
31+
import org.apache.ignite.internal.worker.ThreadAssertions;
32+
33+
/**
34+
* Manages resources of distribution zones; that is, allows creation of underlying storages and closes them on node stop.
35+
*/
36+
class ZoneResourcesManager implements ManuallyCloseable {
37+
private final TxStateRocksDbSharedStorage sharedTxStateStorage;
38+
39+
/** Map from zone IDs to their resource holders. */
40+
private final Map<Integer, ZoneResources> resourcesByZoneId = new ConcurrentHashMap<>();
41+
42+
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
43+
44+
ZoneResourcesManager(TxStateRocksDbSharedStorage sharedTxStateStorage) {
45+
this.sharedTxStateStorage = sharedTxStateStorage;
46+
}
47+
48+
/**
49+
* Gets or creates a transaction state storage for a zone partition.
50+
*
51+
* @param zoneId ID of the zone.
52+
* @param partitionCount Number of partitions in the zone.
53+
* @param partitionId Partition ID.
54+
*/
55+
TxStatePartitionStorage getOrCreatePartitionTxStateStorage(int zoneId, int partitionCount, int partitionId) {
56+
return inBusyLock(busyLock, () -> {
57+
ZoneResources zoneResources = resourcesByZoneId.computeIfAbsent(
58+
zoneId,
59+
id -> createZoneResources(id, partitionCount)
60+
);
61+
62+
return zoneResources.txStateStorage.getOrCreatePartitionStorage(partitionId);
63+
});
64+
}
65+
66+
private ZoneResources createZoneResources(int zoneId, int partitionCount) {
67+
return new ZoneResources(createTxStateStorage(zoneId, partitionCount));
68+
}
69+
70+
private TxStateStorage createTxStateStorage(int zoneId, int partitionCount) {
71+
TxStateStorage txStateStorage = new TxStateRocksDbStorage(zoneId, partitionCount, sharedTxStateStorage);
72+
73+
if (ThreadAssertions.enabled()) {
74+
txStateStorage = new ThreadAssertingTxStateStorage(txStateStorage);
75+
}
76+
77+
txStateStorage.start();
78+
79+
return txStateStorage;
80+
}
81+
82+
@Override
83+
public void close() {
84+
busyLock.block();
85+
86+
for (ZoneResources zoneResources : resourcesByZoneId.values()) {
87+
zoneResources.txStateStorage.close();
88+
}
89+
}
90+
91+
private static class ZoneResources {
92+
private final TxStateStorage txStateStorage;
93+
94+
private ZoneResources(TxStateStorage txStateStorage) {
95+
this.txStateStorage = txStateStorage;
96+
}
97+
}
98+
}

0 commit comments

Comments
 (0)