Skip to content

Commit

Permalink
Make remote config usable in process, without manual FileStorage inte…
Browse files Browse the repository at this point in the history
…raction

Signed-off-by: Bob Weinand <[email protected]>
  • Loading branch information
bwoebi committed Jun 25, 2024
1 parent 98cef5d commit 95c7998
Show file tree
Hide file tree
Showing 6 changed files with 388 additions and 3 deletions.
94 changes: 94 additions & 0 deletions remote-config/examples/remote_config_fetch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use datadog_remote_config::fetch::{ConfigInvariants, SingleChangesFetcher};
use datadog_remote_config::file_change_tracker::{Change, FilePath};
use datadog_remote_config::file_storage::ParsedFileStorage;
use datadog_remote_config::RemoteConfigProduct::ApmTracing;
use datadog_remote_config::{RemoteConfigData, Target};
use ddcommon::Endpoint;
use std::time::Duration;
use tokio::time::sleep;

const RUNTIME_ID: &str = "23e76587-5ae1-410c-a05c-137cae600a10";
const SERVICE: &str = "testservice";
const ENV: &str = "testenv";
const VERSION: &str = "1.2.3";

#[tokio::main(flavor = "current_thread")]
async fn main() {
// SingleChangesFetcher is ideal for a single static (runtime_id, service, env, version) tuple
// Otherwise a SharedFetcher (or even a MultiTargetFetcher for a potentially high number of
// targets) for multiple targets is needed. These can be manually wired together with a
// ChangeTracker to keep track of changes. The SingleChangesTracker does it for you.
let mut fetcher = SingleChangesFetcher::new(
// Use SimpleFileStorage if you desire just the raw, unparsed contents
// (e.g. to do processing directly in your language)
// For more complicated use cases, like needing to store data in shared memory, a custom
// FileStorage implementation is recommended
ParsedFileStorage::default(),
Target {
service: SERVICE.to_string(),
env: ENV.to_string(),
app_version: VERSION.to_string(),
},
RUNTIME_ID.to_string(),
ConfigInvariants {
language: "awesomelang".to_string(),
tracer_version: "99.10.5".to_string(),
endpoint: Endpoint {
url: hyper::Uri::from_static("http://localhost:8126"),
api_key: None,
},
products: vec![ApmTracing],
capabilities: vec![],
},
);

// Custom timeout, defaults to 5 seconds.
fetcher.set_timeout(2000);

loop {
match fetcher.fetch_changes().await {
Ok(changes) => {
println!("Got {} changes:", changes.len());
for change in changes {
match change {
Change::Add(file) => {
println!("Added file: {} (version: {})", file.path(), file.version());
print_file_contents(&file.contents());
}
Change::Update(file, _) => {
println!(
"Got update for file: {} (version: {})",
file.path(),
file.version()
);
print_file_contents(&file.contents());
}
Change::Remove(file) => {
println!("Removing file {}", file.path());
}
}
}
}
Err(e) => {
eprintln!("Fetch failed with {e}");
fetcher.set_last_error(e.to_string());
}
}

sleep(Duration::from_nanos(fetcher.get_interval()).max(Duration::from_secs(1))).await;
}
}

fn print_file_contents(contents: &anyhow::Result<RemoteConfigData>) {
match contents {
Ok(data) => {
println!("File contents: {:?}", data);
}
Err(e) => {
println!("Failed parsing file: {:?}", e);
}
}
}
2 changes: 1 addition & 1 deletion remote-config/src/fetch/multitarget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ where
Self::remove_target(self, runtime_id, target);
}

