diff --git a/accounts-db/src/accounts_hash.rs b/accounts-db/src/accounts_hash.rs index 7a46b4d7fb2740..8df2e19552de73 100644 --- a/accounts-db/src/accounts_hash.rs +++ b/accounts-db/src/accounts_hash.rs @@ -27,7 +27,6 @@ use { atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, }, - thread, time, }, tempfile::tempfile_in, }; @@ -39,9 +38,6 @@ struct AccountHashesFile { /// Writer for hash files writer: Option>, - /// Reader for hash files - reader: Option>>, - /// The directory where temporary cache files are put dir_for_temp_cache_files: PathBuf, @@ -50,13 +46,13 @@ struct AccountHashesFile { } impl AccountHashesFile { - /// return a mmap reader that can be accessed by slice + /// return a buffer FileReader for the underlying file fn get_reader(&mut self) -> Option>> { - // Drop writer to flush the file to disk - if self.writer.is_some() { - drop(self.writer.take()); - } - std::mem::take(&mut self.reader) + std::mem::take(&mut self.writer).map(|writer| { + let file = Some(writer.into_inner().unwrap()); + let reader = BufReader::new(file.unwrap()); + Mutex::new(reader) + }) } /// # hashes stored in this file @@ -69,74 +65,27 @@ impl AccountHashesFile { fn write(&mut self, hash: &Hash) { if self.writer.is_none() { // we have hashes to write but no file yet, so create a file that will auto-delete on drop - let get_file = || -> Result<_, std::io::Error> { - let data = tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| { - panic!( - "Unable to create file within {}: {err}", - self.dir_for_temp_cache_files.display() - ) - }); - Ok(data) - }; - - // Retry 5 times to allocate the AccountHashesFile. The memory might be fragmented and - // causes memory allocation failure. Therefore, let's retry after failure. Hoping that the - // kernel has the chance to defrag the memory between the retries, and retries succeed. - let mut num_retries = 0; - let data = loop { - num_retries += 1; - - match get_file() { - Ok(data) => { - break data; - } - Err(err) => { - info!( - "Unable to create account hashes file within {}: {}, retry counter {}", - self.dir_for_temp_cache_files.display(), - err, - num_retries - ); - - if num_retries > 5 { - panic!( - "Unable to create account hashes file within {}: after {} retries", - self.dir_for_temp_cache_files.display(), - num_retries - ); - } - datapoint_info!( - "retry_account_hashes_file_allocation", - ("retry", num_retries, i64) - ); - thread::sleep(time::Duration::from_millis(num_retries * 100)); - } - } - }; - + let data = tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| { + panic!( + "Unable to create file within {}: {err}", + self.dir_for_temp_cache_files.display() + ) + }); // Now that we have a file, create a writer and reader. - self.writer = Some(BufWriter::new(data.try_clone().unwrap())); - self.reader = Some(Mutex::new(BufReader::new(data))); + self.writer = Some(BufWriter::new(data)); } self.writer .as_mut() .unwrap() .write_all(hash.as_ref()) - .unwrap(); + .unwrap_or_else(|err| { + panic!( + "Unable to write file within {}: {err}", + self.dir_for_temp_cache_files.display() + ) + }); self.count += 1; } - - #[cfg(test)] - fn read(&self, index: usize) -> std::io::Result> { - let start = std::mem::size_of::() * index; - let mut reader = self.reader.as_ref().unwrap().lock().unwrap(); - let file_offset_in_bytes = std::mem::size_of::() * start; - reader.seek(SeekFrom::Start(file_offset_in_bytes.try_into().unwrap()))?; - let mut result_bytes: Vec = vec![]; - reader.read_to_end(&mut result_bytes)?; - let result: Vec = bytemuck::cast_slice(&result_bytes).to_vec(); - Ok(result) - } } /// parameters to calculate accounts hash @@ -1316,7 +1265,6 @@ impl<'a> AccountsHasher<'a> { let mut hashes = AccountHashesFile { writer: None, - reader: None, dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(), count: 0, }; @@ -1539,7 +1487,6 @@ mod tests { fn new(dir_for_temp_cache_files: PathBuf) -> Self { Self { writer: None, - reader: None, dir_for_temp_cache_files, count: 0, } @@ -1637,15 +1584,17 @@ mod tests { // 1 hash file.write(&hashes[0]); - assert_eq!(&[hashes[0]][..], file.read(0).unwrap()); - assert!(file.read(1).is_err()); + let reader = file.get_reader(); + assert_eq!(&[hashes[0]][..], read(&reader, 0).unwrap()); + assert!(read(&reader, 1).unwrap().is_empty()); // multiple hashes let mut file = AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf()); assert!(file.get_reader().is_none()); hashes.iter().for_each(|hash| file.write(hash)); - (0..2).for_each(|i| assert_eq!(&hashes[i..], file.read(i).unwrap())); - assert!(file.read(2).is_err()); + let reader = file.get_reader(); + (0..2).for_each(|i| assert_eq!(&hashes[i..], read(&reader, i).unwrap())); + assert!(read(&reader, 2).unwrap().is_empty()); } #[test] @@ -1829,17 +1778,31 @@ mod tests { let slice = convert_to_slice(&temp_vec); let dir_for_temp_cache_files = tempdir().unwrap(); let accounts_hasher = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf()); - let (hashes, lamports) = + let (mut hashes, lamports) = accounts_hasher.de_dup_accounts_in_parallel(&slice, 0, 1, &HashStats::default()); - assert_eq!(&[Hash::default()], &hashes.read(0).unwrap()[..]); + let reader = hashes.get_reader(); + assert_eq!(&[Hash::default()], &read(&reader, 0).unwrap()[..]); assert_eq!(lamports, 1); } fn get_vec_vec(hashes: Vec) -> Vec> { hashes.into_iter().map(get_vec).collect() } - fn get_vec(hashes: AccountHashesFile) -> Vec { - hashes.read(0).unwrap_or_default() + fn get_vec(mut hashes: AccountHashesFile) -> Vec { + let reader = hashes.get_reader(); + read(&reader, 0).unwrap_or_default() + } + fn read(reader: &Option>>, index: usize) -> std::io::Result> { + let file_offset_in_bytes = std::mem::size_of::() * index; + if reader.is_none() { + return Ok(vec![]); + } + let mut reader = reader.as_ref().unwrap().lock().unwrap(); + reader.seek(SeekFrom::Start(file_offset_in_bytes.try_into().unwrap()))?; + let mut result_bytes: Vec = vec![]; + reader.read_to_end(&mut result_bytes)?; + let result: Vec = bytemuck::cast_slice(&result_bytes).to_vec(); + Ok(result) } #[test]