Skip to content
This repository has been archived by the owner on Mar 3, 2024. It is now read-only.

Ковальчук Владислав M33371 HW5 #285

Merged
merged 64 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
f429a35
initial commit
Sep 28, 2023
fa15007
remove wrong flush
Sep 28, 2023
e165123
add null safety
Sep 28, 2023
07959cf
checkstyle fix
Sep 28, 2023
d9fb355
remove extra ifs & add unsafe methods
Sep 28, 2023
d822985
comment fix
Sep 28, 2023
4a264e8
just move constant to static
Sep 28, 2023
3d3b4a8
Merge branch 'main' into main
incubos Sep 28, 2023
f92dce9
review fix
Sep 28, 2023
e159d50
Merge remote-tracking branch 'origin/main'
Sep 28, 2023
a05eaa3
Merge branch 'main' into main
AlexeyShik Oct 2, 2023
7757e78
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm
Oct 4, 2023
5ac9637
initial commit
Oct 4, 2023
91f9e50
initial commit
Oct 4, 2023
45d7d6a
Merge remote-tracking branch 'origin/main'
Dalvikk Oct 5, 2023
453b290
initial commit
Oct 4, 2023
1a2797f
codeclimate
Dalvikk Oct 5, 2023
bb55a94
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm into slave-3.0
Oct 18, 2023
0c4aa95
codeclimate
Oct 18, 2023
b5c482d
codeclimate
Oct 18, 2023
82c8638
msdf
Oct 18, 2023
535d402
i hate codeclimate
Oct 18, 2023
ac821d4
codeclimate please
Oct 18, 2023
4a63784
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm
Oct 19, 2023
0448e90
Merge branch 'main' into slave-3.0
incubos Oct 19, 2023
c044b94
Merge branch 'main' into slave-3.0
Dalvikk Oct 20, 2023
0ac4a24
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm into slave-3.0
Oct 20, 2023
bdbfb6e
Merge branch 'main' into slave-3.0
Dalvikk Oct 20, 2023
67a143d
Merge branch 'main' into slave-3.0
incubos Oct 20, 2023
00c7a18
Merge branch 'main' into slave-3.0
incubos Oct 20, 2023
45b09b4
Merge branch 'main' into slave-3.0
incubos Oct 20, 2023
8cdc745
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm into slave-3.0
Nov 1, 2023
a5a0281
review fixes
Nov 1, 2023
ab1d07f
Merge remote-tracking branch 'origin/slave-3.0' into slave-3.0
Nov 1, 2023
2600814
hotfix
Nov 1, 2023
bb8c62b
initial commit
Nov 1, 2023
f3a2e15
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm into slave-4.0
Nov 1, 2023
a3febfc
cringe test
Nov 1, 2023
f8df75c
codeclimate fix
Nov 1, 2023
fa012ae
Merge branch 'main' of https://github.com/Dalvikk/2023-nosql-lsm into…
Nov 21, 2023
ffbd90c
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm into slave-4.0
Nov 21, 2023
f88981a
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm into slave-4.0
Nov 22, 2023
2b93168
remove serializer
Nov 22, 2023
d6fc004
revert
Nov 22, 2023
18c1b57
revert 2
Nov 22, 2023
ab95570
Merge branch 'main' of github.com:polis-vk/2023-nosql-lsm into slave-4.0
Nov 23, 2023
3fc588f
review fixes
Nov 23, 2023
c5c1d4b
add fo
Nov 22, 2023
af8a96f
fix
Nov 29, 2023
bfd772e
merge branch slave-6.0
Dec 18, 2023
c529e98
fix codeciamte
Dec 18, 2023
7184cbd
Merge branch 'main' into slave-5.0
Dalvikk Dec 25, 2023
a7012c4
fix
Dec 25, 2023
c63ff7a
fix again
Dec 25, 2023
b74b778
unused fix
Dec 25, 2023
c031fa6
test fix
Dec 25, 2023
16d1bb3
final fix
Dec 25, 2023
576fba6
final fix
Dec 26, 2023
6bd95ea
Merge branch 'main' into slave-5.0
Dalvikk Dec 26, 2023
4352e3a
review fixes
Dec 28, 2023
a69e849
Merge remote-tracking branch 'origin/slave-5.0' into slave-5.0
Dec 28, 2023
6025bb9
remove accidental changes
Dec 28, 2023
eca6c7a
codeclimate fix
Dec 28, 2023
3c64dc8
Merge branch 'main' into slave-5.0
Dalvikk Dec 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,223 +1,148 @@
package ru.vk.itmo.kovalchukvladislav;

