Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance(arbiter-engine): remove use of Artemis in favor of our own agent design #789

Merged
merged 14 commits into from
Jan 11, 2024
990 changes: 77 additions & 913 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ name = "arbiter"
path = "bin/main.rs"

[workspace.dependencies]
arbiter-bindings = { path = "./arbiter-bindings" }
arbiter-core = { path = "./arbiter-core" }
ethers = { version = "2.0.11" }
serde = { version = "1.0.193", features = ["derive"] }
serde_json = { version = "=1.0.108" }
Expand All @@ -31,11 +33,11 @@ syn = { version = "2.0.43" }
quote = { version = "=1.0.33" }
proc-macro2 = { version = "1.0.76" }
tokio = { version = "1.35.1", features = ["macros", "full"] }
arbiter-core = { path = "./arbiter-core" }
crossbeam-channel = { version = "0.5.11" }
futures-util = { version = "=0.3.30" }
async-trait = { version = "0.1.76" }
tracing = "0.1.40"
async-stream = "0.3.5"

# Dependencies for the release build
[dependencies]
Expand Down
5 changes: 4 additions & 1 deletion arbiter-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ hashbrown = "0.14.3"
bytes = { version = "=1.5.0" }
serde.workspace = true
serde_json.workspace = true
hex = { version = "=0.4.3", default-features = false }

# Concurrency/async
tokio.workspace = true
async-trait.workspace = true
async-stream.workspace = true
crossbeam-channel.workspace = true
futures-timer = { version = "=3.0.2" }
futures-locks = { version = "=0.7.1" }


# Randomness
rand = { version = "=0.8.5" }
rand_distr = { version = "=0.4.3" }
Expand All @@ -47,10 +50,10 @@ polars = { version = "0.36.2", features = ["parquet", "csv", "json"] }
[dev-dependencies]
arbiter-derive = { path = "../arbiter-derive" }
arbiter-bindings = { path = "../arbiter-bindings" }
hex = { version = "=0.4.3", default-features = false }
anyhow = { version = "=1.0.79" }
test-log = { version = "=0.2.14" }
tracing-test = "0.2.4"
tracing-subscriber = "0.3.18"

polars = "0.36.2"
cargo_metadata = "0.18.1"
Expand Down
63 changes: 55 additions & 8 deletions arbiter-core/src/data_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@

use std::{
collections::BTreeMap, fmt::Debug, io::BufWriter, marker::PhantomData, mem::transmute,
sync::Arc,
pin::Pin, sync::Arc,
};

use ethers::{
abi::RawLog,
contract::{builders::Event, EthLogDecode},
core::k256::sha2::{Digest, Sha256},
providers::Middleware,
types::{Filter, FilteredParams},
};
use futures_util::Stream;
use polars::{
io::parquet::ParquetWriter,
prelude::{CsvWriter, DataFrame, NamedFrom, SerWriter},
Expand All @@ -43,7 +45,7 @@ use crate::{
middleware::{cast::revm_logs_to_ethers_logs, errors::RevmMiddlewareError, RevmMiddleware},
};

type FilterDecoder =
pub(crate) type FilterDecoder =
BTreeMap<String, (FilteredParams, Box<dyn Fn(&RawLog) -> String + Send + Sync>)>;
/// `EventLogger` is a struct that logs events from the Ethereum network.
///
Expand Down Expand Up @@ -149,6 +151,25 @@ impl EventLogger {
debug!("`EventLogger` now provided with event labeled: {:?}", name);
self
}

/// Adds an event to the `EventLogger` and generates a unique ID for the
/// event since we don't need to name events that are solely streamed and
/// not stored.
pub fn add_stream<D: EthLogDecode + Debug + Serialize + 'static>(
self,
event: Event<Arc<RevmMiddleware>, RevmMiddleware, D>,
) -> Self {
let mut hasher = Sha256::new();
hasher.update(
serde_json::to_string(&event.filter)
.map_err(RevmMiddlewareError::Json)
.unwrap(),
);
let hash = hasher.finalize();
let id = hex::encode(hash);
self.add(event, id)
}

/// Sets the directory for the `EventLogger`.
///
/// # Arguments
Expand Down Expand Up @@ -342,6 +363,35 @@ impl EventLogger {
});
Ok(())
}

/// Returns a stream of the serialized events.
pub fn stream(self) -> Pin<Box<dyn Stream<Item = String> + Send + 'static>> {
let receiver = self.receiver.clone().unwrap();

