This repository has been archived by the owner on Mar 3, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 77
Ковальчук Владислав M33371 HW5 #285
Merged
Merged
Changes from 59 commits
Commits
Show all changes
64 commits
Select commit
Hold shift + click to select a range
f429a35
initial commit
fa15007
remove wrong flush
e165123
add null safety
07959cf
checkstyle fix
d9fb355
remove extra ifs & add unsafe methods
d822985
comment fix
4a264e8
just move constant to static
3d3b4a8
Merge branch 'main' into main
incubos f92dce9
review fix
e159d50
Merge remote-tracking branch 'origin/main'
a05eaa3
Merge branch 'main' into main
AlexeyShik 7757e78
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm
5ac9637
initial commit
91f9e50
initial commit
45d7d6a
Merge remote-tracking branch 'origin/main'
Dalvikk 453b290
initial commit
1a2797f
codeclimate
Dalvikk bb55a94
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm into slave-3.0
0c4aa95
codeclimate
b5c482d
codeclimate
82c8638
msdf
535d402
i hate codeclimate
ac821d4
codeclimate please
4a63784
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm
0448e90
Merge branch 'main' into slave-3.0
incubos c044b94
Merge branch 'main' into slave-3.0
Dalvikk 0ac4a24
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm into slave-3.0
bdbfb6e
Merge branch 'main' into slave-3.0
Dalvikk 67a143d
Merge branch 'main' into slave-3.0
incubos 00c7a18
Merge branch 'main' into slave-3.0
incubos 45b09b4
Merge branch 'main' into slave-3.0
incubos 8cdc745
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm into slave-3.0
a5a0281
review fixes
ab1d07f
Merge remote-tracking branch 'origin/slave-3.0' into slave-3.0
2600814
hotfix
bb8c62b
initial commit
f3a2e15
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm into slave-4.0
a3febfc
cringe test
f8df75c
codeclimate fix
fa012ae
Merge branch 'main' of https://github.com/Dalvikk/2023-nosql-lsm into…
ffbd90c
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm into slave-4.0
f88981a
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm into slave-4.0
2b93168
remove serializer
d6fc004
revert
18c1b57
revert 2
ab95570
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm into slave-4.0
3fc588f
review fixes
c5c1d4b
add fo
af8a96f
fix
bfd772e
merge branch slave-6.0
c529e98
fix codeciamte
7184cbd
Merge branch 'main' into slave-5.0
Dalvikk a7012c4
fix
c63ff7a
fix again
b74b778
unused fix
c031fa6
test fix
16d1bb3
final fix
576fba6
final fix
6bd95ea
Merge branch 'main' into slave-5.0
Dalvikk 4352e3a
review fixes
a69e849
Merge remote-tracking branch 'origin/slave-5.0' into slave-5.0
6025bb9
remove accidental changes
eca6c7a
codeclimate fix
3c64dc8
Merge branch 'main' into slave-5.0
Dalvikk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
255 changes: 90 additions & 165 deletions
255
src/main/java/ru/vk/itmo/kovalchukvladislav/AbstractBasedOnSSTableDao.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,223 +1,148 @@ | ||
package ru.vk.itmo.kovalchukvladislav; | ||
|
||
import ru.vk.itmo.Config; | ||
import ru.vk.itmo.Dao; | ||
import ru.vk.itmo.Entry; | ||
import ru.vk.itmo.kovalchukvladislav.model.DaoIterator; | ||
import ru.vk.itmo.kovalchukvladislav.model.EntryExtractor; | ||
import ru.vk.itmo.kovalchukvladislav.model.TableInfo; | ||
import ru.vk.itmo.kovalchukvladislav.model.SimpleDaoLoggerUtility; | ||
import ru.vk.itmo.kovalchukvladislav.storage.InMemoryStorage; | ||
import ru.vk.itmo.kovalchukvladislav.storage.InMemoryStorageImpl; | ||
import ru.vk.itmo.kovalchukvladislav.storage.SSTableStorage; | ||
import ru.vk.itmo.kovalchukvladislav.storage.SSTableStorageImpl; | ||
|
||
import java.io.IOException; | ||
import java.lang.foreign.Arena; | ||
import java.lang.foreign.MemorySegment; | ||
import java.nio.channels.FileChannel; | ||
import java.nio.charset.StandardCharsets; | ||
import java.nio.file.Files; | ||
import java.io.UncheckedIOException; | ||
import java.nio.file.Path; | ||
import java.nio.file.StandardOpenOption; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.logging.Level; | ||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.logging.Logger; | ||
|
||
public abstract class AbstractBasedOnSSTableDao<D, E extends Entry<D>> extends AbstractInMemoryDao<D, E> { | ||
// =================================== | ||
// Constants | ||
// =================================== | ||
public abstract class AbstractBasedOnSSTableDao<D, E extends Entry<D>> implements Dao<D, E> { | ||
private final Logger logger = SimpleDaoLoggerUtility.createLogger(getClass()); | ||
private static final String DB_FILENAME_PREFIX = "db_"; | ||
private static final String METADATA_FILENAME = "metadata"; | ||
private static final String OFFSETS_FILENAME_PREFIX = "offsets_"; | ||
private static final String DB_FILENAME_PREFIX = "db_"; | ||
|
||
// =================================== | ||
// Variables | ||
// =================================== | ||
|
||
private final Path basePath; | ||
private final Arena arena = Arena.ofShared(); | ||
private final long flushThresholdBytes; | ||
private final EntryExtractor<D, E> extractor; | ||
private final SSTableMemorySegmentWriter<D, E> writer; | ||
|
||
// =================================== | ||
// Storages | ||
// =================================== | ||
|
||
private int storagesCount; | ||
private volatile boolean closed; | ||
private final List<MemorySegment> dbMappedSegments; | ||
private final List<MemorySegment> offsetMappedSegments; | ||
private final Logger logger = Logger.getLogger(getClass().getSimpleName()); | ||
private final AtomicBoolean isClosed = new AtomicBoolean(false); | ||
private final AtomicBoolean isFlushingOrCompacting = new AtomicBoolean(false); | ||
private final ExecutorService flushOrCompactQueue = Executors.newSingleThreadExecutor(); | ||
|
||
/** | ||
* В get(), upsert() и compact() для inMemoryStorage и ssTableStorage не требуется синхронизация между собой. | ||
* Исключение составляет только flush() и compact(). | ||
* Следует проследить что на любом этапе оба стораджа в сумме будут иметь полные данные. | ||
*/ | ||
private final InMemoryStorage<D, E> inMemoryStorage; | ||
private final SSTableStorage<D, E> ssTableStorage; | ||
|
||
protected AbstractBasedOnSSTableDao(Config config, EntryExtractor<D, E> extractor) throws IOException { | ||
super(extractor); | ||
this.closed = false; | ||
this.storagesCount = 0; | ||
this.extractor = extractor; | ||
this.flushThresholdBytes = config.flushThresholdBytes(); | ||
this.basePath = Objects.requireNonNull(config.basePath()); | ||
this.dbMappedSegments = new ArrayList<>(); | ||
this.offsetMappedSegments = new ArrayList<>(); | ||
reloadFilesAndMapToSegment(); | ||
this.writer = new SSTableMemorySegmentWriter<>(basePath, DB_FILENAME_PREFIX, OFFSETS_FILENAME_PREFIX, | ||
METADATA_FILENAME, extractor); | ||
logger.setLevel(Level.OFF); // чтобы не засорять вывод в гитхабе, если такое возможно | ||
} | ||
|
||
// =================================== | ||
// Restoring state | ||
// =================================== | ||
|
||
private void reloadFilesAndMapToSegment() throws IOException { | ||
if (!Files.exists(basePath)) { | ||
Files.createDirectory(basePath); | ||
} | ||
logger.info(() -> String.format("Reloading files from %s", basePath)); | ||
List<String> ssTableIds = getSSTableIds(); | ||
for (String ssTableId : ssTableIds) { | ||
readFileAndMapToSegment(ssTableId); | ||
} | ||
logger.info(() -> String.format("Reloaded %d files", storagesCount)); | ||
} | ||
|
||
private void readFileAndMapToSegment(String timestamp) throws IOException { | ||
Path dbPath = basePath.resolve(DB_FILENAME_PREFIX + timestamp); | ||
Path offsetsPath = basePath.resolve(OFFSETS_FILENAME_PREFIX + timestamp); | ||
if (!Files.exists(dbPath) || !Files.exists(offsetsPath)) { | ||
logger.severe(() -> String.format("File under path %s or %s doesn't exists", dbPath, offsetsPath)); | ||
return; | ||
} | ||
|
||
logger.info(() -> String.format("Reading files with timestamp %s", timestamp)); | ||
|
||
try (FileChannel dbChannel = FileChannel.open(dbPath, StandardOpenOption.READ); | ||
FileChannel offsetChannel = FileChannel.open(offsetsPath, StandardOpenOption.READ)) { | ||
|
||
MemorySegment db = dbChannel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(dbPath), arena); | ||
MemorySegment offsets = offsetChannel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(offsetsPath), arena); | ||
dbMappedSegments.add(db); | ||
offsetMappedSegments.add(offsets); | ||
storagesCount++; | ||
} | ||
logger.info(() -> String.format("Successfully read files with %s timestamp", timestamp)); | ||
} | ||
|
||
private List<String> getSSTableIds() throws IOException { | ||
Path metadataPath = basePath.resolve(METADATA_FILENAME); | ||
if (!Files.exists(metadataPath)) { | ||
return Collections.emptyList(); | ||
} | ||
return Files.readAllLines(metadataPath, StandardCharsets.UTF_8); | ||
} | ||
|
||
private Path[] getAllTablesPath() throws IOException { | ||
List<String> ssTableIds = getSSTableIds(); | ||
int size = ssTableIds.size(); | ||
Path[] files = new Path[2 * size]; | ||
|
||
for (int i = 0; i < size; i++) { | ||
String id = ssTableIds.get(i); | ||
files[2 * i] = basePath.resolve(DB_FILENAME_PREFIX + id); | ||
files[2 * i + 1] = basePath.resolve(OFFSETS_FILENAME_PREFIX + id); | ||
} | ||
return files; | ||
this.inMemoryStorage = new InMemoryStorageImpl<>(extractor, config.flushThresholdBytes()); | ||
this.ssTableStorage = new SSTableStorageImpl<>(basePath, METADATA_FILENAME, | ||
DB_FILENAME_PREFIX, OFFSETS_FILENAME_PREFIX, extractor); | ||
} | ||
|
||
// =================================== | ||
// Finding in storage | ||
// =================================== | ||
@Override | ||
public Iterator<E> get(D from, D to) { | ||
Iterator<E> inMemotyIterator = super.get(from, to); | ||
return new DaoIterator<>(from, to, inMemotyIterator, dbMappedSegments, offsetMappedSegments, extractor); | ||
List<Iterator<E>> iterators = new ArrayList<>(); | ||
iterators.addAll(inMemoryStorage.getIterators(from, to)); | ||
iterators.addAll(ssTableStorage.getIterators(from, to)); | ||
return new DaoIterator<>(iterators, extractor); | ||
} | ||
|
||
@Override | ||
public E get(D key) { | ||
E e = dao.get(key); | ||
E e = inMemoryStorage.get(key); | ||
if (e != null) { | ||
return e.value() == null ? null : e; | ||
} | ||
E fromFile = findInStorages(key); | ||
E fromFile = ssTableStorage.get(key); | ||
return (fromFile == null || fromFile.value() == null) ? null : fromFile; | ||
} | ||
|
||
private E findInStorages(D key) { | ||
for (int i = storagesCount - 1; i >= 0; i--) { | ||
MemorySegment storage = dbMappedSegments.get(i); | ||
MemorySegment offsets = offsetMappedSegments.get(i); | ||
|
||
long offset = extractor.findLowerBoundValueOffset(key, storage, offsets); | ||
if (offset == -1) { | ||
continue; | ||
} | ||
D lowerBoundKey = extractor.readValue(storage, offset); | ||
|
||
if (comparator.compare(lowerBoundKey, key) == 0) { | ||
long valueOffset = offset + extractor.size(lowerBoundKey); | ||
D value = extractor.readValue(storage, valueOffset); | ||
return extractor.createEntry(lowerBoundKey, value); | ||
} | ||
@Override | ||
public void upsert(E entry) { | ||
long size = inMemoryStorage.upsertAndGetSize(entry); | ||
if (size >= flushThresholdBytes) { | ||
flush(); | ||
} | ||
return null; | ||
} | ||
|
||
// =================================== | ||
// Some utils | ||
// =================================== | ||
|
||
private TableInfo getInMemoryDaoSizeInfo() { | ||
long size = 0; | ||
for (E entry : dao.values()) { | ||
size += extractor.size(entry); | ||
@Override | ||
public void flush() { | ||
if (!isFlushingOrCompacting.compareAndSet(false, true)) { | ||
logger.info("Flush or compact already in process"); | ||
return; | ||
} | ||
return new TableInfo(dao.size(), size); | ||
} | ||
|
||
private TableInfo getSSTableDaoSizeInfo() { | ||
Iterator<E> allIterator = all(); | ||
long entriesCount = 0; | ||
long daoSize = 0; | ||
|
||
while (allIterator.hasNext()) { | ||
E next = allIterator.next(); | ||
entriesCount++; | ||
daoSize += extractor.size(next); | ||
Callable<String> flushCallable = inMemoryStorage.prepareFlush( | ||
basePath, | ||
DB_FILENAME_PREFIX, | ||
OFFSETS_FILENAME_PREFIX); | ||
if (flushCallable == null) { | ||
isFlushingOrCompacting.set(false); | ||
return; | ||
} | ||
|
||
return new TableInfo(entriesCount, daoSize); | ||
submitFlushAndAddSSTable(flushCallable); | ||
} | ||
|
||
// =================================== | ||
// Flush and close | ||
// =================================== | ||
|
||
@Override | ||
public synchronized void flush() throws IOException { | ||
if (dao.isEmpty()) { | ||
return; | ||
} | ||
writer.flush(dao.values().iterator(), getInMemoryDaoSizeInfo()); | ||
private void submitFlushAndAddSSTable(Callable<String> flushCallable) { | ||
flushOrCompactQueue.execute(() -> { | ||
try { | ||
String newTimestamp = flushCallable.call(); | ||
ssTableStorage.addSSTableId(newTimestamp, true); | ||
inMemoryStorage.completeFlush(); | ||
} catch (Exception e) { | ||
inMemoryStorage.failFlush(); | ||
} finally { | ||
isFlushingOrCompacting.set(false); | ||
} | ||
}); | ||
} | ||
|
||
@Override | ||
public synchronized void close() throws IOException { | ||
if (closed) { | ||
public void close() { | ||
if (!isClosed.compareAndSet(false, true)) { | ||
return; | ||
} | ||
flush(); | ||
if (arena.scope().isAlive()) { | ||
arena.close(); | ||
|
||
flushOrCompactQueue.close(); | ||
try { | ||
String newTimestamp = inMemoryStorage.close(basePath, DB_FILENAME_PREFIX, OFFSETS_FILENAME_PREFIX); | ||
if (newTimestamp != null) { | ||
ssTableStorage.addSSTableId(newTimestamp, false); | ||
} | ||
} catch (Exception e) { | ||
logger.severe(() -> "Error while flushing on close: " + e.getMessage()); | ||
} | ||
closed = true; | ||
ssTableStorage.close(); | ||
} | ||
|
||
@Override | ||
public synchronized void compact() throws IOException { | ||
if (storagesCount <= 1 && dao.isEmpty()) { | ||
public void compact() { | ||
if (!isFlushingOrCompacting.compareAndSet(false, true)) { | ||
logger.info("Flush or compact already in process"); | ||
return; | ||
} | ||
Path[] oldTables = getAllTablesPath(); | ||
writer.compact(all(), getSSTableDaoSizeInfo()); | ||
writer.deleteUnusedFiles(oldTables); | ||
flushOrCompactQueue.execute(() -> { | ||
try { | ||
ssTableStorage.compact(); | ||
} catch (IOException e) { | ||
throw new UncheckedIOException(e); | ||
} finally { | ||
isFlushingOrCompacting.set(false); | ||
} | ||
}); | ||
} | ||
} |
44 changes: 0 additions & 44 deletions
44
src/main/java/ru/vk/itmo/kovalchukvladislav/AbstractInMemoryDao.java
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Получается, что если хранилище переполненно и кто-то в другом потоке сделал
flush
, то у тебя полетит ошибка? И ты никак это не отлавливаешь?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Не полетят если flush поспевает за вставкой данных.
Допустим, сейчас на диск ничего не флашится и хранилище переполнено.
Вызвали одновременно flush() и upsert().
Внутри flush() под writeLock'ом меняется state на RUNNING, очищается мапа и размер становится 0.
Внутри upsert() кидаются MemoryOverflowException, если размер превышет лимит и state RUNNING или FAILED. Размер и state читается под readLock'ом.
Благодаря локам все будет корректно.
Ошибку кидать разумно, так как в случае RUNNING flush() уже выполняется, и пока он выполняется мы успели опять накопить полную таблицу, не успеваем. В случае FAILED, произошла ошибка при flush(), из памяти мы не ничего удаляли чтобы не потерять. Тоже кидаем MemoryOverflowException, так как есть полная таблица и таблица на flush, что превывает лимит.