Skip to content

Commit

Permalink
Refactor and rename translog manager implementations
Browse files Browse the repository at this point in the history
Signed-off-by: Varun Bharadwaj <[email protected]>
  • Loading branch information
varunbharadwaj committed Feb 26, 2025
1 parent cb010fa commit f56f90f
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.ParseContext;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.EngineTranslogManager;
import org.opensearch.index.translog.NoOpInternalTranslogManager;
import org.opensearch.index.translog.NoOpEngineTranslogManager;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
import org.opensearch.indices.pollingingest.DefaultStreamPoller;
import org.opensearch.indices.pollingingest.StreamPoller;
Expand Down Expand Up @@ -268,12 +268,12 @@ public DocumentMapperForType getDocumentMapperForType() {
}

@Override
protected EngineTranslogManager createTranslogManager(
protected TranslogManager createTranslogManager(
String translogUUID,
TranslogDeletionPolicy translogDeletionPolicy,
CompositeTranslogEventListener translogEventListener
) {
return new NoOpInternalTranslogManager(engineConfig, translogUUID);
return new NoOpEngineTranslogManager(engineConfig, translogUUID);
}

protected Map<String, String> commitDataAsMap() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.OpenSearchMergePolicy;
import org.opensearch.index.translog.EngineTranslogManager;
import org.opensearch.index.translog.InternalTranslogManager;
import org.opensearch.index.translog.InternalEngineTranslogManager;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.index.translog.TranslogDeletionPolicy;
Expand Down Expand Up @@ -156,7 +155,7 @@ public class InternalEngine extends Engine {
*/
private volatile long lastDeleteVersionPruneTimeMSec;

protected final EngineTranslogManager translogManager;
protected final TranslogManager translogManager;
protected final IndexWriter indexWriter;
protected final LocalCheckpointTracker localCheckpointTracker;
protected final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
Expand Down Expand Up @@ -248,7 +247,7 @@ public TranslogManager translogManager() {
ExternalReaderManager externalReaderManager = null;
OpenSearchReaderManager internalReaderManager = null;
EngineMergeScheduler scheduler = null;
EngineTranslogManager translogManagerRef = null;
TranslogManager translogManagerRef = null;
boolean success = false;
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
Expand Down Expand Up @@ -352,12 +351,12 @@ public void onFailure(String reason, Exception ex) {
logger.trace("created new InternalEngine");
}

protected EngineTranslogManager createTranslogManager(
protected TranslogManager createTranslogManager(
String translogUUID,
TranslogDeletionPolicy translogDeletionPolicy,
CompositeTranslogEventListener translogEventListener
) throws IOException {
return new InternalTranslogManager(
return new InternalEngineTranslogManager(
engineConfig.getTranslogConfig(),
engineConfig.getPrimaryTermSupplier(),
engineConfig.getGlobalCheckpointSupplier(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogException;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.WriteOnlyTranslogManager;
import org.opensearch.index.translog.WriteOnlyInternalEngineTranslogManager;
import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.search.suggest.completion.CompletionStats;

Expand Down Expand Up @@ -61,7 +61,7 @@ public class NRTReplicationEngine extends Engine {
private final NRTReplicationReaderManager readerManager;
private final CompletionStatsCache completionStatsCache;
private final LocalCheckpointTracker localCheckpointTracker;
private final WriteOnlyTranslogManager translogManager;
private final WriteOnlyInternalEngineTranslogManager translogManager;
private final Lock flushLock = new ReentrantLock();
protected final ReplicaFileTracker replicaFileTracker;

Expand All @@ -73,7 +73,7 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
super(engineConfig);
store.incRef();
NRTReplicationReaderManager readerManager = null;
WriteOnlyTranslogManager translogManagerRef = null;
WriteOnlyInternalEngineTranslogManager translogManagerRef = null;
boolean success = false;
try {
this.replicaFileTracker = new ReplicaFileTracker(store::deleteQuiet);
Expand All @@ -99,7 +99,7 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
}
final Map<String, String> userData = this.lastCommittedSegmentInfos.getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
translogManagerRef = new WriteOnlyTranslogManager(
translogManagerRef = new WriteOnlyInternalEngineTranslogManager(
engineConfig.getTranslogConfig(),
engineConfig.getPrimaryTermSupplier(),
engineConfig.getGlobalCheckpointSupplier(),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*
* @opensearch.internal
*/
public class InternalTranslogManager implements EngineTranslogManager {
public class InternalEngineTranslogManager implements TranslogManager {

private final ReleasableLock readLock;
private final LifecycleAware engineLifeCycleAware;
Expand All @@ -46,7 +46,7 @@ public class InternalTranslogManager implements EngineTranslogManager {
private final Supplier<LocalCheckpointTracker> localCheckpointTrackerSupplier;
private final Logger logger;

public InternalTranslogManager(
public InternalEngineTranslogManager(
TranslogConfig translogConfig,
LongSupplier primaryTermSupplier,
LongSupplier globalCheckpointSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

/**
* A no-op {@link TranslogManager} implementation capable of interfacing with the {@link org.opensearch.index.engine.Engine}
* to be used when the engine does not require a translog.
*
* @opensearch.internal
*/
public class NoOpInternalTranslogManager implements EngineTranslogManager {
public class NoOpEngineTranslogManager implements TranslogManager {
private final TranslogDeletionPolicy translogDeletionPolicy;
private final String translogUUID;

public NoOpInternalTranslogManager(EngineConfig engineConfig, String translogUUID) {
public NoOpEngineTranslogManager(EngineConfig engineConfig, String translogUUID) {
this.translogUUID = translogUUID;
this.translogDeletionPolicy = new DefaultTranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.seqno.SequenceNumbers;

import java.io.IOException;
import java.util.stream.Stream;
Expand Down Expand Up @@ -134,4 +135,40 @@ public Releasable drainSync() {
public Translog.TranslogGeneration getTranslogGeneration() {
return null;
}

@Override
public long getLastSyncedGlobalCheckpoint() {
return 0;

Check warning on line 141 in server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java#L141

Added line #L141 was not covered by tests
}

@Override
public long getMaxSeqNo() {
return SequenceNumbers.NO_OPS_PERFORMED;

Check warning on line 146 in server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java#L146

Added line #L146 was not covered by tests
}

@Override
public void trimUnreferencedReaders() throws IOException {}

Check warning on line 150 in server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java#L150

Added line #L150 was not covered by tests

@Override
public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) {
return false;

Check warning on line 154 in server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java#L154

Added line #L154 was not covered by tests
}

@Override
public Exception getTragicExceptionIfClosed() {
return null;

Check warning on line 159 in server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java#L159

Added line #L159 was not covered by tests
}

@Override
public TranslogDeletionPolicy getDeletionPolicy() {
return null;

Check warning on line 164 in server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java#L164

Added line #L164 was not covered by tests
}

@Override
public String getTranslogUUID() {
return "";

Check warning on line 169 in server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java#L169

Added line #L169 was not covered by tests
}

@Override
public void close() throws IOException {}

Check warning on line 173 in server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java#L173

Added line #L173 was not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lease.Releasable;

import java.io.Closeable;
import java.io.IOException;
import java.util.stream.Stream;

Expand All @@ -20,7 +21,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public interface TranslogManager {
public interface TranslogManager extends Closeable {

/**
* Rolls the translog generation and cleans unneeded.
Expand Down Expand Up @@ -142,4 +143,46 @@ public interface TranslogManager {
Releasable drainSync();

Translog.TranslogGeneration getTranslogGeneration();

/**
* Retrieves last synced global checkpoint.
*/
long getLastSyncedGlobalCheckpoint();

/**
* Retrieves the max seq no.
*/
long getMaxSeqNo();

/**
* Trims unreferenced translog generations by asking {@link TranslogDeletionPolicy} for the minimum required
* generation.
*/
void trimUnreferencedReaders() throws IOException;

/**
*
* @param localCheckpointOfLastCommit local checkpoint reference of last commit to translog
* @param flushThreshold threshold to flush to translog
* @return if the translog should be flushed
*/
boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold);

/**
* Retrieves the underlying translog tragic exception
* @return the tragic exception
*/
Exception getTragicExceptionIfClosed();

/**
* Retrieves the translog deletion policy
* @return TranslogDeletionPolicy
*/
TranslogDeletionPolicy getDeletionPolicy();

/**
* Retrieves the translog unique identifier
* @return the uuid of the translog
*/
String getTranslogUUID();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
*
* @opensearch.internal
*/
public class WriteOnlyTranslogManager extends InternalTranslogManager {
public class WriteOnlyInternalEngineTranslogManager extends InternalEngineTranslogManager {

public WriteOnlyTranslogManager(
public WriteOnlyInternalEngineTranslogManager(
TranslogConfig translogConfig,
LongSupplier primaryTermSupplier,
LongSupplier globalCheckpointSupplier,
Expand Down
Loading

0 comments on commit f56f90f

Please sign in to comment.