Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
HaoranYi committed Nov 9, 2024
1 parent d1d777f commit d9b0c8a
Showing 1 changed file with 43 additions and 81 deletions.
124 changes: 43 additions & 81 deletions accounts-db/src/accounts_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use {
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, Mutex,
},
thread, time,
},
tempfile::tempfile_in,
};
Expand All @@ -39,9 +38,6 @@ struct AccountHashesFile {
/// Writer for hash files
writer: Option<BufWriter<File>>,

/// Reader for hash files
reader: Option<Mutex<BufReader<File>>>,

/// The directory where temporary cache files are put
dir_for_temp_cache_files: PathBuf,

Expand All @@ -50,13 +46,12 @@ struct AccountHashesFile {
}

impl AccountHashesFile {
/// return a mmap reader that can be accessed by slice
/// Return a file reader for the underlying file.
fn get_reader(&mut self) -> Option<Mutex<BufReader<File>>> {
// Drop writer to flush the file to disk
if self.writer.is_some() {
drop(self.writer.take());
}
std::mem::take(&mut self.reader)
std::mem::take(&mut self.writer).map(|writer| {
let reader = BufReader::new(writer.into_inner().unwrap());
Mutex::new(reader)
})
}

/// # hashes stored in this file
Expand All @@ -69,74 +64,27 @@ impl AccountHashesFile {
fn write(&mut self, hash: &Hash) {
if self.writer.is_none() {
// we have hashes to write but no file yet, so create a file that will auto-delete on drop
let get_file = || -> Result<_, std::io::Error> {
let data = tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| {
panic!(
"Unable to create file within {}: {err}",
self.dir_for_temp_cache_files.display()
)
});
Ok(data)
};

// Retry 5 times to allocate the AccountHashesFile. The memory might be fragmented and
// causes memory allocation failure. Therefore, let's retry after failure. Hoping that the
// kernel has the chance to defrag the memory between the retries, and retries succeed.
let mut num_retries = 0;
let data = loop {
num_retries += 1;

match get_file() {
Ok(data) => {
break data;
}
Err(err) => {
info!(
"Unable to create account hashes file within {}: {}, retry counter {}",
self.dir_for_temp_cache_files.display(),
err,
num_retries
);

if num_retries > 5 {
panic!(
"Unable to create account hashes file within {}: after {} retries",
self.dir_for_temp_cache_files.display(),
num_retries
);
}
datapoint_info!(
"retry_account_hashes_file_allocation",
("retry", num_retries, i64)
);
thread::sleep(time::Duration::from_millis(num_retries * 100));
}
}
};

// Now that we have a file, create a writer and reader.
self.writer = Some(BufWriter::new(data.try_clone().unwrap()));
self.reader = Some(Mutex::new(BufReader::new(data)));
let data = tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| {
panic!(
"Unable to create file within {}: {err}",
self.dir_for_temp_cache_files.display()
)
});
// Now that we have a file, create a writer.
self.writer = Some(BufWriter::new(data));
}
self.writer
.as_mut()
.unwrap()
.write_all(hash.as_ref())
.unwrap();
.unwrap_or_else(|err| {
panic!(
"Unable to write file within {}: {err}",
self.dir_for_temp_cache_files.display()
)
});
self.count += 1;
}

#[cfg(test)]
fn read(&self, index: usize) -> std::io::Result<Vec<Hash>> {
let start = std::mem::size_of::<Hash>() * index;
let mut reader = self.reader.as_ref().unwrap().lock().unwrap();
let file_offset_in_bytes = std::mem::size_of::<Hash>() * start;
reader.seek(SeekFrom::Start(file_offset_in_bytes.try_into().unwrap()))?;
let mut result_bytes: Vec<u8> = vec![];
reader.read_to_end(&mut result_bytes)?;
let result: Vec<Hash> = bytemuck::cast_slice(&result_bytes).to_vec();
Ok(result)
}
}

/// parameters to calculate accounts hash
Expand Down Expand Up @@ -1316,7 +1264,6 @@ impl<'a> AccountsHasher<'a> {

let mut hashes = AccountHashesFile {
writer: None,
reader: None,
dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(),
count: 0,
};
Expand Down Expand Up @@ -1539,7 +1486,6 @@ mod tests {
fn new(dir_for_temp_cache_files: PathBuf) -> Self {
Self {
writer: None,
reader: None,
dir_for_temp_cache_files,
count: 0,
}
Expand Down Expand Up @@ -1637,15 +1583,17 @@ mod tests {

// 1 hash
file.write(&hashes[0]);
assert_eq!(&[hashes[0]][..], file.read(0).unwrap());
assert!(file.read(1).is_err());
let reader = file.get_reader();
assert_eq!(&[hashes[0]][..], read(&reader, 0).unwrap());
assert!(read(&reader, 1).unwrap().is_empty());

// multiple hashes
let mut file = AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf());
assert!(file.get_reader().is_none());
hashes.iter().for_each(|hash| file.write(hash));
(0..2).for_each(|i| assert_eq!(&hashes[i..], file.read(i).unwrap()));
assert!(file.read(2).is_err());
let reader = file.get_reader();
(0..2).for_each(|i| assert_eq!(&hashes[i..], read(&reader, i).unwrap()));
assert!(read(&reader, 2).unwrap().is_empty());
}

#[test]
Expand Down Expand Up @@ -1829,17 +1777,31 @@ mod tests {
let slice = convert_to_slice(&temp_vec);
let dir_for_temp_cache_files = tempdir().unwrap();
let accounts_hasher = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf());
let (hashes, lamports) =
let (mut hashes, lamports) =
accounts_hasher.de_dup_accounts_in_parallel(&slice, 0, 1, &HashStats::default());
assert_eq!(&[Hash::default()], &hashes.read(0).unwrap()[..]);
let reader = hashes.get_reader();
assert_eq!(&[Hash::default()], &read(&reader, 0).unwrap()[..]);
assert_eq!(lamports, 1);
}

fn get_vec_vec(hashes: Vec<AccountHashesFile>) -> Vec<Vec<Hash>> {
hashes.into_iter().map(get_vec).collect()
}
fn get_vec(hashes: AccountHashesFile) -> Vec<Hash> {
hashes.read(0).unwrap_or_default()
fn get_vec(mut hashes: AccountHashesFile) -> Vec<Hash> {
let reader = hashes.get_reader();
read(&reader, 0).unwrap_or_default()
}
fn read(reader: &Option<Mutex<BufReader<File>>>, index: usize) -> std::io::Result<Vec<Hash>> {
let file_offset_in_bytes = std::mem::size_of::<Hash>() * index;
if reader.is_none() {
return Ok(vec![]);
}
let mut reader = reader.as_ref().unwrap().lock().unwrap();
reader.seek(SeekFrom::Start(file_offset_in_bytes.try_into().unwrap()))?;
let mut result_bytes: Vec<u8> = vec![];
reader.read_to_end(&mut result_bytes)?;
let result: Vec<Hash> = bytemuck::cast_slice(&result_bytes).to_vec();
Ok(result)
}

#[test]
Expand Down

0 comments on commit d9b0c8a

Please sign in to comment.