From 3c7995d25780c1f963fb8ccf405c4d576e04f3b4 Mon Sep 17 00:00:00 2001 From: Anubhav Pandey <73362441+anubhav-pandey1@users.noreply.github.com> Date: Mon, 26 Feb 2024 22:06:05 +0530 Subject: [PATCH] Fix concurrent bulk generation issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The root cause of the issue with concurrent bulk generation of Snowflake IDs resulting in duplicate IDs lies in how the sequence variable is being managed within the Snowflake structure in a Rust environment. I think the problem arises due to the lack of synchronisation mechanisms around the access and update of shared state—in this case, the sequence and last_timestamp fields of the Snowflake struct—when accessed by multiple threads. Why Does This Happen? In a concurrent environment, multiple threads might call the get_unique_id method on the same Snowflake instance at the same microsecond. Since the current implementation does not include any form of locking or synchronisation, there's a race condition on the sequence field: multiple threads read the same last_timestamp, see that it hasn't changed, and then concurrently attempt to increment the sequence. However, without proper synchronisation, they might not see each other's updates, resulting in the same sequence value being used for multiple IDs. To fix this, we need to introduce thread-safety into the ID generation process to ensure that concurrent accesses to the sequence and last_timestamp fields are correctly synchronised. In Rust, this can be achieved using synchronisation primitives from the std::sync module with Mutex or Atomic types. Given that the performance of the ID generation is critical and must be high-throughput, using atomic operations is preferable because they incur less overhead than a mutex lock. --- src/lib.rs | 142 +++++++++++++++++++++++++++-------------------------- 1 file changed, 73 insertions(+), 69 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e75018a..56da037 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,9 @@ mod utils; -use crate::utils::{current_time, random}; - -use serde::{Serialize, Deserialize}; +use crate::utils::{ current_time, random }; +use serde::{ Serialize, Deserialize }; use wasm_bindgen::prelude::*; +use std::sync::atomic::{ AtomicU64, AtomicU16, Ordering }; //// GLOBAL CONSTANTS /////////////////////////////////////////////////////////////////////// @@ -28,7 +28,8 @@ const MAX_SEQUENCE: u16 = (1 << SEQUENCE_BITS) - 1; //// CUSTOM TYPESCRIPT EXPORTS ////////////////////////////////////////////////////////////// #[wasm_bindgen(typescript_custom_section)] -const CUSTOM_TS: &'static str = r#" +const CUSTOM_TS: &'static str = + r#" export interface SnowflakeOpts { custom_epoch?: number; instance_id?: number; @@ -58,9 +59,9 @@ pub struct SnowflakeConfig { #[wasm_bindgen] #[derive(Debug)] pub struct Snowflake { - last_timestamp: u64, + last_timestamp: AtomicU64, custom_epoch: u64, - sequence: u16, + sequence: AtomicU16, instance_id: u16, } @@ -70,79 +71,82 @@ impl Snowflake { /// Constructs a Snowflake object which stores method for generation /// of a unique 64 bit time sortable ID pub fn new(opts: Option) -> Result { - match opts { - Some(opts) => { - match opts.into_serde::() { - Ok(opts) => { - let epoch = opts.custom_epoch.unwrap_or_else(||current_time(0)); - let instance_id = opts.instance_id.unwrap_or_else(||random(MAX_INSTANCE_ID as f64) as u16); - - // If passed instance ID is greater than the max then return error - if instance_id > MAX_INSTANCE_ID { - return Err(JsValue::from_str(&format!("instance_id must be between 0 and {}", MAX_INSTANCE_ID))); - } - - Ok(Self { - last_timestamp: 0, - custom_epoch: epoch, - sequence: 0, - instance_id: instance_id & MAX_INSTANCE_ID, - }) - } - Err(_) => { - Err(JsValue::from_str("[NATIVE]: failed to parse object into SnowflakeOpts")) - } - } - } - None => { - Ok(Self{ - last_timestamp: 0, - custom_epoch: current_time(0), - sequence: 0, - instance_id: random(MAX_INSTANCE_ID as f64) as u16, - }) - } + let custom_epoch = opts + .as_ref() + .and_then(|o| { o.into_serde::().ok() }) + .and_then(|cfg| cfg.custom_epoch) + .unwrap_or_else(|| current_time(0)); + + let instance_id = opts + .as_ref() + .and_then(|o| { o.into_serde::().ok() }) + .and_then(|cfg| cfg.instance_id) + .unwrap_or_else(|| random(MAX_INSTANCE_ID as f64) as u16); + + // If passed instance ID is greater than the max then return error + if instance_id > MAX_INSTANCE_ID { + return Err( + JsValue::from_str(&format!("instance_id must be between 0 and {}", MAX_INSTANCE_ID)) + ); } + + Ok(Self { + last_timestamp: AtomicU64::new(0), + custom_epoch, + sequence: AtomicU16::new(0), + instance_id: instance_id & MAX_INSTANCE_ID, + }) } #[wasm_bindgen(js_name = getUniqueID)] - /// getUniqueID generates a 64 bit unique ID + /// getUniqueID generates a 64 bit unique ID /// /// NOTE: This method is blocking in nature, the function also /// has theorotical limit of generating 1,024,000 IDs/sec - pub fn get_unique_id(&mut self) -> u64 { - let mut current_timestamp = current_time(self.custom_epoch); - - if current_timestamp == self.last_timestamp { - self.sequence = (self.sequence + 1) & MAX_SEQUENCE; - - // If we have exhausted all of the sequence number as well - if self.sequence == 0 { - // Wait for roughly a millisecond - while current_time(self.custom_epoch) - current_timestamp < 1 {} - - // Update timestamp by one - current_timestamp += 1; + pub fn get_unique_id(&self) -> u64 { + let mut current_timestamp; + loop { + current_timestamp = current_time(self.custom_epoch); + let last_timestamp = self.last_timestamp.load(Ordering::SeqCst); + + if current_timestamp == last_timestamp { + let sequence = self.sequence.fetch_add(1, Ordering::SeqCst) & MAX_SEQUENCE; + if sequence == 0 { + // If sequence is 0, it means we've exhausted the sequence numbers for this timestamp + // Wait until the next millisecond to get a new timestamp + while current_time(self.custom_epoch) == last_timestamp {} + continue; + } + break; + } else if + self.last_timestamp + .compare_exchange( + last_timestamp, + current_timestamp, + Ordering::SeqCst, + Ordering::Relaxed + ) + .is_ok() + { + // Successfully moved to new timestamp, reset sequence + self.sequence.store(0, Ordering::SeqCst); + break; } - } else { - // Reset the sequence - self.sequence = 0; } - self.last_timestamp = current_timestamp; - - let mut id: u64 = current_timestamp << (TOTAL_BITS - EPOCH_BITS); - id |= (self.instance_id as u64) << (TOTAL_BITS - EPOCH_BITS - INSTANCE_ID_BITS); - id |= self.sequence as u64; + let id = + (current_timestamp << (TOTAL_BITS - EPOCH_BITS)) | + ((self.instance_id as u64) << (TOTAL_BITS - EPOCH_BITS - INSTANCE_ID_BITS)) | + (self.sequence.load(Ordering::SeqCst) as u64); id } #[wasm_bindgen(js_name = idFromTimestamp)] - /// idFromTimestamp takes a UNIX timestamp without any offset + /// idFromTimestamp takes a UNIX timestamp without any offset /// and returns an ID that has timestamp set to the given timestamp pub fn id_from_timestamp(&self, timestamp: f64) -> u64 { - let timestamp = timestamp.round() as u64 - self.custom_epoch; + let timestamp = (timestamp.round() as u64) - self.custom_epoch; let mut id: u64 = timestamp << (TOTAL_BITS - EPOCH_BITS); id |= u64::from(self.instance_id) << (TOTAL_BITS - EPOCH_BITS - INSTANCE_ID_BITS); @@ -151,13 +155,13 @@ impl Snowflake { } #[wasm_bindgen(js_name = instanceID)] - /// instanceID returns the current node id + /// instanceID returns the current node id pub fn instance_id(&self) -> f64 { self.instance_id as f64 } #[wasm_bindgen(js_name = customEpoch)] - /// customEpoch returns the current custom epoch + /// customEpoch returns the current custom epoch pub fn custom_epoch(&self) -> f64 { self.custom_epoch as f64 } @@ -166,19 +170,19 @@ impl Snowflake { #[wasm_bindgen] impl Snowflake { #[wasm_bindgen(js_name = timestampFromID)] - /// timestampFromID takes a unique ID and returns the timestamp + /// timestampFromID takes a unique ID and returns the timestamp /// when the Unique ID was created pub fn timestamp_from_id(unique_id: u64, epoch_offset: f64) -> f64 { ((unique_id >> (TOTAL_BITS - EPOCH_BITS)) as f64) + (epoch_offset as f64) } #[wasm_bindgen(js_name = instanceIDFromID)] - /// instanceIDFromID takes a unique ID and returns the instance + /// instanceIDFromID takes a unique ID and returns the instance /// ID where the unique ID was created /// /// NOTE: The unique ID could be created on ANY instance pub fn instance_id_from_id(unique_id: u64) -> i32 { - let bits = TOTAL_BITS - INSTANCE_ID_BITS - SEQUENCE_BITS; - ((unique_id << bits) >> (bits + SEQUENCE_BITS)) as i32 + let bits = TOTAL_BITS - EPOCH_BITS - INSTANCE_ID_BITS; + ((unique_id >> bits) & ((1 << INSTANCE_ID_BITS) - 1)) as i32 } -} \ No newline at end of file +}