diff --git a/accounts-db/src/accounts_hash.rs b/accounts-db/src/accounts_hash.rs index 1a483d45ac1a87..0a7670b3a81179 100644 --- a/accounts-db/src/accounts_hash.rs +++ b/accounts-db/src/accounts_hash.rs @@ -7,7 +7,6 @@ use { }, bytemuck_derive::{Pod, Zeroable}, log::*, - memmap2::MmapMut, rayon::prelude::*, solana_lattice_hash::lt_hash::LtHash, solana_measure::{measure::Measure, measure_us}, @@ -19,159 +18,73 @@ use { sysvar::epoch_schedule::EpochSchedule, }, std::{ - borrow::Borrow, + clone, convert::TryInto, - io::{Seek, SeekFrom, Write}, + fs::File, + io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, - Arc, + Arc, Mutex, }, - thread, time, }, tempfile::tempfile_in, }; pub const MERKLE_FANOUT: usize = 16; -/// 1 file containing account hashes sorted by pubkey, mapped into memory -struct MmapAccountHashesFile { - /// raw slice of `Hash` values. Can be a larger slice than `count` - mmap: MmapMut, - /// # of valid Hash entries in `mmap` - count: usize, -} - -impl MmapAccountHashesFile { - /// return a slice of account hashes starting at 'index' - fn read(&self, index: usize) -> &[Hash] { - let start = std::mem::size_of::() * index; - let end = std::mem::size_of::() * self.count; - let bytes = &self.mmap[start..end]; - bytemuck::cast_slice(bytes) - } - - /// write a hash to the end of mmap file. - fn write(&mut self, hash: &Hash) { - let start = self.count * std::mem::size_of::(); - let end = start + std::mem::size_of::(); - self.mmap[start..end].copy_from_slice(hash.as_ref()); - self.count += 1; - } -} - /// 1 file containing account hashes sorted by pubkey struct AccountHashesFile { - /// The mmap hash file created in the temp directory, which will be deleted on drop. - writer: Option, + /// Writer for hash files created in the temp directory, which will be deleted on drop. + writer: Option>, + + /// Number of hashes in this file + count: usize, } impl AccountHashesFile { - /// create a new AccountHashesFile - fn new(num_hashes: usize, dir_for_temp_cache_files: impl AsRef) -> Self { - if num_hashes == 0 { - return Self { writer: None }; - } - - let capacity = num_hashes * std::mem::size_of::(); - let get_file = || -> Result<_, std::io::Error> { - let mut data = tempfile_in(&dir_for_temp_cache_files).unwrap_or_else(|err| { - panic!( - "Unable to create file within {}: {err}", - dir_for_temp_cache_files.as_ref().display(), - ) - }); - - // Theoretical performance optimization: write a zero to the end of - // the file so that we won't have to resize it later, which may be - // expensive. - assert!(capacity > 0); - data.seek(SeekFrom::Start((capacity - 1) as u64))?; - data.write_all(&[0])?; - data.rewind()?; - data.flush()?; - Ok(data) - }; - - // Retry 5 times to allocate the AccountHashesFile. The memory might be fragmented and - // causes memory allocation failure. Therefore, let's retry after failure. Hoping that the - // kernel has the chance to defrag the memory between the retries, and retries succeed. - let mut num_retries = 0; - let data = loop { - num_retries += 1; - - match get_file() { - Ok(data) => { - break data; - } - Err(err) => { - info!( - "Unable to create account hashes file within {}: {}, retry counter {}", - dir_for_temp_cache_files.as_ref().display(), - err, - num_retries - ); - - if num_retries > 5 { - panic!( - "Unable to create account hashes file within {}: after {} retries", - dir_for_temp_cache_files.as_ref().display(), - num_retries - ); - } - datapoint_info!( - "retry_account_hashes_file_allocation", - ("retry", num_retries, i64) - ); - thread::sleep(time::Duration::from_millis(num_retries * 100)); - } - } - }; - - //UNSAFE: Required to create a Mmap - let map = unsafe { MmapMut::map_mut(&data) }; - let map = map.unwrap_or_else(|e| { - error!( - "Failed to map the data file (size: {}): {}.\n - Please increase sysctl vm.max_map_count or equivalent for your platform.", - capacity, e - ); - std::process::exit(1); + /// Create a new AccountHashesFile with a writer to it. + fn new(dir_for_temp_cache_files: impl AsRef) -> Self { + let file = tempfile_in(dir_for_temp_cache_files.as_ref()).unwrap_or_else(|err| { + panic!( + "Unable to create file within {}: {err}", + dir_for_temp_cache_files.as_ref().display() + ) }); - - let writer = MmapAccountHashesFile { - mmap: map, - count: 0, - }; - - AccountHashesFile { - writer: Some(writer), - } + // Now that we have a file, create a writer. + let writer = Some(BufWriter::new(file)); + Self { writer, count: 0 } } - /// return a mmap reader that can be accessed by slice - /// The reader will be None if there are no hashes in the file. And this function should only be called once after all writes are done. + /// Return a file reader for the underlying file. + /// This function should only be called once after all writes are done. /// After calling this function, the writer will be None. No more writes are allowed. - fn get_reader(&mut self) -> Option { - let mmap = self.writer.take(); - if mmap.is_some() && mmap.as_ref().unwrap().count > 0 { - mmap - } else { - None + fn get_reader(&mut self) -> Option>> { + let writer = self.writer.take(); + if self.count == 0 { + // If there are no hashes, then the file is empty and we should not return a reader. + return None; } + + writer.map(|writer| { + let reader = BufReader::new(writer.into_inner().unwrap()); + Mutex::new(reader) + }) } /// # hashes stored in this file fn count(&self) -> usize { - self.writer - .as_ref() - .map(|writer| writer.count) - .unwrap_or_default() + self.count } /// write 'hash' to the file fn write(&mut self, hash: &Hash) { debug_assert!(self.writer.is_some()); - self.writer.as_mut().unwrap().write(hash); + self.writer + .as_mut() + .unwrap() + .write_all(hash.as_ref()) + .expect("write hash success"); + self.count += 1; } } @@ -393,7 +306,7 @@ struct CumulativeOffsets { #[derive(Default)] struct CumulativeHashesFromFiles { /// source of hashes in order - readers: Vec, + readers: Vec>>, /// look up reader index and offset by overall index cumulative: CumulativeOffsets, } @@ -405,8 +318,8 @@ impl CumulativeHashesFromFiles { let mut readers = Vec::with_capacity(hashes.len()); let cumulative = CumulativeOffsets::new(hashes.into_iter().filter_map(|mut hash_file| { // ignores all hashfiles that have zero entries + let count = hash_file.count(); hash_file.get_reader().map(|reader| { - let count = reader.count; readers.push(reader); count }) @@ -422,13 +335,59 @@ impl CumulativeHashesFromFiles { self.cumulative.total_count } - // return the biggest slice possible that starts at the overall index 'start' - fn get_slice(&self, start: usize) -> &[Hash] { - let (start, offset) = self.cumulative.find(start); + // return the biggest hash data possible that starts at the overall index 'start' up to max buffer size. + // start is the index of hashes + fn get_data(&self, start: usize) -> Box<[Hash]> { + let (start, offset, num_hashes) = self.cumulative.find(start); let data_source_index = offset.index[0]; - let data = &self.readers[data_source_index]; - // unwrap here because we should never ask for data that doesn't exist. If we do, then cumulative calculated incorrectly. - data.read(start) + let mut data = self.readers[data_source_index].lock().unwrap(); + + // unwrap here because we should never ask for data that doesn't exist. + // If we do, then cumulative calculated incorrectly. + let file_offset_in_bytes = std::mem::size_of::() * start; + data.seek(SeekFrom::Start(file_offset_in_bytes.try_into().unwrap())) + .unwrap(); + + #[cfg(test)] + const MAX_BUFFER_SIZE_IN_HASH: usize = 4; // 4 hashes (total 128 bytes) for testing + #[cfg(not(test))] + const MAX_BUFFER_SIZE_IN_HASH: usize = 2 * 1024 * 1024; // 2M hashes (total 64MB bytes) + + let remaining_num_hashes = num_hashes - start; + let num_hashes_to_read = remaining_num_hashes.min(MAX_BUFFER_SIZE_IN_HASH); + let mut hashes = vec![Hash::default(); num_hashes_to_read].into_boxed_slice(); + + // expect read hash success here because the slice that we are reading + // into was generated by accumulate hash file writer and it is + // guaranteed to be of the correct size. Otherwise, there is a bug in + // generating the accumulated hash files. + data.read_exact(bytemuck::must_cast_slice_mut(hashes.as_mut())) + .expect("read hash success"); + + hashes + } +} + +trait AsHashSlice: std::marker::Sync + std::marker::Send + clone::Clone { + fn num_hashes(&self) -> usize; + fn get(&self, i: usize) -> &Hash; +} + +impl AsHashSlice for &[Hash] { + fn num_hashes(&self) -> usize { + self.len() + } + fn get(&self, i: usize) -> &Hash { + &self[i] + } +} + +impl AsHashSlice for Arc> { + fn num_hashes(&self) -> usize { + self.len() + } + fn get(&self, i: usize) -> &Hash { + &self[i] } } @@ -476,11 +435,21 @@ impl CumulativeOffsets { /// given overall start index 'start' /// return ('start', which is the offset into the data source at 'index', /// and 'index', which is the data source to use) - fn find(&self, start: usize) -> (usize, &CumulativeOffset) { - let index = self.find_index(start); - let index = &self.cumulative_offsets[index]; + /// and number of hashes stored in the data source + fn find(&self, start: usize) -> (usize, &CumulativeOffset, usize) { + let i = self.find_index(start); + let index = &self.cumulative_offsets[i]; let start = start - index.start_offset; - (start, index) + + let i_next = i + 1; + let next_start_offset = if i_next == self.cumulative_offsets.len() { + self.total_count + } else { + let next = &self.cumulative_offsets[i_next]; + next.start_offset + }; + let num_hashes = next_start_offset - index.start_offset; + (start, index, num_hashes) } // return the biggest slice possible that starts at 'start' @@ -488,7 +457,7 @@ impl CumulativeOffsets { where U: ExtractSliceFromRawData<'b, T> + 'b, { - let (start, index) = self.find(start); + let (start, index, _) = self.find(start); raw.extract(index, start) } } @@ -621,7 +590,7 @@ impl AccountsHasher<'_> { // This function is designed to allow hashes to be located in multiple, perhaps multiply deep vecs. // The caller provides a function to return a slice from the source data. - fn compute_merkle_root_from_slices<'b, F, T>( + fn compute_merkle_root_from_slices<'b, F, U>( total_hashes: usize, fanout: usize, max_levels_per_pass: Option, @@ -630,8 +599,8 @@ impl AccountsHasher<'_> { ) -> (Hash, Vec) where // returns a slice of hashes starting at the given overall index - F: Fn(usize) -> &'b [T] + std::marker::Sync, - T: Borrow + std::marker::Sync + 'b, + F: Fn(usize) -> U + std::marker::Sync, + U: AsHashSlice + 'b, { if total_hashes == 0 { return (Hasher::default().result(), vec![]); @@ -650,7 +619,7 @@ impl AccountsHasher<'_> { // initial fetch - could return entire slice let data = get_hash_slice_starting_at_index(0); - let data_len = data.len(); + let data_len = data.num_hashes(); let result: Vec<_> = (0..chunks) .into_par_iter() @@ -671,7 +640,7 @@ impl AccountsHasher<'_> { // if we exhaust our data, then we will request a new slice, and data_index resets to 0, the beginning of the new slice let mut data_index = start_index; // source data, which we may refresh when we exhaust - let mut data = data; + let mut data = data.clone(); // len of the source data let mut data_len = data_len; @@ -682,10 +651,10 @@ impl AccountsHasher<'_> { if data_index >= data_len { // we exhausted our data, fetch next slice starting at i data = get_hash_slice_starting_at_index(i); - data_len = data.len(); + data_len = data.num_hashes(); data_index = 0; } - hasher.hash(data[data_index].borrow().as_ref()); + hasher.hash(data.get(data_index).as_ref()); data_index += 1; } } else { @@ -737,10 +706,10 @@ impl AccountsHasher<'_> { if data_index >= data_len { // we exhausted our data, fetch next slice starting at i data = get_hash_slice_starting_at_index(i); - data_len = data.len(); + data_len = data.num_hashes(); data_index = 0; } - hasher_k.hash(data[data_index].borrow().as_ref()); + hasher_k.hash(data.get(data_index).as_ref()); data_index += 1; i += 1; } @@ -1160,7 +1129,7 @@ impl AccountsHasher<'_> { let binner = PubkeyBinCalculator24::new(bins); // working_set hold the lowest items for each slot_group sorted by pubkey descending (min_key is the last) - let (mut working_set, max_inclusive_num_pubkeys) = Self::initialize_dedup_working_set( + let (mut working_set, _max_inclusive_num_pubkeys) = Self::initialize_dedup_working_set( sorted_data_by_pubkey, pubkey_bin, bins, @@ -1168,8 +1137,7 @@ impl AccountsHasher<'_> { stats, ); - let mut hashes = - AccountHashesFile::new(max_inclusive_num_pubkeys, &self.dir_for_temp_cache_files); + let mut hashes = AccountHashesFile::new(&self.dir_for_temp_cache_files); let mut overall_sum: u64 = 0; @@ -1235,7 +1203,7 @@ impl AccountsHasher<'_> { cumulative.total_count(), MERKLE_FANOUT, None, - |start| cumulative.get_slice(start), + |start| Arc::new(cumulative.get_data(start)), None, ); hash_time.stop(); @@ -1479,25 +1447,25 @@ mod tests { fn test_account_hashes_file() { let dir_for_temp_cache_files = tempdir().unwrap(); // 0 hashes - let mut file = AccountHashesFile::new(0, dir_for_temp_cache_files.path()); + let mut file = AccountHashesFile::new(dir_for_temp_cache_files.path()); assert!(file.get_reader().is_none()); let hashes = (0..2) .map(|i| Hash::new_from_array([i; 32])) .collect::>(); // 1 hash - let mut file = AccountHashesFile::new(1, dir_for_temp_cache_files.path()); + let mut file = AccountHashesFile::new(dir_for_temp_cache_files.path()); file.write(&hashes[0]); - let reader = file.get_reader().unwrap(); - assert_eq!(&[hashes[0]][..], reader.read(0)); - assert!(reader.read(1).is_empty()); + let reader = file.get_reader(); + assert_eq!(&[hashes[0]][..], read(&reader, 0).unwrap()); + assert!(read(&reader, 1).unwrap().is_empty()); // multiple hashes - let mut file = AccountHashesFile::new(hashes.len(), dir_for_temp_cache_files.path()); + let mut file = AccountHashesFile::new(dir_for_temp_cache_files.path()); hashes.iter().for_each(|hash| file.write(hash)); - let reader = file.get_reader().unwrap(); - (0..2).for_each(|i| assert_eq!(&hashes[i..], reader.read(i))); - assert!(reader.read(2).is_empty()); + let reader = file.get_reader(); + (0..2).for_each(|i| assert_eq!(&hashes[i..], read(&reader, i).unwrap())); + assert!(read(&reader, 2).unwrap().is_empty()); } #[test] @@ -1511,15 +1479,15 @@ mod tests { let mut combined = Vec::default(); // 0 hashes - let file0 = AccountHashesFile::new(0, dir_for_temp_cache_files.path()); + let file0 = AccountHashesFile::new(dir_for_temp_cache_files.path()); // 1 hash - let mut file1 = AccountHashesFile::new(1, dir_for_temp_cache_files.path()); + let mut file1 = AccountHashesFile::new(dir_for_temp_cache_files.path()); file1.write(&hashes[0]); combined.push(hashes[0]); // multiple hashes - let mut file2 = AccountHashesFile::new(hashes.len(), dir_for_temp_cache_files.path()); + let mut file2 = AccountHashesFile::new(dir_for_temp_cache_files.path()); hashes.iter().for_each(|hash| { file2.write(hash); combined.push(*hash); @@ -1532,9 +1500,9 @@ mod tests { vec![ file0, file1, - AccountHashesFile::new(0, dir_for_temp_cache_files.path()), + AccountHashesFile::new(dir_for_temp_cache_files.path()), file2, - AccountHashesFile::new(0, dir_for_temp_cache_files.path()), + AccountHashesFile::new(dir_for_temp_cache_files.path()), ] } else if permutation == 2 { vec![file1, file2] @@ -1544,8 +1512,8 @@ mod tests { combined.push(one); vec![ file2, - AccountHashesFile::new(0, dir_for_temp_cache_files.path()), - AccountHashesFile::new(0, dir_for_temp_cache_files.path()), + AccountHashesFile::new(dir_for_temp_cache_files.path()), + AccountHashesFile::new(dir_for_temp_cache_files.path()), file1, ] }; @@ -1558,7 +1526,8 @@ mod tests { let mut cumulative_start = start; // read all data while retrieved.len() < (len - start) { - let this_one = cumulative.get_slice(cumulative_start); + let this_one_bytes = cumulative.get_data(cumulative_start); + let this_one = bytemuck::cast_slice(&this_one_bytes); retrieved.extend(this_one.iter()); cumulative_start += this_one.len(); assert_ne!(0, this_one.len()); @@ -1684,7 +1653,8 @@ mod tests { let accounts_hasher = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf()); let (mut hashes, lamports) = accounts_hasher.de_dup_accounts_in_parallel(&slice, 0, 1, &HashStats::default()); - assert_eq!(&[Hash::default()], hashes.get_reader().unwrap().read(0)); + let reader = hashes.get_reader(); + assert_eq!(&[Hash::default()], &read(&reader, 0).unwrap()[..]); assert_eq!(lamports, 1); } @@ -1692,10 +1662,20 @@ mod tests { hashes.into_iter().map(get_vec).collect() } fn get_vec(mut hashes: AccountHashesFile) -> Vec { - hashes - .get_reader() - .map(|r| r.read(0).to_vec()) - .unwrap_or_default() + let reader = hashes.get_reader(); + read(&reader, 0).unwrap_or_default() + } + fn read(reader: &Option>>, index: usize) -> std::io::Result> { + let file_offset_in_bytes = std::mem::size_of::() * index; + if reader.is_none() { + return Ok(vec![]); + } + let mut reader = reader.as_ref().unwrap().lock().unwrap(); + reader.seek(SeekFrom::Start(file_offset_in_bytes.try_into().unwrap()))?; + let mut result_bytes: Vec = vec![]; + reader.read_to_end(&mut result_bytes)?; + let result: Vec = bytemuck::cast_slice(&result_bytes).to_vec(); + Ok(result) } #[test] @@ -2178,7 +2158,7 @@ mod tests { }], total_count: 0, }; - assert_eq!(input.find(0), (0, &input.cumulative_offsets[0])); + assert_eq!(input.find(0), (0, &input.cumulative_offsets[0], 0)); let input = CumulativeOffsets { cumulative_offsets: vec![ @@ -2191,12 +2171,12 @@ mod tests { start_offset: 2, }, ], - total_count: 0, + total_count: 2, }; - assert_eq!(input.find(0), (0, &input.cumulative_offsets[0])); // = first start_offset - assert_eq!(input.find(1), (1, &input.cumulative_offsets[0])); // > first start_offset - assert_eq!(input.find(2), (0, &input.cumulative_offsets[1])); // = last start_offset - assert_eq!(input.find(3), (1, &input.cumulative_offsets[1])); // > last start_offset + assert_eq!(input.find(0), (0, &input.cumulative_offsets[0], 2)); // = first start_offset + assert_eq!(input.find(1), (1, &input.cumulative_offsets[0], 2)); // > first start_offset + assert_eq!(input.find(2), (0, &input.cumulative_offsets[1], 0)); // = last start_offset + assert_eq!(input.find(3), (1, &input.cumulative_offsets[1], 0)); // > last start_offset } #[test] @@ -2502,4 +2482,62 @@ mod tests { 2, // accounts above are in 2 groups ); } + + #[test] + fn test_get_data() { + // Create a temporary directory for test files + let temp_dir = tempdir().unwrap(); + + const MAX_BUFFER_SIZE_IN_BYTES: usize = 128; + let extra_size = 64; + + // Create a test file and write some data to it + let file_path = temp_dir.path().join("test_file"); + let mut file = File::create(&file_path).unwrap(); + let test_data: Vec = (0..(MAX_BUFFER_SIZE_IN_BYTES + extra_size) as u8).collect(); // 128 + 64 bytes of test data + file.write_all(&test_data).unwrap(); + file.seek(SeekFrom::Start(0)).unwrap(); + drop(file); + + let test_data: &[Hash] = bytemuck::cast_slice(&test_data); + + // Create a BufReader for the test file + let file = File::open(&file_path).unwrap(); + let reader = BufReader::new(file); + let readers = vec![Mutex::new(reader)]; + + // Create a CumulativeOffsets instance + let cumulative_offsets = CumulativeOffsets { + cumulative_offsets: vec![CumulativeOffset { + index: [0, 0], + start_offset: 0, + }], + total_count: test_data.len(), + }; + + // Create a CumulativeHashesFromFiles instance + let cumulative_hashes = CumulativeHashesFromFiles { + readers, + cumulative: cumulative_offsets, + }; + + // Test get_data function + // First read MAX_BUFFER_SIZE 128 bytes (4 hashes) + let start_index = 0; + let result = cumulative_hashes.get_data(start_index); + assert_eq!( + result.len(), + MAX_BUFFER_SIZE_IN_BYTES / std::mem::size_of::() + ); + assert_eq!(&test_data[..result.len()], &result[..]); + + // Second read extra 64 bytes (2 hashes) + let start_index = result.len(); + let result = cumulative_hashes.get_data(start_index); + assert_eq!(result.len(), extra_size / std::mem::size_of::()); + assert_eq!( + &test_data[start_index..start_index + result.len()], + &result[..] + ); + } }