Skip to content

Commit

Permalink
Merge branch 'main' into arbiter-core/db-backend
Browse files Browse the repository at this point in the history
  • Loading branch information
Autoparallel committed Jan 3, 2024
2 parents 20459d2 + 05c0cc8 commit 7a906c1
Show file tree
Hide file tree
Showing 30 changed files with 925 additions and 328 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ name: lint
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches: [main]

jobs:
fmt:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/release-plz.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ jobs:
git config --local user.email "[email protected]"
git config --local user.name "GitHub Action"
git add Cargo.lock
git commit -m "Update Cargo.lock" || echo "No changes to commit"
git commit -m "Update Cargo.lock" || (git add Cargo.lock && git commit -m "Update Cargo.lock") || echo "No changes to commit"
- name: Run release-plz
uses: MarcoIeni/[email protected]
uses: MarcoIeni/[email protected].33
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ proc-macro2 = { version = "1.0.70" }
tokio = { version = "1.35.0", features = ["macros", "full"] }
arbiter-core = { path = "./arbiter-core" }
crossbeam-channel = { version = "0.5.9" }
futures-util = { version = "=0.3.29" }
async-trait = { version = "0.1.74" }
futures-util = { version = "=0.3.30" }
async-trait = { version = "0.1.76" }
tracing = "0.1.40"

# Dependencies for the release build
[dependencies]
arbiter-core.workspace = true

# Command line and config
clap = { version = "=4.4.11", features = ["derive"] }
clap = { version = "=4.4.12", features = ["derive"] }
serde.workspace = true
serde_json.workspace = true
config = { version = "=0.13.4" }
Expand All @@ -56,11 +56,10 @@ Inflector = { version = "=0.11.4" }
# Building files
quote.workspace = true
foundry-config = { version = "=0.2.0" }
tempfile = { version = "3.8.1"}
tempfile = { version = "3.9.0"}

# Errors
thiserror.workspace = true

# Dependencies for the test build and development
[dev-dependencies]
tokio.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion arbiter-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ polars = { version = "0.35.4", features = ["parquet", "csv", "json"] }
arbiter-derive = { path = "../arbiter-derive" }
arbiter-bindings = { path = "../arbiter-bindings" }
hex = { version = "=0.4.3", default-features = false }
anyhow = { version = "1.0.76" }
anyhow = { version = "=1.0.79" }
test-log = { version = "=0.2.14" }
tracing-test = "0.2.4"