import ru.vk.itmo.Config;
import ru.vk.itmo.Dao;
import ru.vk.itmo.Entry;
import ru.vk.itmo.kovalchukvladislav.model.DaoIterator;
import ru.vk.itmo.kovalchukvladislav.model.EntryExtractor;
import ru.vk.itmo.kovalchukvladislav.model.TableInfo;
import ru.vk.itmo.kovalchukvladislav.model.SimpleDaoLoggerUtility;
import ru.vk.itmo.kovalchukvladislav.storage.InMemoryStorage;
import ru.vk.itmo.kovalchukvladislav.storage.InMemoryStorageImpl;
import ru.vk.itmo.kovalchukvladislav.storage.SSTableStorage;
import ru.vk.itmo.kovalchukvladislav.storage.SSTableStorageImpl;

import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.logging.Level;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

public abstract class AbstractBasedOnSSTableDao<D, E extends Entry<D>> extends AbstractInMemoryDao<D, E> {
// ===================================
// Constants
// ===================================
public abstract class AbstractBasedOnSSTableDao<D, E extends Entry<D>> implements Dao<D, E> {
private final Logger logger = SimpleDaoLoggerUtility.createLogger(getClass());
private static final String DB_FILENAME_PREFIX = "db_";
private static final String METADATA_FILENAME = "metadata";
private static final String OFFSETS_FILENAME_PREFIX = "offsets_";
private static final String DB_FILENAME_PREFIX = "db_";

// ===================================
// Variables
// ===================================

private final Path basePath;
private final Arena arena = Arena.ofShared();
private final long flushThresholdBytes;
private final EntryExtractor<D, E> extractor;
private final SSTableMemorySegmentWriter<D, E> writer;

// ===================================
// Storages
// ===================================

private int storagesCount;
private volatile boolean closed;
private final List<MemorySegment> dbMappedSegments;
private final List<MemorySegment> offsetMappedSegments;
private final Logger logger = Logger.getLogger(getClass().getSimpleName());
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicBoolean isFlushingOrCompacting = new AtomicBoolean(false);
private final ExecutorService flushOrCompactQueue = Executors.newSingleThreadExecutor();

/**
* В get(), upsert() и compact() для inMemoryStorage и ssTableStorage не требуется синхронизация между собой.
* Исключение составляет только flush() и compact().
* Следует проследить что на любом этапе оба стораджа в сумме будут иметь полные данные.
*/
private final InMemoryStorage<D, E> inMemoryStorage;
private final SSTableStorage<D, E> ssTableStorage;

protected AbstractBasedOnSSTableDao(Config config, EntryExtractor<D, E> extractor) throws IOException {
super(extractor);
this.closed = false;
this.storagesCount = 0;
this.extractor = extractor;
this.flushThresholdBytes = config.flushThresholdBytes();
this.basePath = Objects.requireNonNull(config.basePath());
this.dbMappedSegments = new ArrayList<>();
this.offsetMappedSegments = new ArrayList<>();
reloadFilesAndMapToSegment();
this.writer = new SSTableMemorySegmentWriter<>(basePath, DB_FILENAME_PREFIX, OFFSETS_FILENAME_PREFIX,
METADATA_FILENAME, extractor);
logger.setLevel(Level.OFF); // чтобы не засорять вывод в гитхабе, если такое возможно
}

// ===================================
// Restoring state
// ===================================

private void reloadFilesAndMapToSegment() throws IOException {
if (!Files.exists(basePath)) {
Files.createDirectory(basePath);
}
logger.info(() -> String.format("Reloading files from %s", basePath));
List<String> ssTableIds = getSSTableIds();
for (String ssTableId : ssTableIds) {
readFileAndMapToSegment(ssTableId);
}
logger.info(() -> String.format("Reloaded %d files", storagesCount));
}

private void readFileAndMapToSegment(String timestamp) throws IOException {
Path dbPath = basePath.resolve(DB_FILENAME_PREFIX + timestamp);
Path offsetsPath = basePath.resolve(OFFSETS_FILENAME_PREFIX + timestamp);
if (!Files.exists(dbPath) || !Files.exists(offsetsPath)) {
logger.severe(() -> String.format("File under path %s or %s doesn't exists", dbPath, offsetsPath));
return;
}

logger.info(() -> String.format("Reading files with timestamp %s", timestamp));

try (FileChannel dbChannel = FileChannel.open(dbPath, StandardOpenOption.READ);
FileChannel offsetChannel = FileChannel.open(offsetsPath, StandardOpenOption.READ)) {

MemorySegment db = dbChannel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(dbPath), arena);
MemorySegment offsets = offsetChannel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(offsetsPath), arena);
dbMappedSegments.add(db);
offsetMappedSegments.add(offsets);
storagesCount++;
}
logger.info(() -> String.format("Successfully read files with %s timestamp", timestamp));
}

