diff --git a/remote-config/examples/remote_config_fetch.rs b/remote-config/examples/remote_config_fetch.rs new file mode 100644 index 0000000000..8e59290b5a --- /dev/null +++ b/remote-config/examples/remote_config_fetch.rs @@ -0,0 +1,94 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use datadog_remote_config::fetch::{ConfigInvariants, SingleChangesFetcher}; +use datadog_remote_config::file_change_tracker::{Change, FilePath}; +use datadog_remote_config::file_storage::ParsedFileStorage; +use datadog_remote_config::RemoteConfigProduct::ApmTracing; +use datadog_remote_config::{RemoteConfigData, Target}; +use ddcommon::Endpoint; +use std::time::Duration; +use tokio::time::sleep; + +const RUNTIME_ID: &str = "23e76587-5ae1-410c-a05c-137cae600a10"; +const SERVICE: &str = "testservice"; +const ENV: &str = "testenv"; +const VERSION: &str = "1.2.3"; + +#[tokio::main(flavor = "current_thread")] +async fn main() { + // SingleChangesFetcher is ideal for a single static (runtime_id, service, env, version) tuple + // Otherwise a SharedFetcher (or even a MultiTargetFetcher for a potentially high number of + // targets) for multiple targets is needed. These can be manually wired together with a + // ChangeTracker to keep track of changes. The SingleChangesTracker does it for you. + let mut fetcher = SingleChangesFetcher::new( + // Use SimpleFileStorage if you desire just the raw, unparsed contents + // (e.g. to do processing directly in your language) + // For more complicated use cases, like needing to store data in shared memory, a custom + // FileStorage implementation is recommended + ParsedFileStorage::default(), + Target { + service: SERVICE.to_string(), + env: ENV.to_string(), + app_version: VERSION.to_string(), + }, + RUNTIME_ID.to_string(), + ConfigInvariants { + language: "awesomelang".to_string(), + tracer_version: "99.10.5".to_string(), + endpoint: Endpoint { + url: hyper::Uri::from_static("http://localhost:8126"), + api_key: None, + }, + products: vec![ApmTracing], + capabilities: vec![], + }, + ); + + // Custom timeout, defaults to 5 seconds. + fetcher.set_timeout(2000); + + loop { + match fetcher.fetch_changes().await { + Ok(changes) => { + println!("Got {} changes:", changes.len()); + for change in changes { + match change { + Change::Add(file) => { + println!("Added file: {} (version: {})", file.path(), file.version()); + print_file_contents(&file.contents()); + } + Change::Update(file, _) => { + println!( + "Got update for file: {} (version: {})", + file.path(), + file.version() + ); + print_file_contents(&file.contents()); + } + Change::Remove(file) => { + println!("Removing file {}", file.path()); + } + } + } + } + Err(e) => { + eprintln!("Fetch failed with {e}"); + fetcher.set_last_error(e.to_string()); + } + } + + sleep(Duration::from_nanos(fetcher.get_interval()).max(Duration::from_secs(1))).await; + } +} + +fn print_file_contents(contents: &anyhow::Result) { + match contents { + Ok(data) => { + println!("File contents: {:?}", data); + } + Err(e) => { + println!("Failed parsing file: {:?}", e); + } + } +} diff --git a/remote-config/src/fetch/multitarget.rs b/remote-config/src/fetch/multitarget.rs index 0d9359ca3c..59b27e7255 100644 --- a/remote-config/src/fetch/multitarget.rs +++ b/remote-config/src/fetch/multitarget.rs @@ -334,7 +334,7 @@ where Self::remove_target(self, runtime_id, target); } - fn start_fetcher(self: &Arc, known_target: &mut KnownTarget) { + fn start_fetcher(self: &Arc, known_target: &KnownTarget) { let this = self.clone(); let fetcher = known_target.fetcher.clone(); let status = known_target.status.clone(); diff --git a/remote-config/src/fetch/single.rs b/remote-config/src/fetch/single.rs index 5138b0af1b..bcf0c5344b 100644 --- a/remote-config/src/fetch/single.rs +++ b/remote-config/src/fetch/single.rs @@ -2,15 +2,18 @@ // SPDX-License-Identifier: Apache-2.0 use crate::fetch::{ConfigFetcher, ConfigFetcherState, ConfigInvariants, FileStorage, OpaqueState}; +use crate::file_change_tracker::{Change, ChangeTracker, FilePath, UpdatedFiles}; use crate::Target; +use std::sync::atomic::Ordering; use std::sync::Arc; +/// Simple implementation pub struct SingleFetcher { fetcher: ConfigFetcher, target: Arc, runtime_id: String, - pub config_id: String, - pub last_error: Option, + config_id: String, + last_error: Option, opaque_state: OpaqueState, } @@ -26,6 +29,12 @@ impl SingleFetcher { } } + pub fn with_config_id(mut self, config_id: String) -> Self { + self.config_id = config_id; + self + } + + /// Polls the current runtime config files. pub async fn fetch_once(&mut self) -> anyhow::Result>>> { self.fetcher .fetch_once( @@ -37,4 +46,82 @@ impl SingleFetcher { ) .await } + + /// Timeout after which to report failure, in milliseconds. + pub fn set_timeout(&self, milliseconds: u32) { + self.fetcher.timeout.store(milliseconds, Ordering::Relaxed); + } + + /// Collected interval. May be zero if not provided by the remote config server or fetched yet. + /// Given in nanoseconds. + pub fn get_interval(&self) -> u64 { + self.fetcher.interval.load(Ordering::Relaxed) + } + + /// Sets the error to be reported to the backend. + pub fn set_last_error(&mut self, error: String) { + self.last_error = Some(error); + } + + pub fn get_config_id(&self) -> &String { + &self.config_id + } +} + +pub struct SingleChangesFetcher +where + S::StoredFile: FilePath, +{ + changes: ChangeTracker, + pub fetcher: SingleFetcher, +} + +impl SingleChangesFetcher +where + S::StoredFile: FilePath, +{ + pub fn new(sink: S, target: Target, runtime_id: String, invariants: ConfigInvariants) -> Self { + SingleChangesFetcher { + changes: ChangeTracker::default(), + fetcher: SingleFetcher::new(sink, target, runtime_id, invariants), + } + } + + pub fn with_config_id(mut self, config_id: String) -> Self { + self.fetcher = self.fetcher.with_config_id(config_id); + self + } + + /// Polls for new changes + pub async fn fetch_changes(&mut self) -> anyhow::Result, R>>> + where + S: UpdatedFiles, + { + Ok(match self.fetcher.fetch_once().await? { + None => vec![], + Some(files) => self + .changes + .get_changes(files, self.fetcher.fetcher.file_storage.updated()), + }) + } + + /// Timeout after which to report failure, in milliseconds. + pub fn set_timeout(&self, milliseconds: u32) { + self.fetcher.set_timeout(milliseconds) + } + + /// Collected interval. May be zero if not provided by the remote config server or fetched yet. + /// Given in nanoseconds. + pub fn get_interval(&self) -> u64 { + self.fetcher.get_interval() + } + + /// Sets the error to be reported to the backend. + pub fn set_last_error(&mut self, error: String) { + self.fetcher.set_last_error(error); + } + + pub fn get_config_id(&self) -> &String { + self.fetcher.get_config_id() + } } diff --git a/remote-config/src/file_change_tracker.rs b/remote-config/src/file_change_tracker.rs new file mode 100644 index 0000000000..9600849d2d --- /dev/null +++ b/remote-config/src/file_change_tracker.rs @@ -0,0 +1,78 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::RemoteConfigPath; +use std::collections::HashSet; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +pub trait FilePath { + fn path(&self) -> &RemoteConfigPath; +} + +pub trait UpdatedFiles { + fn updated(&self) -> Vec<(Arc, R)>; +} + +struct FilePathBasedArc(Arc); + +impl Hash for FilePathBasedArc { + fn hash(&self, state: &mut H) { + self.0.path().hash(state) + } +} + +impl PartialEq for FilePathBasedArc { + fn eq(&self, other: &Self) -> bool { + self.0.path() == other.0.path() + } +} + +impl Eq for FilePathBasedArc {} + +pub struct ChangeTracker { + last_files: HashSet>, +} + +impl Default for ChangeTracker { + fn default() -> Self { + ChangeTracker { + last_files: Default::default(), + } + } +} + +pub enum Change { + Add(S), + Update(S, R), + Remove(S), +} + +impl ChangeTracker { + pub fn get_changes( + &mut self, + files: Vec>, + updated: Vec<(Arc, R)>, + ) -> Vec, R>> { + let files = HashSet::from_iter(files.into_iter().map(FilePathBasedArc)); + let mut changes = vec![]; + + for file in files.difference(&self.last_files) { + changes.push(Change::Add(file.0.clone())); + } + + for file in self.last_files.difference(&files) { + changes.push(Change::Remove(file.0.clone())); + } + + for (updated_file, old_contents) in updated.into_iter() { + let file = FilePathBasedArc(updated_file); + if files.contains(&file) { + changes.push(Change::Update(file.0, old_contents)) + } + } + + self.last_files = files; + changes + } +} diff --git a/remote-config/src/file_storage.rs b/remote-config/src/file_storage.rs new file mode 100644 index 0000000000..62f01e2833 --- /dev/null +++ b/remote-config/src/file_storage.rs @@ -0,0 +1,124 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::fetch::FileStorage; +use crate::file_change_tracker::{FilePath, UpdatedFiles}; +use crate::{RemoteConfigData, RemoteConfigPath}; +use std::ops::Deref; +use std::sync::{Arc, Mutex, MutexGuard}; + +/// A trivial local storage for remote config files. +pub struct RawFileStorage { + updated: Mutex>, P)>>, +} + +impl Default for RawFileStorage

