From 336f0a15aa6ce01b68eff8a50a5e3a9aaf5b75e7 Mon Sep 17 00:00:00 2001 From: lushiji Date: Sun, 26 Jun 2022 22:50:06 +0800 Subject: [PATCH 1/6] add async or sync for :ledger metadata index's rocksdb write --- .../storage/ldb/LedgerMetadataIndex.java | 82 ++--------- .../storage/ldb/LedgerMetadataIndexAsync.java | 117 +++++++++++++++ .../storage/ldb/LedgerMetadataIndexSync.java | 139 ++++++++++++++++++ .../storage/ldb/LocationsIndexRebuildOp.java | 3 +- .../ldb/SingleDirectoryDbLedgerStorage.java | 3 +- .../bookkeeper/conf/ServerConfiguration.java | 23 +++ .../ldb/LedgerMetadataIndexAsyncTest.java | 90 ++++++++++++ .../ldb/LedgerMetadataIndexSyncTest.java | 90 ++++++++++++ 8 files changed, 478 insertions(+), 69 deletions(-) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsyncTest.java create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSyncTest.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java index 23ec7aaeb3e..98386fe3809 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java @@ -50,22 +50,22 @@ * *

The key is the ledgerId and the value is the {@link LedgerData} content. */ -public class LedgerMetadataIndex implements Closeable { +public abstract class LedgerMetadataIndex implements Closeable { // Non-ledger data should have negative ID - private static final long STORAGE_FLAGS = -0xeefd; + protected static final long STORAGE_FLAGS = -0xeefd; // Contains all ledgers stored in the bookie private final ConcurrentLongHashMap ledgers; private final AtomicInteger ledgersCount; - private final KeyValueStorage ledgersDb; + protected final KeyValueStorage ledgersDb; private final LedgerMetadataIndexStats stats; // Holds ledger modifications applied in memory map, and pending to be flushed on db - private final ConcurrentLinkedQueue> pendingLedgersUpdates; + protected final ConcurrentLinkedQueue> pendingLedgersUpdates; // Holds ledger ids that were delete from memory map, and pending to be flushed on db - private final ConcurrentLinkedQueue pendingDeletedLedgers; + protected final ConcurrentLinkedQueue pendingDeletedLedgers; private final ReentrantLock[] locks = new ReentrantLock[16]; public LedgerMetadataIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath, @@ -302,43 +302,9 @@ public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException { /** * Flushes all pending changes. */ - public void flush() throws IOException { - LongWrapper key = LongWrapper.get(); - - int updatedLedgers = 0; - while (!pendingLedgersUpdates.isEmpty()) { - Entry entry = pendingLedgersUpdates.poll(); - key.set(entry.getKey()); - byte[] value = entry.getValue().toByteArray(); - ledgersDb.put(key.array, value); - ++updatedLedgers; - } - - if (log.isDebugEnabled()) { - log.debug("Persisting updates to {} ledgers", updatedLedgers); - } - - ledgersDb.sync(); - key.recycle(); - } - - public void removeDeletedLedgers() throws IOException { - LongWrapper key = LongWrapper.get(); + public abstract void flush() throws IOException; - int deletedLedgers = 0; - while (!pendingDeletedLedgers.isEmpty()) { - long ledgerId = pendingDeletedLedgers.poll(); - key.set(ledgerId); - ledgersDb.delete(key.array); - } - - if (log.isDebugEnabled()) { - log.debug("Persisting deletes of ledgers {}", deletedLedgers); - } - - ledgersDb.sync(); - key.recycle(); - } + public abstract void removeDeletedLedgers() throws IOException; private ReentrantLock lockForLedger(long ledgerId) { return locks[Math.abs((int) ledgerId) % locks.length]; @@ -363,32 +329,7 @@ int getStorageStateFlags() throws IOException { } } - boolean setStorageStateFlags(int expected, int newFlags) throws IOException { - LongWrapper keyWrapper = LongWrapper.get(); - LongWrapper currentWrapper = LongWrapper.get(); - LongWrapper newFlagsWrapper = LongWrapper.get(); - - try { - keyWrapper.set(STORAGE_FLAGS); - newFlagsWrapper.set(newFlags); - synchronized (ledgersDb) { - int current = 0; - if (ledgersDb.get(keyWrapper.array, currentWrapper.array) >= 0) { - current = (int) currentWrapper.getValue(); - } - if (current == expected) { - ledgersDb.put(keyWrapper.array, newFlagsWrapper.array); - ledgersDb.sync(); - return true; - } - } - } finally { - keyWrapper.recycle(); - currentWrapper.recycle(); - newFlagsWrapper.recycle(); - } - return false; - } + abstract boolean setStorageStateFlags(int expected, int newFlags) throws IOException; private static final Logger log = LoggerFactory.getLogger(LedgerMetadataIndex.class); @@ -412,4 +353,11 @@ void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException { } } + public static LedgerMetadataIndex newInstance(ServerConfiguration conf, KeyValueStorageFactory storageFactory, + String basePath, StatsLogger stats) throws IOException { + if (!conf.getDbLedgerMetadataIndexSyncEnable()) { + return new LedgerMetadataIndexAsync(conf, storageFactory, basePath, stats); + } + return new LedgerMetadataIndexSync(conf, storageFactory, basePath, stats); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java new file mode 100644 index 00000000000..cf78e56aa4e --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java @@ -0,0 +1,117 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintains an index for the ledgers metadata. + * + *

Asynchronous write mode class. + *

The key is the ledgerId and the value is the {@link LedgerData} content. + */ +public class LedgerMetadataIndexAsync extends LedgerMetadataIndex { + + private static final Logger log = LoggerFactory.getLogger(LedgerMetadataIndexAsync.class); + + public LedgerMetadataIndexAsync(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath, + StatsLogger stats) throws IOException { + super(conf, storageFactory, basePath, stats); + } + + /** + * Flushes all pending changes. + */ + @Override + public void flush() throws IOException { + LongWrapper key = LongWrapper.get(); + + int updatedLedgers = 0; + while (!pendingLedgersUpdates.isEmpty()) { + Entry entry = pendingLedgersUpdates.poll(); + key.set(entry.getKey()); + byte[] value = entry.getValue().toByteArray(); + ledgersDb.put(key.array, value); + ++updatedLedgers; + } + + if (log.isDebugEnabled()) { + log.debug("Persisting updates to {} ledgers", updatedLedgers); + } + + ledgersDb.sync(); + key.recycle(); + } + + @Override + public void removeDeletedLedgers() throws IOException { + LongWrapper key = LongWrapper.get(); + + int deletedLedgers = 0; + while (!pendingDeletedLedgers.isEmpty()) { + long ledgerId = pendingDeletedLedgers.poll(); + key.set(ledgerId); + ledgersDb.delete(key.array); + } + + if (log.isDebugEnabled()) { + log.debug("Persisting deletes of ledgers {}", deletedLedgers); + } + + ledgersDb.sync(); + key.recycle(); + } + + @Override + boolean setStorageStateFlags(int expected, int newFlags) throws IOException { + LongWrapper keyWrapper = LongWrapper.get(); + LongWrapper currentWrapper = LongWrapper.get(); + LongWrapper newFlagsWrapper = LongWrapper.get(); + + try { + keyWrapper.set(STORAGE_FLAGS); + newFlagsWrapper.set(newFlags); + synchronized (ledgersDb) { + int current = 0; + if (ledgersDb.get(keyWrapper.array, currentWrapper.array) >= 0) { + current = (int) currentWrapper.getValue(); + } + if (current == expected) { + ledgersDb.put(keyWrapper.array, newFlagsWrapper.array); + ledgersDb.sync(); + return true; + } + } + } finally { + keyWrapper.recycle(); + currentWrapper.recycle(); + newFlagsWrapper.recycle(); + } + return false; + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java new file mode 100644 index 00000000000..ad5b4c7f7c5 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java @@ -0,0 +1,139 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintains an index for the ledgers metadata. + * + *

Synchronous write mode class. + *

The key is the ledgerId and the value is the {@link LedgerData} content. + */ +public class LedgerMetadataIndexSync extends LedgerMetadataIndex { + + private static final Logger log = LoggerFactory.getLogger(LedgerMetadataIndexSync.class); + + public LedgerMetadataIndexSync(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath, + StatsLogger stats) throws IOException { + super(conf, storageFactory, basePath, stats); + } + + /** + * Flushes all pending changes. + */ + @Override + public void flush() throws IOException { + LongWrapper key = LongWrapper.get(); + KeyValueStorage.Batch batch = ledgersDb.newBatch(); + + try { + int updatedLedgers = 0; + while (!pendingLedgersUpdates.isEmpty()) { + Entry entry = pendingLedgersUpdates.poll(); + key.set(entry.getKey()); + byte[] value = entry.getValue().toByteArray(); + batch.put(key.array, value); + ++updatedLedgers; + } + + if (log.isDebugEnabled()) { + log.debug("Persisting updates to {} ledgers", updatedLedgers); + } + } finally { + try { + batch.flush(); + batch.clear(); + } finally { + key.recycle(); + batch.close(); + } + } + } + + @Override + public void removeDeletedLedgers() throws IOException { + LongWrapper key = LongWrapper.get(); + KeyValueStorage.Batch batch = ledgersDb.newBatch(); + + try { + int deletedLedgers = 0; + while (!pendingDeletedLedgers.isEmpty()) { + long ledgerId = pendingDeletedLedgers.poll(); + key.set(ledgerId); + batch.remove(key.array); + } + + if (log.isDebugEnabled()) { + log.debug("Persisting deletes of ledgers {}", deletedLedgers); + } + } finally { + try { + batch.flush(); + batch.clear(); + } finally { + key.recycle(); + batch.close(); + } + } + } + + @Override + boolean setStorageStateFlags(int expected, int newFlags) throws IOException { + LongWrapper keyWrapper = LongWrapper.get(); + LongWrapper currentWrapper = LongWrapper.get(); + LongWrapper newFlagsWrapper = LongWrapper.get(); + + KeyValueStorage.Batch batch = ledgersDb.newBatch(); + try { + keyWrapper.set(STORAGE_FLAGS); + newFlagsWrapper.set(newFlags); + synchronized (ledgersDb) { + int current = 0; + if (ledgersDb.get(keyWrapper.array, currentWrapper.array) >= 0) { + current = (int) currentWrapper.getValue(); + } + if (current == expected) { + batch.put(keyWrapper.array, newFlagsWrapper.array); + return true; + } + } + } finally { + try { + batch.flush(); + batch.clear(); + } finally { + keyWrapper.recycle(); + currentWrapper.recycle(); + newFlagsWrapper.recycle(); + batch.close(); + } + } + return false; + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java index 5d53c66081a..f278366930a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java @@ -123,7 +123,8 @@ public boolean accept(long ledgerId) { private Set getActiveLedgers(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath) throws IOException { - LedgerMetadataIndex ledgers = new LedgerMetadataIndex(conf, storageFactory, basePath, NullStatsLogger.INSTANCE); + LedgerMetadataIndex ledgers = LedgerMetadataIndex.newInstance(conf, + storageFactory, basePath, NullStatsLogger.INSTANCE); Set activeLedgers = Sets.newHashSet(); for (Long ledger : ledgers.getActiveLedgersInRange(0, Long.MAX_VALUE)) { activeLedgers.add(ledger); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index 30516f82c67..93e86c13cf1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -174,7 +174,8 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le readCache = new ReadCache(allocator, readCacheMaxSize); - ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, ledgerDirStatsLogger); + ledgerIndex = LedgerMetadataIndex.newInstance(conf, + KeyValueStorageRocksDB.factory, baseDir, ledgerDirStatsLogger); entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, ledgerDirStatsLogger); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 3101dbbe37b..34ec20e9f85 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -171,6 +171,7 @@ public class ServerConfiguration extends AbstractConfiguration Date: Mon, 27 Jun 2022 10:12:45 +0800 Subject: [PATCH 2/6] checkstyle fixed --- .../storage/ldb/LedgerMetadataIndexAsync.java | 4 +-- .../storage/ldb/LedgerMetadataIndexSync.java | 4 +-- .../ldb/LedgerMetadataIndexAsyncTest.java | 24 ++++++++++-------- .../ldb/LedgerMetadataIndexSyncTest.java | 25 +++++++++++-------- 4 files changed, 33 insertions(+), 24 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java index cf78e56aa4e..098c05ddb92 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java @@ -32,8 +32,8 @@ /** * Maintains an index for the ledgers metadata. * - *

Asynchronous write mode class. - *

The key is the ledgerId and the value is the {@link LedgerData} content. + *

Asynchronous write mode class, + * the key is the ledgerId and the value is the {@link LedgerData} content. */ public class LedgerMetadataIndexAsync extends LedgerMetadataIndex { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java index ad5b4c7f7c5..56bb6041eee 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java @@ -32,8 +32,8 @@ /** * Maintains an index for the ledgers metadata. * - *

Synchronous write mode class. - *

The key is the ledgerId and the value is the {@link LedgerData} content. + *

Synchronous write mode class, + * the key is the ledgerId and the value is the {@link LedgerData} content. */ public class LedgerMetadataIndexSync extends LedgerMetadataIndex { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsyncTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsyncTest.java index 65984714d98..8cb4a1f781c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsyncTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsyncTest.java @@ -20,17 +20,17 @@ */ package org.apache.bookkeeper.bookie.storage.ldb; +import static org.junit.Assert.assertEquals; + import com.google.protobuf.ByteString; + +import java.io.File; + import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.NullStatsLogger; import org.junit.Test; -import java.io.File; -import java.io.IOException; - -import static org.junit.Assert.assertEquals; - /** * Unit test for {@link EntryLocationIndex}. */ @@ -49,13 +49,17 @@ public void asyncOperationTest() throws Exception { tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE); // Add some dummy indexes - idx.set(40312, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(true).setMasterKey(ByteString.EMPTY).build()); - idx.set(40313, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(false).setMasterKey(ByteString.EMPTY).build()); + idx.set(40312, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(true) + .setMasterKey(ByteString.EMPTY).build()); + idx.set(40313, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(false) + .setMasterKey(ByteString.EMPTY).build()); idx.flush(); // Add more indexes in a different batch - idx.set(40314, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(true).setMasterKey(ByteString.EMPTY).build()); - idx.set(40315, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(false).setMasterKey(ByteString.EMPTY).build()); + idx.set(40314, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(true) + .setMasterKey(ByteString.EMPTY).build()); + idx.set(40315, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(false) + .setMasterKey(ByteString.EMPTY).build()); idx.flush(); assertEquals(true, idx.get(40312).getExists()); @@ -77,7 +81,7 @@ public void asyncOperationTest() throws Exception { try { idx.get(40313); } catch (Bookie.NoLedgerException e) { - assertEquals(Bookie.NoLedgerException.class,e.getClass()); + assertEquals(Bookie.NoLedgerException.class, e.getClass()); } assertEquals(true, idx.get(40314).getExists()); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSyncTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSyncTest.java index c7b6e444d20..06d595831a0 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSyncTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSyncTest.java @@ -20,23 +20,24 @@ */ package org.apache.bookkeeper.bookie.storage.ldb; +import static org.junit.Assert.assertEquals; + import com.google.protobuf.ByteString; + +import java.io.File; + import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.NullStatsLogger; import org.junit.Test; -import java.io.File; - -import static org.junit.Assert.assertEquals; - /** * Unit test for {@link EntryLocationIndex}. */ public class LedgerMetadataIndexSyncTest { private final ServerConfiguration serverConfiguration = new ServerConfiguration(); - + @Test public void syncOperationTest() throws Exception { File tmpDir = File.createTempFile("bkTest", ".dir"); @@ -49,13 +50,17 @@ public void syncOperationTest() throws Exception { tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE); // Add some dummy indexes - idx.set(40312, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(true).setMasterKey(ByteString.EMPTY).build()); - idx.set(40313, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(false).setMasterKey(ByteString.EMPTY).build()); + idx.set(40312, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(true) + .setMasterKey(ByteString.EMPTY).build()); + idx.set(40313, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(false) + .setMasterKey(ByteString.EMPTY).build()); idx.flush(); // Add more indexes in a different batch - idx.set(40314, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(true).setMasterKey(ByteString.EMPTY).build()); - idx.set(40315, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(false).setMasterKey(ByteString.EMPTY).build()); + idx.set(40314, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(true) + .setMasterKey(ByteString.EMPTY).build()); + idx.set(40315, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(false) + .setMasterKey(ByteString.EMPTY).build()); idx.flush(); assertEquals(true, idx.get(40312).getExists()); @@ -77,7 +82,7 @@ public void syncOperationTest() throws Exception { try { idx.get(40313); } catch (Bookie.NoLedgerException e) { - assertEquals(Bookie.NoLedgerException.class,e.getClass()); + assertEquals(Bookie.NoLedgerException.class, e.getClass()); } assertEquals(true, idx.get(40314).getExists()); From f7736a81cce2ed197e8c6bea8d7d0dd822db5aca Mon Sep 17 00:00:00 2001 From: lushiji Date: Tue, 26 Jul 2022 01:40:06 +0800 Subject: [PATCH 3/6] solve conflicts with the base branch --- .../bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index daccd90a922..7b2128fd684 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -185,9 +185,8 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le readCache = new ReadCache(allocator, readCacheMaxSize); ledgerIndex = LedgerMetadataIndex.newInstance(conf, - KeyValueStorageRocksDB.factory, indexBaseDir, ledgerDirStatsLogger); - - ledgerIndex = new LedgerMetadataIndex(conf, + KeyValueStorageRocksDB.factory, indexBaseDir, ledgerIndexDirStatsLogger); + entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, indexBaseDir, ledgerIndexDirStatsLogger); transientLedgerInfoCache = ConcurrentLongHashMap.newBuilder() From 8ec4d5bb1418b97c996f1b32cdf1bf434246e7d4 Mon Sep 17 00:00:00 2001 From: lushiji Date: Fri, 19 Aug 2022 17:36:43 +0800 Subject: [PATCH 4/6] change sync type --- .../storage/ldb/LedgerMetadataIndex.java | 12 +++++------ .../storage/ldb/LedgerMetadataIndexAsync.java | 20 +++++++++---------- .../storage/ldb/LedgerMetadataIndexSync.java | 18 ++++++++--------- 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java index 98386fe3809..c3d436acaef 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java @@ -310,19 +310,17 @@ private ReentrantLock lockForLedger(long ledgerId) { return locks[Math.abs((int) ledgerId) % locks.length]; } - int getStorageStateFlags() throws IOException { + synchronized int getStorageStateFlags() throws IOException { LongWrapper keyWrapper = LongWrapper.get(); LongWrapper currentWrapper = LongWrapper.get(); try { keyWrapper.set(STORAGE_FLAGS); - synchronized (ledgersDb) { - int current = 0; - if (ledgersDb.get(keyWrapper.array, currentWrapper.array) >= 0) { - current = (int) currentWrapper.getValue(); - } - return current; + int current = 0; + if (ledgersDb.get(keyWrapper.array, currentWrapper.array) >= 0) { + current = (int) currentWrapper.getValue(); } + return current; } finally { keyWrapper.recycle(); currentWrapper.recycle(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java index 098c05ddb92..09487bc2aef 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java @@ -88,7 +88,7 @@ public void removeDeletedLedgers() throws IOException { } @Override - boolean setStorageStateFlags(int expected, int newFlags) throws IOException { + synchronized boolean setStorageStateFlags(int expected, int newFlags) throws IOException { LongWrapper keyWrapper = LongWrapper.get(); LongWrapper currentWrapper = LongWrapper.get(); LongWrapper newFlagsWrapper = LongWrapper.get(); @@ -96,16 +96,14 @@ boolean setStorageStateFlags(int expected, int newFlags) throws IOException { try { keyWrapper.set(STORAGE_FLAGS); newFlagsWrapper.set(newFlags); - synchronized (ledgersDb) { - int current = 0; - if (ledgersDb.get(keyWrapper.array, currentWrapper.array) >= 0) { - current = (int) currentWrapper.getValue(); - } - if (current == expected) { - ledgersDb.put(keyWrapper.array, newFlagsWrapper.array); - ledgersDb.sync(); - return true; - } + int current = 0; + if (ledgersDb.get(keyWrapper.array, currentWrapper.array) >= 0) { + current = (int) currentWrapper.getValue(); + } + if (current == expected) { + ledgersDb.put(keyWrapper.array, newFlagsWrapper.array); + ledgersDb.sync(); + return true; } } finally { keyWrapper.recycle(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java index 56bb6041eee..da5c2522871 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java @@ -104,7 +104,7 @@ public void removeDeletedLedgers() throws IOException { } @Override - boolean setStorageStateFlags(int expected, int newFlags) throws IOException { + synchronized boolean setStorageStateFlags(int expected, int newFlags) throws IOException { LongWrapper keyWrapper = LongWrapper.get(); LongWrapper currentWrapper = LongWrapper.get(); LongWrapper newFlagsWrapper = LongWrapper.get(); @@ -113,15 +113,13 @@ boolean setStorageStateFlags(int expected, int newFlags) throws IOException { try { keyWrapper.set(STORAGE_FLAGS); newFlagsWrapper.set(newFlags); - synchronized (ledgersDb) { - int current = 0; - if (ledgersDb.get(keyWrapper.array, currentWrapper.array) >= 0) { - current = (int) currentWrapper.getValue(); - } - if (current == expected) { - batch.put(keyWrapper.array, newFlagsWrapper.array); - return true; - } + int current = 0; + if (ledgersDb.get(keyWrapper.array, currentWrapper.array) >= 0) { + current = (int) currentWrapper.getValue(); + } + if (current == expected) { + batch.put(keyWrapper.array, newFlagsWrapper.array); + return true; } } finally { try { From 1148a927d4118833be38c6501090ddb35823661d Mon Sep 17 00:00:00 2001 From: lushiji Date: Fri, 19 Aug 2022 19:20:15 +0800 Subject: [PATCH 5/6] checkstyle fix --- .../bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java | 2 -- .../bookie/storage/ldb/LedgerMetadataIndexAsync.java | 1 - .../bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java | 1 - .../bookie/storage/ldb/LedgerMetadataIndexAsyncTest.java | 3 --- .../bookie/storage/ldb/LedgerMetadataIndexSyncTest.java | 2 -- 5 files changed, 9 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java index c3d436acaef..2fa8eaf556f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java @@ -24,7 +24,6 @@ import com.google.common.collect.Iterables; import com.google.protobuf.ByteString; import io.netty.buffer.ByteBuf; - import java.io.Closeable; import java.io.IOException; import java.util.AbstractMap.SimpleEntry; @@ -33,7 +32,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; - import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java index 09487bc2aef..e95ecc7b92e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsync.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.Map.Entry; - import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.StatsLogger; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java index da5c2522871..a127249109b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSync.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.Map.Entry; - import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.StatsLogger; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsyncTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsyncTest.java index 8cb4a1f781c..12f4172a285 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsyncTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsyncTest.java @@ -21,11 +21,8 @@ package org.apache.bookkeeper.bookie.storage.ldb; import static org.junit.Assert.assertEquals; - import com.google.protobuf.ByteString; - import java.io.File; - import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.NullStatsLogger; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSyncTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSyncTest.java index 06d595831a0..729513531e1 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSyncTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexSyncTest.java @@ -23,9 +23,7 @@ import static org.junit.Assert.assertEquals; import com.google.protobuf.ByteString; - import java.io.File; - import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.NullStatsLogger; From 825296ac2f8e992e19f90e04dd1b5ff74ee936ff Mon Sep 17 00:00:00 2001 From: lushiji Date: Sat, 20 Aug 2022 00:11:19 +0800 Subject: [PATCH 6/6] checkstyle fix --- .../bookie/storage/ldb/LedgerMetadataIndexAsyncTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsyncTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsyncTest.java index 12f4172a285..64e53d4940c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsyncTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndexAsyncTest.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.bookie.storage.ldb; import static org.junit.Assert.assertEquals; + import com.google.protobuf.ByteString; import java.io.File; import org.apache.bookkeeper.bookie.Bookie;