diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcEvent.java b/modules/core/src/main/java/org/apache/ignite/cdc/CdcEvent.java index 5f1a490fa6d434..20ac33baa2edaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcEvent.java @@ -23,6 +23,8 @@ import org.apache.ignite.cache.CacheEntryVersion; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.internal.cdc.CdcMain; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.lang.IgniteExperimental; import org.apache.ignite.spi.systemview.view.CacheView; import org.jetbrains.annotations.Nullable; @@ -39,12 +41,32 @@ public interface CdcEvent extends Serializable { /** * @return Key for the changed entry. */ - public Object key(); + public KeyCacheObject key(); /** * @return Value for the changed entry or {@code null} in case of entry removal. */ - @Nullable public Object value(); + @Nullable public CacheObject value(); + + /** + * @return Previous entry state metadata if expected. + */ + @Nullable public CacheObject previousStateMetadata(); + + /** + * @return Key which was placed into cache. Or null if failed to convert. + */ + public Object unwrappedKey(); + + /** + * @return Value which was placed into cache. Or null for delete operation or for failure. + */ + public Object unwrappedValue(); + + /** + * @return Previous entry state metadata. + */ + public Object unwrappedPreviousStateMetadata(); /** * @return {@code True} if event fired on primary node for partition containing this entry. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcEventImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcEventImpl.java index 96f6adfd07986a..d124eef7b63373 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcEventImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcEventImpl.java @@ -20,7 +20,12 @@ import org.apache.ignite.cache.CacheEntryVersion; import org.apache.ignite.cdc.CdcConsumer; import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.internal.pagemem.wal.record.DataEntry; +import org.apache.ignite.internal.pagemem.wal.record.UnwrappedDataEntry; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; /** * Event of single entry change. @@ -33,87 +38,69 @@ public class CdcEventImpl implements CdcEvent { /** Serial version uid. */ private static final long serialVersionUID = 0L; - /** Key. */ - private final Object key; + /** Entry. */ + private final DataEntry entry; - /** Value. */ - private final Object val; - - /** {@code True} if changes made on primary node. */ - private final boolean primary; - - /** Partition. */ - private final int part; + /** + * @param entry Entry. + */ + public CdcEventImpl(DataEntry entry) { + this.entry = entry; + } - /** Order of the entry change. */ - private final CacheEntryVersion ord; + /** {@inheritDoc} */ + @Override public Object unwrappedKey() { + return ((UnwrappedDataEntry)(entry)).unwrappedKey(); + } - /** Cache id. */ - private final int cacheId; + /** {@inheritDoc} */ + @Override public Object unwrappedValue() { + return ((UnwrappedDataEntry)(entry)).unwrappedValue(); + } - /** Expire time. */ - private final long expireTime; + /** {@inheritDoc} */ + @Override public Object unwrappedPreviousStateMetadata() { + return ((UnwrappedDataEntry)(entry)).unwrappedPreviousStateMetadata(); + } - /** - * @param key Key. - * @param val Value. - * @param primary {@code True} if changes made on primary node. - * @param part Partition. - * @param ord Order of the entry change. - * @param cacheId Cache id. - * @param expireTime Expire time. - */ - public CdcEventImpl( - Object key, - Object val, - boolean primary, - int part, - CacheEntryVersion ord, - int cacheId, - long expireTime - ) { - this.key = key; - this.val = val; - this.primary = primary; - this.part = part; - this.ord = ord; - this.cacheId = cacheId; - this.expireTime = expireTime; + /** {@inheritDoc} */ + @Override public KeyCacheObject key() { + return entry.key(); } /** {@inheritDoc} */ - @Override public Object key() { - return key; + @Override public CacheObject value() { + return entry.value(); } /** {@inheritDoc} */ - @Override public Object value() { - return val; + @Override public @Nullable CacheObject previousStateMetadata() { + return entry.previousStateMetadata(); } /** {@inheritDoc} */ @Override public boolean primary() { - return primary; + return (entry.flags() & DataEntry.PRIMARY_FLAG) != 0; } /** {@inheritDoc} */ @Override public int partition() { - return part; + return entry.partitionId(); } /** {@inheritDoc} */ @Override public CacheEntryVersion version() { - return ord; + return entry.writeVersion(); } /** {@inheritDoc} */ @Override public int cacheId() { - return cacheId; + return entry.cacheId(); } /** {@inheritDoc} */ @Override public long expireTime() { - return expireTime; + return entry.expireTime(); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index c76f41fe54cd02..15ec7b588fd735 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -602,7 +602,7 @@ private void consumeSegmentActively(IgniteWalIteratorFactory.IteratorParametersB boolean interrupted; do { - boolean commit = consumer.onRecords(iter, WalRecordsConsumer.CDC_EVENT_TRANSFORMER, null); + boolean commit = consumer.onRecords(iter, null); if (commit) saveStateAndRemoveProcessed(iter.state()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java index 3b111d50a197ec..54035e455e28fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java @@ -43,7 +43,6 @@ import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -89,21 +88,6 @@ public class WalRecordsConsumer { return OPERATIONS_TYPES.contains(e.op()); }; - /** Event transformer. */ - static final IgniteClosure CDC_EVENT_TRANSFORMER = e -> { - UnwrapDataEntry ue = (UnwrapDataEntry)e; - - return new CdcEventImpl( - ue.unwrappedKey(), - ue.unwrappedValue(), - (e.flags() & DataEntry.PRIMARY_FLAG) != 0, - e.partitionId(), - e.writeVersion(), - e.cacheId(), - e.expireTime() - ); - }; - /** * @param consumer User provided CDC consumer. * @param log Logger. @@ -119,15 +103,10 @@ public WalRecordsConsumer(CdcConsumer consumer, IgniteLogger log) { * {@link DataRecord} will be stored and WAL iteration will be started from it on CDC application fail/restart. * * @param entries Data entries iterator. - * @param transform Event transformer. * @param filter Optional event filter. * @return {@code True} if current offset in WAL should be commited. */ - public boolean onRecords( - Iterator entries, - IgniteClosure transform, - @Nullable IgnitePredicate filter - ) { + public boolean onRecords(Iterator entries, @Nullable IgnitePredicate filter) { Iterator evts = F.iterator(new Iterator() { @Override public boolean hasNext() { return entries.hasNext(); @@ -142,7 +121,7 @@ public boolean onRecords( return next; } - }, transform, true, OPERATIONS_FILTER, filter); + }, CdcEventImpl::new, true, OPERATIONS_FILTER, filter); return consumer.onEvents(evts); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java index c38a94df8cda59..7ef0920b8c6c7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcCacheDataResendTask.java @@ -213,6 +213,7 @@ private void resendCacheData(IgniteInternalCache cache) throws IgniteCheck row.expireTime(), key.partition(), -1, + null, // Conflict resolve should not happen at data copying. DataEntry.flags(true)) ); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java index 096b1e8e1ce709..a861d38a2bafd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java @@ -41,6 +41,9 @@ public class DataEntry { /** */ public static final byte FROM_STORE_FLAG = 0b00000100; + /** */ + public static final byte PREV_STATE_FLAG = 0b00001000; + /** Cache ID. */ @GridToStringInclude protected int cacheId; @@ -74,6 +77,9 @@ public class DataEntry { @GridToStringInclude protected long partCnt; + /** Previous entry state metadata. */ + protected CacheObject prevStateMeta; + /** * Bit flags. *
    @@ -100,6 +106,7 @@ private DataEntry() { * @param expireTime Expire time. * @param partId Partition ID. * @param partCnt Partition counter. + * @param prevStateMeta Previous entry state metadata. * @param flags Entry flags. */ public DataEntry( @@ -112,6 +119,7 @@ public DataEntry( long expireTime, int partId, long partCnt, + CacheObject prevStateMeta, byte flags ) { this.cacheId = cacheId; @@ -123,8 +131,12 @@ public DataEntry( this.expireTime = expireTime; this.partId = partId; this.partCnt = partCnt; + this.prevStateMeta = prevStateMeta; this.flags = flags; + if (this.prevStateMeta != null) + this.flags |= PREV_STATE_FLAG; + // Only READ, CREATE, UPDATE and DELETE operations should be stored in WAL. assert op == GridCacheOperation.READ || op == GridCacheOperation.CREATE @@ -231,6 +243,13 @@ public long expireTime() { return expireTime; } + /** + * Previous entry state metadata. + */ + public CacheObject previousStateMetadata() { + return prevStateMeta; + } + /** * Entry flags. * @see #flags diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java index 5819496c4cacaa..dbcc0e53ceaa4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyDataEntry.java @@ -47,6 +47,12 @@ public class LazyDataEntry extends DataEntry { /** Value value bytes. */ private byte[] valBytes; + /** Previous entry state metadata bytes type code. See {@link CacheObject} for built-in value type codes */ + private byte prevStateMetaType; + + /** Previous entry state metadata bytes. */ + private byte[] prevStateMetaBytes; + /** * @param cctx Shared context. * @param cacheId Cache ID. @@ -60,6 +66,8 @@ public class LazyDataEntry extends DataEntry { * @param expireTime Expire time. * @param partId Partition ID. * @param partCnt Partition counter. + * @param prevStateMetaType Object type code for previous entry state metadata. + * @param prevStateMetaBytes Previous entry state metadata bytes. * @param flags Flags. */ public LazyDataEntry( @@ -75,15 +83,19 @@ public LazyDataEntry( long expireTime, int partId, long partCnt, + byte prevStateMetaType, + byte[] prevStateMetaBytes, byte flags ) { - super(cacheId, null, null, op, nearXidVer, writeVer, expireTime, partId, partCnt, flags); + super(cacheId, null, null, op, nearXidVer, writeVer, expireTime, partId, partCnt, null, flags); this.cctx = cctx; this.keyType = keyType; this.keyBytes = keyBytes; this.valType = valType; this.valBytes = valBytes; + this.prevStateMetaType = prevStateMetaType; + this.prevStateMetaBytes = prevStateMetaBytes; } /** {@inheritDoc} */ @@ -126,6 +138,22 @@ public LazyDataEntry( return val; } + /** {@inheritDoc} */ + @Override public CacheObject previousStateMetadata() { + if (prevStateMeta == null && prevStateMetaBytes != null) { + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + + if (cacheCtx == null) + throw new IgniteException("Failed to find cache context for the given cache ID: " + cacheId); + + IgniteCacheObjectProcessor co = cctx.kernalContext().cacheObjects(); + + prevStateMeta = co.toCacheObject(cacheCtx.cacheObjectContext(), prevStateMetaType, prevStateMetaBytes); + } + + return prevStateMeta; + } + /** @return Data Entry Key type code. See {@link CacheObject} for built-in value type codes */ public byte getKeyType() { return keyType; @@ -145,4 +173,14 @@ public byte getValType() { public byte[] getValBytes() { return valBytes; } + + /** {@inheritDoc} */ + @Override public byte getPreviousStateMetadataType() { + return prevStateMetaType; + } + + /** {@inheritDoc} */ + @Override public byte[] getPreviousStateMetadataBytes() { + return prevStateMetaBytes; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyMvccDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/LazyMvccDataEntry.java new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MvccDataEntry.java new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java index e535e22f0b8d42..3e88d133a90ca8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapDataEntry.java @@ -47,7 +47,8 @@ public class UnwrapDataEntry extends DataEntry { * @param partId Partition ID. * @param partCnt Partition counter. * @param cacheObjValCtx cache object value context for unwrapping objects. - * @param keepBinary disable unwrapping for non primitive objects, Binary Objects would be returned instead. + * @param keepBinary disable unwrapping for non-primitive objects, Binary Objects would be returned instead. + * @param prevStateMeta Previous state metadata. * @param flags Flags. */ public UnwrapDataEntry( @@ -62,8 +63,10 @@ public UnwrapDataEntry( final long partCnt, final CacheObjectValueContext cacheObjValCtx, final boolean keepBinary, + CacheObject prevStateMeta, final byte flags) { - super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt, flags); + super(cacheId, key, val, op, nearXidVer, writeVer, expireTime, partId, partCnt, prevStateMeta, flags); + this.cacheObjValCtx = cacheObjValCtx; this.keepBinary = keepBinary; } @@ -98,7 +101,19 @@ public Object unwrappedValue() { } catch (Exception e) { cacheObjValCtx.kernalContext().log(UnwrapDataEntry.class) - .error("Unable to convert value [" + value() + "]", e); + .error("Unable to convert value [" + val + "]", e); + return null; + } + } + + /** {@inheritDoc} */ + @Override public Object unwrappedPreviousStateMetadata() { + try { + return unwrapValue(prevStateMeta, keepBinary, cacheObjValCtx); + } + catch (Exception e) { + cacheObjValCtx.kernalContext().log(UnwrapDataEntry.class) + .error("Unable to convert previous state metadata [" + prevStateMeta + "]", e); return null; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapMvccDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrapMvccDataEntry.java new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrappedDataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/UnwrappedDataEntry.java new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index fbf11bf85a1858..33ee3569f861a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1630,16 +1630,23 @@ public CacheVersionConflictResolver conflictResolver() { * * @param oldEntry Old entry. * @param newEntry New entry. + * @param prevStateMeta Previous entry state metadata. * @param atomicVerComp Whether to use atomic version comparator. * @return Conflict resolution result. * @throws IgniteCheckedException In case of exception. */ - public GridCacheVersionConflictContext conflictResolve(GridCacheVersionedEntryEx oldEntry, - GridCacheVersionedEntryEx newEntry, boolean atomicVerComp) throws IgniteCheckedException { + public GridCacheVersionConflictContext conflictResolve( + GridCacheVersionedEntryEx oldEntry, + GridCacheVersionedEntryEx newEntry, + CacheObject prevStateMeta, + boolean atomicVerComp) throws IgniteCheckedException { assert conflictRslvr != null : "Should not reach this place."; - GridCacheVersionConflictContext ctx = conflictRslvr.resolve(cacheObjCtx, oldEntry, newEntry, - atomicVerComp); + Object prevStateMetaObj = prevStateMeta != null ? + cacheObjCtx.unwrapBinaryIfNeeded(prevStateMeta, false, true, null) : null; + + GridCacheVersionConflictContext ctx = + conflictRslvr.resolve(cacheObjCtx, oldEntry, newEntry, prevStateMetaObj, atomicVerComp); if (ctx.isManualResolve()) drMgr.onReceiveCacheConflictResolved(ctx.isUseNew(), ctx.isUseOld(), ctx.isMerge()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 195b9cb6e4828b..f54e83739b347a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -431,6 +431,7 @@ public GridCacheUpdateTxResult innerRemove( * @param conflictTtl Conflict TTL (if any). * @param conflictExpireTime Conflict expire time (if any). * @param conflictVer DR version (if any). + * @param prevStateMeta Previous entry state metadata. * @param conflictResolve If {@code true} then performs conflicts resolution. * @param intercept If {@code true} then calls cache interceptor. * @param taskName Task name. @@ -468,6 +469,7 @@ public GridCacheUpdateAtomicResult innerUpdate( long conflictTtl, long conflictExpireTime, @Nullable GridCacheVersion conflictVer, + CacheObject prevStateMeta, boolean conflictResolve, boolean intercept, String taskName, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index fea3e4cd7e5110..4a995fbe1a90ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; +import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; @@ -1108,7 +1109,8 @@ else if (interceptorVal != val0) updateCntr0 = nextPartitionCounter(tx, updateCntr); if (tx != null && cctx.group().logDataRecords()) - logPtr = logTxUpdate(tx, val, addConflictVersion(tx.writeVersion(), newVer), expireTime, updateCntr0); + logPtr = logTxUpdate( + tx, val, addConflictVersion(tx.writeVersion(), newVer), expireTime, updateCntr0, previousStateMetadata()); update(val, expireTime, ttl, newVer, true); @@ -1338,7 +1340,7 @@ protected Object keyValue(boolean cpy) { updateCntr0 = nextPartitionCounter(tx, updateCntr); if (tx != null && cctx.group().logDataRecords()) - logPtr = logTxUpdate(tx, null, addConflictVersion(tx.writeVersion(), newVer), 0, updateCntr0); + logPtr = logTxUpdate(tx, null, addConflictVersion(tx.writeVersion(), newVer), 0, updateCntr0, previousStateMetadata()); drReplicate(drType, null, newVer, topVer); @@ -1463,6 +1465,7 @@ else if (log.isDebugEnabled()) final long explicitTtl, final long explicitExpireTime, @Nullable final GridCacheVersion conflictVer, + @Nullable final CacheObject prevStateMeta, final boolean conflictResolve, final boolean intercept, final String taskName, @@ -1512,6 +1515,7 @@ else if (log.isDebugEnabled()) explicitTtl, explicitExpireTime, conflictVer, + prevStateMeta, conflictResolve, intercept, updateCntr, @@ -2612,6 +2616,7 @@ else if (deletedUnlocked()) expireTime, partition(), updateCntr, + previousStateMetadata(), DataEntry.flags(primary, preload, fromStore) ))); } @@ -3465,6 +3470,7 @@ protected boolean storeValue( * @param expireTime Expire time. * @param updCntr Update counter. * @param primary {@code True} if node is primary for entry in the moment of logging. + * @param prevStateMeta Previous state metadata. */ protected void logUpdate( GridCacheOperation op, @@ -3472,7 +3478,8 @@ protected void logUpdate( GridCacheVersion writeVer, long expireTime, long updCntr, - boolean primary + boolean primary, + CacheObject prevStateMeta ) throws IgniteCheckedException { // We log individual updates only in ATOMIC cache. assert cctx.atomic(); @@ -3489,6 +3496,7 @@ protected void logUpdate( expireTime, partition(), updCntr, + prevStateMeta, DataEntry.flags(primary)))); } catch (StorageException e) { @@ -3503,6 +3511,7 @@ protected void logUpdate( * @param writeVer New entry version. * @param expireTime Expire time (or 0 if not applicable). * @param updCntr Update counter. + * @param prevStateMeta Previous state metadata. * @throws IgniteCheckedException In case of log failure. */ protected WALPointer logTxUpdate( @@ -3510,7 +3519,8 @@ protected WALPointer logTxUpdate( CacheObject val, GridCacheVersion writeVer, long expireTime, - long updCntr + long updCntr, + CacheObject prevStateMeta ) throws IgniteCheckedException { assert cctx.transactional(); @@ -3531,6 +3541,7 @@ protected WALPointer logTxUpdate( expireTime, key.partition(), updCntr, + prevStateMeta, DataEntry.flags(CU.txOnPrimary(tx))))); } else @@ -4221,6 +4232,21 @@ private void unlockListenerReadLock() { context().evicts().touch(this); } + /** */ + private CacheObject previousStateMetadata() throws IgniteCheckedException { + CacheVersionConflictResolver resolver = cctx.conflictResolver(); + + if (resolver == null) + return null; + + CacheObject res = cctx.toCacheObject(resolver.previousStateMetadata(this)); + + if (res != null) + res.prepareForCache(cctx.cacheObjectContext()); + + return res; + } + /** {@inheritDoc} */ @Override public boolean equals(Object o) { // Identity comparison left on purpose. @@ -4532,6 +4558,9 @@ private static class AtomicCacheUpdateClosure implements IgniteCacheOffheapManag /** */ private final boolean conflictResolve; + /** */ + private final CacheObject prevStateMeta; + /** */ private final boolean intercept; @@ -4578,6 +4607,7 @@ private static class AtomicCacheUpdateClosure implements IgniteCacheOffheapManag long explicitTtl, long explicitExpireTime, @Nullable GridCacheVersion conflictVer, + @Nullable CacheObject prevStateMeta, boolean conflictResolve, boolean intercept, @Nullable Long updateCntr, @@ -4601,6 +4631,7 @@ private static class AtomicCacheUpdateClosure implements IgniteCacheOffheapManag this.explicitTtl = explicitTtl; this.explicitExpireTime = explicitExpireTime; this.conflictVer = conflictVer; + this.prevStateMeta = prevStateMeta; this.conflictResolve = conflictResolve; this.intercept = intercept; this.updateCntr = updateCntr; @@ -4702,7 +4733,7 @@ else if (oldVal != null && entry.deletedUnlocked()) GridCacheVersionConflictContext conflictCtx = null; if (conflictResolve) { - conflictCtx = resolveConflict(newVal, invokeRes); + conflictCtx = resolveConflict(newVal, prevStateMeta, invokeRes); if (updateRes != null) { assert conflictCtx != null && conflictCtx.isUseOld() : conflictCtx; @@ -5058,7 +5089,7 @@ else if (interceptorVal != updated0) { long updateCntr0 = entry.nextPartitionCounter(topVer, primary, false, updateCntr); - entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0, primary); + entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0, primary, previousStateMetadata()); if (!entry.isNear()) { newRow = entry.localPartition().dataStore().createRow( @@ -5157,7 +5188,7 @@ private void remove(@Nullable GridCacheVersionConflictContext conflictCtx, long updateCntr0 = entry.nextPartitionCounter(topVer, primary, false, updateCntr); - entry.logUpdate(op, null, newVer, 0, updateCntr0, primary); + entry.logUpdate(op, null, newVer, 0, updateCntr0, primary, previousStateMetadata()); if (oldVal != null) { assert !entry.deletedUnlocked(); @@ -5201,14 +5232,33 @@ private void remove(@Nullable GridCacheVersionConflictContext conflictCtx, transformed); } + /** */ + private CacheObject previousStateMetadata() { + GridCacheContext ctx = entry.context(); + + CacheVersionConflictResolver resolver = ctx.conflictResolver(); + + if (resolver == null) + return null; + + CacheObject res = ctx.toCacheObject(resolver.previousStateMetadata(entry)); + + if (res != null) + res.prepareForCache(ctx.cacheObjectContext()); + + return res; + } + /** * @param newVal New entry value. + * @param prevStateMeta Previous entry state metadata. * @param invokeRes Entry processor result (for invoke operation). * @return Conflict context. * @throws IgniteCheckedException If failed. */ private GridCacheVersionConflictContext resolveConflict( CacheObject newVal, + CacheObject prevStateMeta, @Nullable IgniteBiTuple invokeRes) throws IgniteCheckedException { GridCacheContext cctx = entry.context(); @@ -5241,7 +5291,7 @@ private void remove(@Nullable GridCacheVersionConflictContext conflictCtx, keepBinary); // Resolve conflict. - GridCacheVersionConflictContext conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck); + GridCacheVersionConflictContext conflictCtx = cctx.conflictResolve(oldEntry, newEntry, prevStateMeta, verCheck); assert conflictCtx != null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 5754a89d978a7e..ba696d28aafc41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteState; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState; import org.apache.ignite.internal.processors.cache.transactions.TxCounters; +import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; @@ -599,6 +600,11 @@ else if (conflictCtx.isMerge()) { if (dataEntries == null) dataEntries = new ArrayList<>(entries.size()); + CacheVersionConflictResolver resolver = cacheCtx.conflictResolver(); + + CacheObject prevStateMeta = resolver == null ? null : + cacheCtx.toCacheObject(resolver.previousStateMetadata(cached)); + dataEntry = new DataEntry( cacheCtx.cacheId(), txEntry.key(), @@ -609,6 +615,7 @@ else if (conflictCtx.isMerge()) { 0, txEntry.key().partition(), txEntry.updateCounter(), + prevStateMeta, DataEntry.flags(CU.txOnPrimary(this)) ); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 174e091ac5c95a..e5c8ca23a7d5ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -487,6 +487,7 @@ private void addMapping( existing.conflictExpireTime(e.conflictExpireTime()); existing.conflictVersion(e.conflictVersion()); + existing.previousStateMetadata(e.previousStateMetadata()); } else { existing = e; @@ -615,6 +616,7 @@ IgniteInternalFuture lockAllAsync( -1L, -1L, null, + null, skipStore, keepBinary, nearCache); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 4d07658898bde9..ba346cf369aed5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -2515,6 +2515,7 @@ private void updateSingle( GridCacheVersion newConflictVer = req.conflictVersion(i); long newConflictTtl = req.conflictTtl(i); long newConflictExpireTime = req.conflictExpireTime(i); + CacheObject prevStateMeta = req.previousStateMetadata(i); assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer; @@ -2548,6 +2549,7 @@ private void updateSingle( newConflictTtl, newConflictExpireTime, newConflictVer, + prevStateMeta, /*conflictResolve*/true, intercept, taskName, @@ -2833,6 +2835,7 @@ else if (GridDhtCacheEntry.ReaderId.contains(readers, nearNode.id())) { CU.TTL_NOT_CHANGED, CU.EXPIRE_TIME_CALCULATE, null, + null, /*conflict resolve*/false, /*intercept*/false, taskName, @@ -3301,6 +3304,7 @@ && writeThrough() && !req.skipStore(), ttl, expireTime, req.conflictVersion(i), + null, false, intercept, taskName, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index c06f82861b7a23..60d716036b2caf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -437,12 +437,14 @@ private boolean isFlag(int mask) { * @param conflictTtl Conflict TTL (optional). * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). + * @param prevStateMeta Previous entry state metadata (optional). */ abstract void addUpdateEntry(KeyCacheObject key, @Nullable Object val, long conflictTtl, long conflictExpireTime, - @Nullable GridCacheVersion conflictVer); + @Nullable GridCacheVersion conflictVer, + @Nullable Object prevStateMeta); /** * @return Keys for this update request. @@ -472,6 +474,12 @@ abstract void addUpdateEntry(KeyCacheObject key, */ public abstract CacheObject writeValue(int idx); + /** + * @param idx Index to get. + * @return Previous entry state metadata. + */ + public abstract CacheObject previousStateMetadata(int idx); + /** * @return Conflict versions. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java index 95db4eac657a19..83b5aa55fb36a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java @@ -50,7 +50,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; @@ -70,6 +69,10 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat @GridDirectCollection(CacheObject.class) private List vals; + /** Previous state metadatas. */ + @GridDirectCollection(CacheObject.class) + private List prevStateMetas; + /** Entry processors. */ @GridDirectTransient private List> entryProcessors; @@ -176,7 +179,8 @@ public GridNearAtomicFullUpdateRequest() { @Nullable Object val, long conflictTtl, long conflictExpireTime, - @Nullable GridCacheVersion conflictVer) { + @Nullable GridCacheVersion conflictVer, + @Nullable Object prevStateMeta) { EntryProcessor entryProc = null; if (op == TRANSFORM) { @@ -185,8 +189,6 @@ public GridNearAtomicFullUpdateRequest() { entryProc = (EntryProcessor)val; } - assert val != null || op == DELETE; - keys.add(key); if (entryProc != null) { @@ -218,6 +220,19 @@ else if (val != null) { else if (conflictVers != null) conflictVers.add(null); + if (prevStateMeta != null) { + if (prevStateMetas == null) { + prevStateMetas = new ArrayList<>(initSize); + + for (int i = 0; i < keys.size() - 1; i++) + prevStateMetas.add(null); + } + + prevStateMetas.add((CacheObject)prevStateMeta); + } + else if (prevStateMetas != null) + prevStateMetas.add(null); + if (conflictTtl >= 0) { if (conflictTtls == null) { conflictTtls = new GridLongList(keys.size()); @@ -285,6 +300,14 @@ else if (conflictVers != null) return null; } + /** {@inheritDoc} */ + @Override public CacheObject previousStateMetadata(int idx) { + if (prevStateMetas != null) + return prevStateMetas.get(idx); + + return null; + } + /** {@inheritDoc} */ @Override @Nullable public List conflictVersions() { return conflictVers; @@ -377,6 +400,8 @@ else if (conflictVers != null) } else prepareMarshalCacheObjects(vals, cctx); + + prepareMarshalCacheObjects(prevStateMetas, cctx); } /** {@inheritDoc} */ @@ -406,6 +431,8 @@ else if (conflictVers != null) } else finishUnmarshalCacheObjects(vals, cctx, ldr); + + finishUnmarshalCacheObjects(prevStateMetas, cctx, ldr); } /** {@inheritDoc} */ @@ -479,6 +506,12 @@ else if (conflictVers != null) writer.incrementState(); case 18: + if (!writer.writeCollection("prevStateMetas", prevStateMetas, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 19: if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) return false; @@ -565,6 +598,14 @@ else if (conflictVers != null) reader.incrementState(); case 18: + prevStateMetas = reader.readCollection("prevStateMetas", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 19: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -596,7 +637,7 @@ else if (conflictVers != null) /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 19; + return 20; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index db5af0d82c3584..1debc7674bd137 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -612,6 +612,7 @@ private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, long val, CU.TTL_NOT_CHANGED, CU.EXPIRE_TIME_CALCULATE, + null, null); return new PrimaryRequestState(req, nodes, true); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java index c772be970032d3..adcabd12a0a911 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java @@ -115,18 +115,13 @@ public GridNearAtomicSingleUpdateInvokeRequest() { this.invokeArgs = invokeArgs; } - /** - * @param key Key to add. - * @param val Optional update value. - * @param conflictTtl Conflict TTL (optional). - * @param conflictExpireTime Conflict expire time (optional). - * @param conflictVer Conflict version (optional). - */ + /** {@inheritDoc} */ @Override public void addUpdateEntry(KeyCacheObject key, @Nullable Object val, long conflictTtl, long conflictExpireTime, - @Nullable GridCacheVersion conflictVer) { + @Nullable GridCacheVersion conflictVer, + @Nullable Object prevStateMeta) { assert conflictTtl < 0 : conflictTtl; assert conflictExpireTime < 0 : conflictExpireTime; assert conflictVer == null : conflictVer; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java index 1f121549fdfb44..50928292bc9256 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java @@ -107,18 +107,13 @@ public GridNearAtomicSingleUpdateRequest() { return key.partition(); } - /** - * @param key Key to add. - * @param val Optional update value. - * @param conflictTtl Conflict TTL (optional). - * @param conflictExpireTime Conflict expire time (optional). - * @param conflictVer Conflict version (optional). - */ + /** {@inheritDoc} */ @Override public void addUpdateEntry(KeyCacheObject key, @Nullable Object val, long conflictTtl, long conflictExpireTime, - @Nullable GridCacheVersion conflictVer) { + @Nullable GridCacheVersion conflictVer, + @Nullable Object prevStateMeta) { assert op != TRANSFORM; assert val != null || op == DELETE; assert conflictTtl < 0 : conflictTtl; @@ -179,6 +174,11 @@ public GridNearAtomicSingleUpdateRequest() { return val; } + /** {@inheritDoc} */ + @Override public CacheObject previousStateMetadata(int idx) { + return null; + } + /** {@inheritDoc} */ @Nullable @Override public List conflictVersions() { return null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 08e221c2fad5bd..f1a1dfdc09f6f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -924,12 +924,14 @@ private Map mapUpdate(Collection topNode GridCacheVersion conflictVer; long conflictTtl; long conflictExpireTime; + Object prevStateMeta; if (vals != null) { val = it.next(); conflictVer = null; conflictTtl = CU.TTL_NOT_CHANGED; conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + prevStateMeta = null; if (val == null) throw new NullPointerException("Null value."); @@ -941,18 +943,21 @@ else if (conflictPutVals != null) { conflictVer = conflictPutVal.version(); conflictTtl = conflictPutVal.ttl(); conflictExpireTime = conflictPutVal.expireTime(); + prevStateMeta = conflictPutVal.previousStateMetadata(); } else if (conflictRmvVals != null) { val = null; conflictVer = conflictRmvValsIt.next(); conflictTtl = CU.TTL_NOT_CHANGED; conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + prevStateMeta = null; } else { val = null; conflictVer = null; conflictTtl = CU.TTL_NOT_CHANGED; conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + prevStateMeta = null; } if (val == null && op != GridCacheOperation.DELETE) @@ -1019,7 +1024,7 @@ else if (conflictRmvVals != null) { if (mapped.req.initMappingLocally()) mapped.addMapping(nodes); - mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer); + mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, prevStateMeta); } return pendingMappings; @@ -1040,6 +1045,7 @@ private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long GridCacheVersion conflictVer; long conflictTtl; long conflictExpireTime; + Object prevStateMeta; if (vals != null) { // Regular PUT. @@ -1047,6 +1053,7 @@ private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long conflictVer = null; conflictTtl = CU.TTL_NOT_CHANGED; conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + prevStateMeta = null; } else if (conflictPutVals != null) { // Conflict PUT. @@ -1056,6 +1063,7 @@ else if (conflictPutVals != null) { conflictVer = conflictPutVal.version(); conflictTtl = conflictPutVal.ttl(); conflictExpireTime = conflictPutVal.expireTime(); + prevStateMeta = conflictPutVal.previousStateMetadata(); } else if (conflictRmvVals != null) { // Conflict REMOVE. @@ -1063,6 +1071,7 @@ else if (conflictRmvVals != null) { conflictVer = F.first(conflictRmvVals); conflictTtl = CU.TTL_NOT_CHANGED; conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + prevStateMeta = null; } else { // Regular REMOVE. @@ -1070,13 +1079,14 @@ else if (conflictRmvVals != null) { conflictVer = null; conflictTtl = CU.TTL_NOT_CHANGED; conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + prevStateMeta = null; } // We still can get here if user pass map with single element. if (key == null) throw new NullPointerException("Null key."); - if (val == null && op != GridCacheOperation.DELETE) + if (val == null && op != GridCacheOperation.DELETE && conflictPutVals == null /*null values allowed at dr*/) throw new NullPointerException("Null value."); KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); @@ -1128,7 +1138,8 @@ else if (conflictRmvVals != null) { val, conflictTtl, conflictExpireTime, - conflictVer); + conflictVer, + prevStateMeta); return new PrimaryRequestState(req, nodes, true); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java index 64fbb0387cdace..a1acf706a46f86 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java @@ -79,7 +79,8 @@ public void resetFromPrimary(CacheObject val, GridCacheVersion ver) { GridCacheVersion writeVer, long expireTime, long updCntr, - boolean primary + boolean primary, + CacheObject prevStateMeta ) throws IgniteCheckedException { // No-op for detached entries, index is updated on primary or backup nodes. } @@ -90,7 +91,8 @@ public void resetFromPrimary(CacheObject val, GridCacheVersion ver) { CacheObject val, GridCacheVersion writeVer, long expireTime, - long updCntr + long updCntr, + CacheObject prevStateMeta ) { return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index ac361269221749..53286f378753ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -271,6 +271,7 @@ private void processNearAtomicUpdateResponse( ttl, expireTime, null, + null, false, false, taskName, @@ -375,6 +376,7 @@ private void processNearAtomicUpdateResponse( ttl, expireTime, null, + null, false, intercept, taskName, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 7d2bcb5a410bbd..e957b0d852157f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -467,7 +467,7 @@ public boolean loadedValue(@Nullable IgniteInternalTx tx, /** {@inheritDoc} */ @Override protected void logUpdate(GridCacheOperation op, CacheObject val, GridCacheVersion ver, long expireTime, - long updCntr, boolean primary) { + long updCntr, boolean primary, CacheObject prevStateMeta) { // No-op: queries are disabled for near cache. } @@ -477,7 +477,8 @@ public boolean loadedValue(@Nullable IgniteInternalTx tx, CacheObject val, GridCacheVersion writeVer, long expireTime, - long updCntr + long updCntr, + CacheObject prevStateMeta ) { return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index a3205a8a5a01f9..6ff2005f8e4602 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -921,11 +921,12 @@ private IgniteInternalFuture enlistWrite( expiryPlc, retval, filter, - /*drVer*/drVer, - /*drTtl*/-1L, - /*drExpireTime*/-1L, + drVer, + -1L, + -1L, + null, ret, - /*enlisted*/null, + null, skipStore, false, hasFilters, @@ -1064,6 +1065,7 @@ private IgniteInternalFuture enlistWrite( GridCacheVersion drVer; long drTtl; long drExpireTime; + CacheObject prevStateMeta; if (drPutMap != null) { GridCacheDrInfo info = drPutMap.get(key); @@ -1073,6 +1075,7 @@ private IgniteInternalFuture enlistWrite( drVer = info.version(); drTtl = info.ttl(); drExpireTime = info.expireTime(); + prevStateMeta = info.previousStateMetadata(); } else if (drRmvMap != null) { assert drRmvMap.get(key) != null; @@ -1080,19 +1083,22 @@ else if (drRmvMap != null) { drVer = drRmvMap.get(key); drTtl = -1L; drExpireTime = -1L; + prevStateMeta = null; } else if (dataCenterId != null) { drVer = cacheCtx.cache().nextVersion(dataCenterId); drTtl = -1L; drExpireTime = -1L; + prevStateMeta = null; } else { drVer = null; drTtl = -1L; drExpireTime = -1L; + prevStateMeta = null; } - if (!rmv && val == null && entryProc == null) { + if (!rmv && val == null && entryProc == null && drPutMap == null /*null values allowed at dr*/) { setRollbackOnly(); throw new NullPointerException("Null value."); @@ -1112,6 +1118,7 @@ else if (dataCenterId != null) { drVer, drTtl, drExpireTime, + prevStateMeta, ret, enlisted, skipStore, @@ -1183,6 +1190,7 @@ else if (dataCenterId != null) { * @param drVer DR version. * @param drTtl DR ttl. * @param drExpireTime DR expire time. + * @param prevStateMeta Previous entry state meta. * @param ret Return value. * @param enlisted Enlisted keys collection. * @param skipStore Skip store flag. @@ -1205,6 +1213,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, final GridCacheVersion drVer, final long drTtl, long drExpireTime, + @Nullable CacheObject prevStateMeta, final GridCacheReturn ret, @Nullable final Collection enlisted, boolean skipStore, @@ -1326,6 +1335,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, drTtl, drExpireTime, drVer, + prevStateMeta, skipStore, keepBinary, CU.isNearEnabled(cacheCtx)); @@ -1342,6 +1352,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, -1L, -1L, null, + prevStateMeta, skipStore, keepBinary, CU.isNearEnabled(cacheCtx)); @@ -1378,6 +1389,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, drTtl, drExpireTime, drVer, + prevStateMeta, skipStore, keepBinary, CU.isNearEnabled(cacheCtx)); @@ -1497,6 +1509,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, drTtl, drExpireTime, drVer, + prevStateMeta, skipStore, keepBinary, CU.isNearEnabled(cacheCtx)); @@ -2419,6 +2432,7 @@ private Collection enlistRead( -1L, -1L, null, + null, skipStore, !deserializeBinary, CU.isNearEnabled(cacheCtx)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java index 14e801cd246bf5..b7bb6603fe0999 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; /** * Cache DR info used as argument in PUT cache internal interfaces. @@ -43,6 +44,9 @@ public class GridCacheDrInfo implements Externalizable { /** DR version. */ private GridCacheVersion ver; + /** Previous entry state metadata. */ + protected CacheObject prevStateMeta; + /** * {@link Externalizable} support. */ @@ -64,6 +68,21 @@ public GridCacheDrInfo(CacheObject val, GridCacheVersion ver) { this.ver = ver; } + /** + * Constructor. + * + * @param val Value. + * @param ver Version. + * @param prevStateMeta Previous entry state metadata. + */ + public GridCacheDrInfo(@Nullable CacheObject val, GridCacheVersion ver, CacheObject prevStateMeta) { + assert ver != null; + + this.val = val; + this.ver = ver; + this.prevStateMeta = prevStateMeta; + } + /** * Constructor. * @@ -115,6 +134,13 @@ public GridCacheVersion version() { return ver; } + /** + * @return Previous entry state metadata. + */ + public CacheObject previousStateMetadata() { + return prevStateMeta; + } + /** * @return TTL. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index 7246379dabe2b9..b5ebb5483f2892 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -369,10 +369,13 @@ private boolean checkBounds(long idx) { if (X.hasCause(e, IgniteDataIntegrityViolationException.class)) // "curIdx" is an index in walFileDescriptors list. if (curIdx == walFileDescriptors.size() - 1) - // This means that there is no explicit last sengment, so we stop as if we reached the end + // This means that there is no explicit last segment, so we stop as if we reached the end // of the WAL. - if (highBound.equals(DFLT_HIGH_BOUND)) + if (highBound.equals(DFLT_HIGH_BOUND)) { + log.warning("Corrupted or partially written (last) WAL segment found.", e); + return null; + } return super.handleRecordException(e, ptr); } @@ -429,6 +432,8 @@ private boolean checkBounds(long idx) { final KeyCacheObject key; final CacheObject val; + final CacheObject prevStateMeta; + boolean keepBinary = this.keepBinary || !fakeCacheObjCtx.kernalContext().marshallerContext().initialized(); if (dataEntry instanceof LazyDataEntry) { @@ -438,19 +443,27 @@ private boolean checkBounds(long idx) { lazyDataEntry.getKeyType(), lazyDataEntry.getKeyBytes()); - final byte type = lazyDataEntry.getValType(); + final byte valType = lazyDataEntry.getValType(); - val = type == 0 ? null : + val = valType == 0 ? null : processor.toCacheObject(fakeCacheObjCtx, - type, + valType, lazyDataEntry.getValBytes()); + + final byte prevMetaStateType = lazyDataEntry.getPreviousStateMetadataType(); + + prevStateMeta = prevMetaStateType == 0 ? null : + processor.toCacheObject(fakeCacheObjCtx, + prevMetaStateType, + lazyDataEntry.getPreviousStateMetadataBytes()); } else { key = dataEntry.key(); val = dataEntry.value(); + prevStateMeta = dataEntry.previousStateMetadata(); } - return unwrapDataEntry(fakeCacheObjCtx, dataEntry, key, val, keepBinary); + return unwrapDataEntry(fakeCacheObjCtx, dataEntry, key, val, prevStateMeta, keepBinary); } /** @@ -459,11 +472,18 @@ private boolean checkBounds(long idx) { * @param dataEntry Data entry. * @param key Entry key. * @param val Entry value. - * @param keepBinary Don't convert non primitive types. + * @param keepBinary Don't convert non-primitive types. + * @param prevStateMeta Previous state metadata. * @return Unwrapped entry. */ - private DataEntry unwrapDataEntry(CacheObjectContext coCtx, DataEntry dataEntry, - KeyCacheObject key, CacheObject val, boolean keepBinary) { + private DataEntry unwrapDataEntry( + CacheObjectContext coCtx, + DataEntry dataEntry, + KeyCacheObject key, + CacheObject val, + CacheObject prevStateMeta, + boolean keepBinary + ) { return new UnwrapDataEntry( dataEntry.cacheId(), key, @@ -476,6 +496,7 @@ private DataEntry unwrapDataEntry(CacheObjectContext coCtx, DataEntry dataEntry, dataEntry.partitionCounter(), coCtx, keepBinary, + prevStateMeta, dataEntry.flags()); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java index 0a77869f3ff496..9ca218c9e4c1bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -124,6 +124,7 @@ import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.pagemem.wal.record.DataEntry.PREV_STATE_FLAG; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2; @@ -2020,6 +2021,12 @@ else if (!entry.value().putValue(buf)) buf.putLong(entry.expireTime()); buf.put(entry.flags()); + + CacheObject prevStare = entry.previousStateMetadata(); + + if (prevStare != null) + if (!prevStare.putValue(buf)) + throw new AssertionError(); } /** @@ -2124,6 +2131,16 @@ DataEntry readPlainDataEntry(ByteBufferBackedDataInput in, RecordType type) thro long expireTime = in.readLong(); byte flags = type == DATA_RECORD_V2 || type == CDC_DATA_RECORD ? in.readByte() : (byte)0; + byte[] prevStateMetaBytes = null; + byte prevStateMetaType = 0; + + if ((flags & PREV_STATE_FLAG) == PREV_STATE_FLAG) { + int prevStateMetaSize = in.readInt(); + prevStateMetaType = in.readByte(); + prevStateMetaBytes = new byte[prevStateMetaSize]; + in.readFully(prevStateMetaBytes); + } + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); if (cacheCtx != null) { @@ -2135,35 +2152,39 @@ DataEntry readPlainDataEntry(ByteBufferBackedDataInput in, RecordType type) thro key.partition(partId); CacheObject val = valBytes != null ? co.toCacheObject(coCtx, valType, valBytes) : null; + CacheObject prevStateMeta = prevStateMetaBytes != null ? co.toCacheObject(coCtx, prevStateMetaType, prevStateMetaBytes) : null; return new DataEntry( - cacheId, - key, - val, - op, - nearXidVer, - writeVer, - expireTime, - partId, - partCntr, - flags + cacheId, + key, + val, + op, + nearXidVer, + writeVer, + expireTime, + partId, + partCntr, + prevStateMeta, + flags ); } else return new LazyDataEntry( - cctx, - cacheId, - keyType, - keyBytes, - valType, - valBytes, - op, - nearXidVer, - writeVer, - expireTime, - partId, - partCntr, - flags + cctx, + cacheId, + keyType, + keyBytes, + valType, + valBytes, + op, + nearXidVer, + writeVer, + expireTime, + partId, + partCntr, + prevStateMetaType, + prevStateMetaBytes, + flags ); } @@ -2328,7 +2349,8 @@ protected int entrySize(DataEntry entry) throws IgniteCheckedException { /*part ID*/4 + /*expire Time*/8 + /*part cnt*/8 + - /*flags*/1; + /*flags*/1 + + /*prev state meta*/(entry.previousStateMetadata() == null ? 0 : entry.previousStateMetadata().valueBytesLength(coCtx)); } /** @@ -2361,7 +2383,17 @@ private int cacheStatesSize(Map states) { public static class EncryptedDataEntry extends DataEntry { /** Constructor. */ EncryptedDataEntry() { - super(0, null, null, READ, null, null, 0, 0, 0, EMPTY_FLAGS); + super(0, + null, + null, + READ, + null, + null, + 0, + 0, + 0, + null, + EMPTY_FLAGS); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 64aee86418e881..a90d02cbc46d2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1802,7 +1802,7 @@ else if (op == UPDATE) false, txEntry.keepBinary()); - GridCacheVersionConflictContext ctx = old.context().conflictResolve(oldEntry, newEntry, false); + GridCacheVersionConflictContext ctx = old.context().conflictResolve(oldEntry, newEntry, txEntry.previousStateMetadata(), false); if (ctx.isMerge()) { Object resVal = ctx.mergeValue(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 98611dba9a100d..efc18086f7e722 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -155,6 +155,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** Conflict version. */ private GridCacheVersion conflictVer; + /** Previous entry state Metadata. */ + private CacheObject prevStateMeta; + /** Explicit lock version if there is one. */ @GridToStringInclude private GridCacheVersion explicitVer; @@ -290,6 +293,7 @@ public IgniteTxEntry(GridCacheContext ctx, * @param entry Cache entry. * @param filters Put filters. * @param conflictVer Data center replication version. + * @param prevStateMeta Previous entry state metadata. * @param skipStore Skip store flag. * @param addReader Add reader flag. */ @@ -303,6 +307,7 @@ public IgniteTxEntry(GridCacheContext ctx, GridCacheEntryEx entry, CacheEntryPredicate[] filters, GridCacheVersion conflictVer, + CacheObject prevStateMeta, boolean skipStore, boolean keepBinary, boolean addReader @@ -319,6 +324,7 @@ public IgniteTxEntry(GridCacheContext ctx, this.ttl = ttl; this.filters = filters; this.conflictVer = conflictVer; + this.prevStateMeta = prevStateMeta; skipStore(skipStore); keepBinary(keepBinary); @@ -867,6 +873,20 @@ public GridCacheVersion explicitVersion() { return conflictVer; } + /** + * @param prevStateMeta Previous state metadata. + */ + public void previousStateMetadata(CacheObject prevStateMeta) { + this.prevStateMeta = prevStateMeta; + } + + /** + * @return Previous entry state metadata. + */ + public CacheObject previousStateMetadata() { + return prevStateMeta; + } + /** * @param conflictVer Conflict version. */ @@ -949,6 +969,9 @@ public void marshal(GridCacheSharedContext ctx, boolean transferExpiry) th if (oldVal != null) oldVal.marshal(context()); + + if (prevStateMeta != null) + prevStateMeta.prepareMarshal(context().cacheObjectContext()); } /** @@ -1181,6 +1204,11 @@ public void clearEntryReadVersion() { writer.incrementState(); + case 13: + if (!writer.writeMessage("conflictMeta", prevStateMeta)) + return false; + + writer.incrementState(); } return true; @@ -1298,6 +1326,13 @@ public void clearEntryReadVersion() { reader.incrementState(); + case 13: + prevStateMeta = reader.readMessage("conflictMeta"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); } return reader.afterMessageRead(IgniteTxEntry.class); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index ed89cf1a2357c8..b4cc73f5569182 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1358,6 +1358,7 @@ protected void checkValid(boolean checkTimeout) throws IgniteCheckedException { * @param drTtl DR TTL (if any). * @param drExpireTime DR expire time (if any). * @param drVer DR version. + * @param prevStateMeta Previous entry state meta. * @param skipStore Skip store flag. * @return Transaction entry. */ @@ -1372,6 +1373,7 @@ public final IgniteTxEntry addEntry(GridCacheOperation op, long drTtl, long drExpireTime, @Nullable GridCacheVersion drVer, + @Nullable CacheObject prevStateMeta, boolean skipStore, boolean keepBinary, boolean addReader @@ -1443,6 +1445,7 @@ public final IgniteTxEntry addEntry(GridCacheOperation op, entry, filter, drVer, + prevStateMeta, skipStore, keepBinary, addReader); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java index d3a7e3fdb117e5..854f6c8dd6c155 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java @@ -19,6 +19,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; /** * Cache version conflict resolver. @@ -29,6 +30,7 @@ public interface CacheVersionConflictResolver { * * @param oldEntry Old entry. * @param newEntry New entry. + * @param prevStateMeta Previous entry state metadata. * @param atomicVerComparator Whether to use atomic version comparator. * @return Conflict resolution context. * @throws IgniteCheckedException If failed. @@ -37,6 +39,17 @@ public GridCacheVersionConflictContext resolve( CacheObjectValueContext ctx, GridCacheVersionedEntryEx oldEntry, GridCacheVersionedEntryEx newEntry, + Object prevStateMeta, boolean atomicVerComparator ) throws IgniteCheckedException; + + /** + * Generates serialized previous entry state metadata. + * + * @param entry Cache entry. + * @return Serialized previous state metadata. + */ + public default Object previousStateMetadata(GridCacheEntryEx entry) { + return null; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java index d8034a89c24de3..d0bd618b087164 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java @@ -25,7 +25,7 @@ /** * Plain versioned entry. */ -public class GridCachePlainVersionedEntry implements GridCacheVersionedEntryEx { +public abstract class GridCachePlainVersionedEntry implements GridCacheVersionedEntryEx { /** Key. */ @GridToStringInclude protected K key; diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java index 12a3b2d037e9c6..f653e6e036fef9 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -377,7 +378,7 @@ public static class UserCdcConsumer extends TestCdcConsumer { if (evt.value() == null) return; - User user = (User)evt.value(); + User user = (User)evt.unwrappedValue(); assertTrue(user.getName().startsWith(JOHN)); assertTrue(user.getAge() >= 42); @@ -385,7 +386,7 @@ public static class UserCdcConsumer extends TestCdcConsumer { /** {@inheritDoc} */ @Override public Integer extract(CdcEvent evt) { - return (Integer)evt.key(); + return (Integer)evt.unwrappedKey(); } /** {@inheritDoc} */ @@ -514,6 +515,35 @@ public int getAge() { public byte[] getPayload() { return payload; } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + User user = (User)o; + + if (age != user.age) + return false; + + if (!Objects.equals(name, user.name)) + return false; + + return Arrays.equals(payload, user.payload); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = name != null ? name.hashCode() : 0; + + result = 31 * result + age; + result = 31 * result + Arrays.hashCode(payload); + + return result; + } } /** */ diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java index aa33d3859b3306..44e012816b8ec6 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcCacheVersionTest.java @@ -44,8 +44,10 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.CacheObjectImpl; import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -223,15 +225,18 @@ public void testConflictVersionWritten() throws Exception { CacheObjectValueContext ctx, GridCacheVersionedEntryEx oldEntry, GridCacheVersionedEntryEx newEntry, + Object prevStateMeta, boolean atomicVerComparator ) { - GridCacheVersionConflictContext res = - new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry); + GridCacheVersionConflictContext res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry); res.useNew(); assertEquals(OTHER_CLUSTER_ID, newEntry.version().dataCenterId()); + // Sets as value on addition with a conflict. + assertEquals(newEntry.value(ctx), prevStateMeta); + if (!oldEntry.isStartVersion()) assertEquals(OTHER_CLUSTER_ID, oldEntry.version().dataCenterId()); @@ -358,10 +363,18 @@ public void testOrderIncrease() throws Exception { DataRecord dataRec = (DataRecord)rec; for (int i = 0; i < dataRec.entryCount(); i++) { - assertEquals(CU.cacheId(DEFAULT_CACHE_NAME), dataRec.get(i).cacheId()); - assertEquals(KEY_TO_UPD, (int)dataRec.get(i).key().value(null, false)); + int cacheId = CU.cacheId(DEFAULT_CACHE_NAME); + int key = dataRec.get(i).key().value(null, false); + + assertEquals(cacheId, dataRec.get(i).cacheId()); + assertEquals(KEY_TO_UPD, key); assertTrue(dataRec.get(i).writeVersion().order() > prevOrder); + CacheObjectContext coCtx = context().cacheObjectContext(cacheId); + + // Provided as a key by conflict resolver. + assertEquals(key, coCtx.unwrapBinaryIfNeeded(dataRec.get(i).previousStateMetadata(), false, true, null)); + prevOrder = dataRec.get(i).writeVersion().order(); walRecCheckedCntr.incrementAndGet(); @@ -371,6 +384,31 @@ public void testOrderIncrease() throws Exception { } }; + conflictResolutionMgrSupplier = () -> new CacheVersionConflictResolver() { + @Override public GridCacheVersionConflictContext resolve( + CacheObjectValueContext ctx, + GridCacheVersionedEntryEx oldEntry, + GridCacheVersionedEntryEx newEntry, + Object prevStateMeta, + boolean atomicVerComparator + ) { + GridCacheVersionConflictContext res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry); + + res.useNew(); + + return res; + } + + @Override public Object previousStateMetadata(GridCacheEntryEx entry) { + // Checking Previous state meta delivery to the log records as well (using key); + return entry.key(); + } + + @Override public String toString() { + return "TestCacheConflictResolutionManager"; + } + }; + IgniteConfiguration cfg = getConfiguration("ignite-0"); IgniteEx ign = startGrid(cfg); @@ -418,7 +456,8 @@ private void addConflictData( val.prepareMarshal(intCache.context().cacheObjectContext()); - drMap.put(key, new GridCacheDrInfo(val, new GridCacheVersion(1, i, 1, OTHER_CLUSTER_ID))); + // Checking Previous state meta delivery to the resolver as well (using value). + drMap.put(key, new GridCacheDrInfo(val, new GridCacheVersion(1, i, 1, OTHER_CLUSTER_ID), val)); } if (concurrency != null) { diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index a9807d4720150b..a24b2abe67ef62 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -54,9 +54,16 @@ import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder; +import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty; import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -66,7 +73,12 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.visor.VisorTaskArgument; import org.apache.ignite.metric.MetricRegistry; +import org.apache.ignite.plugin.AbstractCachePluginProvider; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.plugin.CachePluginContext; +import org.apache.ignite.plugin.CachePluginProvider; import org.apache.ignite.testframework.junits.WithSystemProperty; +import org.jetbrains.annotations.Nullable; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -113,6 +125,9 @@ public class CdcSelfTest extends AbstractCdcTest { /** */ private long cdcWalDirMaxSize = DFLT_CDC_WAL_DIRECTORY_MAX_SIZE; + /** */ + private volatile Supplier conflictResolutionMgrSupplier; + /** */ @Parameterized.Parameters(name = "consistentId={0}, wal={1}, persistence={2}") public static Collection parameters() { @@ -148,6 +163,23 @@ public static Collection parameters() { .setBackups(1) ); + cfg.setPluginProviders(new AbstractTestPluginProvider() { + @Override public String name() { + return "ConflictResolverProvider"; + } + + @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { + return new AbstractCachePluginProvider() { + @Override public @Nullable Object createComponent(Class cls) { + if (cls != CacheConflictResolutionManager.class || conflictResolutionMgrSupplier == null) + return null; + + return new TestCacheConflictResolutionManager<>(); + } + }; + } + }); + return cfg; } @@ -186,6 +218,46 @@ public void testReadAllKeysCommitEachEvent() throws Exception { }, true); } + /** */ + @Test + public void testPreviousStateMetadataWritten() throws Exception { + conflictResolutionMgrSupplier = () -> new CacheVersionConflictResolver() { + @Override public GridCacheVersionConflictContext resolve( + CacheObjectValueContext ctx, + GridCacheVersionedEntryEx oldEntry, + GridCacheVersionedEntryEx newEntry, + Object prevStateMeta, + boolean atomicVerComparator + ) { + GridCacheVersionConflictContext res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry); + + res.useNew(); + + return res; + } + + @Override public Object previousStateMetadata(GridCacheEntryEx entry) { + // Checking Previous state meta delivery to the log records as well (using key); + return entry.key(); + } + + @Override public String toString() { + return "TestCacheConflictResolutionManager"; + } + }; + + try { + readAll(new UserCdcConsumer() { + @Override public void checkEvent(CdcEvent evt) { + assertEquals(evt.unwrappedKey(), evt.unwrappedPreviousStateMetadata()); + } + }, true); + } + finally { + conflictResolutionMgrSupplier = null; + } + } + /** */ @Test public void testReadExpireTime() throws Exception { @@ -220,7 +292,7 @@ public void testReadExpireTime() throws Exception { @Override public void checkEvent(CdcEvent evt) { super.checkEvent(evt); - Integer key = (Integer)evt.key(); + Integer key = (Integer)evt.unwrappedKey(); if (evt.value() == null || key % 2 != 0) { assertEquals("Expire time must not be set [key=" + key + ']', CU.EXPIRE_TIME_ETERNAL, evt.expireTime()); @@ -356,6 +428,15 @@ public void testReadOneByOneForBackup() throws Exception { AtomicBoolean firstEvt = new AtomicBoolean(true); CdcConsumer cnsmr = new CdcConsumer() { + @Override + public void start(MetricRegistry mreg) { + // No-op. + } + + @Override public void stop() { + // No-op. + } + @Override public boolean onEvents(Iterator evts) { if (!evts.hasNext()) return true; @@ -363,7 +444,7 @@ public void testReadOneByOneForBackup() throws Exception { if (!firstEvt.get()) throw new RuntimeException("Expected fail."); - data.add((Integer)evts.next().key()); + data.add((Integer)evts.next().unwrappedKey()); firstEvt.set(false); @@ -389,14 +470,6 @@ public void testReadOneByOneForBackup() throws Exception { @Override public void onCacheDestroy(Iterator caches) { caches.forEachRemaining(ce -> assertNotNull(ce)); } - - @Override public void stop() { - // No-op. - } - - @Override public void start(MetricRegistry mreg) { - // No-op. - } }; for (int j = 0; j < keysCnt; j++) { @@ -455,7 +528,7 @@ public void testReadFromNextEntry() throws Exception { CdcEvent evt = evts.next(); - assertEquals(expKey.get(), evt.key()); + assertEquals(expKey.get(), evt.unwrappedKey()); expKey.incrementAndGet(); @@ -748,7 +821,7 @@ public void testReReadWhenStateWasNotStored() throws Exception { data.computeIfAbsent( F.t(evt.value() == null ? DELETE : UPDATE, evt.cacheId()), - k -> new ArrayList<>()).add((Integer)evt.key() + k -> new ArrayList<>()).add((Integer)evt.unwrappedKey() ); if (consumeHalf.get()) @@ -901,4 +974,14 @@ private void removeData(IgniteCache cache, int from, int to) { for (int i = from; i < to; i++) cache.remove(i); } + + /** */ + public class TestCacheConflictResolutionManager extends GridCacheManagerAdapter + implements CacheConflictResolutionManager { + + /** {@inheritDoc} */ + @Override public CacheVersionConflictResolver conflictResolver() { + return conflictResolutionMgrSupplier.get(); + } + } } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/TransformedCdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/TransformedCdcSelfTest.java index 72880aec0a3aaf..c11d868d98cbc2 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/TransformedCdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/TransformedCdcSelfTest.java @@ -21,6 +21,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.processors.cache.transform.TestCacheObjectTransformerPluginProvider; import org.apache.ignite.internal.processors.cache.transform.TestCacheObjectTransformerProcessorAdapter; +import org.apache.ignite.plugin.PluginProvider; import static org.apache.ignite.internal.binary.GridBinaryMarshaller.TRANSFORMED; @@ -30,8 +31,15 @@ public class TransformedCdcSelfTest extends CdcSelfTest { /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - return super.getConfiguration(igniteInstanceName).setPluginProviders( - new TestCacheObjectTransformerPluginProvider(new CacheObjectShiftTransformer())); + PluginProvider[] providers = super.getConfiguration(igniteInstanceName).getPluginProviders(); + PluginProvider[] extendedProviders = new PluginProvider[providers.length + 1]; + + System.arraycopy(providers, 0, extendedProviders, 0, providers.length); + + extendedProviders[extendedProviders.length - 1] = + new TestCacheObjectTransformerPluginProvider(new CacheObjectShiftTransformer()); + + return super.getConfiguration(igniteInstanceName).setPluginProviders(extendedProviders); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index c73b9634823476..e084baf888dc00 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -17,10 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import java.util.Collection; -import java.util.Collections; -import java.util.UUID; -import javax.cache.Cache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.eviction.EvictableEntry; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -35,42 +31,37 @@ import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.jetbrains.annotations.Nullable; +import javax.cache.Cache; +import java.util.Collection; +import java.util.UUID; + /** * Test entry. */ @SuppressWarnings("unchecked") public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements GridCacheEntryEx { + /** Context */ + private final GridCacheContext context; + /** Key. */ - private KeyCacheObject key; + private final KeyCacheObject key; /** Val. */ - private CacheObject val; - - /** TTL. */ - private long ttl; + private final CacheObject val; /** Version. */ - private GridCacheVersion ver = new GridCacheVersion(0, 0, 1, 0); - - /** Obsolete version. */ - private GridCacheVersion obsoleteVer = ver; - - /** MVCC. */ - private GridCacheMvcc mvcc; + private final GridCacheVersion ver; - /** - * @param ctx Context. - * @param key Key. - */ - GridCacheTestEntryEx(GridCacheContext ctx, Object key) { - mvcc = new GridCacheMvcc(ctx); - - this.key = ctx.toCacheKeyObject(key); - } + /** TTL. */ + private final long ttl; - /** {@inheritDoc} */ - @Override public int memorySize() throws IgniteCheckedException { - return 1024; + /** */ + public GridCacheTestEntryEx(GridCacheContext context, KeyCacheObject key, CacheObject val, GridCacheVersion ver, long ttl) { + this.context = context; + this.key = key; + this.val = val; + this.ver = ver; + this.ttl = ttl; } /** {@inheritDoc} */ @@ -81,9 +72,21 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr return false; } + /** {@inheritDoc} */ + @Override + public boolean initialValue(CacheObject val, GridCacheVersion ver, long ttl, long expireTime, boolean preload, AffinityTopologyVersion topVer, GridDrType drType, boolean fromStore, boolean primary, @Nullable CacheDataRow row) throws IgniteCheckedException, GridCacheEntryRemovedException { + return false; + } + + /** {@inheritDoc} */ + @Override + public int memorySize() throws IgniteCheckedException { + return 0; + } + /** {@inheritDoc} */ @Override public boolean isInternal() { - return key instanceof GridCacheInternal; + return false; } /** {@inheritDoc} */ @@ -112,13 +115,13 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** {@inheritDoc} */ - @Nullable @Override public GridCacheContext context() { - return null; + @Override public boolean deleted() { + return false; } /** {@inheritDoc} */ - @Nullable @Override public EvictableEntry wrapEviction() { - return null; + @Override public GridCacheContext context() { + return context; } /** {@inheritDoc} */ @@ -127,640 +130,360 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** {@inheritDoc} */ - @Override public boolean partitionValid() { - return true; - } - - /** - * @param threadId Thread ID. - * @param ver Lock version. - * @param timeout Lock acquisition timeout. - * @param reenter Reentry flag ({@code true} if reentry is allowed). - * @param tx Transaction flag. - * @return New lock candidate if lock was added, or current owner if lock was reentered, - * or null if lock was owned by another thread and timeout is negative. - */ - @Nullable GridCacheMvccCandidate addLocal( - long threadId, - GridCacheVersion ver, - long timeout, - boolean reenter, - boolean tx) { - return mvcc.addLocal( - this, - threadId, - ver, - timeout, - reenter, - tx, - false, - false - ); - } - - /** - * Adds new lock candidate. - * - * @param nodeId Node ID. - * @param threadId Thread ID. - * @param ver Lock version. - * @param tx Transaction flag. - * @return Remote candidate. - */ - GridCacheMvccCandidate addRemote(UUID nodeId, long threadId, GridCacheVersion ver, - boolean tx) { - return mvcc.addRemote(this, nodeId, null, threadId, ver, tx, true, false); - } - - /** - * Adds new lock candidate. - * - * @param nodeId Node ID. - * @param threadId Thread ID. - * @param ver Lock version. - * @param tx Transaction flag. - * @return Remote candidate. - */ - GridCacheMvccCandidate addNearLocal(UUID nodeId, long threadId, GridCacheVersion ver, - boolean tx) { - return mvcc.addNearLocal(this, nodeId, null, threadId, ver, tx, true, false); - } - - /** - * - * @param baseVer Base version. - */ - void salvageRemote(GridCacheVersion baseVer) { - mvcc.salvageRemote(baseVer, false); - } - - /** - * Moves completed candidates right before the base one. Note that - * if base is not found, then nothing happens and {@code false} is - * returned. - * - * @param baseVer Base version. - * @param committedVers Committed versions relative to base. - * @param rolledbackVers Rolled back versions relative to base. - */ - void orderCompleted(GridCacheVersion baseVer, - Collection committedVers, Collection rolledbackVers) { - mvcc.orderCompleted(baseVer, committedVers, rolledbackVers); - } - - /** - * @param ver Version. - */ - void doneRemote(GridCacheVersion ver) { - mvcc.doneRemote(ver, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList()); - } - - /** - * @param baseVer Base version. - * @param owned Owned. - */ - void orderOwned(GridCacheVersion baseVer, GridCacheVersion owned) { - mvcc.markOwned(baseVer, owned); - } - - /** - * @param ver Lock version to acquire or set to ready. - */ - void readyLocal(GridCacheVersion ver) { - mvcc.readyLocal(ver); - } - - /** - * @param ver Ready near lock version. - * @param mapped Mapped version. - * @param committedVers Committed versions. - * @param rolledbackVers Rolled back versions. - * @param pending Pending versions. - */ - void readyNearLocal(GridCacheVersion ver, GridCacheVersion mapped, - Collection committedVers, Collection rolledbackVers, - Collection pending) { - mvcc.readyNearLocal(ver, mapped, committedVers, rolledbackVers, pending); - } - - /** - * @param cand Candidate to set to ready. - */ - void readyLocal(GridCacheMvccCandidate cand) { - mvcc.readyLocal(cand); - } - - /** - * Local release. - * - * @param threadId ID of the thread. - */ - void releaseLocal(long threadId) { - mvcc.releaseLocal(threadId); - } - - /** - * - */ - void recheckLock() { - mvcc.recheck(); - } - - /** {@inheritDoc} */ - @Override public GridCacheEntryInfo info() { - GridCacheEntryInfo info = new GridCacheEntryInfo(); - - info.key(key()); - info.value(val); - info.ttl(ttl()); - info.expireTime(expireTime()); - info.version(version()); - - return info; - } - - /** {@inheritDoc} */ - @Override public boolean valid(AffinityTopologyVersion topVer) { - return true; - } - - /** @inheritDoc */ @Override public KeyCacheObject key() { return key; } /** {@inheritDoc} */ @Override public IgniteTxKey txKey() { - return new IgniteTxKey(key, 0); + return null; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public CacheObject rawGet() { return val; } /** {@inheritDoc} */ @Override public boolean hasValue() { - return val != null; + return false; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public CacheObject rawPut(CacheObject val, long ttl) { - CacheObject old = this.val; - - this.ttl = ttl; - this.val = val; - - return old; + return null; } - /** @inheritDoc */ - @Override public Cache.Entry wrap() { - assert false; - + /** {@inheritDoc} */ + @Override public Cache.Entry wrap() { return null; } - /** @inheritDoc */ - @Override public Cache.Entry wrapLazyValue(boolean keepBinary) { - assert false; - + /** {@inheritDoc} */ + @Override public Cache.Entry wrapLazyValue(boolean keepBinary) { return null; } /** {@inheritDoc} */ - @Override public CacheEntryImplEx wrapVersioned() { - assert false; - + @Override public @Nullable CacheObject peekVisibleValue() { return null; } - /** @inheritDoc */ - @Nullable @Override public CacheObject peekVisibleValue() { - assert false; + /** {@inheritDoc} */ + @Override public EvictableEntry wrapEviction() { + return null; + } + /** {@inheritDoc} */ + @Override public CacheEntryImplEx wrapVersioned() { return null; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public GridCacheVersion obsoleteVersion() { - return obsoleteVer; + return null; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public boolean obsolete() { - return obsoleteVer != null; + return false; + } + + /** {@inheritDoc} */ + @Override public boolean obsoleteOrDeleted() { + return false; } /** {@inheritDoc} */ @Override public boolean obsolete(GridCacheVersion exclude) { - return obsoleteVer != null && !obsoleteVer.equals(exclude); + return false; } - /** @inheritDoc */ - @Override public boolean invalidate(GridCacheVersion newVer) - throws IgniteCheckedException { - assert false; + /** {@inheritDoc} */ + @Override public @Nullable GridCacheEntryInfo info() { + return null; + } + /** {@inheritDoc} */ + @Override public boolean invalidate(GridCacheVersion newVer) throws IgniteCheckedException { return false; } - /** @inheritDoc */ - @Override public boolean evictInternal(GridCacheVersion obsoleteVer, - @Nullable CacheEntryPredicate[] filter, boolean evictOffheap) { - assert false; - + /** {@inheritDoc} */ + @Override public boolean evictInternal(GridCacheVersion obsoleteVer, @Nullable CacheEntryPredicate[] filter, + boolean evictOffheap) { return false; } - /** @inheritDoc */ + /** {@inheritDoc} */ + @Override public void onMarkedObsolete() { + + } + + /** {@inheritDoc} */ @Override public boolean isNew() { - assert false; return false; + return false; } /** {@inheritDoc} */ - @Override public boolean isNewLocked() throws GridCacheEntryRemovedException { - assert false; return false; + @Override public boolean isNewLocked() { + return false; } - /** @inheritDoc */ - @Override public CacheObject innerGet( - @Nullable GridCacheVersion ver, - @Nullable IgniteInternalTx tx, - boolean readThrough, - boolean updateMetrics, - boolean evt, - Object transformClo, - String taskName, - @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean keepBinary) { - return val; + /** {@inheritDoc} */ + @Override public boolean valid(AffinityTopologyVersion topVer) { + return false; } - /** @inheritDoc */ - @Override public void clearReserveForLoad(GridCacheVersion ver) { - assert false; + /** {@inheritDoc} */ + @Override public boolean partitionValid() { + return false; } - /** @inheritDoc */ - @Override public EntryGetResult innerGetAndReserveForLoad( - boolean updateMetrics, - boolean evt, - String taskName, + /** {@inheritDoc} */ + @Override public CacheObject innerGet(@Nullable GridCacheVersion ver, @Nullable IgniteInternalTx tx, boolean readThrough, + boolean updateMetrics, boolean evt, Object transformClo, String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean keepBinary, - @Nullable ReaderArguments args) throws IgniteCheckedException, GridCacheEntryRemovedException { - assert false; - + boolean keepBinary) { return null; } - /** @inheritDoc */ - @Nullable @Override public EntryGetResult innerGetVersioned( - @Nullable GridCacheVersion ver, - IgniteInternalTx tx, - boolean updateMetrics, - boolean evt, - Object transformClo, - String taskName, - @Nullable IgniteCacheExpiryPolicy expiryPlc, + /** {@inheritDoc} */ + @Override public EntryGetResult innerGetVersioned(@Nullable GridCacheVersion ver, IgniteInternalTx tx, boolean updateMetrics, + boolean evt, Object transformClo, String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean keepBinary, @Nullable ReaderArguments readerArgs) { - assert false; - return null; } - /** @inheritDoc */ - @Override public CacheObject innerReload() { - return val; + /** {@inheritDoc} */ + @Override public EntryGetResult innerGetAndReserveForLoad(boolean updateMetrics, boolean evt, String taskName, + @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean keepBinary, + @Nullable ReaderArguments readerArgs) { + return null; } - /** @inheritDoc */ - @Override public GridCacheUpdateTxResult innerSet(@Nullable IgniteInternalTx tx, - UUID evtNodeId, - UUID affNodeId, - @Nullable CacheObject val, - boolean writeThrough, - boolean retval, - long ttl, - boolean evt, - boolean metrics, - boolean keepBinary, - boolean hasOldVal, - @Nullable CacheObject oldVal, - AffinityTopologyVersion topVer, - CacheEntryPredicate[] filter, - GridDrType drType, - long drExpireTime, - @Nullable GridCacheVersion drVer, - String taskName, - @Nullable GridCacheVersion dhtVer, - @Nullable Long updateCntr - ) throws IgniteCheckedException, GridCacheEntryRemovedException { - rawPut(val, ttl); - - return new GridCacheUpdateTxResult(true); - } - - /** {@inheritDoc} */ - @Override public GridCacheUpdateAtomicResult innerUpdate( - GridCacheVersion ver, - UUID evtNodeId, - UUID affNodeId, - GridCacheOperation op, - @Nullable Object val, - @Nullable Object[] invokeArgs, - boolean writeThrough, - boolean readThrough, - boolean retval, - boolean keepBinary, - @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean evt, - boolean metrics, - boolean primary, - boolean checkVer, - boolean readRepairRecovery, - AffinityTopologyVersion topVer, - @Nullable CacheEntryPredicate[] filter, - GridDrType drType, - long conflictTtl, - long conflictExpireTime, - @Nullable GridCacheVersion conflictVer, - boolean conflictResolve, - boolean intercept, - String taskName, - @Nullable CacheObject prevVal, - @Nullable Long updateCntr, - @Nullable GridDhtAtomicAbstractUpdateFuture fut, - boolean transformOp) - throws IgniteCheckedException, GridCacheEntryRemovedException { - assert false; + /** {@inheritDoc} */ + @Override public void clearReserveForLoad(GridCacheVersion ver) { + + } + /** {@inheritDoc} */ + @Override public @Nullable CacheObject innerReload() { return null; } - /** @inheritDoc */ - @Override public GridCacheUpdateTxResult innerRemove( - @Nullable IgniteInternalTx tx, - UUID evtNodeId, - UUID affNodeId, - boolean retval, - boolean evt, - boolean metrics, - boolean keepBinary, - boolean oldValPresent, - @Nullable CacheObject oldVal, - AffinityTopologyVersion topVer, - CacheEntryPredicate[] filter, - GridDrType drType, - @Nullable GridCacheVersion drVer, - String taskName, - @Nullable GridCacheVersion dhtVer, - @Nullable Long updateCntr - ) throws IgniteCheckedException, GridCacheEntryRemovedException { - obsoleteVer = ver; + /** {@inheritDoc} */ + @Override public GridCacheUpdateTxResult innerSet(@Nullable IgniteInternalTx tx, UUID evtNodeId, UUID affNodeId, + @Nullable CacheObject val, boolean writeThrough, boolean retval, long ttl, boolean evt, boolean metrics, + boolean keepBinary, boolean oldValPresent, @Nullable CacheObject oldVal, AffinityTopologyVersion topVer, + CacheEntryPredicate[] filter, GridDrType drType, long drExpireTime, @Nullable GridCacheVersion explicitVer, + String taskName, @Nullable GridCacheVersion dhtVer, + @Nullable Long updateCntr) { + return null; + } - val = null; + /** {@inheritDoc} */ + @Override public GridCacheUpdateTxResult innerRemove(@Nullable IgniteInternalTx tx, UUID evtNodeId, UUID affNodeId, + boolean retval, boolean evt, boolean metrics, boolean keepBinary, boolean oldValPresent, + @Nullable CacheObject oldVal, AffinityTopologyVersion topVer, CacheEntryPredicate[] filter, GridDrType drType, + @Nullable GridCacheVersion explicitVer, String taskName, @Nullable GridCacheVersion dhtVer, + @Nullable Long updateCntr) { + return null; + } - return new GridCacheUpdateTxResult(true); + /** {@inheritDoc} */ + @Override public GridCacheUpdateAtomicResult innerUpdate(GridCacheVersion ver, UUID evtNodeId, UUID affNodeId, + GridCacheOperation op, @Nullable Object val, @Nullable Object[] invokeArgs, boolean writeThrough, + boolean readThrough, boolean retval, boolean keepBinary, @Nullable IgniteCacheExpiryPolicy expiryPlc, + boolean evt, boolean metrics, boolean primary, boolean checkVer, boolean readRepairRecovery, + AffinityTopologyVersion topVer, @Nullable CacheEntryPredicate[] filter, GridDrType drType, long conflictTtl, + long conflictExpireTime, @Nullable GridCacheVersion conflictVer, CacheObject prevStateMeta, + boolean conflictResolve, boolean intercept, String taskName, @Nullable CacheObject prevVal, + @Nullable Long updateCntr, @Nullable GridDhtAtomicAbstractUpdateFuture fut, + boolean transformOp) { + return null; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public boolean clear(GridCacheVersion ver, boolean readers) throws IgniteCheckedException { - if (ver == null || ver.equals(this.ver)) { - val = null; - - return true; - } - return false; } - /** @inheritDoc */ - @Override public boolean tmLock(IgniteInternalTx tx, - long timeout, - @Nullable GridCacheVersion serOrder, - GridCacheVersion serReadVer, + /** {@inheritDoc} */ + @Override public boolean tmLock(IgniteInternalTx tx, long timeout, @Nullable GridCacheVersion serOrder, + @Nullable GridCacheVersion serReadVer, boolean read) { - assert false; return false; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public void txUnlock(IgniteInternalTx tx) { - assert false; - } - - /** @inheritDoc */ - @Override public boolean removeLock(GridCacheVersion ver) { - GridCacheMvccCandidate doomed = mvcc.candidate(ver); - - mvcc.remove(ver); - return doomed != null; } - /** @inheritDoc */ - @Override public boolean markObsolete(GridCacheVersion ver) { - if (ver == null || ver.equals(obsoleteVer)) { - obsoleteVer = ver; - - val = null; - - return true; - } - + /** {@inheritDoc} */ + @Override public boolean removeLock(GridCacheVersion ver) { return false; } /** {@inheritDoc} */ - @Override public void onMarkedObsolete() { - // No-op. + @Override public boolean markObsolete(GridCacheVersion ver) { + return false; } /** {@inheritDoc} */ - @Override public boolean markObsoleteIfEmpty(GridCacheVersion ver) { - if (val == null) - obsoleteVer = ver; - - return obsoleteVer != null; + @Override public boolean markObsoleteIfEmpty(@Nullable GridCacheVersion ver) { + return false; } /** {@inheritDoc} */ @Override public boolean markObsoleteVersion(GridCacheVersion ver) { - if (this.ver.equals(ver)) { - obsoleteVer = ver; - - return true; - } - return false; } - /** @inheritDoc */ - @Override public GridCacheVersion version() { + /** {@inheritDoc} */ + @Override public GridCacheVersion version() throws GridCacheEntryRemovedException { return ver; } - /** @inheritDoc */ - @Override public boolean checkSerializableReadVersion(GridCacheVersion ver) { - assert false; - + /** {@inheritDoc} */ + @Override public boolean checkSerializableReadVersion(GridCacheVersion serReadVer) { return false; } - /** @inheritDoc */ - @Override public boolean initialValue( - CacheObject val, - GridCacheVersion ver, - long ttl, - long expireTime, - boolean preload, - AffinityTopologyVersion topVer, - GridDrType drType, - boolean fromStore, - boolean primary, - CacheDataRow row - ) throws IgniteCheckedException, GridCacheEntryRemovedException { - assert false; + /** {@inheritDoc} */ + @Override public @Nullable CacheObject peek(boolean heap, boolean offheap, AffinityTopologyVersion topVer, + @Nullable IgniteCacheExpiryPolicy plc) throws GridCacheEntryRemovedException, IgniteCheckedException { + return null; + } - return false; + /** {@inheritDoc} */ + @Override public @Nullable CacheObject peek() throws GridCacheEntryRemovedException, IgniteCheckedException { + return null; } - /** @inheritDoc */ - @Override public GridCacheVersionedEntryEx versionedEntry(final boolean keepBinary) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public GridCacheVersionedEntryEx versionedEntry(boolean keepBinary) { return null; } - /** @inheritDoc */ - @Override public EntryGetResult versionedValue(CacheObject val, - GridCacheVersion curVer, - GridCacheVersion newVer, - @Nullable IgniteCacheExpiryPolicy loadExpiryPlc, + /** {@inheritDoc} */ + @Override public EntryGetResult versionedValue(CacheObject val, @Nullable GridCacheVersion curVer, + @Nullable GridCacheVersion newVer, @Nullable IgniteCacheExpiryPolicy loadExpiryPlc, @Nullable ReaderArguments readerArgs) { - assert false; - return null; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public boolean hasLockCandidate(GridCacheVersion ver) { - return mvcc.hasCandidate(ver); + return false; } - /** @inheritDoc */ - @Override public boolean lockedByAny(GridCacheVersion... exclude) { - return !mvcc.isEmpty(exclude); + /** {@inheritDoc} */ + @Override public boolean hasLockCandidate(long threadId) { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean lockedByAny(GridCacheVersion... exclude) throws GridCacheEntryRemovedException { + return false; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public boolean lockedByThread() { - return lockedByThread(Thread.currentThread().getId()); + return false; } - /** @inheritDoc */ - @Override public boolean lockedLocally(GridCacheVersion lockVer) { - return mvcc.isLocallyOwned(lockVer); + /** {@inheritDoc} */ + @Override public boolean lockedLocallyByIdOrThread(GridCacheVersion lockVer, long threadId) { + return false; } /** {@inheritDoc} */ - @Override public boolean lockedLocallyByIdOrThread(GridCacheVersion lockVer, long threadId) - throws GridCacheEntryRemovedException { - return lockedLocally(lockVer) || lockedByThread(threadId); + @Override public boolean lockedLocally(GridCacheVersion lockVer) { + return false; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public boolean lockedByThread(long threadId, GridCacheVersion exclude) { - return mvcc.isLocallyOwnedByThread(threadId, false, exclude); + return false; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public boolean lockedByThread(long threadId) { - return mvcc.isLocallyOwnedByThread(threadId, true); + return false; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public boolean lockedBy(GridCacheVersion ver) { - return mvcc.isOwnedBy(ver); + return false; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public boolean lockedByThreadUnsafe(long threadId) { - return mvcc.isLocallyOwnedByThread(threadId, true); + return false; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public boolean lockedByUnsafe(GridCacheVersion ver) { - return mvcc.isOwnedBy(ver); + return false; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public boolean lockedLocallyUnsafe(GridCacheVersion lockVer) { - return mvcc.isLocallyOwned(lockVer); + return false; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public boolean hasLockCandidateUnsafe(GridCacheVersion ver) { - return mvcc.hasCandidate(ver); + return false; } - /** @inheritDoc */ - @Override public Collection localCandidates(GridCacheVersion... exclude) { - return mvcc.localCandidates(exclude); + /** {@inheritDoc} */ + @Override public @Nullable GridCacheMvccCandidate localCandidate(long threadId) { + return null; } - /** @inheritDoc */ - Collection localCandidates(boolean reentries, GridCacheVersion... exclude) { - return mvcc.localCandidates(reentries, exclude); + /** {@inheritDoc} */ + @Override public Collection localCandidates(@Nullable GridCacheVersion... exclude) { + return null; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public Collection remoteMvccSnapshot(GridCacheVersion... exclude) { - return mvcc.remoteCandidates(exclude); + return null; } /** {@inheritDoc} */ - @Override public GridCacheMvccCandidate localCandidate(long threadId) throws GridCacheEntryRemovedException { - return mvcc.localCandidate(threadId); - } - - /** @inheritDoc */ - @Override public GridCacheMvccCandidate candidate(GridCacheVersion ver) { - return mvcc.candidate(ver); + @Override public @Nullable GridCacheMvccCandidate candidate(GridCacheVersion ver) { + return null; } /** {@inheritDoc} */ - @Override public GridCacheMvccCandidate candidate(UUID nodeId, long threadId) - throws GridCacheEntryRemovedException { - return mvcc.remoteCandidate(nodeId, threadId); + @Override public @Nullable GridCacheMvccCandidate candidate(UUID nodeId, long threadId) { + return null; } - /** - * @return Any MVCC owner. - */ - GridCacheMvccCandidate anyOwner() { - return mvcc.anyOwner(); + /** {@inheritDoc} */ + @Override public @Nullable GridCacheMvccCandidate localOwner() { + return null; } - /** @inheritDoc */ - @Override public GridCacheMvccCandidate localOwner() { - return mvcc.localOwner(); + /** {@inheritDoc} */ + @Override public CacheObject valueBytes() throws GridCacheEntryRemovedException { + return null; } - /** @inheritDoc */ - @Override public CacheObject valueBytes() { - assert false; - + /** {@inheritDoc} */ + @Override public @Nullable CacheObject valueBytes( + @Nullable GridCacheVersion ver) throws IgniteCheckedException, GridCacheEntryRemovedException { return null; } - /** @inheritDoc */ - @Override public CacheObject valueBytes(GridCacheVersion ver) { - assert false; + /** {@inheritDoc} */ + @Override public void updateIndex(SchemaIndexCacheVisitorClosure clo) { - return null; } /** {@inheritDoc} */ @@ -768,8 +491,8 @@ GridCacheMvccCandidate anyOwner() { return 0; } - /** @inheritDoc */ - @Override public long expireTime() { + /** {@inheritDoc} */ + @Override public long expireTime() throws GridCacheEntryRemovedException { return 0; } @@ -788,83 +511,51 @@ GridCacheMvccCandidate anyOwner() { return ttl; } - /** @inheritDoc */ - @Override public long ttl() { + /** {@inheritDoc} */ + @Override public long ttl() throws GridCacheEntryRemovedException { return ttl; } - /** @inheritDoc */ + /** {@inheritDoc} */ @Override public void updateTtl(GridCacheVersion ver, IgniteCacheExpiryPolicy expiryPlc) { - throw new UnsupportedOperationException(); - } - /** @inheritDoc */ - @Override public void updateTtl(GridCacheVersion ver, long ttl) { - throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public CacheObject unswap() throws IgniteCheckedException { - return null; - } + @Override public void updateTtl(@Nullable GridCacheVersion ver, long ttl) { - /** {@inheritDoc} */ - @Override public CacheObject unswap(boolean needVal) throws IgniteCheckedException { - return null; } /** {@inheritDoc} */ - @Override public CacheObject unswap(CacheDataRow row) throws IgniteCheckedException { + @Override public @Nullable CacheObject unswap() throws IgniteCheckedException, GridCacheEntryRemovedException { return null; } /** {@inheritDoc} */ - @Override public boolean hasLockCandidate(long threadId) throws GridCacheEntryRemovedException { - return localCandidate(threadId) != null; - } - - /** {@inheritDoc} */ - @Override public void updateIndex(SchemaIndexCacheVisitorClosure clo) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean deleted() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean obsoleteOrDeleted() { - return false; - } - - /** {@inheritDoc} */ - @Nullable @Override public CacheObject peek(boolean heap, - boolean offheap, - AffinityTopologyVersion topVer, - @Nullable IgniteCacheExpiryPolicy plc) { + @Override public @Nullable CacheObject unswap( + CacheDataRow row) throws IgniteCheckedException, GridCacheEntryRemovedException { return null; } /** {@inheritDoc} */ - @Nullable @Override public CacheObject peek() - throws GridCacheEntryRemovedException, IgniteCheckedException { + @Override + public @Nullable CacheObject unswap(boolean needVal) throws IgniteCheckedException, GridCacheEntryRemovedException { return null; } /** {@inheritDoc} */ @Override public void onUnlock() { - // No-op. + } /** {@inheritDoc} */ @Override public void lockEntry() { - // No-op. + } /** {@inheritDoc} */ @Override public void unlockEntry() { - // No-op. + } /** {@inheritDoc} */ @@ -879,6 +570,6 @@ GridCacheMvccCandidate anyOwner() { /** {@inheritDoc} */ @Override public void touch() { - context().evicts().touch(this); + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ReadRepairDataGenerator.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ReadRepairDataGenerator.java index f73ed0f31a8b94..811251d06a8930 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ReadRepairDataGenerator.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ReadRepairDataGenerator.java @@ -385,6 +385,7 @@ private InconsistentMapping setDifferentValuesForSameKey( 0, 0, null, + null, false, false, null, diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java index 05d3d894720c9c..5b59e6dc3566f7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@ -1555,6 +1555,7 @@ public static class TestCacheConflictResolutionManager extends GridCacheMa CacheObjectValueContext ctx, GridCacheVersionedEntryEx oldEntry, GridCacheVersionedEntryEx newEntry, + Object prevStateMeta, boolean atomicVerComparator ) { GridCacheVersionConflictContext res = diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java index 7793c994f3822a..42b54b08a88cae 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java @@ -336,7 +336,7 @@ private void checkDataWalEntries() throws Exception { entries.add(new DataEntry(cctx.cacheId(), key, val, op, null, cctx.cache().nextVersion(), 0L, - cctx.affinity().partition(i), i, DataEntry.EMPTY_FLAGS)); + cctx.affinity().partition(i), i, null, DataEntry.EMPTY_FLAGS)); } UUID cpId = UUID.randomUUID(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java index 730b725799882b..90797f7fae5221 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java @@ -892,6 +892,7 @@ public void testSwitchHistoricalRebalanceToFullWhileIteratingOverWAL() throws Ex 0, 0, 0, + null, DataEntry.EMPTY_FLAGS ))); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java index 81dbd9a87cfa02..3780905fe02663 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelf2Test.java @@ -535,6 +535,7 @@ public void testCheckFailOnCorruptedData() throws Exception { 0, 0, null, + null, false, false, null, @@ -1068,6 +1069,7 @@ public class TestCacheConflictResolutionManager extends GridCacheManagerAd CacheObjectValueContext ctx, GridCacheVersionedEntryEx oldEntry, GridCacheVersionedEntryEx newEntry, + Object prevStateMeta, boolean atomicVerComparator ) { GridCacheVersionConflictContext res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry); diff --git a/modules/core/src/test/java/org/apache/ignite/util/TestStorageUtils.java b/modules/core/src/test/java/org/apache/ignite/util/TestStorageUtils.java index 9c696e81355c07..de9aa8adcb523c 100644 --- a/modules/core/src/test/java/org/apache/ignite/util/TestStorageUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/util/TestStorageUtils.java @@ -66,6 +66,7 @@ public static void corruptDataEntry( 0L, partId, breakCntr ? locPart.updateCounter() + 1 : locPart.updateCounter(), + null, DataEntry.EMPTY_FLAGS); IgniteCacheDatabaseSharedManager db = ctx.shared().database(); diff --git a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/DataEntryWrapper.java b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/DataEntryWrapper.java index d63c198e392452..c6d6ea5f5930d7 100644 --- a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/DataEntryWrapper.java +++ b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/DataEntryWrapper.java @@ -66,6 +66,7 @@ public DataEntryWrapper( dataEntry.expireTime(), dataEntry.partitionId(), dataEntry.partitionCounter(), + dataEntry.previousStateMetadata(), dataEntry.flags() ); diff --git a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterSensitiveDataTest.java b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterSensitiveDataTest.java index 866b5087e9b86c..61d614e42c9b67 100644 --- a/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterSensitiveDataTest.java +++ b/modules/dev-utils/src/test/java/org/apache/ignite/development/utils/IgniteWalConverterSensitiveDataTest.java @@ -300,6 +300,7 @@ private Collection withSensitiveData() { 0, 0, 0, + null, DataEntry.EMPTY_FLAGS ); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java index 8691a0a91d28b5..d8ca8d756857de 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java @@ -233,12 +233,12 @@ public static class BinaryCdcConsumer extends TestCdcConsumer { return; if (evt.cacheId() == cacheId(USER)) { - int id = ((BinaryObject)evt.key()).field(ID); - int cityId = ((BinaryObject)evt.key()).field(CITY_ID); + int id = ((BinaryObject)evt.unwrappedKey()).field(ID); + int cityId = ((BinaryObject)evt.unwrappedKey()).field(CITY_ID); assertEquals(42 * id, cityId); - String name = ((BinaryObject)evt.value()).field(NAME); + String name = ((BinaryObject)evt.unwrappedValue()).field(NAME); if (id % 2 == 0) assertTrue(name.startsWith(JOHN)); @@ -246,9 +246,9 @@ public static class BinaryCdcConsumer extends TestCdcConsumer { assertTrue(name.startsWith(SARAH)); } else { - int id = (Integer)evt.key(); - String name = ((BinaryObject)evt.value()).field(NAME); - String zipCode = ((BinaryObject)evt.value()).field(ZIP_CODE); + int id = (Integer)evt.unwrappedKey(); + String name = ((BinaryObject)evt.unwrappedValue()).field(NAME); + String zipCode = ((BinaryObject)evt.unwrappedValue()).field(ZIP_CODE); assertEquals(Integer.toString(127000 + id), zipCode);