private List<String> getSSTableIds() throws IOException {
Path metadataPath = basePath.resolve(METADATA_FILENAME);
if (!Files.exists(metadataPath)) {
return Collections.emptyList();
}
return Files.readAllLines(metadataPath, StandardCharsets.UTF_8);
}

private Path[] getAllTablesPath() throws IOException {
List<String> ssTableIds = getSSTableIds();
int size = ssTableIds.size();
Path[] files = new Path[2 * size];

for (int i = 0; i < size; i++) {
String id = ssTableIds.get(i);
files[2 * i] = basePath.resolve(DB_FILENAME_PREFIX + id);
files[2 * i + 1] = basePath.resolve(OFFSETS_FILENAME_PREFIX + id);
}
return files;
this.inMemoryStorage = new InMemoryStorageImpl<>(extractor, config.flushThresholdBytes());
this.ssTableStorage = new SSTableStorageImpl<>(basePath, METADATA_FILENAME,
DB_FILENAME_PREFIX, OFFSETS_FILENAME_PREFIX, extractor);
}

// ===================================
// Finding in storage
// ===================================
@Override
public Iterator<E> get(D from, D to) {
Iterator<E> inMemotyIterator = super.get(from, to);
return new DaoIterator<>(from, to, inMemotyIterator, dbMappedSegments, offsetMappedSegments, extractor);
List<Iterator<E>> iterators = new ArrayList<>();
iterators.addAll(inMemoryStorage.getIterators(from, to));
iterators.addAll(ssTableStorage.getIterators(from, to));
return new DaoIterator<>(iterators, extractor);
}

@Override
public E get(D key) {
E e = dao.get(key);
E e = inMemoryStorage.get(key);
if (e != null) {
return e.value() == null ? null : e;
}
E fromFile = findInStorages(key);
E fromFile = ssTableStorage.get(key);
return (fromFile == null || fromFile.value() == null) ? null : fromFile;
}

