Skip to content

Commit

Permalink
feat: run tvm in separate thread
Browse files Browse the repository at this point in the history
  • Loading branch information
akostylev0 committed Mar 21, 2024
1 parent 346500d commit 338d222
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 104 deletions.
33 changes: 1 addition & 32 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ quick_cache = "0.4"
url = { version = "2.5.0", features = ["serde"] }
clap = { version = "4.5.2", features = ["derive"] }
humantime = "2.1.0"
uuid = { version = "1.8", features = ["v4", "fast-rng", "serde"] }
2 changes: 1 addition & 1 deletion tonlibjson-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ hex = { workspace = true }
async-stream = { workspace = true }
quick_cache = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
dashmap = "5.5"
uuid = { version = "1.8", features = ["v4", "fast-rng", "serde"] }
pin-project = "1.1"
reqwest = { version = "0.11", features = ["rustls-tls", "trust-dns"], default-features = false }
hickory-resolver = { version = "0.24.0", features = ["tokio-runtime"] }
Expand Down
2 changes: 0 additions & 2 deletions tonlibjson-sys/src/tonemulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ impl Drop for TransactionEmulator {
}
}

unsafe impl Send for TransactionEmulator {}

#[derive(Debug)]
pub struct TvmEmulator {
pointer: *mut c_void,
Expand Down
2 changes: 1 addition & 1 deletion tvm-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ serde = { workspace = true }
serde_json = { workspace = true }
tokio-stream = { workspace = true }
humantime = { workspace = true }
rayon = "1.9"
quick_cache = "0.4"
clap = { workspace = true }
uuid = { workspace = true }

[build-dependencies]
tonic-build = { workspace = true }
6 changes: 3 additions & 3 deletions tvm-grpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ async fn main() -> anyhow::Result<()> {
health_reporter.set_serving::<TvmEmulatorServiceServer<TvmEmulatorService>>().await;
health_reporter.set_serving::<TransactionEmulatorServiceServer<TransactionEmulatorService>>().await;

let tvm_emulator_service = TvmEmulatorServiceServer::new(TvmEmulatorService)
let tvm_emulator_service = TvmEmulatorServiceServer::new(TvmEmulatorService::default())
.accept_compressed(Gzip)
.send_compressed(Gzip);
let transaction_emulator_service = TransactionEmulatorServiceServer::new(TransactionEmulatorService)
let transaction_emulator_service = TransactionEmulatorServiceServer::new(TransactionEmulatorService::default())
.accept_compressed(Gzip)
.send_compressed(Gzip);

tracing::info!("Listening on {:?}", &args.listen);

Server::builder()
.timeout(args.timeout.into())
.timeout(args.timeout)
.tcp_keepalive(args.tcp_keepalive.into())
.http2_keepalive_interval(args.http2_keepalive_interval.into())
.http2_keepalive_timeout(args.http2_keepalive_timeout.into())
Expand Down
18 changes: 13 additions & 5 deletions tvm-grpc/src/threaded.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
use tonic::Status;
use uuid::Uuid;

pub type StreamId = Uuid;

pub enum Command<T, R> {
Request { request: T, response: tokio::sync::oneshot::Sender<Result<R, Status>> },
Drop
Request { stream_id: StreamId, request: T, response: tokio::sync::oneshot::Sender<Result<R, Status>> },
Drop { stream_id: StreamId }
}

#[derive(Debug)]
pub struct Stop<T, R> { sender: tokio::sync::mpsc::UnboundedSender<Command<T, R>> }
pub struct Stop<T, R> {
stream_id: StreamId,
sender: tokio::sync::mpsc::UnboundedSender<Command<T, R>>
}

impl<T, R> Stop<T, R> {
pub fn new(sender: tokio::sync::mpsc::UnboundedSender<Command<T, R>>) -> Self { Self { sender } }
pub fn new(stream_id: StreamId, sender: tokio::sync::mpsc::UnboundedSender<Command<T, R>>) -> Self {
Self { stream_id, sender }
}
}

impl<T, R> Drop for Stop<T, R> {
fn drop(&mut self) { let _ = self.sender.send(Command::Drop); }
fn drop(&mut self) { let _ = self.sender.send(Command::Drop { stream_id: self.stream_id }); }
}
70 changes: 44 additions & 26 deletions tvm-grpc/src/transaction_emulator.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
Expand All @@ -8,14 +9,17 @@ use anyhow::anyhow;
use async_stream::stream;
use tokio_stream::StreamExt;
use quick_cache::sync::Cache;
use crate::threaded::{Command, Stop};
use tokio::sync::mpsc::UnboundedSender;
use crate::threaded::{Command, Stop, StreamId};
use crate::tvm::transaction_emulator_service_server::TransactionEmulatorService as BaseTransactionEmulatorService;
use crate::tvm::{transaction_emulator_request, transaction_emulator_response, TransactionEmulatorEmulateRequest, TransactionEmulatorEmulateResponse, TransactionEmulatorPrepareRequest, TransactionEmulatorPrepareResponse, TransactionEmulatorRequest, TransactionEmulatorResponse, TransactionEmulatorSetConfigRequest, TransactionEmulatorSetConfigResponse, TransactionEmulatorSetIgnoreChksigRequest, TransactionEmulatorSetIgnoreChksigResponse, TransactionEmulatorSetLibsRequest, TransactionEmulatorSetLibsResponse, TransactionEmulatorSetLtRequest, TransactionEmulatorSetLtResponse, TransactionEmulatorSetRandSeedRequest, TransactionEmulatorSetRandSeedResponse, TransactionEmulatorSetUnixtimeRequest, TransactionEmulatorSetUnixtimeResponse, TvmResult};
use crate::tvm::transaction_emulator_request::Request::*;
use crate::tvm::transaction_emulator_response::Response::*;

#[derive(Debug, Default)]
pub struct TransactionEmulatorService;
#[derive(Debug)]
pub struct TransactionEmulatorService {
tx: UnboundedSender<Command<transaction_emulator_request::Request, transaction_emulator_response::Response>>
}

#[derive(Default)]
struct State {
Expand All @@ -28,52 +32,66 @@ fn lru_cache() -> &'static Cache<String, Arc<String>> {
LRU_CACHE.get_or_init(|| Cache::new(32))
}

#[async_trait]
impl BaseTransactionEmulatorService for TransactionEmulatorService {
type ProcessStream = Pin<Box<dyn Stream<Item=Result<TransactionEmulatorResponse, Status>> + Send>>;

async fn process(&self, request: Request<Streaming<TransactionEmulatorRequest>>) -> Result<Response<Self::ProcessStream>, Status> {
let stream = request.into_inner();

impl Default for TransactionEmulatorService {
fn default() -> Self {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Command<transaction_emulator_request::Request, transaction_emulator_response::Response>>();
let stop = Stop::new(tx.clone());

rayon::spawn(move || {
let mut state = State::default();
tokio::task::spawn_blocking(move || {
let mut states: HashMap<StreamId, State> = HashMap::default();

while let Some(command) = rx.blocking_recv() {
match command {
Command::Request { request, response: oneshot } => {
Command::Request { stream_id, request, response: oneshot } => {
let state = states.entry(stream_id).or_default();

let response = match request {
Prepare(req) => prepare(&mut state, req).map(PrepareResponse),
Emulate(req) => emulate(&mut state, req).map(EmulateResponse),
SetUnixtime(req) => set_unixtime(&mut state, req).map(SetUnixtimeResponse),
SetLt(req) => set_lt(&mut state, req).map(SetLtResponse),
SetRandSeed(req) => set_rand_seed(&mut state, req).map(SetRandSeedResponse),
SetIgnoreChksig(req) => set_ignore_chksig(&mut state, req).map(SetIgnoreChksigResponse),
SetConfig(req) => set_config(&mut state, req).map(SetConfigResponse),
SetLibs(req) => set_libs(&mut state, req).map(SetLibsResponse),
Prepare(req) => prepare(state, req).map(PrepareResponse),
Emulate(req) => emulate(state, req).map(EmulateResponse),
SetUnixtime(req) => set_unixtime(state, req).map(SetUnixtimeResponse),
SetLt(req) => set_lt(state, req).map(SetLtResponse),
SetRandSeed(req) => set_rand_seed(state, req).map(SetRandSeedResponse),
SetIgnoreChksig(req) => set_ignore_chksig(state, req).map(SetIgnoreChksigResponse),
SetConfig(req) => set_config(state, req).map(SetConfigResponse),
SetLibs(req) => set_libs(state, req).map(SetLibsResponse),
};

if let Err(e) = oneshot.send(response) {
tracing::error!(error = ?e, "failed to send response");
break;
states.remove(&stream_id);
}
}
Command::Drop => { break; }
Command::Drop { stream_id } => {
states.remove(&stream_id);
}
}
}
});

Self { tx }
}
}

#[async_trait]
impl BaseTransactionEmulatorService for TransactionEmulatorService {
type ProcessStream = Pin<Box<dyn Stream<Item=Result<TransactionEmulatorResponse, Status>> + Send>>;

async fn process(&self, request: Request<Streaming<TransactionEmulatorRequest>>) -> Result<Response<Self::ProcessStream>, Status> {
let stream_id = StreamId::new_v4();
let stream = request.into_inner();
let stream = stream.timeout(Duration::from_secs(3));
let stop = Stop::new(stream_id, self.tx.clone());
let tx = self.tx.clone();

let output = stream! {
for await msg in stream {
match msg {
Ok(Ok(TransactionEmulatorRequest { request_id, request: Some(req) })) => {
let (to, ro) = tokio::sync::oneshot::channel();

let _ = tx.send(Command::Request { request: req, response: to });
let response = ro.await.expect("failed to receive response");
let _ = tx.send(Command::Request { stream_id, request: req, response: to });
let Ok(response) = ro.await else {
break
};

yield response.map(|r| TransactionEmulatorResponse { request_id, response: Some(r) })
},
Expand Down
87 changes: 53 additions & 34 deletions tvm-grpc/src/tvm_emulator.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
Expand All @@ -8,14 +9,55 @@ use tokio_stream::StreamExt;
use tonic::{async_trait, Request, Response, Status, Streaming};
use tracing::instrument;
use quick_cache::sync::Cache;
use crate::threaded::{Command, Stop};
use tokio::sync::mpsc::UnboundedSender;
use crate::threaded::{Command, Stop, StreamId};
use crate::tvm::tvm_emulator_request::Request::{Prepare, RunGetMethod, SendExternalMessage, SendInternalMessage, SetC7, SetGasLimit, SetLibraries};
use crate::tvm::tvm_emulator_response::Response::{PrepareResponse, RunGetMethodResponse, SendExternalMessageResponse, SendInternalMessageResponse, SetC7Response, SetGasLimitResponse, SetLibrariesResponse};
use crate::tvm::tvm_emulator_service_server::TvmEmulatorService as BaseTvmEmulatorService;
use crate::tvm::{tvm_emulator_request, tvm_emulator_response, TvmEmulatorPrepareRequest, TvmEmulatorPrepareResponse, TvmEmulatorRequest, TvmEmulatorResponse, TvmEmulatorRunGetMethodRequest, TvmEmulatorRunGetMethodResponse, TvmEmulatorSendExternalMessageRequest, TvmEmulatorSendExternalMessageResponse, TvmEmulatorSendInternalMessageRequest, TvmEmulatorSendInternalMessageResponse, TvmEmulatorSetC7Request, TvmEmulatorSetC7Response, TvmEmulatorSetGasLimitRequest, TvmEmulatorSetGasLimitResponse, TvmEmulatorSetLibrariesRequest, TvmEmulatorSetLibrariesResponse, TvmResult};

#[derive(Debug, Default)]
pub struct TvmEmulatorService;
#[derive(Debug)]
pub struct TvmEmulatorService {
tx: UnboundedSender<Command<tvm_emulator_request::Request, tvm_emulator_response::Response>>
}

impl Default for TvmEmulatorService {
fn default() -> Self {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Command<tvm_emulator_request::Request, tvm_emulator_response::Response>>();

tokio::task::spawn_blocking(move || {
let mut states: HashMap<StreamId, State> = HashMap::default();

while let Some(command) = rx.blocking_recv() {
match command {
Command::Request { stream_id, request, response: oneshot } => {
let state = states.entry(stream_id).or_default();

let response = match request {
Prepare(req) => prepare_emu(state, req).map(PrepareResponse),
RunGetMethod(req) => run_get_method(state, req).map(RunGetMethodResponse),
SendExternalMessage(req) => send_external_message(state, req).map(SendExternalMessageResponse),
SendInternalMessage(req) => send_internal_message(state, req).map(SendInternalMessageResponse),
SetLibraries(req) => set_libraries(state, req).map(SetLibrariesResponse),
SetGasLimit(req) => set_gas_limit(state, req).map(SetGasLimitResponse),
SetC7(req) => set_c7(state, req).map(SetC7Response),
};

if let Err(e) = oneshot.send(response) {
tracing::error!(error = ?e, "failed to send response");
states.remove(&stream_id);
}
},
Command::Drop { stream_id } => {
states.remove(&stream_id);
}
}
}
});

Self { tx }
}
}

#[derive(Default)]
struct State {
Expand All @@ -33,45 +75,22 @@ impl BaseTvmEmulatorService for TvmEmulatorService {
type ProcessStream = Pin<Box<dyn Stream<Item=Result<TvmEmulatorResponse, Status>> + Send>>;

async fn process(&self, request: Request<Streaming<TvmEmulatorRequest>>) -> Result<Response<Self::ProcessStream>, Status> {
let stream_id = StreamId::new_v4();
let stream = request.into_inner();

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Command<tvm_emulator_request::Request, tvm_emulator_response::Response>>();
let stop = Stop::new(tx.clone());

rayon::spawn(move || {
let mut state = State::default();
while let Some(command) = rx.blocking_recv() {
match command {
Command::Request { request, response: oneshot } => {
let response = match request {
Prepare(req) => prepare_emu(&mut state, req).map(PrepareResponse),
RunGetMethod(req) => run_get_method(&mut state, req).map(RunGetMethodResponse),
SendExternalMessage(req) => send_external_message(&mut state, req).map(SendExternalMessageResponse),
SendInternalMessage(req) => send_internal_message(&mut state, req).map(SendInternalMessageResponse),
SetLibraries(req) => set_libraries(&mut state, req).map(SetLibrariesResponse),
SetGasLimit(req) => set_gas_limit(&mut state, req).map(SetGasLimitResponse),
SetC7(req) => set_c7(&mut state, req).map(SetC7Response),
};

if let Err(e) = oneshot.send(response) {
tracing::error!(error = ?e, "failed to send response");
break;
}
},
Command::Drop => { break; }
}
}
});

let stream = stream.timeout(Duration::from_secs(3));
let stop = Stop::new(stream_id, self.tx.clone());
let tx = self.tx.clone();

let output = stream! {
for await msg in stream {
match msg {
Ok(Ok(TvmEmulatorRequest { request_id, request: Some(req)})) => {
let (to, ro) = tokio::sync::oneshot::channel();

let _ = tx.send(Command::Request { request: req, response: to });
let response = ro.await.expect("failed to receive response");
let _ = tx.send(Command::Request { stream_id, request: req, response: to });
let Ok(response) = ro.await else {
break
};

yield response.map(|r| TvmEmulatorResponse { request_id, response: Some(r) })
},
Expand Down

0 comments on commit 338d222

Please sign in to comment.