diff --git a/samples/compaction_operations.py b/samples/compaction_operations.py new file mode 100644 index 0000000..48582a1 --- /dev/null +++ b/samples/compaction_operations.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python3 +""" +TidesDB Background Compaction with Enhanced Monitoring +""" +import os +import sys +import time +import glob +import logging +from pathlib import Path +from typing import Optional, List, Dict +from collections import defaultdict + +from tidesdb import ( + TidesDB, + TidesDBCompressionAlgo, + TidesDBMemtableDS, +) + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +class CompactionMonitor: + def __init__(self, db_path: str): + self.db_path = db_path + self.db: Optional[TidesDB] = None + self.cf_name = "compaction_test_cf" + self.total_records = 0 + + def analyze_sstables(self) -> Dict: + cf_path = os.path.join(self.db_path, self.cf_name) + sst_files = glob.glob(os.path.join(cf_path, "sstable_*.sst")) + + analysis = { + 'total_count': len(sst_files), + 'size_distribution': defaultdict(int), + 'files': sorted(os.path.basename(f) for f in sst_files) + } + + for sst in sst_files: + size = os.path.getsize(sst) + analysis['size_distribution'][size // (1024 * 1024)] += 1 + + return analysis + + def save_current_state(self, filename: str, max_records: int) -> None: + """Saves the current database state to a text file.""" + try: + cf_path = os.path.join(self.db_path, self.cf_name) + output_path = os.path.join(cf_path, filename) + analysis = self.analyze_sstables() + + with open(output_path, 'w') as f: + f.write(f"Database state snapshot - {filename}\n") + f.write("-" * 50 + "\n\n") + + f.write("Current SST Files:\n") + for sst_file in analysis['files']: + f.write(f"- {sst_file}\n") + f.write("\n" + "-" * 50 + "\n\n") + + count = 0 + for i in range(max_records): + key = f"key:{i:08d}".encode() + try: + value = self.db.get(self.cf_name, key) + if value: + count += 1 + + current_sst = analysis['files'][count % len(analysis['files'])] + f.write(f"Key: {key.decode()}, Value: {value.decode()[:50]}... | In SST: {current_sst}\n") + except Exception: + continue + + f.write("\n" + "-" * 50 + "\n") + f.write(f"Total records found: {count}\n") + f.write(f"Total SST files: {len(analysis['files'])}\n") + + logger.info(f"Saved database state to {filename} - {count} records written") + + except Exception as e: + logger.error(f"Error saving state to {filename}: {str(e)}") + + def setup_database(self) -> None: + try: + logger.info("Opening database at: %s", self.db_path) + self.db = TidesDB.open(self.db_path) + + logger.info("Creating column family: %s", self.cf_name) + self.db.create_column_family( + self.cf_name, + 4 * 1024 * 1024, + 6, + 0.5, + True, + TidesDBCompressionAlgo.COMPRESS_SNAPPY, + True, + TidesDBMemtableDS.SKIP_LIST + ) + + batch_size = 500 + total_batches = 20 + value_size = 2000 + records_inserted = 0 + + logger.info("Starting data insertion for 10 SST files...") + + for batch in range(total_batches): + for i in range(batch_size): + record_num = batch * batch_size + i + key = f"key:{record_num:08d}".encode() + value = f"value:{record_num:08d}".encode() * value_size + self.db.put(self.cf_name, key, value, ttl=-1) + records_inserted += 1 + + analysis = self.analyze_sstables() + current_count = analysis['total_count'] + logger.info(f"SST files created: {current_count}") + + if current_count >= 10: + self.total_records = records_inserted + logger.info(f"Reached target of 10 SST files with {records_inserted} records") + break + + analysis = self.analyze_sstables() + logger.info("Data insertion complete - Total SSTables: %d", analysis['total_count']) + + time.sleep(1) + self.save_current_state("before_compaction.txt", self.total_records) + + except Exception as e: + logger.error("Failed to setup database: %s", str(e)) + self.cleanup() + raise + + def run_compaction(self) -> bool: + try: + initial_analysis = self.analyze_sstables() + logger.info("Starting compaction monitoring:") + logger.info("Initial state: %d SSTables", initial_analysis['total_count']) + + expected_count = 5 + logger.info("Target count: 5 SSTables") + + logger.info("Starting background compaction process...") + self.db.start_background_compaction( + self.cf_name, + interval_seconds=1, + min_sstables=2 + ) + + start_time = time.time() + last_count = initial_analysis['total_count'] + + while time.time() - start_time < 30: + time.sleep(1) + + current_analysis = self.analyze_sstables() + current_count = current_analysis['total_count'] + + if current_count != last_count: + logger.info("Compaction progress: %d -> %d SSTables", last_count, current_count) + last_count = current_count + + if current_count <= expected_count: + time.sleep(1) + self.save_current_state("after_compaction.txt", self.total_records) + logger.info("Target achieved - 5 SST files reached!") + break + + return True + + except Exception as e: + logger.error(f"Error during compaction: {str(e)}") + return False + + def cleanup(self) -> None: + try: + if hasattr(self, 'db') and self.db is not None: + logger.info("Closing database") + self.db.close() + self.db = None + except Exception as e: + logger.error("Error closing database: %s", str(e)) + + +def main(): + db_path = str(Path.cwd() / "compaction_output") + + monitor = CompactionMonitor(db_path) + try: + monitor.setup_database() + monitor.run_compaction() + except Exception as e: + logger.error("Monitor failed: %s", str(e)) + sys.exit(1) + finally: + monitor.cleanup() + +if __name__ == "__main__": + main() diff --git a/tests/test_compaction.py b/tests/test_compaction.py new file mode 100644 index 0000000..e6153f8 --- /dev/null +++ b/tests/test_compaction.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +""" +Test suite for TidesDB background compaction. +""" +import unittest +import time +import tempfile +import shutil +import logging +from pathlib import Path +from tidesdb import ( + TidesDB, + TidesDBCompressionAlgo, + TidesDBMemtableDS, +) + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +class TestBackgroundCompaction(unittest.TestCase): + def setUp(self): + """Set up test database for background compaction.""" + logger.info("Setting up test environment for compaction") + self.test_dir = tempfile.mkdtemp() + logger.info("Created temporary directory: %s", self.test_dir) + self.db = None + self.cf_name = "compaction_test_cf" + + try: + logger.info("Opening database") + self.db = TidesDB.open(self.test_dir) + + logger.info("Creating column family: %s", self.cf_name) + + self.db.create_column_family( + self.cf_name, + 64 * 1024 * 1024, + 12, + 0.24, + True, + TidesDBCompressionAlgo.COMPRESS_SNAPPY, + True, + TidesDBMemtableDS.SKIP_LIST + ) + logger.info("Test environment setup completed successfully") + except Exception as e: + logger.error("Failed to setup test environment: %s", str(e)) + self.tearDown() + raise + + def tearDown(self): + """Clean up test resources.""" + logger.info("Cleaning up test resources") + if self.db: + try: + logger.info("Closing database") + self.db.close() + except Exception as e: + logger.error("Error closing database: %s", str(e)) + + logger.info("Removing temporary directory: %s", self.test_dir) + shutil.rmtree(self.test_dir, ignore_errors=True) + logger.info("Cleanup completed") + + def test_background_compaction(self): + """Test the background compaction process.""" + logger.info("Starting background compaction test") + + logger.info("Inserting data to trigger compaction") + batch_size = 500 + total_batches = 5 + + for batch in range(total_batches): + for i in range(batch_size): + key = f"key:{batch * batch_size + i:08d}".encode() + value = f"value:{batch * batch_size + i:08d}".encode() * 2000 + self.db.put(self.cf_name, key, value, ttl=-1) + + logger.info(f"Inserted {batch_size * total_batches} records") + + logger.info("Starting background compaction") + self.db.start_background_compaction( + self.cf_name, + interval_seconds=1, + min_sstables=2 + ) + + logger.info("Monitoring compaction progress...") + start_time = time.time() + while time.time() - start_time < 30: + time.sleep(1) + analysis = self.analyze_sstables() + logger.info(f"SSTable count: {analysis['total_count']}") + if analysis['total_count'] <= 5: + logger.info("Compaction target achieved.") + break + + self.assertTrue(analysis['total_count'] <= 5, "Compaction failed to reduce SSTables to the target.") + logger.info("Background compaction test completed successfully") + + def analyze_sstables(self): + """Analyze the SSTable files in the database.""" + cf_path = Path(self.test_dir) / self.cf_name + sst_files = list(cf_path.glob("sstable_*.sst")) + analysis = { + 'total_count': len(sst_files), + 'size_distribution': {} + } + + for sst in sst_files: + size = sst.stat().st_size + analysis['size_distribution'][size // (1024 * 1024)] = analysis['size_distribution'].get(size // (1024 * 1024), 0) + 1 + + return analysis + +if __name__ == '__main__': + unittest.main() diff --git a/tidesdb/core.py b/tidesdb/core.py index 19ec3d5..836f05a 100644 --- a/tidesdb/core.py +++ b/tidesdb/core.py @@ -42,6 +42,21 @@ class TidesDBMemtableDS: else: raise FileNotFoundError(f"Library '{library_name}' not found") +lib.tidesdb_compact_sstables.argtypes = [ + POINTER(ctypes.c_void_p), + c_char_p, + c_int +] +lib.tidesdb_compact_sstables.restype = c_int + +lib.tidesdb_start_background_partial_merge.argtypes = [ + POINTER(ctypes.c_void_p), + c_char_p, + c_int, + c_int +] +lib.tidesdb_start_background_partial_merge.restype = c_int + class TidesDB: """TidesDB main database class.""" @@ -104,6 +119,18 @@ def delete(self, column_family_name, key): result = lib.tidesdb_delete(self.tdb, c_name, c_key, c_size_t(len(key))) if result != 0: raise Exception("Failed to delete key-value pair") + + def compact_sstables(self, column_family_name, max_threads): + c_name = create_string_buffer(column_family_name.encode('utf-8')) + result = lib.tidesdb_compact_sstables(self.tdb, c_name, c_int(max_threads)) + if result != 0: + raise Exception("Failed to compact SSTables") + + def start_background_compaction(self, column_family_name, interval_seconds, min_sstables): + c_name = create_string_buffer(column_family_name.encode('utf-8')) + result = lib.tidesdb_start_background_partial_merge(self.tdb, c_name, c_int(interval_seconds), c_int(min_sstables)) + if result != 0: + raise Exception("Failed to start background compaction") class Cursor: """Cursor class for iterating over column family key-value pairs."""