diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
index 23ec7aaeb3e..2fa8eaf556f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
@@ -24,7 +24,6 @@
import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
-
import java.io.Closeable;
import java.io.IOException;
import java.util.AbstractMap.SimpleEntry;
@@ -33,7 +32,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
@@ -50,22 +48,22 @@
*
*
The key is the ledgerId and the value is the {@link LedgerData} content.
*/
-public class LedgerMetadataIndex implements Closeable {
+public abstract class LedgerMetadataIndex implements Closeable {
// Non-ledger data should have negative ID
- private static final long STORAGE_FLAGS = -0xeefd;
+ protected static final long STORAGE_FLAGS = -0xeefd;
// Contains all ledgers stored in the bookie
private final ConcurrentLongHashMap ledgers;
private final AtomicInteger ledgersCount;
- private final KeyValueStorage ledgersDb;
+ protected final KeyValueStorage ledgersDb;
private final LedgerMetadataIndexStats stats;
// Holds ledger modifications applied in memory map, and pending to be flushed on db
- private final ConcurrentLinkedQueue> pendingLedgersUpdates;
+ protected final ConcurrentLinkedQueue> pendingLedgersUpdates;
// Holds ledger ids that were delete from memory map, and pending to be flushed on db
- private final ConcurrentLinkedQueue pendingDeletedLedgers;
+ protected final ConcurrentLinkedQueue pendingDeletedLedgers;
private final ReentrantLock[] locks = new ReentrantLock[16];
public LedgerMetadataIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath,
@@ -302,93 +300,32 @@ public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
/**
* Flushes all pending changes.
*/
- public void flush() throws IOException {
- LongWrapper key = LongWrapper.get();
-
- int updatedLedgers = 0;
- while (!pendingLedgersUpdates.isEmpty()) {
- Entry entry = pendingLedgersUpdates.poll();
- key.set(entry.getKey());
- byte[] value = entry.getValue().toByteArray();
- ledgersDb.put(key.array, value);
- ++updatedLedgers;
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Persisting updates to {} ledgers", updatedLedgers);
- }
-
- ledgersDb.sync();
- key.recycle();
- }
+ public abstract void flush() throws IOException;
- public void removeDeletedLedgers() throws IOException {
- LongWrapper key = LongWrapper.get();
-
- int deletedLedgers = 0;
- while (!pendingDeletedLedgers.isEmpty()) {
- long ledgerId = pendingDeletedLedgers.poll();
- key.set(ledgerId);
- ledgersDb.delete(key.array);
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Persisting deletes of ledgers {}", deletedLedgers);
- }
-
- ledgersDb.sync();
- key.recycle();
- }
+ public abstract void removeDeletedLedgers() throws IOException;
private ReentrantLock lockForLedger(long ledgerId) {
return locks[Math.abs((int) ledgerId) % locks.length];
}
- int getStorageStateFlags() throws IOException {
+ synchronized int getStorageStateFlags() throws IOException {
LongWrapper keyWrapper = LongWrapper.get();
LongWrapper currentWrapper = LongWrapper.get();
try {
keyWrapper.set(STORAGE_FLAGS);
- synchronized (ledgersDb) {
- int current = 0;
- if (ledgersDb.get(keyWrapper.array, currentWrapper.array) >= 0) {
- current = (int) currentWrapper.getValue();
- }
- return current;
+ int current = 0;
+ if (ledgersDb.get(keyWrapper.array, currentWrapper.array) >= 0) {
+ current = (int) currentWrapper.getValue();
}
+ return current;
} finally {
keyWrapper.recycle();
currentWrapper.recycle();
}
}
- boolean setStorageStateFlags(int expected, int newFlags) throws IOException {
- LongWrapper keyWrapper = LongWrapper.get();
- LongWrapper currentWrapper = LongWrapper.get();
- LongWrapper newFlagsWrapper = LongWrapper.get();
-
- try {
- keyWrapper.set(STORAGE_FLAGS);
- newFlagsWrapper.set(newFlags);
- synchronized (ledgersDb) {
- int current = 0;
- if (ledgersDb.get(keyWrapper.array, currentWrapper.array) >= 0) {
- current = (int) currentWrapper.getValue();
- }
- if (current == expected) {
- ledgersDb.put(keyWrapper.array, newFlagsWrapper.array);
- ledgersDb.sync();
- return true;
- }
- }
- } finally {
- keyWrapper.recycle();
- currentWrapper.recycle();
- newFlagsWrapper.recycle();
- }
- return false;
- }
+ abstract boolean setStorageStateFlags(int expected, int newFlags) throws IOException;
private static final Logger log = LoggerFactory.getLogger(LedgerMetadataIndex.class);
@@ -412,4 +349,11 @@ void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException {
}
}
+ public static LedgerMetadataIndex newInstance(ServerConfiguration conf, KeyValueStorageFactory storageFactory,
+ String basePath, StatsLogger stats) throws IOException {
+ if (!conf.getDbLedgerMetadataIndexSyncEnable()) {
+ return new LedgerMetadataIndexAsync(conf, storageFactory, basePath, stats);
+ }
+ return new LedgerMetadataIndexSync(conf, storageFactory, basePath, stats);
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java
new file mode 100644
index 00000000000..e95ecc7b92e
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains an index for the ledgers metadata.
+ *
+ * Asynchronous write mode class,
+ * the key is the ledgerId and the value is the {@link LedgerData} content.
+ */
+public class LedgerMetadataIndexAsync extends LedgerMetadataIndex {
+
+ private static final Logger log = LoggerFactory.getLogger(LedgerMetadataIndexAsync.class);
+
+ public LedgerMetadataIndexAsync(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath,
+ StatsLogger stats) throws IOException {
+ super(conf, storageFactory, basePath, stats);
+ }
+
+ /**
+ * Flushes all pending changes.
+ */
+ @Override
+ public void flush() throws IOException {
+ LongWrapper key = LongWrapper.get();
+
+ int updatedLedgers = 0;
+ while (!pendingLedgersUpdates.isEmpty()) {
+ Entry entry = pendingLedgersUpdates.poll();
+ key.set(entry.getKey());
+ byte[] value = entry.getValue().toByteArray();
+ ledgersDb.put(key.array, value);
+ ++updatedLedgers;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Persisting updates to {} ledgers", updatedLedgers);
+ }
+
+ ledgersDb.sync();
+ key.recycle();
+ }
+
+ @Override
+ public void removeDeletedLedgers() throws IOException {
+ LongWrapper key = LongWrapper.get();
+
+ int deletedLedgers = 0;
+ while (!pendingDeletedLedgers.isEmpty()) {
+ long ledgerId = pendingDeletedLedgers.poll();
+ key.set(ledgerId);
+ ledgersDb.delete(key.array);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Persisting deletes of ledgers {}", deletedLedgers);
+ }
+
+ ledgersDb.sync();
+ key.recycle();
+ }
+
+ @Override
+ synchronized boolean setStorageStateFlags(int expected, int newFlags) throws IOException {
+ LongWrapper keyWrapper = LongWrapper.get();
+ LongWrapper currentWrapper = LongWrapper.get();
+ LongWrapper newFlagsWrapper = LongWrapper.get();
+
+ try {
+ keyWrapper.set(STORAGE_FLAGS);
+ newFlagsWrapper.set(newFlags);
+ int current = 0;
+ if (ledgersDb.get(keyWrapper.array, currentWrapper.array) >= 0) {
+ current = (int) currentWrapper.getValue();
+ }
+ if (current == expected) {
+ ledgersDb.put(keyWrapper.array, newFlagsWrapper.array);
+ ledgersDb.sync();
+ return true;
+ }
+ } finally {
+ keyWrapper.recycle();
+ currentWrapper.recycle();
+ newFlagsWrapper.recycle();
+ }
+ return false;
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java
new file mode 100644
index 00000000000..a127249109b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java
@@ -0,0 +1,136 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains an index for the ledgers metadata.
+ *
+ * Synchronous write mode class,
+ * the key is the ledgerId and the value is the {@link LedgerData} content.
+ */
+public class LedgerMetadataIndexSync extends LedgerMetadataIndex {
+
+ private static final Logger log = LoggerFactory.getLogger(LedgerMetadataIndexSync.class);
+
+ public LedgerMetadataIndexSync(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath,
+ StatsLogger stats) throws IOException {
+ super(conf, storageFactory, basePath, stats);
+ }
+
+ /**
+ * Flushes all pending changes.
+ */
+ @Override
+ public void flush() throws IOException {
+ LongWrapper key = LongWrapper.get();
+ KeyValueStorage.Batch batch = ledgersDb.newBatch();
+
+ try {
+ int updatedLedgers = 0;
+ while (!pendingLedgersUpdates.isEmpty()) {
+ Entry entry = pendingLedgersUpdates.poll();
+ key.set(entry.getKey());
+ byte[] value = entry.getValue().toByteArray();
+ batch.put(key.array, value);
+ ++updatedLedgers;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Persisting updates to {} ledgers", updatedLedgers);
+ }
+ } finally {
+ try {
+ batch.flush();
+ batch.clear();
+ } finally {
+ key.recycle();
+ batch.close();
+ }
+ }
+ }
+
+ @Override
+ public void removeDeletedLedgers() throws IOException {
+ LongWrapper key = LongWrapper.get();
+ KeyValueStorage.Batch batch = ledgersDb.newBatch();
+
+ try {
+ int deletedLedgers = 0;
+ while (!pendingDeletedLedgers.isEmpty()) {
+ long ledgerId = pendingDeletedLedgers.poll();
+ key.set(ledgerId);
+ batch.remove(key.array);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Persisting deletes of ledgers {}", deletedLedgers);
+ }
+ } finally {
+ try {
+ batch.flush();
+ batch.clear();
+ } finally {
+ key.recycle();
+ batch.close();
+ }
+ }
+ }
+
+ @Override
+ synchronized boolean setStorageStateFlags(int expected, int newFlags) throws IOException {
+ LongWrapper keyWrapper = LongWrapper.get();
+ LongWrapper currentWrapper = LongWrapper.get();
+ LongWrapper newFlagsWrapper = LongWrapper.get();
+
+ KeyValueStorage.Batch batch = ledgersDb.newBatch();
+ try {
+ keyWrapper.set(STORAGE_FLAGS);
+ newFlagsWrapper.set(newFlags);
+ int current = 0;
+ if (ledgersDb.get(keyWrapper.array, currentWrapper.array) >= 0) {
+ current = (int) currentWrapper.getValue();
+ }
+ if (current == expected) {
+ batch.put(keyWrapper.array, newFlagsWrapper.array);
+ return true;
+ }
+ } finally {
+ try {
+ batch.flush();
+ batch.clear();
+ } finally {
+ keyWrapper.recycle();
+ currentWrapper.recycle();
+ newFlagsWrapper.recycle();
+ batch.close();
+ }
+ }
+ return false;
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
index 84a9da552d5..e5886392255 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
@@ -135,7 +135,8 @@ public boolean accept(long ledgerId) {
private Set getActiveLedgers(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath)
throws IOException {
- LedgerMetadataIndex ledgers = new LedgerMetadataIndex(conf, storageFactory, basePath, NullStatsLogger.INSTANCE);
+ LedgerMetadataIndex ledgers = LedgerMetadataIndex.newInstance(conf,
+ storageFactory, basePath, NullStatsLogger.INSTANCE);
Set activeLedgers = Sets.newHashSet();
for (Long ledger : ledgers.getActiveLedgersInRange(0, Long.MAX_VALUE)) {
activeLedgers.add(ledger);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index a1131df8157..7b2128fd684 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -184,7 +184,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
readCache = new ReadCache(allocator, readCacheMaxSize);
- ledgerIndex = new LedgerMetadataIndex(conf,
+ ledgerIndex = LedgerMetadataIndex.newInstance(conf,
KeyValueStorageRocksDB.factory, indexBaseDir, ledgerIndexDirStatsLogger);
entryLocationIndex = new EntryLocationIndex(conf,
KeyValueStorageRocksDB.factory, indexBaseDir, ledgerIndexDirStatsLogger);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 5518bf03623..d52cc9563ec 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -171,6 +171,7 @@ public class ServerConfiguration extends AbstractConfiguration