private E findInStorages(D key) {
for (int i = storagesCount - 1; i >= 0; i--) {
MemorySegment storage = dbMappedSegments.get(i);
MemorySegment offsets = offsetMappedSegments.get(i);

long offset = extractor.findLowerBoundValueOffset(key, storage, offsets);
if (offset == -1) {
continue;
}
D lowerBoundKey = extractor.readValue(storage, offset);

if (comparator.compare(lowerBoundKey, key) == 0) {
long valueOffset = offset + extractor.size(lowerBoundKey);
D value = extractor.readValue(storage, valueOffset);
return extractor.createEntry(lowerBoundKey, value);
}
@Override
public void upsert(E entry) {
long size = inMemoryStorage.upsertAndGetSize(entry);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Получается, что если хранилище переполненно и кто-то в другом потоке сделал flush, то у тебя полетит ошибка? И ты никак это не отлавливаешь?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не полетят если flush поспевает за вставкой данных.

Допустим, сейчас на диск ничего не флашится и хранилище переполнено.
Вызвали одновременно flush() и upsert().
Внутри flush() под writeLock'ом меняется state на RUNNING, очищается мапа и размер становится 0.
Внутри upsert() кидаются MemoryOverflowException, если размер превышет лимит и state RUNNING или FAILED. Размер и state читается под readLock'ом.

Благодаря локам все будет корректно.
Ошибку кидать разумно, так как в случае RUNNING flush() уже выполняется, и пока он выполняется мы успели опять накопить полную таблицу, не успеваем. В случае FAILED, произошла ошибка при flush(), из памяти мы не ничего удаляли чтобы не потерять. Тоже кидаем MemoryOverflowException, так как есть полная таблица и таблица на flush, что превывает лимит.

if (size >= flushThresholdBytes) {
flush();
}
return null;
}

// ===================================
// Some utils
// ===================================

private TableInfo getInMemoryDaoSizeInfo() {
long size = 0;
for (E entry : dao.values()) {
size += extractor.size(entry);
@Override
public void flush() {
if (!isFlushingOrCompacting.compareAndSet(false, true)) {
logger.info("Flush or compact already in process");
return;
}
return new TableInfo(dao.size(), size);
}

private TableInfo getSSTableDaoSizeInfo() {
Iterator<E> allIterator = all();
long entriesCount = 0;
long daoSize = 0;

while (allIterator.hasNext()) {
E next = allIterator.next();
entriesCount++;
daoSize += extractor.size(next);
Callable<String> flushCallable = inMemoryStorage.prepareFlush(
basePath,
DB_FILENAME_PREFIX,
OFFSETS_FILENAME_PREFIX);
if (flushCallable == null) {
isFlushingOrCompacting.set(false);
return;
}

return new TableInfo(entriesCount, daoSize);
submitFlushAndAddSSTable(flushCallable);
}

// ===================================
// Flush and close
// ===================================

@Override
public synchronized void flush() throws IOException {
if (dao.isEmpty()) {
return;
}
writer.flush(dao.values().iterator(), getInMemoryDaoSizeInfo());
private void submitFlushAndAddSSTable(Callable<String> flushCallable) {
flushOrCompactQueue.execute(() -> {
try {
String newTimestamp = flushCallable.call();
ssTableStorage.addSSTableId(newTimestamp, true);
inMemoryStorage.completeFlush();
} catch (Exception e) {
inMemoryStorage.failFlush();
} finally {
isFlushingOrCompacting.set(false);
}
});
}

@Override
public synchronized void close() throws IOException {
if (closed) {
public void close() {
if (!isClosed.compareAndSet(false, true)) {
return;
}
flush();
if (arena.scope().isAlive()) {
arena.close();

flushOrCompactQueue.close();
try {
String newTimestamp = inMemoryStorage.close(basePath, DB_FILENAME_PREFIX, OFFSETS_FILENAME_PREFIX);
if (newTimestamp != null) {
ssTableStorage.addSSTableId(newTimestamp, false);
}
} catch (Exception e) {
logger.severe(() -> "Error while flushing on close: " + e.getMessage());
}
closed = true;
ssTableStorage.close();
}

@Override
public synchronized void compact() throws IOException {
if (storagesCount <= 1 && dao.isEmpty()) {
public void compact() {
if (!isFlushingOrCompacting.compareAndSet(false, true)) {
logger.info("Flush or compact already in process");
return;
}
Path[] oldTables = getAllTablesPath();
writer.compact(all(), getSSTableDaoSizeInfo());
writer.deleteUnusedFiles(oldTables);
flushOrCompactQueue.execute(() -> {
try {
ssTableStorage.compact();
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
isFlushingOrCompacting.set(false);
}
});
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public long size(MemorySegment value) {

@Override
public long size(Entry<MemorySegment> entry) {
if (entry == null) {
return 0;
}
return size(entry.key()) + size(entry.value());
}

Expand Down
Loading
Loading