Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
anton-vinogradov committed Jul 24, 2024
1 parent 5c3cf86 commit 1517de3
Show file tree
Hide file tree
Showing 51 changed files with 935 additions and 755 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,15 @@

package org.apache.ignite.util;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cdc.AbstractCdcTest;
import org.apache.ignite.cdc.CdcCacheEvent;
import org.apache.ignite.cdc.CdcConfiguration;
import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.cdc.TypeMapping;
import org.apache.ignite.cdc.*;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
Expand All @@ -54,15 +37,14 @@
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;

import java.util.*;

import static org.apache.ignite.cdc.AbstractCdcTest.KEYS_CNT;
import static org.apache.ignite.cdc.CdcSelfTest.addData;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
import static org.apache.ignite.testframework.GridTestUtils.stopThreads;
import static org.apache.ignite.util.CdcCommandTest.CDC;
import static org.apache.ignite.util.CdcCommandTest.RESEND;
import static org.apache.ignite.util.CdcCommandTest.runCdc;
import static org.apache.ignite.util.CdcCommandTest.waitForSize;
import static org.apache.ignite.util.CdcCommandTest.*;
import static org.apache.ignite.util.GridCommandHandlerClusterByClassTest.CACHES;

/**
Expand Down Expand Up @@ -257,10 +239,11 @@ private static class AlwaysNewResolutionManager<K, V>
AlwaysNewResolutionManager() {
rslv = new CacheVersionConflictResolver() {
@Override public <K1, V1> GridCacheVersionConflictContext<K1, V1> resolve(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K1, V1> oldEntry,
GridCacheVersionedEntryEx<K1, V1> newEntry,
boolean atomicVerComparator
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K1, V1> oldEntry,
GridCacheVersionedEntryEx<K1, V1> newEntry,
Object prevStateMeta,
boolean atomicVerComparator
) {
GridCacheVersionConflictContext<K1, V1> res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);

Expand Down
26 changes: 24 additions & 2 deletions modules/core/src/main/java/org/apache/ignite/cdc/CdcEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.spi.systemview.view.CacheView;
import org.jetbrains.annotations.Nullable;
Expand All @@ -39,12 +41,32 @@ public interface CdcEvent extends Serializable {
/**
* @return Key for the changed entry.
*/
public Object key();
public KeyCacheObject key();

/**
* @return Value for the changed entry or {@code null} in case of entry removal.
*/
@Nullable public Object value();
@Nullable public CacheObject value();

/**
* @return Previous entry state metadata if expected.
*/
@Nullable public CacheObject previousStateMetadata();

/**
* @return Key which was placed into cache. Or null if failed to convert.
*/
public Object unwrappedKey();

/**
* @return Value which was placed into cache. Or null for delete operation or for failure.
*/
public Object unwrappedValue();

/**
* @return Previous entry state metadata.
*/
public Object unwrappedPreviousStateMetadata();

/**
* @return {@code True} if event fired on primary node for partition containing this entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;

/**
* Event of single entry change.
Expand All @@ -33,87 +38,69 @@ public class CdcEventImpl implements CdcEvent {
/** Serial version uid. */
private static final long serialVersionUID = 0L;

/** Key. */
private final Object key;
/** Entry. */
private final DataEntry entry;

/** Value. */
private final Object val;

/** {@code True} if changes made on primary node. */
private final boolean primary;

/** Partition. */
private final int part;
/**
* @param entry Entry.
*/
public CdcEventImpl(DataEntry entry) {
this.entry = entry;
}

/** Order of the entry change. */
private final CacheEntryVersion ord;
/** {@inheritDoc} */
@Override public Object unwrappedKey() {
return ((UnwrapDataEntry)(entry)).unwrappedKey();
}

/** Cache id. */
private final int cacheId;
/** {@inheritDoc} */
@Override public Object unwrappedValue() {
return ((UnwrapDataEntry)(entry)).unwrappedValue();
}

/** Expire time. */
private final long expireTime;
/** {@inheritDoc} */
@Override public Object unwrappedPreviousStateMetadata() {
return ((UnwrapDataEntry)(entry)).unwrappedPreviousStateMetadata();
}

