Skip to content

Commit

Permalink
IGNITE-23333 СDC is no longer experimental (#11567)
Browse files Browse the repository at this point in the history
  • Loading branch information
luchnikovbsk authored Feb 7, 2025
1 parent fda9dbd commit 2d57d27
Show file tree
Hide file tree
Showing 17 changed files with 33 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
// limitations under the License.
= Cross-cluster Replication Extension

WARNING: Change Data Capture (CDC) and Cross-cluster Replication Extension are experimental features. API or design architecture might be changed.

== Overview
link:https://github.com/apache/ignite-extensions/tree/master/modules/cdc-ext[Cross-cluster Replication Extension] module provides the following ways to set up cross-cluster replication based on CDC.

Expand Down
2 changes: 0 additions & 2 deletions docs/_docs/persistence/change-data-capture.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
== Overview
Change Data Capture (link:https://en.wikipedia.org/wiki/Change_data_capture[CDC]) is a data processing pattern used to asynchronously receive entries that have been changed on the local node so that action can be taken using the changed entry.

WARNING: CDC is an experimental feature. API or design architecture might be changed.

Below are some CDC use cases:

* Streaming changes in Warehouse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.spi.systemview.view.CacheView;

/**
Expand All @@ -34,7 +33,6 @@
* @see CacheConfiguration
* @see QueryEntity
*/
@IgniteExperimental
public interface CdcCacheEvent {
/**
* @return Cache ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,12 @@
package org.apache.ignite.cdc;

import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.spi.metric.MetricExporterSpi;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;

/**
* This class defines {@link CdcMain} runtime configuration.
* Configuration is passed to {@link CdcMain} constructor.
* This class defines ignite-cdc runtime configuration.
*/
@IgniteExperimental
public class CdcConfiguration {
/** */
private static final int DFLT_LOCK_TIMEOUT = 1000;
Expand All @@ -48,7 +44,7 @@ public class CdcConfiguration {
private boolean keepBinary = DFLT_KEEP_BINARY;

/**
* {@link CdcMain} acquire file lock on startup to ensure exclusive consumption.
* Ignite-cdc process acquire file lock on startup to ensure exclusive consumption.
* This property specifies amount of time to wait for lock acquisition.<br>
* Default is {@code 1000 ms}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@
import org.apache.ignite.binary.BinaryIdMapper;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.metric.MetricRegistry;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.systemview.view.CacheView;

/**
* Consumer of WAL data change events.
* This consumer will receive data change events during {@link CdcMain} application invocation.
* This consumer will receive data change events during ignite-cdc process invocation.
* The lifecycle of the consumer is the following:
* <ul>
* <li>Start of the consumer {@link #start(MetricRegistry)}.</li>
Expand Down Expand Up @@ -61,11 +59,9 @@
* </ul>
* Note, {@link CdcConsumer} receive notifications on each running CDC application(node).
*
* @see CdcMain
* @see CdcEvent
* @see CacheEntryVersion
*/
@IgniteExperimental
public interface CdcConsumer {
/**
* Starts the consumer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,15 @@
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.spi.systemview.view.CacheView;
import org.jetbrains.annotations.Nullable;

/**
* Event of single entry change.
* Instance presents new value of modified entry.
*
* @see CdcMain
* @see CdcConsumer
*/
@IgniteExperimental
public interface CdcEvent extends Serializable {
/**
* @return Key for the changed entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.binary.BinaryIdMapper;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.marshaller.MarshallerContext;
import org.apache.ignite.platform.PlatformType;

Expand All @@ -38,7 +37,6 @@
* @see CdcConsumer#onMappings(Iterator)
* @see MarshallerContext#registerClassName(byte, int, String, boolean)
*/
@IgniteExperimental
public interface TypeMapping extends Serializable {
/** @return Type id. */
public int typeId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.ignite.DataRegionMetrics;
import org.apache.ignite.internal.mem.IgniteOutOfMemoryException;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.mem.MemoryAllocator;
import org.apache.ignite.mxbean.MetricsMxBean;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -157,7 +156,6 @@ public final class DataRegionConfiguration implements Serializable {
@Nullable private MemoryAllocator memoryAllocator = null;

/** Change Data Capture enabled flag. */
@IgniteExperimental
private boolean cdcEnabled;

/**
Expand Down Expand Up @@ -566,7 +564,6 @@ public DataRegionConfiguration setWarmUpConfiguration(@Nullable WarmUpConfigurat
* @param cdcEnabled CDC enabled flag.
* @return {@code this} for chaining.
*/
@IgniteExperimental
public DataRegionConfiguration setCdcEnabled(boolean cdcEnabled) {
this.cdcEnabled = cdcEnabled;

Expand All @@ -579,7 +576,6 @@ public DataRegionConfiguration setCdcEnabled(boolean cdcEnabled) {
*
* @return CDC enabled flag.
*/
@IgniteExperimental
public boolean isCdcEnabled() {
return cdcEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.mem.MemoryAllocator;
import org.apache.ignite.mxbean.MetricsMxBean;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -239,11 +238,9 @@ public class DataStorageConfiguration implements Serializable {
private String walArchivePath = DFLT_WAL_ARCHIVE_PATH;

/** Change Data Capture path. */
@IgniteExperimental
private String cdcWalPath = DFLT_WAL_CDC_PATH;

/** Change Data Capture directory size limit. */
@IgniteExperimental
private long cdcWalDirMaxSize = DFLT_CDC_WAL_DIRECTORY_MAX_SIZE;

/**
Expand Down Expand Up @@ -298,7 +295,6 @@ public class DataStorageConfiguration implements Serializable {
private long walAutoArchiveAfterInactivity = -1;

/** Time interval (in milliseconds) after last log of data change for force archiving of incompletely WAL segment. */
@IgniteExperimental
private long walForceArchiveTimeout = -1;

/** If true, threads that generate dirty pages too fast during ongoing checkpoint will be throttled. */
Expand Down Expand Up @@ -787,7 +783,6 @@ public DataStorageConfiguration setWalArchivePath(String walArchivePath) {
*
* @return CDC directory.
*/
@IgniteExperimental
public String getCdcWalPath() {
return cdcWalPath;
}
Expand All @@ -799,7 +794,6 @@ public String getCdcWalPath() {
* @param cdcWalPath CDC directory.
* @return {@code this} for chaining.
*/
@IgniteExperimental
public DataStorageConfiguration setCdcWalPath(String cdcWalPath) {
A.notNull(cdcWalPath, "CDC WAL");

Expand All @@ -813,7 +807,6 @@ public DataStorageConfiguration setCdcWalPath(String cdcWalPath) {
*
* @return CDC directory maximum size in bytes.
*/
@IgniteExperimental
public long getCdcWalDirectoryMaxSize() {
return cdcWalDirMaxSize;
}
Expand All @@ -827,7 +820,6 @@ public long getCdcWalDirectoryMaxSize() {
* @param cdcWalDirMaxSize CDC directory maximum size in bytes.
* @return {@code this} for chaining.
*/
@IgniteExperimental
public DataStorageConfiguration setCdcWalDirectoryMaxSize(long cdcWalDirMaxSize) {
this.cdcWalDirMaxSize = cdcWalDirMaxSize;

Expand Down Expand Up @@ -1149,7 +1141,6 @@ public long getWalAutoArchiveAfterInactivity() {
* Zero or negative value disables forcefull auto archiving.
* @return current configuration instance for chaining.
*/
@IgniteExperimental
public DataStorageConfiguration setWalForceArchiveTimeout(long walForceArchiveTimeout) {
this.walForceArchiveTimeout = walForceArchiveTimeout;

Expand All @@ -1160,7 +1151,6 @@ public DataStorageConfiguration setWalForceArchiveTimeout(long walForceArchiveTi
* @return time interval (in milliseconds) after last log of data change
* for force archiving of incompletely WAL segment.
*/
@IgniteExperimental
public long getWalForceArchiveTimeout() {
return walForceArchiveTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,25 @@
* limitations under the License.
*/

package org.apache.ignite.cdc;
package org.apache.ignite.internal.cdc;

import java.net.URL;
import java.util.Collection;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cdc.CdcConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteExperimental;

import static org.apache.ignite.internal.IgniteComponentType.SPRING;

/**
* Utility class to load {@link CdcMain} from Spring XML configuration.
*/
@IgniteExperimental
public class CdcLoader {
/**
* Loads {@link CdcMain} from XML configuration file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public IgniteCommandRegistry() {
new PersistenceCommand(),
new DefragmentationCommand(),
new PerformanceStatisticsCommand(),
new ConsistencyCommand(),
new CdcCommand()
new CdcCommand(),
new ConsistencyCommand()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.management.api.ComputeCommand;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteExperimental;

import static org.apache.ignite.internal.management.api.CommandUtils.node;
import static org.apache.ignite.internal.management.api.CommandUtils.servers;

/**
* Command to delete lost segment links.
*/
@IgniteExperimental
public class CdcDeleteLostSegmentLinksCommand implements ComputeCommand<CdcDeleteLostSegmentLinksCommandArg, Void> {
/** {@inheritDoc} */
@Override public String description() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@

import java.util.function.Consumer;
import org.apache.ignite.internal.management.api.ComputeCommand;
import org.apache.ignite.lang.IgniteExperimental;

/**
* The command to forcefully resend all cache data to CDC.
* Iterates over given caches and writes data entries to the WAL to get captured by CDC.
*/
@IgniteExperimental
public class CdcResendCommand implements ComputeCommand<CdcResendCommandArg, Void> {
/** {@inheritDoc} */
@Override public String description() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cdc.CdcLoader;
import org.apache.ignite.internal.cdc.CdcLoader;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.util.typedef.X;
import org.jetbrains.annotations.Nullable;
Expand All @@ -41,7 +41,6 @@
* this startup and you can use them as an example.
* <p>
*
* @see CdcMain
*/
public class CdcCommandLineStartup {
/** Quite log flag. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,18 @@ If the file name isn't specified the output file name is: '<typeId>.bin':
Get status of collecting performance statistics in the cluster:
control.(sh|bat) --performance-statistics status

Delete lost segment CDC links:
control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] [--yes]

Parameters:
--node-id node_id - ID of the node to delete lost segment links from. If not set, the command will affect all server nodes.

Forcefully resend all cache data to CDC. Iterates over caches and writes primary copies of data entries to the WAL to get captured by CDC:
control.(sh|bat) --cdc resend --caches cache1,...,cacheN

Parameters:
--caches cache1,...,cacheN - specifies a comma-separated list of cache names.

[EXPERIMENTAL]
Check/Repair cache consistency using Read Repair approach:
control.(sh|bat) --consistency repair --cache cache --partitions partition --strategy LWW|PRIMARY|RELATIVE_MAJORITY|REMOVE|CHECK_ONLY [--parallel]
Expand All @@ -450,20 +462,6 @@ If the file name isn't specified the output file name is: '<typeId>.bin':
Finalize partitions update counters:
control.(sh|bat) --consistency finalize

[EXPERIMENTAL]
Delete lost segment CDC links:
control.(sh|bat) --cdc delete_lost_segment_links [--node-id node_id] [--yes]

Parameters:
--node-id node_id - ID of the node to delete lost segment links from. If not set, the command will affect all server nodes.

[EXPERIMENTAL]
Forcefully resend all cache data to CDC. Iterates over caches and writes primary copies of data entries to the WAL to get captured by CDC:
control.(sh|bat) --cdc resend --caches cache1,...,cacheN

Parameters:
--caches cache1,...,cacheN - specifies a comma-separated list of cache names.

By default commands affecting the cluster require interactive confirmation.
Use --yes option to disable it.

Expand Down
Loading

0 comments on commit 2d57d27

Please sign in to comment.