let stream = async_stream::stream! {
while let Ok(broadcast) = receiver.recv() {
match broadcast {
Broadcast::StopSignal => {
trace!("`EventLogger` has seen a stop signal");
break;
}
Broadcast::Event(event) => {
trace!("`EventLogger` received an event");
let ethers_logs = revm_logs_to_ethers_logs(event);
for log in ethers_logs {
for (_id, (filter, decoder)) in self.decoder.iter() {
if filter.filter_address(&log) && filter.filter_topics(&log) {
yield decoder(&log.clone().into());
}
}
}
}
}
}
};

Box::pin(stream)
}
}

fn flatten_to_data_frame(events: BTreeMap<String, BTreeMap<String, Vec<Value>>>) -> DataFrame {
Expand All @@ -361,17 +411,14 @@ fn flatten_to_data_frame(events: BTreeMap<String, BTreeMap<String, Vec<Value>>>)
}

// 2. Convert the vectors into a DataFrame
let df = DataFrame::new(vec![
DataFrame::new(vec![
Series::new("contract_name", contract_names),
Series::new("event_name", event_names),
Series::new("event_value", event_values),
])
.unwrap();
println!("{:?}", df);

df
.unwrap()
}
struct EventTransmuted<B, M, D> {
pub(crate) struct EventTransmuted<B, M, D> {
/// The event filter's state
pub filter: Filter,
pub(crate) provider: B,
Expand Down
74 changes: 50 additions & 24 deletions arbiter-core/src/tests/data_collection_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,32 @@ struct MockMetadata {
pub name: String,
}

async fn generate_events(
arbx: ArbiterToken<RevmMiddleware>,
arby: ArbiterToken<RevmMiddleware>,
lex: LiquidExchange<RevmMiddleware>,
client: Arc<RevmMiddleware>,
) -> Result<(), RevmMiddlewareError> {
for _ in 0..5 {
arbx.approve(client.address(), U256::from(1))
.send()
.await
.unwrap()
.await?;
arby.approve(client.address(), U256::from(1))
.send()
.await
.unwrap()
.await?;
lex.set_price(U256::from(10u128.pow(18)))
.send()
.await
.unwrap()
.await?;
}
Ok(())
}

#[tokio::test]
async fn data_capture() {
let (env, client) = startup_user_controlled().unwrap();
Expand Down Expand Up @@ -70,28 +96,28 @@ async fn data_capture() {
std::fs::remove_dir_all("./data").unwrap();
}

async fn generate_events(
arbx: ArbiterToken<RevmMiddleware>,
arby: ArbiterToken<RevmMiddleware>,
lex: LiquidExchange<RevmMiddleware>,
client: Arc<RevmMiddleware>,
) -> Result<(), RevmMiddlewareError> {
for _ in 0..5 {
arbx.approve(client.address(), U256::from(1))
.send()
.await
.unwrap()
.await?;
arby.approve(client.address(), U256::from(1))
.send()
.await
.unwrap()
.await?;
lex.set_price(U256::from(10u128.pow(18)))
.send()
.await
.unwrap()
.await?;
}
Ok(())
#[tokio::test]
async fn data_stream() {
std::env::set_var("RUST_LOG", "trace");
tracing_subscriber::fmt::init();
let (env, client) = startup_user_controlled().unwrap();
let (arbx, arby, lex) = deploy_liquid_exchange(client.clone()).await.unwrap();
println!("Deployed contracts");

// default_listener
let streamer = EventLogger::builder()
.add_stream(arbx.events())
.add_stream(arby.events())
.add_stream(lex.events())
.stream();

generate_events(arbx, arby, lex, client.clone())
.await
.unwrap_or_else(|e| {
panic!("Error generating events: {}", e);
});
panic!("This test is not complete");
let stream_buffer = streamer.enumerate().collect::<Vec<_>>().await;
println!("Buffer: {:?}", stream_buffer);
let _ = env.stop();
}
7 changes: 4 additions & 3 deletions arbiter-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ readme = "../README.md"

[dependencies]
ethers.workspace = true
artemis-core = { git = "https://github.com/paradigmxyz/artemis.git" }
# artemis-core = { path = "../../../artemis/crates/artemis-core" }
futures-util.workspace = true
async-trait.workspace = true
serde_json.workspace = true
serde.workspace = true
tokio.workspace = true
async-stream.workspace = true
anyhow = { version = "=1.0.79" }
tracing.workspace = true
async-stream = "0.3.5"
tokio-stream = "0.1.14"
async-broadcast = "0.6.0"
futures = "0.3.30"
crossbeam-channel.workspace = true
arbiter-core.workspace = true
arbiter-bindings.workspace = true

[dev-dependencies]
arbiter-core.workspace = true
Expand Down
Loading
Loading