{ + fn default() -> Self { + RawFileStorage { + updated: Mutex::default(), + } + } +} + +pub trait ParseFile +where + Self: Sized, +{ + fn parse(path: &RemoteConfigPath, contents: Vec) -> Self; +} + +impl UpdatedFiles, P> for RawFileStorage

{ + fn updated(&self) -> Vec<(Arc>, P)> { + std::mem::take(&mut *self.updated.lock().unwrap()) + } +} + +/// Mutable data: version and contents. +struct RawFileData

{ + version: u64, + contents: P, +} + +/// File contents and file metadata +pub struct RawFile

{ + path: RemoteConfigPath, + data: Mutex>, +} + +pub struct RawFileContentsGuard<'a, P>(MutexGuard<'a, RawFileData

>); + +impl<'a, P> Deref for RawFileContentsGuard<'a, P> { + type Target = P; + + fn deref(&self) -> &Self::Target { + &self.0.contents + } +} + +impl

RawFile

{ + /// Gets the contents behind a Deref impl (guarding a Mutex). + pub fn contents(&self) -> RawFileContentsGuard

{ + RawFileContentsGuard(self.data.lock().unwrap()) + } + + pub fn version(&self) -> u64 { + self.data.lock().unwrap().version + } +} + +impl

FilePath for RawFile