fn start_fetcher(self: &Arc<Self>, known_target: &mut KnownTarget) {
fn start_fetcher(self: &Arc<Self>, known_target: &KnownTarget) {
let this = self.clone();
let fetcher = known_target.fetcher.clone();
let status = known_target.status.clone();
Expand Down
91 changes: 89 additions & 2 deletions remote-config/src/fetch/single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@
// SPDX-License-Identifier: Apache-2.0

use crate::fetch::{ConfigFetcher, ConfigFetcherState, ConfigInvariants, FileStorage, OpaqueState};
use crate::file_change_tracker::{Change, ChangeTracker, FilePath, UpdatedFiles};
use crate::Target;
use std::sync::atomic::Ordering;
use std::sync::Arc;

/// Simple implementation
pub struct SingleFetcher<S: FileStorage> {
fetcher: ConfigFetcher<S>,
target: Arc<Target>,
runtime_id: String,
pub config_id: String,
pub last_error: Option<String>,
config_id: String,
last_error: Option<String>,
opaque_state: OpaqueState,
}

Expand All @@ -26,6 +29,12 @@ impl<S: FileStorage> SingleFetcher<S> {
}
}

pub fn with_config_id(mut self, config_id: String) -> Self {
self.config_id = config_id;
self
}

/// Polls the current runtime config files.
pub async fn fetch_once(&mut self) -> anyhow::Result<Option<Vec<Arc<S::StoredFile>>>> {
self.fetcher
.fetch_once(
Expand All @@ -37,4 +46,82 @@ impl<S: FileStorage> SingleFetcher<S> {
)
.await
}

/// Timeout after which to report failure, in milliseconds.
pub fn set_timeout(&self, milliseconds: u32) {
self.fetcher.timeout.store(milliseconds, Ordering::Relaxed);
}

/// Collected interval. May be zero if not provided by the remote config server or fetched yet.
/// Given in nanoseconds.
pub fn get_interval(&self) -> u64 {
self.fetcher.interval.load(Ordering::Relaxed)
}

/// Sets the error to be reported to the backend.
pub fn set_last_error(&mut self, error: String) {
self.last_error = Some(error);
}

pub fn get_config_id(&self) -> &String {
&self.config_id
}
}

pub struct SingleChangesFetcher<S: FileStorage>
where
S::StoredFile: FilePath,
{
changes: ChangeTracker<S::StoredFile>,
pub fetcher: SingleFetcher<S>,
}

impl<S: FileStorage> SingleChangesFetcher<S>
where
S::StoredFile: FilePath,
{
pub fn new(sink: S, target: Target, runtime_id: String, invariants: ConfigInvariants) -> Self {
SingleChangesFetcher {
changes: ChangeTracker::default(),
fetcher: SingleFetcher::new(sink, target, runtime_id, invariants),
}
}

pub fn with_config_id(mut self, config_id: String) -> Self {
self.fetcher = self.fetcher.with_config_id(config_id);
self
}

/// Polls for new changes
pub async fn fetch_changes<R>(&mut self) -> anyhow::Result<Vec<Change<Arc<S::StoredFile>, R>>>
where
S: UpdatedFiles<S::StoredFile, R>,
{
Ok(match self.fetcher.fetch_once().await? {
None => vec![],
Some(files) => self
.changes
.get_changes(files, self.fetcher.fetcher.file_storage.updated()),
})
}

/// Timeout after which to report failure, in milliseconds.
pub fn set_timeout(&self, milliseconds: u32) {
self.fetcher.set_timeout(milliseconds)
}

/// Collected interval. May be zero if not provided by the remote config server or fetched yet.
/// Given in nanoseconds.
pub fn get_interval(&self) -> u64 {
self.fetcher.get_interval()
}

/// Sets the error to be reported to the backend.
pub fn set_last_error(&mut self, error: String) {
self.fetcher.set_last_error(error);
}

pub fn get_config_id(&self) -> &String {
self.fetcher.get_config_id()
}
}
78 changes: 78 additions & 0 deletions remote-config/src/file_change_tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::RemoteConfigPath;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

pub trait FilePath {
fn path(&self) -> &RemoteConfigPath;
}

pub trait UpdatedFiles<S: FilePath, R> {
fn updated(&self) -> Vec<(Arc<S>, R)>;
}

struct FilePathBasedArc<S: FilePath>(Arc<S>);

impl<S: FilePath> Hash for FilePathBasedArc<S> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.path().hash(state)
}
}

impl<S: FilePath> PartialEq for FilePathBasedArc<S> {
fn eq(&self, other: &Self) -> bool {
self.0.path() == other.0.path()
}
}

impl<S: FilePath> Eq for FilePathBasedArc<S> {}

pub struct ChangeTracker<S: FilePath> {
last_files: HashSet<FilePathBasedArc<S>>,
}

impl<S: FilePath> Default for ChangeTracker<S> {
fn default() -> Self {
ChangeTracker {
last_files: Default::default(),
}
}
}

pub enum Change<S, R> {
Add(S),
Update(S, R),
Remove(S),
}

impl<S: FilePath> ChangeTracker<S> {
pub fn get_changes<R>(
&mut self,
files: Vec<Arc<S>>,
updated: Vec<(Arc<S>, R)>,
) -> Vec<Change<Arc<S>, R>> {
let files = HashSet::from_iter(files.into_iter().map(FilePathBasedArc));
let mut changes = vec![];

for file in files.difference(&self.last_files) {
changes.push(Change::Add(file.0.clone()));
}

for file in self.last_files.difference(&files) {
changes.push(Change::Remove(file.0.clone()));
}

for (updated_file, old_contents) in updated.into_iter() {
let file = FilePathBasedArc(updated_file);
if files.contains(&file) {
changes.push(Change::Update(file.0, old_contents))
}
}

self.last_files = files;
changes
}
}
Loading

0 comments on commit 95c7998

Please sign in to comment.