diff --git a/rs/config/src/execution_environment.rs b/rs/config/src/execution_environment.rs index db3346376c03..9b3f841eb6a0 100644 --- a/rs/config/src/execution_environment.rs +++ b/rs/config/src/execution_environment.rs @@ -365,8 +365,8 @@ pub struct Config { /// Enables the replicated inter-canister calls to `fetch_canister_logs`. pub replicated_inter_canister_log_fetch: FlagStatus, - /// Enables filtering by range in `fetch_canister_logs`. - pub fetch_canister_logs_filter: FlagStatus, + /// Enables the log memory store feature. + pub log_memory_store_feature: FlagStatus, } impl Default for Config { @@ -455,7 +455,7 @@ impl Default for Config { max_environment_variable_name_length: MAX_ENVIRONMENT_VARIABLE_NAME_LENGTH, max_environment_variable_value_length: MAX_ENVIRONMENT_VARIABLE_VALUE_LENGTH, replicated_inter_canister_log_fetch: FlagStatus::Disabled, - fetch_canister_logs_filter: FlagStatus::Disabled, + log_memory_store_feature: FlagStatus::Disabled, } } } diff --git a/rs/embedders/src/wasmtime_embedder/system_api/sandbox_safe_system_state.rs b/rs/embedders/src/wasmtime_embedder/system_api/sandbox_safe_system_state.rs index 1d33d9090b0a..651bc2e96d58 100644 --- a/rs/embedders/src/wasmtime_embedder/system_api/sandbox_safe_system_state.rs +++ b/rs/embedders/src/wasmtime_embedder/system_api/sandbox_safe_system_state.rs @@ -513,10 +513,23 @@ impl SystemStateModifications { system_state.global_timer = new_global_timer; } + // TODO(DSM-11): cleanup population logic after migration is done. + // We need to copy existing canister_log to log_memory_store in order + // not to loose any log records until the migration is complete. + if system_state.log_memory_store.is_empty() && !system_state.canister_log.is_empty() { + system_state + .log_memory_store + .append_delta_log(&mut system_state.canister_log.clone()); + } + // Append delta log to the total canister log. + let mut canister_log_copy = self.canister_log.clone(); system_state .canister_log .append_delta_log(&mut self.canister_log); + system_state + .log_memory_store + .append_delta_log(&mut canister_log_copy); // Bump the canister version after all changes have been applied. if self.should_bump_canister_version { diff --git a/rs/execution_environment/src/canister_logs.rs b/rs/execution_environment/src/canister_logs.rs index 93c2ff30cb1c..37741411e17d 100644 --- a/rs/execution_environment/src/canister_logs.rs +++ b/rs/execution_environment/src/canister_logs.rs @@ -1,10 +1,7 @@ -use std::collections::VecDeque; - use ic_config::flag_status::FlagStatus; use ic_error_types::{ErrorCode, UserError}; use ic_management_canister_types_private::{ - CanisterLogRecord, FetchCanisterLogsFilter, FetchCanisterLogsRange, FetchCanisterLogsRequest, - FetchCanisterLogsResponse, LogVisibilityV2, + FetchCanisterLogsRequest, FetchCanisterLogsResponse, LogVisibilityV2, }; use ic_replicated_state::ReplicatedState; use ic_types::PrincipalId; @@ -13,7 +10,7 @@ pub(crate) fn fetch_canister_logs( sender: PrincipalId, state: &ReplicatedState, args: FetchCanisterLogsRequest, - fetch_canister_logs_filter: FlagStatus, + log_memory_store_feature: FlagStatus, ) -> Result { let canister_id = args.get_canister_id(); let canister = state.canister_state(&canister_id).ok_or_else(|| { @@ -26,10 +23,15 @@ pub(crate) fn fetch_canister_logs( // Check if the sender has permission to access logs check_log_visibility_permission(&sender, canister.log_visibility(), canister.controllers())?; - let records = canister.system_state.canister_log.records(); - let canister_log_records = match fetch_canister_logs_filter { - FlagStatus::Disabled => records.iter().cloned().collect(), - FlagStatus::Enabled => filter_records(&args, records)?, + let canister_log_records = match log_memory_store_feature { + FlagStatus::Disabled => canister + .system_state + .canister_log + .records() + .iter() + .cloned() + .collect(), + FlagStatus::Enabled => canister.system_state.log_memory_store.records(args.filter), }; Ok(FetchCanisterLogsResponse { @@ -60,27 +62,3 @@ pub(crate) fn check_log_visibility_permission( )) } } - -fn filter_records( - args: &FetchCanisterLogsRequest, - records: &VecDeque, -) -> Result, UserError> { - let Some(filter) = &args.filter else { - return Ok(records.iter().cloned().collect()); - }; - - let (range, key): (&FetchCanisterLogsRange, fn(&CanisterLogRecord) -> u64) = match filter { - FetchCanisterLogsFilter::ByIdx(r) => (r, |rec| rec.idx), - FetchCanisterLogsFilter::ByTimestampNanos(r) => (r, |rec| rec.timestamp_nanos), - }; - - if range.is_empty() { - return Ok(Vec::new()); - } - - Ok(records - .iter() - .filter(|r| range.contains(key(r))) - .cloned() - .collect()) -} diff --git a/rs/execution_environment/src/canister_manager.rs b/rs/execution_environment/src/canister_manager.rs index 08cb7f084e3e..e2e793e147b8 100644 --- a/rs/execution_environment/src/canister_manager.rs +++ b/rs/execution_environment/src/canister_manager.rs @@ -562,8 +562,11 @@ impl CanisterManager { if let Some(log_visibility) = settings.log_visibility() { canister.system_state.log_visibility = log_visibility.clone(); } - if let Some(_log_memory_limit) = settings.log_memory_limit() { - // TODO: populate log_memory_store with the new limit. + if let Some(log_memory_limit) = settings.log_memory_limit() { + canister + .system_state + .log_memory_store + .set_log_memory_limit(log_memory_limit.get() as usize); } if let Some(wasm_memory_limit) = settings.wasm_memory_limit() { canister.system_state.wasm_memory_limit = Some(wasm_memory_limit); @@ -1099,7 +1102,7 @@ impl CanisterManager { let freeze_threshold = canister.system_state.freeze_threshold; let reserved_cycles_limit = canister.system_state.reserved_balance_limit(); let log_visibility = canister.system_state.log_visibility.clone(); - let log_memory_limit = canister.system_state.canister_log.byte_capacity(); + let log_memory_limit = canister.system_state.log_memory_store.byte_capacity(); let wasm_memory_limit = canister.system_state.wasm_memory_limit; let wasm_memory_threshold = canister.system_state.wasm_memory_threshold; @@ -3170,7 +3173,8 @@ pub fn uninstall_canister( canister.execution_state = None; // Clear log. - canister.clear_log(); + canister.system_state.canister_log.clear(); + canister.system_state.log_memory_store.clear(fd_factory); // Clear the Wasm chunk store. canister.system_state.wasm_chunk_store = WasmChunkStore::new(fd_factory); diff --git a/rs/execution_environment/src/execution/install_code.rs b/rs/execution_environment/src/execution/install_code.rs index 4bd75340f546..b85237b3e143 100644 --- a/rs/execution_environment/src/execution/install_code.rs +++ b/rs/execution_environment/src/execution/install_code.rs @@ -18,7 +18,9 @@ use ic_logger::{error, fatal, info, warn}; use ic_management_canister_types_private::{ CanisterChangeDetails, CanisterChangeOrigin, CanisterInstallModeV2, }; -use ic_replicated_state::canister_state::system_state::ReservationError; +use ic_replicated_state::canister_state::system_state::{ + ReservationError, log_memory_store::LogMemoryStore, +}; use ic_replicated_state::metadata_state::subnet_call_context_manager::InstallCodeCallId; use ic_replicated_state::{CanisterState, ExecutionState, num_bytes_try_from}; use ic_state_layout::{CanisterLayout, CheckpointLayout, ReadOnly}; @@ -149,9 +151,9 @@ impl InstallCodeHelper { self.canister.system_state.certified_data = Vec::new(); } - pub fn clear_log(&mut self) { + pub fn clear_log_obsolete(&mut self) { self.steps.push(InstallCodeStep::ClearLog); - self.canister.clear_log(); + self.canister.clear_log_obsolete(); } pub fn deactivate_global_timer(&mut self) { @@ -241,7 +243,14 @@ impl InstallCodeHelper { paused: PausedInstallCodeHelper, original: &OriginalContext, round: &RoundContext, - ) -> Result { + ) -> Result< + Self, + ( + CanisterManagerError, + NumInstructions, + (CanisterLog, LogMemoryStore), + ), + > { let mut helper = Self::new(clean_canister, original); let paused_instructions_left = paused.instructions_left; for state_change in paused.steps.into_iter() { @@ -594,8 +603,12 @@ impl InstallCodeHelper { } /// Takes the canister log. - pub(crate) fn take_canister_log(&mut self) -> CanisterLog { - self.canister.system_state.canister_log.take() + pub(crate) fn take_canister_log(&mut self) -> (CanisterLog, LogMemoryStore) { + // TODO: Remove duplication when the migration is fully done. + ( + self.canister.system_state.canister_log.take(), + self.canister.system_state.log_memory_store.clone(), + ) } /// Checks the result of Wasm execution and applies the state changes. @@ -756,7 +769,7 @@ impl InstallCodeHelper { Ok(()) } InstallCodeStep::ClearLog => { - self.clear_log(); + self.clear_log_obsolete(); Ok(()) } InstallCodeStep::DeactivateGlobalTimer => { @@ -856,7 +869,7 @@ pub(crate) fn finish_err( original: OriginalContext, round: RoundContext, err: CanisterManagerError, - new_canister_log: CanisterLog, + new_canister_log: (CanisterLog, LogMemoryStore), ) -> DtsInstallCodeResult { let mut new_canister = clean_canister; diff --git a/rs/execution_environment/src/execution_environment.rs b/rs/execution_environment/src/execution_environment.rs index 09e0abfc0f80..84e9cbc6bc84 100644 --- a/rs/execution_environment/src/execution_environment.rs +++ b/rs/execution_environment/src/execution_environment.rs @@ -1586,7 +1586,7 @@ impl ExecutionEnvironment { sender, &state, args, - self.config.fetch_canister_logs_filter, + self.config.log_memory_store_feature, ) }) .map(|resp| { diff --git a/rs/execution_environment/src/query_handler.rs b/rs/execution_environment/src/query_handler.rs index 4bac47c2c365..a21fd8d2d032 100644 --- a/rs/execution_environment/src/query_handler.rs +++ b/rs/execution_environment/src/query_handler.rs @@ -191,7 +191,7 @@ impl InternalHttpQueryHandler { query.source(), state.get_ref(), FetchCanisterLogsRequest::decode(&query.method_payload)?, - self.config.fetch_canister_logs_filter, + self.config.log_memory_store_feature, )?; let result = Ok(WasmResult::Reply(Encode!(&response).unwrap())); self.metrics.observe_subnet_query_message( diff --git a/rs/execution_environment/tests/canister_logging.rs b/rs/execution_environment/tests/canister_logging.rs index 8c5628397bd7..c82fb4f4697d 100644 --- a/rs/execution_environment/tests/canister_logging.rs +++ b/rs/execution_environment/tests/canister_logging.rs @@ -75,7 +75,7 @@ fn readable_logs_without_backtraces( fn setup_env_with( replicated_inter_canister_log_fetch: FlagStatus, - fetch_canister_logs_filter: FlagStatus, + log_memory_store_feature: FlagStatus, ) -> StateMachine { let subnet_type = SubnetType::Application; let mut subnet_config = SubnetConfig::new(subnet_type); @@ -86,7 +86,7 @@ fn setup_env_with( subnet_config, ExecutionConfig { replicated_inter_canister_log_fetch, - fetch_canister_logs_filter, + log_memory_store_feature, ..Default::default() }, ); @@ -99,10 +99,10 @@ fn setup_env_with( fn setup_env() -> StateMachine { let replicated_inter_canister_log_fetch = FlagStatus::Disabled; - let fetch_canister_logs_filter = FlagStatus::Disabled; + let log_memory_store_feature = FlagStatus::Disabled; setup_env_with( replicated_inter_canister_log_fetch, - fetch_canister_logs_filter, + log_memory_store_feature, ) } diff --git a/rs/replicated_state/src/canister_state.rs b/rs/replicated_state/src/canister_state.rs index 0f201e91321d..d9c9ee5e28c5 100644 --- a/rs/replicated_state/src/canister_state.rs +++ b/rs/replicated_state/src/canister_state.rs @@ -6,7 +6,9 @@ mod tests; use crate::canister_state::execution_state::WasmExecutionMode; use crate::canister_state::queues::CanisterOutputQueuesIterator; -use crate::canister_state::system_state::{ExecutionTask, SystemState}; +use crate::canister_state::system_state::{ + ExecutionTask, SystemState, log_memory_store::LogMemoryStore, +}; use crate::{InputQueueType, StateError}; pub use execution_state::{EmbedderCache, ExecutionState, ExportedFunctions}; use ic_config::embedders::Config as HypervisorConfig; @@ -383,8 +385,8 @@ impl CanisterState { /// The amount of memory currently being used by the canister. /// /// This includes execution memory (heap, stable, globals, Wasm), - /// canister history memory, wasm chunk storage and snapshots that - /// belong to this canister. + /// canister history memory, wasm chunk storage, log storage + /// and snapshots that belong to this canister. /// /// This amount is used to periodically charge the canister for the memory /// resources it consumes and can be used to calculate the canister's @@ -393,6 +395,7 @@ impl CanisterState { self.execution_memory_usage() + self.canister_history_memory_usage() + self.wasm_chunk_store_memory_usage() + + self.log_memory_store_memory_usage() + self.snapshots_memory_usage() } @@ -459,6 +462,12 @@ impl CanisterState { self.system_state.wasm_chunk_store.memory_usage() } + /// Returns the memory usage of the log memory store in bytes. + pub fn log_memory_store_memory_usage(&self) -> NumBytes { + NumBytes::new(self.system_state.log_memory_store.total_allocated_bytes() as u64) + } + + /// Returns the memory usage of the snapshots in bytes. pub fn snapshots_memory_usage(&self) -> NumBytes { self.system_state.snapshots_memory_usage } @@ -586,13 +595,14 @@ impl CanisterState { } /// Clears the canister log. - pub fn clear_log(&mut self) { + pub fn clear_log_obsolete(&mut self) { self.system_state.canister_log.clear(); } /// Sets the new canister log. - pub fn set_log(&mut self, other: CanisterLog) { - self.system_state.canister_log = other; + pub fn set_log(&mut self, (canister_log, log_memory_store): (CanisterLog, LogMemoryStore)) { + self.system_state.canister_log = canister_log; + self.system_state.log_memory_store = log_memory_store; } /// Returns the cumulative amount of heap delta represented by this canister's state. diff --git a/rs/replicated_state/src/canister_state/system_state.rs b/rs/replicated_state/src/canister_state/system_state.rs index 65f222c4cf0d..26acf831df2c 100644 --- a/rs/replicated_state/src/canister_state/system_state.rs +++ b/rs/replicated_state/src/canister_state/system_state.rs @@ -1,15 +1,15 @@ mod call_context_manager; +pub mod log_memory_store; pub mod proto; mod task_queue; pub mod wasm_chunk_store; -// TODO(DSM-11): remove testing cofiguration when log memory store is used in production. -#[cfg(test)] -pub mod log_memory_store; - pub use self::task_queue::{TaskQueue, is_low_wasm_memory_hook_condition_satisfied}; -use self::wasm_chunk_store::{WasmChunkStore, WasmChunkStoreMetadata}; +use self::{ + log_memory_store::LogMemoryStore, + wasm_chunk_store::{WasmChunkStore, WasmChunkStoreMetadata}, +}; use super::queues::refunds::RefundPool; use super::queues::{CanisterInput, can_push}; pub use super::queues::{CanisterOutputQueuesIterator, memory_usage_of_request}; @@ -340,6 +340,10 @@ pub struct SystemState { #[validate_eq(CompareWithValidateEq)] pub canister_log: CanisterLog, + /// The memory used for storing log entries. + #[validate_eq(CompareWithValidateEq)] + pub log_memory_store: LogMemoryStore, + /// The Wasm memory limit. This is a field in developer-visible canister /// settings that allows the developer to limit the usage of the Wasm memory /// by the canister to leave some room in 4GiB for upgrade calls. @@ -484,7 +488,8 @@ impl SystemState { initial_cycles, freeze_threshold, CanisterStatus::new_running(), - WasmChunkStore::new(fd_factory), + WasmChunkStore::new(fd_factory.clone()), + LogMemoryStore::new(fd_factory), ) } @@ -495,6 +500,7 @@ impl SystemState { freeze_threshold: NumSeconds, status: CanisterStatus, wasm_chunk_store: WasmChunkStore, + log_memory_store: LogMemoryStore, ) -> Self { Self { canister_id, @@ -521,6 +527,7 @@ impl SystemState { // therefore it should not scale to memory limit from above. // Remove this field after migration is done. canister_log: CanisterLog::default_aggregate(), + log_memory_store, wasm_memory_limit: None, next_snapshot_id: 0, snapshots_memory_usage: NumBytes::new(0), @@ -550,12 +557,14 @@ impl SystemState { wasm_chunk_store_metadata: WasmChunkStoreMetadata, log_visibility: LogVisibilityV2, canister_log: CanisterLog, + log_memory_store_data: PageMap, wasm_memory_limit: Option, next_snapshot_id: u64, snapshots_memory_usage: NumBytes, environment_variables: BTreeMap, metrics: &dyn CheckpointLoadingMetrics, ) -> Self { + let log_memory_limit = 4096; // TODO: properly populate. let system_state = Self { controllers, canister_id, @@ -580,6 +589,10 @@ impl SystemState { ), log_visibility, canister_log, + log_memory_store: LogMemoryStore::from_checkpoint( + log_memory_store_data, + log_memory_limit, + ), wasm_memory_limit, next_snapshot_id, snapshots_memory_usage, @@ -653,6 +666,7 @@ impl SystemState { freeze_threshold, status, WasmChunkStore::new_for_testing(), + LogMemoryStore::new_for_testing(), ) } @@ -2219,6 +2233,7 @@ pub mod testing { // therefore it should not scale to memory limit from above. // Remove this field after migration is done. canister_log: CanisterLog::default_aggregate(), + log_memory_store: LogMemoryStore::new_for_testing(), wasm_memory_limit: Default::default(), next_snapshot_id: Default::default(), snapshots_memory_usage: Default::default(), diff --git a/rs/replicated_state/src/canister_state/system_state/log_memory_store/header.rs b/rs/replicated_state/src/canister_state/system_state/log_memory_store/header.rs index 63b410576b9c..a03698570016 100644 --- a/rs/replicated_state/src/canister_state/system_state/log_memory_store/header.rs +++ b/rs/replicated_state/src/canister_state/system_state/log_memory_store/header.rs @@ -1,8 +1,11 @@ use crate::canister_state::system_state::log_memory_store::{ memory::{MemoryAddress, MemoryPosition, MemorySize}, - ring_buffer::{DATA_REGION_OFFSET, INDEX_TABLE_PAGES, MAGIC}, + ring_buffer::{DATA_REGION_OFFSET, INDEX_TABLE_PAGES}, }; +/// Magic prefix that marks a properly initialized canister log buffer. +pub(crate) const MAGIC: &[u8; 3] = b"CLB"; + /// Header structure for the log memory store (version 1). /// This is the in-memory representation of the header. #[derive(Debug, PartialEq, Clone, Copy)] diff --git a/rs/replicated_state/src/canister_state/system_state/log_memory_store/mod.rs b/rs/replicated_state/src/canister_state/system_state/log_memory_store/mod.rs index 054eeec5e367..cee0cf1ade50 100644 --- a/rs/replicated_state/src/canister_state/system_state/log_memory_store/mod.rs +++ b/rs/replicated_state/src/canister_state/system_state/log_memory_store/mod.rs @@ -25,7 +25,16 @@ const DELTA_LOG_SIZES_CAP: usize = 10_000; #[derive(Clone, Eq, PartialEq, Debug, ValidateEq)] pub struct LogMemoryStore { #[validate_eq(Ignore)] - pub page_map: PageMap, + page_map: PageMap, + + /// Stores the log memory limit for the canister. + /// + /// This is important for the cases when the canister + /// is created without a Wasm module or after uninstall. + /// In these cases the canister should not be charged, + /// so the page_map must be empty, but we still need to + /// preserve the log_memory_limit. + log_memory_limit: usize, /// (!) No need to preserve across checkpoints. /// Tracks the size of each delta log appended during a round. @@ -37,23 +46,30 @@ pub struct LogMemoryStore { } impl LogMemoryStore { + /// Creates a new store with an empty ring buffer to avoid unnecessary log-memory charges. pub fn new(fd_factory: Arc) -> Self { - Self::new_inner(PageMap::new(fd_factory)) + // This creates a new empty page map with invalid ring buffer header. + Self::new_inner(RingBuffer::load_raw(PageMap::new(fd_factory)).to_page_map()) } /// Creates a new store that will use the temp file system for allocating new pages. pub fn new_for_testing() -> Self { - Self::new_inner(PageMap::new_for_testing()) + Self::new_inner(RingBuffer::load_raw(PageMap::new_for_testing()).to_page_map()) } - pub fn from_checkpoint(page_map: PageMap) -> Self { - Self::new_inner(page_map) + fn new_inner(page_map: PageMap) -> Self { + Self { + page_map, + delta_log_sizes: VecDeque::new(), + log_memory_limit: DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT, + } } - pub fn new_inner(page_map: PageMap) -> Self { + pub fn from_checkpoint(page_map: PageMap, log_memory_limit: usize) -> Self { Self { page_map, delta_log_sizes: VecDeque::new(), + log_memory_limit, } } @@ -65,76 +81,102 @@ impl LogMemoryStore { &mut self.page_map } - fn load_ring_buffer(&self) -> RingBuffer { - RingBuffer::load_or_new( - self.page_map.clone(), - MemorySize::new(DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT as u64), - ) + /// Clears the canister log records. + pub fn clear(&mut self, fd_factory: Arc) { + // This creates a new empty page map with invalid ring buffer header. + self.page_map = PageMap::new(fd_factory); + } + + /// Loads the ring buffer from the page map. + fn load_ring_buffer(&self) -> Option { + RingBuffer::load_checked(self.page_map.clone()) } /// Returns the total allocated bytes for the ring buffer /// including header, index table and data region. - #[allow(dead_code)] pub fn total_allocated_bytes(&self) -> usize { - self.load_ring_buffer().total_allocated_bytes() + self.load_ring_buffer() + .map(|rb| rb.total_allocated_bytes()) + .unwrap_or(0) } /// Returns the data capacity of the ring buffer. pub fn byte_capacity(&self) -> usize { - self.load_ring_buffer().byte_capacity() + self.load_ring_buffer() + .map(|rb| rb.byte_capacity()) + .unwrap_or(0) } /// Returns the data size of the ring buffer. pub fn bytes_used(&self) -> usize { - self.load_ring_buffer().bytes_used() - } - - /// Set the ring buffer capacity — preserves existing records by collecting and re-appending them. - pub fn set_byte_capacity(&mut self, new_byte_capacity: usize) { - let new_byte_capacity = new_byte_capacity.max(DATA_CAPACITY_MIN); - - // TODO: PageMap cannot be shrunk today; reducing capacity does not free allocated pages - // (practical ring buffer max currently ~55 MB). Future improvement: allocate a new PageMap - // with the desired capacity, refeed records, then drop the old map or provide a `PageMap::shrink` API. - let old = self.load_ring_buffer(); - if old.byte_capacity() == new_byte_capacity { - return; + self.load_ring_buffer() + .map(|rb| rb.bytes_used()) + .unwrap_or(0) + } + + pub fn log_memory_limit(&self) -> usize { + self.log_memory_limit + } + + /// Sets the log memory limit for this canister. + /// + /// The ring buffer is updated only when it already exists and the new + /// limit changes its byte capacity. This avoids creating a ring buffer + /// for canisters without a Wasm module or after uninstall, preventing + /// unnecessary log-memory charges. + pub fn set_log_memory_limit(&mut self, new_log_memory_limit: usize) { + // Enforce a safe minimum for data capacity. + let new_log_memory_limit = new_log_memory_limit.max(DATA_CAPACITY_MIN); + self.log_memory_limit = new_log_memory_limit; + + // Only resize on an existing ring buffer. + if let Some(old) = self.load_ring_buffer() { + // Only resize when the capacity actually changes. + if old.byte_capacity() != new_log_memory_limit { + // NOTE — PageMap cannot be shrunk today. Reducing capacity keeps + // allocated pages in place; in practice the ring buffer max is + // currently ~55 MB. Future improvement — allocate a new PageMap + // with the desired capacity, refeed records, then drop the old + // map or add a `PageMap::shrink` API to reclaim pages. + // + // Recreate a ring buffer with the new capacity and restore records. + let mut new = RingBuffer::new( + self.page_map.clone(), + MemorySize::new(new_log_memory_limit as u64), + ); + new.append_log(old.all_records()); + self.page_map = new.to_page_map(); + } } - - // Recreate ring buffer with new capacity and restore records. - let mut new = RingBuffer::new( - self.page_map.clone(), - MemorySize::new(new_byte_capacity as u64), - ); - new.append_log(old.all_records()); - self.page_map = new.to_page_map(); } /// Returns the next log record `idx`. pub fn next_idx(&self) -> u64 { - self.load_ring_buffer().next_idx() + self.load_ring_buffer().map(|rb| rb.next_idx()).unwrap_or(0) } pub fn is_empty(&self) -> bool { - self.load_ring_buffer().is_empty() + self.load_ring_buffer() + .map(|rb| rb.is_empty()) + .unwrap_or(true) } /// Returns the canister log records, optionally filtered. pub fn records(&self, filter: Option) -> Vec { - self.load_ring_buffer().records(filter) + self.load_ring_buffer() + .map(|rb| rb.records(filter)) + .unwrap_or_default() } /// Returns all canister log records. pub fn all_records(&self) -> Vec { - self.load_ring_buffer().all_records() - } - - /// Clears the canister log records. - pub fn clear(&mut self) { - self.update_ring_buffer(|rb| rb.clear()); + self.load_ring_buffer() + .map(|rb| rb.all_records()) + .unwrap_or_default() } /// Appends a delta log to the ring buffer. + /// If the ring buffer does not exist, it is created with the current log memory limit. pub fn append_delta_log(&mut self, delta_log: &mut CanisterLog) { // Record the size of the appended delta log for metrics. self.push_delta_log_size(delta_log.bytes_used()); @@ -143,12 +185,11 @@ impl LogMemoryStore { .iter_mut() .map(std::mem::take) .collect(); - self.update_ring_buffer(|rb| rb.append_log(records)); - } - - fn update_ring_buffer(&mut self, f: impl FnOnce(&mut RingBuffer)) { - let mut ring_buffer = self.load_ring_buffer(); - f(&mut ring_buffer); + let mut ring_buffer = self.load_ring_buffer().unwrap_or(RingBuffer::new( + self.page_map.clone(), + MemorySize::new(self.log_memory_limit as u64), + )); + ring_buffer.append_log(records); self.page_map = ring_buffer.to_page_map(); } @@ -222,8 +263,42 @@ mod tests { assert_eq!(s.next_idx(), 0); assert!(s.is_empty()); assert_eq!(s.bytes_used(), 0); - assert_eq!(s.byte_capacity(), DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT); + assert_eq!(s.byte_capacity(), 0); + assert_eq!(s.log_memory_limit(), DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT); + assert_eq!(s.total_allocated_bytes(), 0); + assert_eq!(s.records(None).len(), 0); + } + + #[test] + fn test_memory_usage_after_appending_logs() { + let s = LogMemoryStore::new_for_testing(); + + // Canister created, but no wasm module uploaded, so no logs recorded. + assert_eq!(s.next_idx(), 0); + assert!(s.is_empty()); + assert_eq!(s.bytes_used(), 0); + assert_eq!(s.byte_capacity(), 0); + assert_eq!(s.log_memory_limit(), DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT); + assert_eq!(s.total_allocated_bytes(), 0); assert_eq!(s.records(None).len(), 0); + + // Append some logs. + let mut delta = CanisterLog::default_delta(); + delta.add_record(100, b"a".to_vec()); + delta.add_record(200, b"bb".to_vec()); + delta.add_record(300, b"ccc".to_vec()); + let mut s = LogMemoryStore::new_for_testing(); + s.append_delta_log(&mut delta); + + // Assert memory usage. + assert_eq!(s.next_idx(), 3); + assert!(!s.is_empty()); + assert!(s.bytes_used() > 0); + assert!(s.bytes_used() < DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT); + assert_eq!(s.byte_capacity(), DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT); + assert_eq!(s.log_memory_limit(), DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT); + assert!(s.total_allocated_bytes() > DEFAULT_AGGREGATE_LOG_MEMORY_LIMIT); + assert_eq!(s.records(None).len(), 3); } #[test] @@ -282,7 +357,7 @@ mod tests { let aggregate_capacity = 50_000; // keep small for the test. let start_idx = 0; let mut s = LogMemoryStore::new_for_testing(); - s.set_byte_capacity(aggregate_capacity); + s.set_log_memory_limit(aggregate_capacity); // Append 100k records in batches of 10k deltas of ~1KB record each. append_deltas(&mut s, start_idx, 100_000, 10_000, 1_000); @@ -315,7 +390,7 @@ mod tests { ); let mut s = LogMemoryStore::new_for_testing(); - s.set_byte_capacity(aggregate_capacity); + s.set_log_memory_limit(aggregate_capacity); // Append 5 MB records in batches of 1 MB deltas of ~1KB record each. append_deltas(&mut s, start_idx, 5_000_000, 1_000_000, 1_000); @@ -337,7 +412,7 @@ mod tests { ); let mut s = LogMemoryStore::new_for_testing(); - s.set_byte_capacity(aggregate_capacity); + s.set_log_memory_limit(aggregate_capacity); // Append 5 MB records in batches of 1 MB deltas of ~1KB record each. append_deltas(&mut s, start_idx, 5_000_000, 1_000_000, 1_000); @@ -376,7 +451,7 @@ mod tests { ); let mut s = LogMemoryStore::new_for_testing(); - s.set_byte_capacity(aggregate_capacity); + s.set_log_memory_limit(aggregate_capacity); // Append 5 MB records in batches of 1 MB deltas of ~1KB record each. append_deltas(&mut s, start_idx, 5_000_000, 1_000_000, 1_000); @@ -407,7 +482,7 @@ mod tests { #[test] fn test_increasing_capacity_preserves_records() { let mut store = LogMemoryStore::new_for_testing(); - store.set_byte_capacity(1_000_000); // 1 MB + store.set_log_memory_limit(1_000_000); // 1 MB // Append 200 KB records in batches of 100 KB deltas of ~1KB record each. append_deltas(&mut store, 0, 200_000, 100_000, 1_000); @@ -416,7 +491,7 @@ mod tests { let bytes_used_before = store.bytes_used(); // Increase capacity. - store.set_byte_capacity(2_000_000); // 2 MB + store.set_log_memory_limit(2_000_000); // 2 MB let records_after = store.records(None); let bytes_used_after = store.bytes_used(); @@ -429,7 +504,7 @@ mod tests { #[test] fn test_decreasing_capacity_drops_oldest_records_but_preserves_recent() { let mut store = LogMemoryStore::new_for_testing(); - store.set_byte_capacity(500_000); // 500 KB + store.set_log_memory_limit(500_000); // 500 KB // Append 200 KB records in batches of 100 KB deltas of ~1KB record each. append_deltas(&mut store, 0, 200_000, 100_000, 1_000); @@ -439,7 +514,7 @@ mod tests { let next_idx_before = store.next_idx(); // Decrease capacity. - store.set_byte_capacity(100_000); // 100 KB + store.set_log_memory_limit(100_000); // 100 KB let records_after = store.records(None); let bytes_used_after = store.bytes_used(); @@ -456,7 +531,7 @@ mod tests { let mut store = LogMemoryStore::new_for_testing(); // Set a very small capacity, smaller than 146 bytes (INDEX_ENTRY_COUNT_MAX). // 146 entries. If capacity is 100. 100 / 146 = 0. - store.set_byte_capacity(100); + store.set_log_memory_limit(100); let mut delta = CanisterLog::new_delta_with_next_index(0, 100); // Add multiple records. @@ -483,7 +558,7 @@ mod tests { fn test_multiple_records_in_same_segment() { let mut store = LogMemoryStore::new_for_testing(); // Capacity 100KB. Segment size ~685 bytes. - store.set_byte_capacity(100_000); + store.set_log_memory_limit(100_000); let mut delta = CanisterLog::new_delta_with_next_index(0, 100_000); // Add 10 records, each ~21 bytes. All should fit in segment 0. @@ -512,10 +587,11 @@ mod tests { fn test_very_small_capacity_single_byte() { let mut store = LogMemoryStore::new_for_testing(); // Set capacity to 1 byte - this will be clamped to DATA_CAPACITY_MIN (4096 bytes). - store.set_byte_capacity(1); + store.set_log_memory_limit(1); - // Verify capacity was clamped to minimum. - assert_eq!(store.byte_capacity(), 4096); + // Verify log memory limit was clamped to minimum. + assert_eq!(store.log_memory_limit(), 4096); + assert_eq!(store.byte_capacity(), 0); // Actual capacity is 0 because no records were added. let mut delta = CanisterLog::new_delta_with_next_index(0, 4096); // Add a record - it should fit within the minimum capacity. @@ -524,6 +600,7 @@ mod tests { store.append_delta_log(&mut delta); // The record should be stored. + assert_eq!(store.byte_capacity(), 4096); assert_eq!(store.next_idx(), 1); assert_eq!(store.bytes_used(), 21); let records = store.records(None); @@ -534,12 +611,11 @@ mod tests { #[test] fn test_small_capacity_with_eviction() { let mut store = LogMemoryStore::new_for_testing(); - // Set capacity to minimum (4096 bytes). let capacity = 4096; - store.set_byte_capacity(capacity); + store.set_log_memory_limit(capacity); - // Verify capacity is at minimum. - assert_eq!(store.byte_capacity(), capacity); + assert_eq!(store.log_memory_limit(), capacity); + assert_eq!(store.byte_capacity(), 0); // Actual capacity is 0 because no records were added. // Fill the buffer with records until we're close to capacity. // Each record is ~21 bytes (8+8+4+1), so we can fit ~195 records. @@ -554,6 +630,7 @@ mod tests { } // Verify all records fit. + assert_eq!(store.byte_capacity(), capacity); assert_eq!(store.next_idx(), max_records as u64); let records = store.records(None); assert_eq!(records.len(), max_records); @@ -575,7 +652,7 @@ mod tests { fn test_filtering_with_multiple_records_in_same_segment() { let mut store = LogMemoryStore::new_for_testing(); // Capacity 100KB. Segment size ~685 bytes. - store.set_byte_capacity(100_000); + store.set_log_memory_limit(100_000); let mut delta = CanisterLog::new_delta_with_next_index(0, 100_000); // Add 20 records, each ~21 bytes. All should fit in segment 0. diff --git a/rs/replicated_state/src/canister_state/system_state/log_memory_store/ring_buffer.rs b/rs/replicated_state/src/canister_state/system_state/log_memory_store/ring_buffer.rs index 024a44c3a862..a8c72ca2d0bd 100644 --- a/rs/replicated_state/src/canister_state/system_state/log_memory_store/ring_buffer.rs +++ b/rs/replicated_state/src/canister_state/system_state/log_memory_store/ring_buffer.rs @@ -1,5 +1,5 @@ use crate::canister_state::system_state::log_memory_store::{ - header::Header, + header::{Header, MAGIC}, log_record::LogRecord, memory::{MemoryAddress, MemorySize}, struct_io::StructIO, @@ -9,22 +9,22 @@ use ic_management_canister_types_private::{CanisterLogRecord, DataSize, FetchCan // PageMap file layout. // Header layout constants. -pub const HEADER_OFFSET: MemoryAddress = MemoryAddress::new(0); -pub const HEADER_SIZE: MemorySize = MemorySize::new(PAGE_SIZE as u64); -pub const MAGIC: &[u8; 3] = b"CLB"; // Canister Log Buffer +pub(crate) const HEADER_OFFSET: MemoryAddress = MemoryAddress::new(0); +pub(crate) const HEADER_SIZE: MemorySize = MemorySize::new(PAGE_SIZE as u64); // Index table layout constants. -pub const INDEX_TABLE_OFFSET: MemoryAddress = HEADER_OFFSET.add_size(HEADER_SIZE); -pub const INDEX_TABLE_PAGES: usize = 1; -pub const INDEX_TABLE_SIZE: MemorySize = MemorySize::new((INDEX_TABLE_PAGES * PAGE_SIZE) as u64); -pub const INDEX_ENTRY_SIZE: MemorySize = MemorySize::new(28); -pub const INDEX_ENTRY_COUNT_MAX: u64 = INDEX_TABLE_SIZE.get() / INDEX_ENTRY_SIZE.get(); +pub(crate) const INDEX_TABLE_OFFSET: MemoryAddress = HEADER_OFFSET.add_size(HEADER_SIZE); +pub(crate) const INDEX_TABLE_PAGES: usize = 1; +pub(crate) const INDEX_TABLE_SIZE: MemorySize = + MemorySize::new((INDEX_TABLE_PAGES * PAGE_SIZE) as u64); +pub(crate) const INDEX_ENTRY_SIZE: MemorySize = MemorySize::new(28); +pub(crate) const INDEX_ENTRY_COUNT_MAX: u64 = INDEX_TABLE_SIZE.get() / INDEX_ENTRY_SIZE.get(); // Data region layout constants. -pub const DATA_REGION_OFFSET: MemoryAddress = INDEX_TABLE_OFFSET.add_size(INDEX_TABLE_SIZE); +pub(crate) const DATA_REGION_OFFSET: MemoryAddress = INDEX_TABLE_OFFSET.add_size(INDEX_TABLE_SIZE); // Ring buffer constraints. /// Maximum total size of log records returned in a single message. -pub const RESULT_MAX_SIZE: MemorySize = MemorySize::new(2_000_000); +pub(crate) const RESULT_MAX_SIZE: MemorySize = MemorySize::new(2_000_000); const _: () = assert!(RESULT_MAX_SIZE.get() <= 2_000_000, "Exceeds 2 MB"); // With index table of 1 page (4 KiB) and 28 bytes per entry -> 146 entries max. @@ -32,12 +32,12 @@ const _: () = assert!(RESULT_MAX_SIZE.get() <= 2_000_000, "Exceeds 2 MB"); // say 20% of that (400 KB). So 146 segments turns into ~55 MB total data capacity. // Small segments help to reduce work on refining log records filtering // when fetching logs. -pub const DATA_CAPACITY_MAX: MemorySize = MemorySize::new(55_000_000); // 55 MB +pub(crate) const DATA_CAPACITY_MAX: MemorySize = MemorySize::new(55_000_000); // 55 MB const DATA_SEGMENT_SIZE_MAX: u64 = DATA_CAPACITY_MAX.get() / INDEX_ENTRY_COUNT_MAX; // Ensure data segment size is significantly smaller than max result size, say 20%. const _: () = assert!(5 * DATA_SEGMENT_SIZE_MAX <= RESULT_MAX_SIZE.get()); -pub const DATA_CAPACITY_MIN: usize = PAGE_SIZE; +pub(crate) const DATA_CAPACITY_MIN: usize = PAGE_SIZE; const _: () = assert!(PAGE_SIZE <= DATA_CAPACITY_MIN); // data capacity must be at least one page. pub(super) struct RingBuffer { @@ -57,27 +57,26 @@ impl RingBuffer { Self { io } } - /// Returns an existing ring buffer if present, or initializes a new one. - pub fn load_or_new(page_map: PageMap, data_capacity: MemorySize) -> Self { + /// Loads a raw ring buffer from given `page_map` without validating its contents. + pub fn load_raw(page_map: PageMap) -> Self { + Self { + io: StructIO::new(page_map), + } + } + + /// Returns an existing ring buffer if present. + pub fn load_checked(page_map: PageMap) -> Option { let io = StructIO::new(page_map); if io.load_header().magic != *MAGIC { - // Not initialized yet — set up a new header. - return Self::new(io.to_page_map(), data_capacity); + return None; } - - Self { io } + Some(Self { io }) } pub fn to_page_map(&self) -> PageMap { self.io.to_page_map() } - /// Clears the canister log records. - pub fn clear(&mut self) { - let data_capacity = self.io.load_header().data_capacity; - self.io.save_header(&Header::new(data_capacity)); - } - /// Returns the total allocated bytes for the ring buffer /// including header, index table and data region. pub fn total_allocated_bytes(&self) -> usize { @@ -340,23 +339,6 @@ mod tests { assert!(rb.pop_front().is_none()); } - #[test] - fn test_clear() { - let page_map = PageMap::new_for_testing(); - let data_capacity = TEST_DATA_CAPACITY; - let mut rb = RingBuffer::new(page_map, data_capacity); - - let r0 = log_record(0, 100, "a"); - let r1 = log_record(1, 200, "bb"); - rb.append(&r0); - rb.append(&r1); - rb.clear(); - - assert_eq!(rb.bytes_used(), 0); - assert_eq!(rb.byte_capacity(), data_capacity.get() as usize); - assert_eq!(rb.pop_front(), None); - } - #[test] fn test_exact_fit_no_eviction() { let record_size: usize = 25; diff --git a/rs/state_layout/src/state_layout.rs b/rs/state_layout/src/state_layout.rs index 0b84a21c09a6..8351067cd403 100644 --- a/rs/state_layout/src/state_layout.rs +++ b/rs/state_layout/src/state_layout.rs @@ -76,6 +76,7 @@ pub const OVERLAY: &str = "overlay"; pub const VMEMORY_0: &str = "vmemory_0"; pub const STABLE_MEMORY: &str = "stable_memory"; pub const WASM_CHUNK_STORE: &str = "wasm_chunk_store"; +pub const LOG_MEMORY_STORE: &str = "log_memory_store"; pub const BIN_FILE: &str = "bin"; /// `ReadOnly` is the access policy used for reading checkpoints. We @@ -308,7 +309,8 @@ struct CheckpointRefData { /// │ │ ├── software.wasm /// │ │ ├── stable_memory.bin /// │ │ ├── vmemory_0.bin -/// │ │ └── wasm_chunk_store.bin +/// │ │ ├── wasm_chunk_store.bin +/// │ │ └── log_memory_store.bin /// │ ├── snapshots /// │ │ └── /// │ │ └── @@ -2196,6 +2198,7 @@ impl CanisterLayout { self.vmemory_0(), self.stable_memory(), self.wasm_chunk_store(), + self.log_memory_store(), ] .into_iter() { @@ -2232,6 +2235,15 @@ impl CanisterLayout { _checkpoint: self.checkpoint.clone(), } } + + pub fn log_memory_store(&self) -> PageMapLayout { + PageMapLayout { + root: self.canister_root.clone(), + name_stem: LOG_MEMORY_STORE.into(), + permissions_tag: PhantomData, + _checkpoint: self.checkpoint.clone(), + } + } } pub struct SnapshotLayout { @@ -2286,6 +2298,7 @@ impl SnapshotLayout { self.vmemory_0(), self.stable_memory(), self.wasm_chunk_store(), + // log_memory_store is not included in canister snapshots. ] .into_iter() { @@ -2322,6 +2335,8 @@ impl SnapshotLayout { _checkpoint: self.checkpoint.clone(), } } + + // log_memory_store is not included in canister snapshots. } impl

SnapshotLayout

diff --git a/rs/state_manager/src/checkpoint.rs b/rs/state_manager/src/checkpoint.rs index 0f561824a34f..49e0b43edab4 100644 --- a/rs/state_manager/src/checkpoint.rs +++ b/rs/state_manager/src/checkpoint.rs @@ -179,6 +179,7 @@ pub(crate) enum PageMapType { WasmMemory(CanisterId), StableMemory(CanisterId), WasmChunkStore(CanisterId), + LogMemoryStore(CanisterId), SnapshotWasmMemory(SnapshotId), SnapshotStableMemory(SnapshotId), SnapshotWasmChunkStore(SnapshotId), @@ -190,6 +191,7 @@ impl PageMapType { let mut result = vec![]; for (id, canister) in &state.canister_states { result.push(Self::WasmChunkStore(id.to_owned())); + result.push(Self::LogMemoryStore(id.to_owned())); if canister.execution_state.is_some() { result.push(Self::WasmMemory(id.to_owned())); result.push(Self::StableMemory(id.to_owned())); @@ -223,6 +225,7 @@ impl PageMapType { PageMapType::WasmMemory(id) => Ok(layout.canister(id)?.vmemory_0()), PageMapType::StableMemory(id) => Ok(layout.canister(id)?.stable_memory()), PageMapType::WasmChunkStore(id) => Ok(layout.canister(id)?.wasm_chunk_store()), + PageMapType::LogMemoryStore(id) => Ok(layout.canister(id)?.log_memory_store()), PageMapType::SnapshotWasmMemory(id) => Ok(layout.snapshot(id)?.vmemory_0()), PageMapType::SnapshotStableMemory(id) => Ok(layout.snapshot(id)?.stable_memory()), PageMapType::SnapshotWasmChunkStore(id) => Ok(layout.snapshot(id)?.wasm_chunk_store()), @@ -245,6 +248,9 @@ impl PageMapType { PageMapType::WasmChunkStore(id) => state .canister_state(id) .map(|can| can.system_state.wasm_chunk_store.page_map()), + PageMapType::LogMemoryStore(id) => state + .canister_state(id) + .map(|can| can.system_state.log_memory_store.page_map()), PageMapType::SnapshotWasmMemory(id) => state .canister_snapshots .get(*id) @@ -275,6 +281,11 @@ fn strip_page_map_deltas( .wasm_chunk_store .page_map_mut() .strip_all_deltas(Arc::clone(&fd_factory)); + canister + .system_state + .log_memory_store + .page_map_mut() + .strip_all_deltas(Arc::clone(&fd_factory)); if let Some(execution_state) = canister.execution_state.as_mut() { execution_state .wasm_memory @@ -356,6 +367,10 @@ pub(crate) fn flush_canister_snapshots_and_page_maps( PageMapType::WasmChunkStore(id.to_owned()), canister.system_state.wasm_chunk_store.page_map_mut(), ); + add_to_pagemaps_and_strip( + PageMapType::LogMemoryStore(id.to_owned()), + canister.system_state.log_memory_store.page_map_mut(), + ); if let Some(execution_state) = canister.execution_state.as_mut() { add_to_pagemaps_and_strip( PageMapType::WasmMemory(id.to_owned()), @@ -881,6 +896,15 @@ pub fn load_canister_state( )?; durations.insert("wasm_chunk_store", starting_time.elapsed()); + let starting_time = Instant::now(); + let log_memory_store_layout = canister_layout.log_memory_store(); + let log_memory_store_data = PageMap::open( + Box::new(log_memory_store_layout), + height, + Arc::clone(&fd_factory), + )?; + durations.insert("log_memory_store", starting_time.elapsed()); + let system_state = SystemState::new_from_checkpoint( canister_state_bits.controllers, *canister_id, @@ -903,6 +927,7 @@ pub fn load_canister_state( canister_state_bits.wasm_chunk_store_metadata, canister_state_bits.log_visibility, canister_state_bits.canister_log, + log_memory_store_data, canister_state_bits.wasm_memory_limit, canister_state_bits.next_snapshot_id, canister_state_bits.snapshots_memory_usage, diff --git a/rs/state_manager/src/split/tests.rs b/rs/state_manager/src/split/tests.rs index 2811a96cf552..a66a8e09090b 100644 --- a/rs/state_manager/src/split/tests.rs +++ b/rs/state_manager/src/split/tests.rs @@ -85,10 +85,13 @@ const SUBNET_B_RANGES: &[CanisterIdRange] = &[ /// Note that any queue files are missing as they would be empty. fn subnet_a_files() -> &'static [&'static str] { &[ + "canister_states/00000000000000010101/000000000000002a_0000_log_memory_store.overlay", "canister_states/00000000000000010101/canister.pbuf", "canister_states/00000000000000010101/software.wasm", + "canister_states/00000000000000020101/000000000000002a_0000_log_memory_store.overlay", "canister_states/00000000000000020101/canister.pbuf", "canister_states/00000000000000020101/software.wasm", + "canister_states/00000000000000030101/000000000000002a_0000_log_memory_store.overlay", "canister_states/00000000000000030101/canister.pbuf", "canister_states/00000000000000030101/software.wasm", INGRESS_HISTORY_FILE, @@ -103,8 +106,10 @@ fn subnet_a_files() -> &'static [&'static str] { /// Full list of files expected to be listed in the manifest of subnet A'. fn subnet_a_prime_files() -> &'static [&'static str] { &[ + "canister_states/00000000000000010101/000000000000002a_0000_log_memory_store.overlay", "canister_states/00000000000000010101/canister.pbuf", "canister_states/00000000000000010101/software.wasm", + "canister_states/00000000000000030101/000000000000002a_0000_log_memory_store.overlay", "canister_states/00000000000000030101/canister.pbuf", "canister_states/00000000000000030101/software.wasm", INGRESS_HISTORY_FILE, @@ -120,6 +125,7 @@ fn subnet_a_prime_files() -> &'static [&'static str] { /// Full list of files expected to be listed in the manifest of subnet B. fn subnet_b_files() -> &'static [&'static str] { &[ + "canister_states/00000000000000020101/000000000000002a_0000_log_memory_store.overlay", "canister_states/00000000000000020101/canister.pbuf", "canister_states/00000000000000020101/software.wasm", INGRESS_HISTORY_FILE, diff --git a/rs/state_manager/src/tip.rs b/rs/state_manager/src/tip.rs index 8fcd0a83a85d..6b5ed6869fdb 100644 --- a/rs/state_manager/src/tip.rs +++ b/rs/state_manager/src/tip.rs @@ -574,6 +574,18 @@ fn switch_to_checkpoint( ) .map_err(|err| Box::new(err) as Box)?, ); + tip_canister + .system_state + .log_memory_store + .page_map_mut() + .switch_to_checkpoint( + &PageMap::open( + Box::new(canister_layout.log_memory_store()), + layout.height(), + Arc::clone(fd_factory), + ) + .map_err(|err| Box::new(err) as Box)?, + ); if let Some(tip_execution) = tip_canister.execution_state.as_mut() { tip_execution.wasm_memory.page_map.switch_to_checkpoint( @@ -763,6 +775,7 @@ fn backup( &canister_layout.wasm_chunk_store(), &snapshot_layout.wasm_chunk_store(), )?; + // no need to copy log_memory_store as it is not in snapshot. WasmFile::hardlink_file(&canister_layout.wasm(), &snapshot_layout.wasm())?; @@ -1213,6 +1226,16 @@ fn serialize_canister_wasm_binary_and_pagemaps( lsmt_config, metrics, )?; + canister_state + .system_state + .log_memory_store + .page_map() + .persist_delta( + &canister_layout.log_memory_store(), + tip.height(), + lsmt_config, + metrics, + )?; Ok(()) } diff --git a/rs/state_manager/tests/state_manager.rs b/rs/state_manager/tests/state_manager.rs index 956cbd457b34..0857e3d6bdf3 100644 --- a/rs/state_manager/tests/state_manager.rs +++ b/rs/state_manager/tests/state_manager.rs @@ -366,7 +366,7 @@ fn lazy_pagemaps() { let canister_id = env.install_canister_wat(TEST_CANISTER, vec![], None); env.tick(); - assert_eq!(page_maps_by_status("loaded", &env), 0); + assert_eq!(page_maps_by_status("loaded", &env), 1); assert!(page_maps_by_status("not_loaded", &env) > 0); env.execute_ingress(canister_id, "write_heap_64k", vec![]) @@ -2957,21 +2957,23 @@ fn can_state_sync_from_cache_alone() { // The state sync won't complete because all the chunks have to be fetched from scratch. // file idx | file size | chunk idx | path // ------------+------------+---------- +------------------------------------------------------ - // 0 | 331 | 0 | canister_states/00000000000000640101/canister.pbuf - // 1 | 18 | 1 | canister_states/00000000000000640101/software.wasm - // 2 | 0 | N/A | canister_states/00000000000000640101/stable_memory.bin - // 3 | 0 | N/A | canister_states/00000000000000640101/vmemory_0.bin - // 4 | 0 | N/A | canister_states/00000000000000640101/wasm_chunk_store.bin - // 5 | 331 | 2 | canister_states/00000000000000c80101/canister.pbuf - // 6 | 18 | 3 | canister_states/00000000000000c80101/software.wasm - // 7 | 0 | N/A | canister_states/00000000000000c80101/stable_memory.bin - // 8 | 0 | N/A | canister_states/00000000000000c80101/vmemory_0.bin - // 9 | 0 | N/A | canister_states/00000000000000c80101/wasm_chunk_store.bin - // 10 | 97 | 4 | system_metadata.pbuf + // 0 | 4132 | 0 | canister_states/00000000000000640101/0000000000000001_0000_log_memory_store.overlay + // 1 | 331 | 1 | canister_states/00000000000000640101/canister.pbuf + // 2 | 18 | 2 | canister_states/00000000000000640101/software.wasm + // 3 | 0 | N/A | canister_states/00000000000000640101/stable_memory.bin + // 4 | 0 | N/A | canister_states/00000000000000640101/vmemory_0.bin + // 5 | 0 | N/A | canister_states/00000000000000640101/wasm_chunk_store.bin + // 6 | 4132 | 3 | canister_states/00000000000000c80101/0000000000000001_0000_log_memory_store.overlay + // 7 | 331 | 4 | canister_states/00000000000000c80101/canister.pbuf + // 8 | 18 | 5 | canister_states/00000000000000c80101/software.wasm + // 9 | 0 | N/A | canister_states/00000000000000c80101/stable_memory.bin + // 10 | 0 | N/A | canister_states/00000000000000c80101/vmemory_0.bin + // 11 | 0 | N/A | canister_states/00000000000000c80101/wasm_chunk_store.bin + // 12 | 97 | 6 | system_metadata.pbuf // Given the current state layout, the chunk for `software.wasm` of the first canister has the index 1. // If there are changes to the state layout that affect the chunk's position in the chunk table, // the assertion below will panic and we need to adjust the selected chunk id accordingly for this test. - let chunk_table_idx_to_omit = 1; + let chunk_table_idx_to_omit = 2; let chunk_id_to_omit = ChunkId::new(chunk_table_idx_to_omit as u32 + 1); let file_table_idx_to_omit = msg.manifest.chunk_table[chunk_table_idx_to_omit].file_index as usize; diff --git a/rs/types/types/src/canister_log.rs b/rs/types/types/src/canister_log.rs index 9dce1fac3a64..dc5ea109a57a 100644 --- a/rs/types/types/src/canister_log.rs +++ b/rs/types/types/src/canister_log.rs @@ -222,6 +222,11 @@ impl CanisterLog { self.records.bytes_used } + /// Returns true if the canister log is empty. + pub fn is_empty(&self) -> bool { + self.records.bytes_used == 0 + } + /// Returns the remaining space in the canister log buffer. pub fn remaining_bytes(&self) -> usize { let records = &self.records;