Skip to content

Commit 74ff639

Browse files
committed
adds streaming support for requests
1 parent 89ad26f commit 74ff639

File tree

5 files changed

+63
-31
lines changed

5 files changed

+63
-31
lines changed

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,13 @@ A tool to fetch blocks from rpc and save in local db for better performance duri
2121

2222
![basic arch](assets/arch_v2.png)
2323

24-
## Docker Local Development
24+
## Docker
25+
26+
Build
27+
28+
```bash
29+
docker build -t oreowallet .
30+
```
2531

2632
Run node:
2733

crates/networking/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub mod rpc_handler;
55
pub mod server_handler;
66
pub mod socket_message;
77
pub mod web_abi;
8+
pub mod stream;
89

910
use db_handler::{DBTransaction, InnerBlock, Json};
1011
use rpc_abi::RpcBlock;

crates/networking/src/rpc_abi.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use axum::{response::IntoResponse, Json};
22
use constants::IRON_NATIVE_ASSET;
3-
use serde::{Deserialize, Serialize};
4-
use ureq::json;
3+
use serde::{ Deserialize, Serialize};
4+
use ureq::{json};
55

66
use crate::orescriptions::{get_ores, is_ores_local, Ores};
77

@@ -11,6 +11,14 @@ pub struct RpcResponse<T> {
1111
pub data: T,
1212
}
1313

14+
#[derive(Debug, Deserialize, Serialize)]
15+
pub struct RpcResponseStream<T> {
16+
pub data: T
17+
}
18+
19+
20+
21+
1422
impl<T: Serialize> IntoResponse for RpcResponse<T> {
1523
fn into_response(self) -> axum::response::Response {
1624
Json(json!({"code": 200, "data": self.data})).into_response()

crates/networking/src/rpc_handler/handler.rs

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,9 @@ use ureq::{Agent, AgentBuilder, Error, Response};
88

99
use crate::{
1010
rpc_abi::{
11-
RpcBroadcastTxRequest, RpcBroadcastTxResponse, RpcCreateTxRequest, RpcCreateTxResponse,
12-
RpcExportAccountResponse, RpcGetAccountStatusRequest, RpcGetAccountStatusResponse,
13-
RpcGetAccountTransactionRequest, RpcGetAccountTransactionResponse, RpcGetBalancesRequest,
14-
RpcGetBalancesResponse, RpcGetBlockRequest, RpcGetBlockResponse, RpcGetBlocksRequest,
15-
RpcGetBlocksResponse, RpcGetLatestBlockResponse, RpcGetTransactionsRequest,
16-
RpcGetTransactionsResponse, RpcImportAccountRequest, RpcImportAccountResponse,
17-
RpcRemoveAccountRequest, RpcRemoveAccountResponse, RpcResetAccountRequest, RpcResponse,
18-
RpcSetAccountHeadRequest, RpcSetScanningRequest, SendTransactionRequest,
19-
SendTransactionResponse,
11+
RpcBroadcastTxRequest, RpcBroadcastTxResponse, RpcCreateTxRequest, RpcCreateTxResponse, RpcExportAccountResponse, RpcGetAccountStatusRequest, RpcGetAccountStatusResponse, RpcGetAccountTransactionRequest, RpcGetAccountTransactionResponse, RpcGetBalancesRequest, RpcGetBalancesResponse, RpcGetBlockRequest, RpcGetBlockResponse, RpcGetBlocksRequest, RpcGetBlocksResponse, RpcGetLatestBlockResponse, RpcGetTransactionsRequest, RpcGetTransactionsResponse, RpcImportAccountRequest, RpcImportAccountResponse, RpcRemoveAccountRequest, RpcRemoveAccountResponse, RpcResetAccountRequest, RpcResponse, RpcSetAccountHeadRequest, RpcSetScanningRequest, SendTransactionRequest, SendTransactionResponse
2012
},
21-
rpc_handler::RpcError,
13+
rpc_handler::RpcError, stream::RequestExt,
2214
};
2315

2416
#[derive(Debug, Clone)]
@@ -137,32 +129,24 @@ impl RpcHandler {
137129
let resp = self.agent.clone().post(&path).send_json(&request);
138130
handle_response(resp)
139131
}
140-
132+
141133
pub fn get_transactions(
142134
&self,
143135
request: RpcGetTransactionsRequest,
144136
) -> Result<RpcResponse<RpcGetTransactionsResponse>, OreoError> {
145137
let path = format!("http://{}/wallet/getAccountTransactions", self.endpoint);
146138
let resp = self.agent.clone().post(&path).send_json(&request);
147-
148139
match resp {
149140
Ok(response) => {
150-
let mut buffer = Vec::new();
151-
response.into_reader().read_to_end(&mut buffer).map_err(|e| OreoError::InternalRpcError(e.to_string()))?;
152-
let transactions_response: RpcResponse<RpcGetTransactionsResponse> = if buffer.is_empty() {
153-
RpcResponse {
154-
data: RpcGetTransactionsResponse {
155-
transactions: Vec::new(),
156-
},
157-
status: 200,
158-
}
159-
} else {
160-
serde_json::from_slice(&buffer)
161-
.map_err(|e| OreoError::InternalRpcError(e.to_string()))?
162-
};
163-
Ok(transactions_response)
164-
}
165-
Err(e) => handle_response(Err(e)),
141+
let transactions = response.collect_stream();
142+
Ok(RpcResponse {
143+
status: 200,
144+
data: RpcGetTransactionsResponse {
145+
transactions: transactions?,
146+
},
147+
})
148+
},
149+
Err(e) => Err(OreoError::InternalRpcError(e.to_string())),
166150
}
167151
}
168152

crates/networking/src/stream.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use std::io::Read;
2+
3+
use oreo_errors::OreoError;
4+
use serde::de::DeserializeOwned;
5+
use ureq::Response;
6+
7+
use crate::rpc_abi::RpcResponseStream;
8+
9+
pub trait RequestExt {
10+
fn collect_stream<T: DeserializeOwned>(self) -> Result<Vec<T>, OreoError>;
11+
}
12+
13+
impl RequestExt for Response {
14+
fn collect_stream<T: DeserializeOwned>(self) -> Result<Vec<T>, OreoError> {
15+
let reader = self.into_reader();
16+
let mut buffered = std::io::BufReader::new(reader);
17+
let mut items = Vec::new();
18+
let mut response_str = String::new();
19+
buffered.read_to_string(&mut response_str).map_err(|e| OreoError::InternalRpcError(e.to_string()))?;
20+
let lines = response_str.split('\x0c').collect::<Vec<&str>>();
21+
22+
// Get rid of status code
23+
for line in lines[0..lines.len()-1].into_iter() {
24+
let line = *line; // Dereference to get &str
25+
if !line.trim().is_empty() {
26+
let item: RpcResponseStream<T> = serde_json::from_str(line)
27+
.map_err(|e| OreoError::InternalRpcError(e.to_string()))?;
28+
items.push(item.data);
29+
}
30+
}
31+
Ok(items)
32+
}
33+
}

0 commit comments

Comments
 (0)