Expand Down
2 changes: 1 addition & 1 deletion arbiter-core/src/middleware/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::environment::{EventBroadcaster, InstructionSender, OutcomeReceiver, O

/// Represents a connection to the EVM contained in the corresponding
/// [`Environment`].
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Connection {
/// Used to send calls and transactions to the [`Environment`] to be
/// executed by `revm`.
Expand Down
40 changes: 35 additions & 5 deletions arbiter-core/src/middleware/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ use ethers::{
ecdsa::SigningKey,
sha2::{Digest, Sha256},
},
JsonRpcClient, ProviderError,
ProviderError,
},
providers::{
FilterKind, FilterWatcher, Middleware, MiddlewareError, PendingTransaction, Provider,
PubsubClient, SubscriptionStream,
FilterKind, FilterWatcher, JsonRpcClient, Middleware, MiddlewareError, PendingTransaction,
Provider, PubsubClient, SubscriptionStream,
},
signers::{Signer, Wallet},
types::{
Expand All @@ -39,9 +39,11 @@ use ethers::{
},
};
use futures_timer::Delay;
use futures_util::Stream;
use rand::{rngs::StdRng, SeedableRng};
use revm::primitives::{CreateScheme, Output, TransactTo, TxEnv, U256};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::value::RawValue;
use thiserror::Error;

use super::*;
Expand Down Expand Up @@ -92,7 +94,7 @@ pub mod nonce_middleware;
/// Use a seed like `Some("test_label")` for maintaining a
/// consistent address across simulations and client labeling. Seeding is be
/// useful for debugging and post-processing.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct RevmMiddleware {
provider: Provider<Connection>,
wallet: EOA,
Expand All @@ -101,9 +103,37 @@ pub struct RevmMiddleware {
pub label: Option<String>,
}

#[async_trait::async_trait]
impl JsonRpcClient for RevmMiddleware {
type Error = ProviderError;
async fn request<T: Serialize + Send + Sync + Debug, R: DeserializeOwned + Send>(
&self,
method: &str,
params: T,
) -> Result<R, ProviderError> {
self.provider().as_ref().request(method, params).await
}
}

#[async_trait::async_trait]
impl PubsubClient for RevmMiddleware {
type NotificationStream = Pin<Box<dyn Stream<Item = Box<RawValue>> + Send>>;

fn subscribe<T: Into<ethers::types::U256>>(
&self,
id: T,
) -> Result<Self::NotificationStream, Self::Error> {
self.provider().as_ref().subscribe(id)
}

fn unsubscribe<T: Into<ethers::types::U256>>(&self, id: T) -> Result<(), Self::Error> {
self.provider.as_ref().unsubscribe(id)
}
}

/// A wrapper enum for the two types of accounts that can be used with the
/// middleware.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum EOA {
/// The [`Forked`] variant is used for the forked EOA,
/// allowing us to treat them as mock accounts that we can still authorize
Expand Down
2 changes: 0 additions & 2 deletions arbiter-core/src/tests/contracts.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use chrono::format::parse;

use super::*;

#[tokio::test]
Expand Down
4 changes: 1 addition & 3 deletions arbiter-core/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use std::{str::FromStr, sync::Arc};

use anyhow::Result;
use arbiter_bindings::bindings::{
arbiter_math::ArbiterMath,
arbiter_token::{self, ArbiterToken},
liquid_exchange::LiquidExchange,
arbiter_math::ArbiterMath, arbiter_token::ArbiterToken, liquid_exchange::LiquidExchange,
};
use ethers::{
prelude::{
Expand Down
16 changes: 10 additions & 6 deletions arbiter-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,22 @@ readme = "../README.md"

[dependencies]
ethers.workspace = true
arbiter-core.workspace = true
arbiter-bindings = { path = "../arbiter-bindings" }
artemis-core = { git = "https://github.com/paradigmxyz/artemis.git" }
crossbeam-channel.workspace = true
# artemis-core = { path = "../../../artemis/crates/artemis-core" }
futures-util.workspace = true
async-trait.workspace = true
serde_json.workspace = true
serde.workspace = true
tokio.workspace = true
anyhow = { version = "1.0.76" }
anyhow = { version = "=1.0.79" }
tracing.workspace = true
flume = "0.11.0"
async-stream = "0.3.5"
tokio-stream = "0.1.14"
async-broadcast = "0.6.0"
futures = "0.3.29"

[dev-dependencies]
tracing-subscriber = "0.3.18"
arbiter-core.workspace = true
arbiter-bindings = { path = "../arbiter-bindings" }
tracing-subscriber = "0.3.18"
tracing-test = "0.2.4"
135 changes: 100 additions & 35 deletions arbiter-engine/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,34 @@
// Need an init signal or something.
// We can give agents a "calculator" evm to send "Actions" to when they are just
// doing compute so they aren't blocking the main tx thread.
// Maybe by default we should give agents a messager as part of their engine so we can call a
// "start" and "stop" with them.
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

//! The agent module contains the core agent abstraction for the Arbiter Engine.
use std::{error::Error, pin::Pin};

use artemis_core::{
engine::Engine,
types::{Collector, Executor, Strategy},
};
use futures_util::Future;
use tokio::task::JoinSet;

/// An agent is an entity capable of processing events and producing actions.
/// These are the core actors in simulations or in onchain systems.
/// Agents can be connected of other agents either as a dependent, or a
/// dependency.
pub struct Agent<E, A> {
pub struct Agent {
/// Identifier for this agent.
/// Used for routing messages.
pub id: String,

/// The engine that this agent uses to process events and produce actions.
pub(crate) engine: Option<Engine<E, A>>, /* Note, agent shouldn't NEED a client as a field
* as the engine can
* handle this. */
pub(crate) behaviors: Vec<Behavior>, /* Note, agent shouldn't NEED a client as a field
* as the engine can
* handle this. */

/// Agents that this agent depends on.
pub dependencies: Vec<String>,
Expand All @@ -36,44 +42,28 @@ pub struct Agent<E, A> {
pub dependents: Vec<String>,
}

impl<E, A> Agent<E, A>
where
E: Send + Clone + 'static + std::fmt::Debug,
A: Send + Clone + 'static + std::fmt::Debug,
{
impl Agent {
#[allow(clippy::new_without_default)]
/// Produces a new agent with the given identifier.
pub fn new(id: &str) -> Self {
Self {
id: id.to_owned(),
engine: Some(Engine::new()),
behaviors: vec![],
dependencies: vec![],
dependents: vec![],
}
}

/// Adds a collector to the agent's engine.
pub fn add_collector(&mut self, collector: impl Collector<E> + 'static) {
self.engine
.as_mut()
.expect("Engine has already been taken by the `World::run()` method.")
.add_collector(Box::new(collector));
}

/// Adds an executor to the agent's engine.
pub fn add_executor(&mut self, executor: impl Executor<A> + 'static) {
self.engine
.as_mut()
.expect("Engine has already been taken by the `World::run()` method.")
.add_executor(Box::new(executor));
}

/// Adds a strategy to the agent's engine.
pub fn add_strategy(&mut self, strategy: impl Strategy<E, A> + 'static) {
self.engine
.as_mut()
.expect("Engine has already been taken by the `World::run()` method.")
.add_strategy(Box::new(strategy));
/// Does so by pushing a future onto the agent's behavior vector.
/// This future returns the `JoinSet<()>` of the engine.
pub fn add_behavior<E, A>(&mut self, engine: Engine<E, A>)
where
E: Send + Clone + 'static + std::fmt::Debug,
A: Send + Clone + 'static + std::fmt::Debug,
{
let fut = engine.run();
self.behaviors.push(Box::pin(fut));
}

/// Adds a dependency to the agent.
Expand All @@ -87,6 +77,77 @@ where
pub fn add_dependent(&mut self, dependent: &str) {
self.dependents.push(dependent.to_owned());
}

pub(crate) async fn run(&mut self) -> Vec<JoinSet<()>> {
let mut join_sets = vec![];
for behavior in self.behaviors.iter_mut() {
let joinset = behavior.await.unwrap();
join_sets.push(joinset);
}
join_sets
}
}

type Behavior = Pin<Box<dyn Future<Output = Result<JoinSet<()>, Box<dyn Error>>> + Send>>;

/// A behavior builder is used to build an agent's behavior.
/// A behavior is meant to be quite simple, and is composed of a collector, an
/// executor, and a strategy that uses those two components.
/// Agents can have multiple simple behaviors to allow for the agent to have an
/// emergent complex set of actions.
pub struct BehaviorBuilder<E, A> {
collectors: Vec<Box<dyn Collector<E>>>,
executors: Vec<Box<dyn Executor<A>>>,
strategies: Vec<Box<dyn Strategy<E, A>>>,
}

impl<E, A> BehaviorBuilder<E, A>
where
E: Send + Clone + 'static + std::fmt::Debug,
A: Send + Clone + 'static + std::fmt::Debug,
{
/// Creates a new behavior builder.
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self {
collectors: vec![],
executors: vec![],
strategies: vec![],
}
}

/// Adds a collector to the behavior.
pub fn add_collector(mut self, collector: impl Collector<E> + 'static) -> Self {
self.collectors.push(Box::new(collector));
self
}

/// Adds an executor to the behavior.
pub fn add_executor(mut self, executor: impl Executor<A> + 'static) -> Self {
self.executors.push(Box::new(executor));
self
}

/// Adds a strategy to the behavior.
pub fn add_strategy(mut self, strategy: impl Strategy<E, A> + 'static) -> Self {
self.strategies.push(Box::new(strategy));
self
}

/// Builds the behavior.
pub fn build(self) -> Engine<E, A> {
let mut engine = Engine::new();
for collector in self.collectors {
engine.add_collector(collector);
}
for executor in self.executors {
engine.add_executor(executor);
}
for strategy in self.strategies {
engine.add_strategy(strategy);
}
engine
}
}

#[cfg(test)]
Expand Down Expand Up @@ -118,10 +179,14 @@ mod tests {

// Build the agent
let mut agent = Agent::new("test");
let collector = LogCollector::new(client.clone(), arb.transfer_filter().filter);
agent.add_collector(collector);
let executor = MempoolExecutor::new(client.clone());
agent.add_executor(executor);
let behavior = BehaviorBuilder::new()
.add_collector(LogCollector::new(
client.clone(),
arb.transfer_filter().filter,
))
.add_executor(MempoolExecutor::new(client.clone()))
.build();
agent.add_behavior(behavior);

let tx = arb.mint(client.address(), U256::from(1)).tx;
let _submit_tx = SubmitTxToMempool {
Expand Down
Loading

0 comments on commit 7a906c1

Please sign in to comment.