Skip to content

Commit

Permalink
Rename CheckpointManager to CheckpointStore (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
sijie authored Jan 11, 2018
1 parent be560d6 commit aaada2e
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import lombok.Builder.Default;
import lombok.Getter;
import lombok.Singular;
import org.apache.distributedlog.api.statestore.checkpoint.CheckpointManager;
import org.apache.distributedlog.api.statestore.checkpoint.CheckpointStore;
import org.apache.distributedlog.common.coder.Coder;

/**
Expand All @@ -43,7 +43,7 @@ public class StateStoreSpec {
private String stream;
private ScheduledExecutorService writeIOScheduler;
private ScheduledExecutorService readIOScheduler;
private CheckpointManager checkpointManager;
private CheckpointStore checkpointStore;
@Default private Duration checkpointDuration = Duration.ofMinutes(1);
@Singular private Map<String, Object> configs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* A checkpoint factory that provides inputstream and outputstream for reading checkpoints.
*/
public interface CheckpointManager {
public interface CheckpointStore {

List<String> listFiles(String filePath) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.distributedlog.api.statestore.checkpoint.CheckpointManager;
import org.apache.distributedlog.api.statestore.checkpoint.CheckpointStore;
import org.apache.distributedlog.api.statestore.exceptions.StateStoreException;
import org.apache.distributedlog.statestore.impl.rocksdb.RocksUtils;
import org.apache.distributedlog.statestore.proto.CheckpointMetadata;
Expand All @@ -41,21 +41,21 @@ public class RocksCheckpointer implements AutoCloseable {

public static CheckpointMetadata restore(String dbName,
File dbPath,
CheckpointManager checkpointManager)
CheckpointStore checkpointStore)
throws StateStoreException {
try {
String dbPrefix = String.format("%s", dbName);

Pair<String, CheckpointMetadata> latestCheckpoint = getLatestCheckpoint(
dbPrefix, checkpointManager);
dbPrefix, checkpointStore);
File checkpointsDir = new File(dbPath, "checkpoints");
String checkpointId = latestCheckpoint.getLeft();
CheckpointMetadata checkpointMetadata = latestCheckpoint.getRight();
if (checkpointId != null) {
RocksdbRestoreTask task = new RocksdbRestoreTask(
dbName,
checkpointsDir,
checkpointManager);
checkpointStore);
task.restore(checkpointId, checkpointMetadata);
} else {
// no checkpoints available, create an empty directory
Expand All @@ -77,8 +77,8 @@ public static CheckpointMetadata restore(String dbName,
}

private static Pair<String, CheckpointMetadata > getLatestCheckpoint(
String dbPrefix, CheckpointManager checkpointManager) throws IOException {
List<String> files = checkpointManager.listFiles(dbPrefix);
String dbPrefix, CheckpointStore checkpointStore) throws IOException {
List<String> files = checkpointStore.listFiles(dbPrefix);
CheckpointMetadata latestCheckpoint = null;
String latestCheckpointId = null;

Expand All @@ -91,7 +91,7 @@ private static Pair<String, CheckpointMetadata > getLatestCheckpoint(
dbPrefix,
checkpointId);

try (InputStream is = checkpointManager.openInputStream(metadataPath)) {
try (InputStream is = checkpointStore.openInputStream(metadataPath)) {
CheckpointMetadata ckpt = CheckpointMetadata.parseFrom(is);
if (null == latestCheckpoint) {
latestCheckpointId = checkpointId;
Expand All @@ -108,16 +108,16 @@ private static Pair<String, CheckpointMetadata > getLatestCheckpoint(
private final String dbName;
private final File dbPath;
private final Checkpoint checkpoint;
private final CheckpointManager checkpointManager;
private final CheckpointStore checkpointStore;

public RocksCheckpointer(String dbName,
File dbPath,
RocksDB rocksDB,
CheckpointManager checkpointManager) {
CheckpointStore checkpointStore) {
this.dbName = dbName;
this.dbPath = dbPath;
this.checkpoint = Checkpoint.create(rocksDB);
this.checkpointManager = checkpointManager;
this.checkpointStore = checkpointStore;
}

void checkpointAtTxid(byte[] txid) throws StateStoreException {
Expand All @@ -126,7 +126,7 @@ void checkpointAtTxid(byte[] txid) throws StateStoreException {
dbName,
checkpoint,
new File(dbPath, "checkpoints"),
checkpointManager);
checkpointStore);
task.checkpoint(txid);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.List;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.statestore.checkpoint.CheckpointManager;
import org.apache.distributedlog.api.statestore.checkpoint.CheckpointStore;
import org.apache.distributedlog.api.statestore.exceptions.StateStoreException;
import org.apache.distributedlog.statestore.impl.rocksdb.RocksUtils;
import org.apache.distributedlog.statestore.proto.CheckpointMetadata;
Expand All @@ -45,17 +45,17 @@ public class RocksdbCheckpointTask {
private final String dbName;
private final Checkpoint checkpoint;
private final File checkpointDir;
private final CheckpointManager checkpointManager;
private final CheckpointStore checkpointStore;
private final String dbPrefix;

public RocksdbCheckpointTask(String dbName,
Checkpoint checkpoint,
File checkpointDir,
CheckpointManager checkpointManager) {
CheckpointStore checkpointStore) {
this.dbName = dbName;
this.checkpoint = checkpoint;
this.checkpointDir = checkpointDir;
this.checkpointManager = checkpointManager;
this.checkpointStore = checkpointStore;
this.dbPrefix = String.format("%s", dbName);
}

Expand Down Expand Up @@ -110,7 +110,7 @@ private List<File> getFilesToCopy(File checkpointedDir) throws IOException {
// sst files
String destSstPath = RocksUtils.getDestSstPath(dbPrefix, file);
// TODO: do more validation on the file
if (!checkpointManager.fileExists(destSstPath)) {
if (!checkpointStore.fileExists(destSstPath)) {
fileToCopy.add(file);
}
} else {
Expand All @@ -132,7 +132,7 @@ private void copyFilesToDest(String checkpointId, List<File> files) throws IOExc
*/
private void copyFileToDest(String checkpointId, File file) throws IOException {
String destPath = RocksUtils.getDestPath(dbPrefix, checkpointId, file);
try (OutputStream os = checkpointManager.openOutputStream(destPath)) {
try (OutputStream os = checkpointStore.openOutputStream(destPath)) {
Files.copy(file, os);
}
}
Expand All @@ -146,7 +146,7 @@ private void finalizeCopyFiles(String checkpointId, List<File> files) throws IOE
String destSstTempPath = RocksUtils.getDestTempSstPath(
dbPrefix, checkpointId, file);
String destSstPath = RocksUtils.getDestSstPath(dbPrefix, file);
checkpointManager.rename(destSstTempPath, destSstPath);
checkpointStore.rename(destSstTempPath, destSstPath);
}
}
}
Expand All @@ -166,7 +166,7 @@ private void finalizeCheckpoint(String checkpointId,
metadataBuilder.setCreatedAt(System.currentTimeMillis());

String destCheckpointPath = RocksUtils.getDestCheckpointMetadataPath(dbPrefix, checkpointId);
try (OutputStream os = checkpointManager.openOutputStream(destCheckpointPath)) {
try (OutputStream os = checkpointStore.openOutputStream(destCheckpointPath)) {
os.write(metadataBuilder.build().toByteArray());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.nio.file.StandardCopyOption;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.statestore.checkpoint.CheckpointManager;
import org.apache.distributedlog.api.statestore.checkpoint.CheckpointStore;
import org.apache.distributedlog.api.statestore.exceptions.StateStoreException;
import org.apache.distributedlog.statestore.impl.rocksdb.RocksUtils;
import org.apache.distributedlog.statestore.proto.CheckpointMetadata;
Expand All @@ -39,15 +39,15 @@ public class RocksdbRestoreTask {

private final String dbName;
private final File checkpointDir;
private final CheckpointManager checkpointManager;
private final CheckpointStore checkpointStore;
private final String dbPrefix;

public RocksdbRestoreTask(String dbName,
File checkpointDir,
CheckpointManager checkpointManager) {
CheckpointStore checkpointStore) {
this.dbName = dbName;
this.checkpointDir = checkpointDir;
this.checkpointManager = checkpointManager;
this.checkpointStore = checkpointStore;
this.dbPrefix = String.format("%s", dbName);
}

Expand Down Expand Up @@ -89,7 +89,7 @@ private List<String> getFilesToCopy(String checkpointId,
srcFile = RocksUtils.getDestPath(dbPrefix, checkpointId, localFile);
}

long srcFileLength = checkpointManager.getFileLength(srcFile);
long srcFileLength = checkpointStore.getFileLength(srcFile);
long localFileLength = localFile.length();
if (srcFileLength != localFileLength) {
filesToCopy.add(fileName);
Expand Down Expand Up @@ -119,7 +119,7 @@ private void copyFileFromRemote(String checkpointId,
}

byte[] data = new byte[128 * 1024];
try (InputStream is = checkpointManager.openInputStream(remoteFilePath)) {
try (InputStream is = checkpointStore.openInputStream(remoteFilePath)) {
Files.copy(
is,
Paths.get(localFile.getAbsolutePath()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.List;
import org.apache.distributedlog.api.statestore.checkpoint.CheckpointManager;
import org.apache.distributedlog.api.statestore.checkpoint.CheckpointStore;

/**
* Filesystem based checkpoint factory.
*/
public class FSCheckpointManager implements CheckpointManager {
public class FSCheckpointManager implements CheckpointStore {

@Override
public List<String> listFiles(String filePath) throws IOException {
Expand Down

0 comments on commit aaada2e

Please sign in to comment.