{ + fn path(&self) -> &RemoteConfigPath { + &self.path + } +} + +impl FileStorage for RawFileStorage

{ + type StoredFile = RawFile

; + + fn store( + &self, + version: u64, + path: RemoteConfigPath, + contents: Vec, + ) -> anyhow::Result> { + Ok(Arc::new(RawFile { + data: Mutex::new(RawFileData { + version, + contents: P::parse(&path, contents), + }), + path, + })) + } + + fn update( + &self, + file: &Arc, + version: u64, + contents: Vec, + ) -> anyhow::Result<()> { + let mut contents = P::parse(&file.path, contents); + let mut data = file.data.lock().unwrap(); + std::mem::swap(&mut data.contents, &mut contents); + self.updated.lock().unwrap().push((file.clone(), contents)); + data.version = version; + Ok(()) + } +} + +/// It simply stores the raw remote config file contents. +pub type SimpleFileStorage = RawFileStorage>; + +impl ParseFile for Vec { + fn parse(_path: &RemoteConfigPath, contents: Vec) -> Self { + contents + } +} + +/// Storing the remote config file contents in parsed form +pub type ParsedFileStorage = RawFileStorage>; + +impl ParseFile for anyhow::Result { + fn parse(path: &RemoteConfigPath, contents: Vec) -> Self { + RemoteConfigData::try_parse(path.product, contents.as_slice()) + } +} diff --git a/remote-config/src/lib.rs b/remote-config/src/lib.rs index f6ca1a364f..378ee1d858 100644 --- a/remote-config/src/lib.rs +++ b/remote-config/src/lib.rs @@ -3,6 +3,8 @@ pub mod dynamic_configuration; pub mod fetch; +pub mod file_change_tracker; +pub mod file_storage; mod parse; mod targets;