/**
* @param key Key.
* @param val Value.
* @param primary {@code True} if changes made on primary node.
* @param part Partition.
* @param ord Order of the entry change.
* @param cacheId Cache id.
* @param expireTime Expire time.
*/
public CdcEventImpl(
Object key,
Object val,
boolean primary,
int part,
CacheEntryVersion ord,
int cacheId,
long expireTime
) {
this.key = key;
this.val = val;
this.primary = primary;
this.part = part;
this.ord = ord;
this.cacheId = cacheId;
this.expireTime = expireTime;
/** {@inheritDoc} */
@Override public KeyCacheObject key() {
return entry.key();
}

/** {@inheritDoc} */
@Override public Object key() {
return key;
@Override public CacheObject value() {
return entry.value();
}

/** {@inheritDoc} */
@Override public Object value() {
return val;
@Override public @Nullable CacheObject previousStateMetadata() {
return entry.previousStateMetadata();
}

/** {@inheritDoc} */
@Override public boolean primary() {
return primary;
return (entry.flags() & DataEntry.PRIMARY_FLAG) != 0;
}

/** {@inheritDoc} */
@Override public int partition() {
return part;
return entry.partitionId();
}

/** {@inheritDoc} */
@Override public CacheEntryVersion version() {
return ord;
return entry.writeVersion();
}

/** {@inheritDoc} */
@Override public int cacheId() {
return cacheId;
return entry.cacheId();
}

/** {@inheritDoc} */
@Override public long expireTime() {
return expireTime;
return entry.expireTime();
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ private void consumeSegmentActively(IgniteWalIteratorFactory.IteratorParametersB
boolean interrupted;

do {
boolean commit = consumer.onRecords(iter, WalRecordsConsumer.CDC_EVENT_TRANSFORMER, null);
boolean commit = consumer.onRecords(iter, null);

if (commit)
saveStateAndRemoveProcessed(iter.state());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -89,21 +88,6 @@ public class WalRecordsConsumer<K, V> {
return OPERATIONS_TYPES.contains(e.op());
};

/** Event transformer. */
static final IgniteClosure<DataEntry, CdcEvent> CDC_EVENT_TRANSFORMER = e -> {
UnwrapDataEntry ue = (UnwrapDataEntry)e;

return new CdcEventImpl(
ue.unwrappedKey(),
ue.unwrappedValue(),
(e.flags() & DataEntry.PRIMARY_FLAG) != 0,
e.partitionId(),
e.writeVersion(),
e.cacheId(),
e.expireTime()
);
};

/**
* @param consumer User provided CDC consumer.
* @param log Logger.
Expand All @@ -119,15 +103,10 @@ public WalRecordsConsumer(CdcConsumer consumer, IgniteLogger log) {
* {@link DataRecord} will be stored and WAL iteration will be started from it on CDC application fail/restart.
*
* @param entries Data entries iterator.
* @param transform Event transformer.
* @param filter Optional event filter.
* @return {@code True} if current offset in WAL should be commited.
*/
public boolean onRecords(
Iterator<DataEntry> entries,
IgniteClosure<DataEntry, CdcEvent> transform,
@Nullable IgnitePredicate<? super DataEntry> filter
) {
public boolean onRecords(Iterator<DataEntry> entries, @Nullable IgnitePredicate<? super DataEntry> filter) {
Iterator<CdcEvent> evts = F.iterator(new Iterator<DataEntry>() {
@Override public boolean hasNext() {
return entries.hasNext();
Expand All @@ -142,7 +121,7 @@ public boolean onRecords(

return next;
}
}, transform, true, OPERATIONS_FILTER, filter);
}, CdcEventImpl::new, true, OPERATIONS_FILTER, filter);

return consumer.onEvents(evts);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ private void resendCacheData(IgniteInternalCache<?, ?> cache) throws IgniteCheck
row.expireTime(),
key.partition(),
-1,
null, // Conflict resolve should not happen at data copying.
DataEntry.flags(true))
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class DataEntry {
/** */
public static final byte FROM_STORE_FLAG = 0b00000100;

/** */
public static final byte PREV_STATE_FLAG = 0b00001000;

/** Cache ID. */
@GridToStringInclude
protected int cacheId;
Expand Down Expand Up @@ -74,6 +77,9 @@ public class DataEntry {
@GridToStringInclude
protected long partCnt;

/** Previous entry state metadata. */
protected CacheObject prevStateMeta;

/**
* Bit flags.
* <ul>
Expand All @@ -100,6 +106,7 @@ private DataEntry() {
* @param expireTime Expire time.
* @param partId Partition ID.
* @param partCnt Partition counter.
* @param prevStateMeta Previous entry state metadata.
* @param flags Entry flags.
*/
public DataEntry(
Expand All @@ -112,6 +119,7 @@ public DataEntry(
long expireTime,
int partId,
long partCnt,
CacheObject prevStateMeta,
byte flags
) {
this.cacheId = cacheId;
Expand All @@ -123,8 +131,12 @@ public DataEntry(
this.expireTime = expireTime;
this.partId = partId;
this.partCnt = partCnt;
this.prevStateMeta = prevStateMeta;
this.flags = flags;

if (this.prevStateMeta != null)
this.flags |= PREV_STATE_FLAG;

// Only READ, CREATE, UPDATE and DELETE operations should be stored in WAL.
assert op == GridCacheOperation.READ
|| op == GridCacheOperation.CREATE
Expand Down Expand Up @@ -231,6 +243,13 @@ public long expireTime() {
return expireTime;
}

/**
* Previous entry state metadata.
*/
public CacheObject previousStateMetadata() {
return prevStateMeta;
}

/**
* Entry flags.
* @see #flags
Expand Down
Loading

0 comments on commit 1517de3

Please sign in to comment.