Skip to content

Commit 3c9a87b

Browse files
committed
Refactor tests
1 parent a51e35d commit 3c9a87b

File tree

9 files changed

+96
-130
lines changed

9 files changed

+96
-130
lines changed

Diff for: modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,7 @@ public class CdcConsumerState {
7272
public static final String CACHES_STATE_FILE_NAME = "cdc-caches-state" + FILE_SUFFIX;
7373

7474
/**
75-
* The file stores state of CDC mode. {@link CdcManager} creates the file and writes into it, {@link CdcMain} only reads it.
76-
* Content of the file is a {@link CdcMode} value:
75+
* The file stores state of CDC mode. Content of the file is a {@link CdcMode} value:
7776
* <ul>
7877
* <li>{@link CdcMode#CDC_UTILITY_ACTIVE} means that {@link CdcMain} utility captures data.</li>
7978
* <li>{@link CdcMode#IGNITE_NODE_ACTIVE} means that {@link CdcManager} captures data within Ignite node.</li>

Diff for: modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java

-6
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import org.apache.ignite.internal.pagemem.wal.WALIterator;
5454
import org.apache.ignite.internal.pagemem.wal.record.CdcManagerRecord;
5555
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
56-
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
5756
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
5857
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
5958
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
@@ -971,11 +970,6 @@ private void ackAsciiLogo() {
971970
}
972971
}
973972

974-
/** @return Path to CDC state directory. */
975-
public static File stateDirFile(GridCacheSharedContext<?, ?> cctx) {
976-
return new File(((FileWriteAheadLogManager)cctx.wal(true)).walCdcDirectory(), STATE_DIR);
977-
}
978-
979973
/** */
980974
public static String cdcInstanceName(String igniteInstanceName) {
981975
return "cdc-" + igniteInstanceName;

Diff for: modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcManager.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,14 @@
5959
*/
6060
public interface CdcManager extends GridCacheSharedManager {
6161
/**
62-
* Callback is invoked once, after Ignite restores memory on start-up. It runs within Ignite start thread before
63-
* {@link IgniteWriteAheadLogManager} starts writing WAL records.
62+
* Callback is invoked once, after Ignite restores memory on start-up. It invokes before {@link IgniteWriteAheadLogManager}
63+
* starts writing WAL records amd before the first call of {@link #collect(ByteBuffer)}.
6464
*
65-
* This method invoked before the first call of {@link #collect(ByteBuffer)}.
65+
* Implementation suggestions:
66+
* <ul>
67+
* <li>This method can be used for restoring CDC state on Ignite node start, collecting missed events from WAL segments.</li>
68+
* <li>Be aware, this method runs in Ignite main thread and might lengthen the Ignite start procedure.</li>
69+
* </ul>
6670
*/
6771
public void afterMemoryRestore() throws IgniteCheckedException;
6872

@@ -97,7 +101,7 @@ public interface CdcManager extends GridCacheSharedManager {
97101
public void collect(ByteBuffer dataBuf);
98102

99103
/**
100-
* If this manager isn't active then Ignite skips preparing data for the {@link #collect(ByteBuffer)} method.
104+
* If this manager isn't active then Ignite skips calls of {@link #afterMemoryRestore()} and {@link #collect(ByteBuffer)} methods.
101105
*
102106
* @return {@code true} if manager is active, otherwise {@code false}.
103107
*/

Diff for: modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcUtilityActiveCdcManager.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@
2222
import org.apache.ignite.IgniteCheckedException;
2323
import org.apache.ignite.internal.pagemem.wal.record.CdcManagerStopRecord;
2424
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
25+
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
2526
import org.apache.ignite.internal.util.typedef.internal.U;
2627

27-
import static org.apache.ignite.internal.cdc.CdcMain.stateDirFile;
28+
import static org.apache.ignite.internal.cdc.CdcMain.STATE_DIR;
2829

2930
/**
3031
* CDC manager that delegates consuming CDC events to the {@link CdcMain} utility.
@@ -35,7 +36,7 @@ public class CdcUtilityActiveCdcManager extends GridCacheSharedManagerAdapter im
3536

3637
/** {@inheritDoc} */
3738
@Override protected void start0() throws IgniteCheckedException {
38-
File stateDir = stateDirFile(cctx);
39+
File stateDir = new File(((FileWriteAheadLogManager)cctx.wal(true)).walCdcDirectory(), STATE_DIR);
3940

4041
if (stateDir.exists()) {
4142
CdcConsumerState state = new CdcConsumerState(log, stateDir.toPath());

Diff for: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1099,7 +1099,7 @@ private RestoreBinaryState restoreBinaryMemory(
10991099
else if (restored != null)
11001100
U.log(log, "Binary memory state restored at node startup [restoredPtr=" + restored + ']');
11011101

1102-
if (cctx.cdc() != null)
1102+
if (cctx.cdc() != null && cctx.cdc().active())
11031103
cctx.cdc().afterMemoryRestore();
11041104

11051105
// Wal logging is now available.

Diff for: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1067,7 +1067,7 @@ public void startMemoryRestore(GridKernalContext kctx, TimeBag startTimer) throw
10671067
if (ptr != null)
10681068
ptr = ptr.next();
10691069

1070-
if (cctx.cdc() != null)
1070+
if (cctx.cdc() != null && cctx.cdc().active())
10711071
cctx.cdc().afterMemoryRestore();
10721072

10731073
cctx.wal(true).startAutoReleaseSegments();

Diff for: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ private void fsyncReadSegment(SegmentedRingByteBuffer.ReadSegment seg) throws Ig
541541

542542
fsync((MappedByteBuffer)buf.buf, off, len);
543543

544-
if (cctx.cdc() != null) {
544+
if (cctx.cdc() != null && cctx.cdc().active()) {
545545
try {
546546
ByteBuffer cdcBuf = buf.buf.asReadOnlyBuffer();
547547
cdcBuf.position(off);

Diff for: modules/core/src/test/java/org/apache/ignite/cdc/CdcIgniteNodeActiveModeTest.java

+14-42
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
4747
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
4848
import org.apache.ignite.internal.processors.metric.MetricRegistry;
49+
import org.apache.ignite.internal.util.lang.RunnableX;
4950
import org.apache.ignite.internal.util.typedef.T2;
5051
import org.apache.ignite.plugin.AbstractTestPluginProvider;
5152
import org.apache.ignite.plugin.PluginContext;
@@ -119,7 +120,7 @@ public class CdcIgniteNodeActiveModeTest extends AbstractCdcTest {
119120

120121
/** */
121122
@Test
122-
public void testCdcMetricsIgniteNodeActiveMode() throws Exception {
123+
public void testCdcMetrics() throws Exception {
123124
awaitCdcModeValue(CdcMode.IGNITE_NODE_ACTIVE);
124125

125126
IgniteCache<Integer, User> cache = ign.cache(DEFAULT_CACHE_NAME);
@@ -157,25 +158,7 @@ public void testSwitchToCdcUtilityActiveMode() throws Exception {
157158

158159
/** */
159160
@Test
160-
public void testCdcModeWritesByIgniteNodeOnly() throws Exception {
161-
writeCdcManagerStopCdcRecord();
162-
163-
rollSegment();
164-
165-
awaitCdcModeValue(CdcMode.CDC_UTILITY_ACTIVE);
166-
167-
cdcMainFut.cancel();
168-
169-
cdcMain = createCdc(new UserCdcConsumer(), ign.configuration());
170-
171-
cdcMainFut = runAsync(cdcMain);
172-
173-
awaitCdcModeValue(CdcMode.IGNITE_NODE_ACTIVE);
174-
}
175-
176-
/** */
177-
@Test
178-
public void testSwitchToPreviousSegment() throws Exception {
161+
public void testSkipsNodeCommittedData() throws Exception {
179162
List<Integer> expUsers = new ArrayList<>();
180163

181164
addData(0, 1, null);
@@ -207,7 +190,7 @@ public void testSwitchToPreviousSegment() throws Exception {
207190

208191
/** */
209192
@Test
210-
public void testCleanOldWals() throws Exception {
193+
public void testCleanWalsAfterCdcManagerRecord() throws Exception {
211194
checkCdcSegmentsExists(0, -1); // No segments stored in CDC dir.
212195

213196
rollSegment();
@@ -240,20 +223,13 @@ public void testCleanOldWals() throws Exception {
240223

241224
/** */
242225
@Test
243-
public void testIteratorSimple() throws Exception {
244-
checkIterator(false);
245-
}
246-
247-
/** */
248-
@Test
249-
public void testIteratorWithCdcRestart() throws Exception {
250-
checkIterator(true);
251-
}
226+
public void testRestoreModeOnRestart() throws Exception {
227+
RunnableX restartUtil = () -> {
228+
cdcMainFut.cancel();
229+
cdcMain = createCdc(cnsmr, getConfiguration(getTestIgniteInstanceName()));
230+
cdcMainFut = runAsync(cdcMain);
231+
};
252232

253-
/**
254-
* @param restartCdc Restart cdc.
255-
*/
256-
private void checkIterator(boolean restartCdc) throws Exception {
257233
List<Integer> expUsers = new ArrayList<>();
258234

259235
addData(0, 1, null);
@@ -262,13 +238,7 @@ private void checkIterator(boolean restartCdc) throws Exception {
262238

263239
addData(2, 3, null);
264240

265-
if (restartCdc) {
266-
cdcMainFut.cancel();
267-
268-
cdcMain = createCdc(cnsmr, getConfiguration(getTestIgniteInstanceName()));
269-
270-
cdcMainFut = runAsync(cdcMain);
271-
}
241+
restartUtil.run();
272242

273243
rollSegment();
274244

@@ -279,6 +249,8 @@ private void checkIterator(boolean restartCdc) throws Exception {
279249

280250
rollSegment();
281251

252+
restartUtil.run();
253+
282254
awaitCdcModeValue(CdcMode.CDC_UTILITY_ACTIVE);
283255

284256
checkConsumerData(expUsers);
@@ -338,7 +310,7 @@ private void awaitCdcModeValue(CdcMode expVal) throws IgniteInterruptedCheckedEx
338310
ObjectMetric<String> m = GridTestUtils.<StandaloneGridKernalContext>getFieldValue(cdcMain, "kctx")
339311
.metric().registry("cdc").findMetric(CDC_MODE);
340312

341-
return expVal.name().equals(m.value());
313+
return m != null && expVal.name().equals(m.value());
342314
}
343315
catch (Exception e) {
344316
return false;

0 commit comments

Comments
 (0)