diff --git a/Cargo.toml b/Cargo.toml index df0a30c2..558d40f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,5 +3,6 @@ members = [ "json", "client", + "rpc", "integration_test", ] diff --git a/client/Cargo.toml b/client/Cargo.toml index de478d87..ee69b211 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -19,10 +19,11 @@ path = "src/lib.rs" [dependencies] bitcoincore-rpc-json = { version = "0.13.0", path = "../json" } +bitcoincore-rpc-rpc = { version = "0.13.0", path = "../rpc" } log = "0.4.5" -jsonrpc = "0.12.0" +futures = { version = "0.3" } # Used for deserialization of JSON. serde = "1" -serde_json = "1" +serde_json = { version = "1.0", features = ["raw_value"] } diff --git a/client/examples/retry_client.rs b/client/examples/retry_client.rs index 10b2fed3..b8bdd078 100644 --- a/client/examples/retry_client.rs +++ b/client/examples/retry_client.rs @@ -9,11 +9,10 @@ // extern crate bitcoincore_rpc; -extern crate jsonrpc; extern crate serde; extern crate serde_json; -use bitcoincore_rpc::{Client, Error, Result, RpcApi}; +use bitcoincore_rpc::{rpc, Client, Error, Result, RpcApi}; pub struct RetryClient { client: Client, @@ -31,7 +30,7 @@ impl RpcApi for RetryClient { for _ in 0..RETRY_ATTEMPTS { match self.client.call(cmd, args) { Ok(ret) => return Ok(ret), - Err(Error::JsonRpc(jsonrpc::error::Error::Rpc(ref rpcerr))) + Err(Error::JsonRpc(rpc::http::Error::Rpc(ref rpcerr))) if rpcerr.code == -28 => { ::std::thread::sleep(::std::time::Duration::from_millis(INTERVAL)); diff --git a/client/src/client.rs b/client/src/client.rs index 651acd96..63305d41 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -15,7 +15,7 @@ use std::path::PathBuf; use std::{fmt, result}; use bitcoin; -use jsonrpc; +use rpc; use serde; use serde_json; @@ -1082,12 +1082,12 @@ pub trait RpcApi: Sized { /// Client implements a JSON-RPC client for the Bitcoin Core daemon or compatible APIs. pub struct Client { - client: jsonrpc::client::Client, + client: rpc::rpc::RpcClient, } impl fmt::Debug for Client { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "bitcoincore_rpc::Client({:?})", self.client) + write!(f, "RpcClient({:?})", self.client) } } @@ -1097,26 +1097,29 @@ impl Client { /// Can only return [Err] when using cookie authentication. pub fn new(url: &str, auth: Auth) -> Result { let (user, pass) = auth.get_user_pass()?; - jsonrpc::client::Client::simple_http(url, user, pass) + rpc::rpc::RpcClient::from_url(user, pass, url) .map(|client| Client { client, }) - .map_err(|e| super::error::Error::JsonRpc(e.into())) + .map_err(|e| super::error::Error::Io(e.into())) } /// Create a new Client using the given [jsonrpc::Client]. - pub fn from_jsonrpc(client: jsonrpc::client::Client) -> Client { + pub fn from_jsonrpc(client: rpc::rpc::RpcClient) -> Client { Client { client, } } /// Get the underlying JSONRPC client. - pub fn get_jsonrpc_client(&self) -> &jsonrpc::client::Client { + pub fn get_jsonrpc_client(&self) -> &rpc::rpc::RpcClient { &self.client } } +// FIXME: Temporary +use futures::executor::block_on; + impl RpcApi for Client { /// Call an `cmd` rpc with given `args` list fn call serde::de::Deserialize<'a>>( @@ -1124,46 +1127,35 @@ impl RpcApi for Client { cmd: &str, args: &[serde_json::Value], ) -> Result { - let raw_args: Vec<_> = args - .iter() - .map(|a| { - let json_string = serde_json::to_string(a)?; - serde_json::value::RawValue::from_string(json_string) // we can't use to_raw_value here due to compat with Rust 1.29 - }) - .map(|a| a.map_err(|e| Error::Json(e))) - .collect::>>()?; - let req = self.client.build_request(&cmd, &raw_args); if log_enabled!(Debug) { debug!(target: "bitcoincore_rpc", "JSON-RPC request: {} {}", cmd, serde_json::Value::from(args)); } - let resp = self.client.send_request(req).map_err(Error::from); + let resp = block_on(self.client.call_method(cmd, args)).map_err(Error::from); log_response(cmd, &resp); - Ok(resp?.result()?) + Ok(resp?) } } -fn log_response(cmd: &str, resp: &Result) { +fn log_response(cmd: &str, resp: &Result) { if log_enabled!(Warn) || log_enabled!(Debug) || log_enabled!(Trace) { match resp { Err(ref e) => { if log_enabled!(Debug) { - debug!(target: "bitcoincore_rpc", "JSON-RPC failed parsing reply of {}: {:?}", cmd, e); + match e { + Error::JsonRpc(ref e) => { + debug!(target: "bitcoincore_rpc", "JSON-RPC error for {}: {:?}", cmd, e) + } + _ => { + debug!(target: "bitcoincore_rpc", "JSON-RPC failed parsing reply of {}: {:?}", cmd, e) + } + } } } Ok(ref resp) => { - if let Some(ref e) = resp.error { - if log_enabled!(Debug) { - debug!(target: "bitcoincore_rpc", "JSON-RPC error for {}: {:?}", cmd, e); - } - } else if log_enabled!(Trace) { - // we can't use to_raw_value here due to compat with Rust 1.29 - let def = serde_json::value::RawValue::from_string( - serde_json::Value::Null.to_string(), - ) - .unwrap(); - let result = resp.result.as_ref().unwrap_or(&def); - trace!(target: "bitcoincore_rpc", "JSON-RPC response for {}: {}", cmd, result); + if log_enabled!(Trace) { + // FIXME: Log this. Not sure how to make T implement Display. + //trace!(target: "bitcoincore_rpc", "JSON-RPC response for {}: {}", cmd, resp); } } } diff --git a/client/src/error.rs b/client/src/error.rs index 70676c47..c9866076 100644 --- a/client/src/error.rs +++ b/client/src/error.rs @@ -13,13 +13,13 @@ use std::{error, fmt, io}; use bitcoin; use bitcoin::hashes::hex; use bitcoin::secp256k1; -use jsonrpc; +use rpc; use serde_json; /// The error type for errors produced in this library. #[derive(Debug)] pub enum Error { - JsonRpc(jsonrpc::error::Error), + JsonRpc(rpc::http::Error), Hex(hex::Error), Json(serde_json::error::Error), BitcoinSerialization(bitcoin::consensus::encode::Error), @@ -31,8 +31,8 @@ pub enum Error { UnexpectedStructure, } -impl From for Error { - fn from(e: jsonrpc::error::Error) -> Error { +impl From for Error { + fn from(e: rpc::http::Error) -> Error { Error::JsonRpc(e) } } diff --git a/client/src/lib.rs b/client/src/lib.rs index d1dd7b37..515065e7 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -21,14 +21,16 @@ extern crate log; #[allow(unused)] #[macro_use] // `macro_use` is needed for v1.24.0 compilation. extern crate serde; +extern crate futures; extern crate serde_json; -pub extern crate jsonrpc; - pub extern crate bitcoincore_rpc_json; pub use bitcoincore_rpc_json as json; pub use json::bitcoin; +pub extern crate bitcoincore_rpc_rpc; +pub use bitcoincore_rpc_rpc as rpc; + mod client; mod error; mod queryable; @@ -36,3 +38,4 @@ mod queryable; pub use client::*; pub use error::Error; pub use queryable::*; +pub use rpc::http::HttpEndpoint; diff --git a/integration_test/src/main.rs b/integration_test/src/main.rs index 0e35ee79..9e11fee4 100644 --- a/integration_test/src/main.rs +++ b/integration_test/src/main.rs @@ -19,7 +19,7 @@ extern crate log; use std::collections::HashMap; use bitcoincore_rpc::json; -use bitcoincore_rpc::jsonrpc::error::Error as JsonRpcError; +use bitcoincore_rpc::rpc::http::Error as JsonRpcError; use bitcoincore_rpc::{Auth, Client, Error, RpcApi}; use bitcoin::consensus::encode::{deserialize, serialize}; diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml new file mode 100644 index 00000000..f6616f91 --- /dev/null +++ b/rpc/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "bitcoincore-rpc-rpc" +version = "0.13.0" +authors = ["Jeffrey Czyz", "Matt Corallo", "Sergi Delgado"] +license = "CC0-1.0" +homepage = "https://github.com/rust-bitcoin/rust-bitcoincore-rpc/" +repository = "https://github.com/rust-bitcoin/rust-bitcoincore-rpc/" +keywords = [ "crypto", "bitcoin", "bitcoin-core", "rpc" ] +readme = "README.md" +edition = "2018" + +[lib] +name = "bitcoincore_rpc_rpc" +path = "src/lib.rs" + + +[dependencies] +bitcoin = "0.26" +chunked_transfer = { version = "1.4" } +base64 = "0.13.0" +serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0" } + + +[dev-dependencies] +tokio = { version = "1.0", features = [ "macros", "rt" ] } \ No newline at end of file diff --git a/rpc/src/http.rs b/rpc/src/http.rs new file mode 100644 index 00000000..96eb18b6 --- /dev/null +++ b/rpc/src/http.rs @@ -0,0 +1,1055 @@ +//! Simple HTTP implementation which supports both async and traditional execution environments +//! with minimal dependencies. This is used as the basis for REST and RPC clients. + +use chunked_transfer; +use serde_json; + +use std::convert::TryFrom; +#[cfg(not(feature = "tokio"))] +use std::io::Write; +use std::net::{SocketAddr, ToSocketAddrs}; +use std::time::Duration; +use std::{error, fmt}; + +#[cfg(feature = "tokio")] +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt}; +#[cfg(feature = "tokio")] +use tokio::net::TcpStream; + +#[cfg(not(feature = "tokio"))] +use std::io::BufRead; +use std::io::Read; +#[cfg(not(feature = "tokio"))] +use std::net::TcpStream; + +/// Timeout for operations on TCP streams. +const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5); + +/// Timeout for reading the first byte of a response. This is separate from the general read +/// timeout as it is not uncommon for Bitcoin Core to be blocked waiting on UTXO cache flushes for +/// upwards of a minute or more. Note that we always retry once when we time out, so the maximum +/// time we allow Bitcoin Core to block for is twice this value. +const TCP_STREAM_RESPONSE_TIMEOUT: Duration = Duration::from_secs(120); + +/// Maximum HTTP message header size in bytes. +const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192; + +/// Maximum HTTP message body size in bytes. Enough for a hex-encoded block in JSON format and any +/// overhead for HTTP chunked transfer encoding. +const MAX_HTTP_MESSAGE_BODY_SIZE: usize = 2 * 4_000_000 + 32_000; + +/// The default TCP port to use for connections. +/// Set to 8332, the default RPC port for bitcoind. +pub const DEFAULT_PORT: u16 = 8332; + +/// Endpoint for interacting with an HTTP-based API. +#[derive(Debug)] +pub struct HttpEndpoint { + host: String, + port: Option, + path: String, +} + +impl HttpEndpoint { + /// Creates an endpoint for the given host and default HTTP port. + pub fn for_host(host: String) -> Self { + Self { + host, + port: None, + path: String::from("/"), + } + } + + /// Specifies a port to use with the endpoint. + pub fn with_port(mut self, port: u16) -> Self { + self.port = Some(port); + self + } + + /// Specifies a path to use with the endpoint. + pub fn with_path(mut self, path: String) -> Self { + self.path = path; + self + } + + /// Create the HttpEndpoint from an URL + /// Taken from https://github.com/apoelstra/rust-jsonrpc/blob/master/src/simple_http.rs + pub fn from_url(url: &str) -> Result { + // The fallback port in case no port was provided. + // This changes when the http or https scheme was provided. + let mut fallback_port = DEFAULT_PORT; + + let after_scheme = { + let mut split = url.splitn(2, "://"); + let s = split.next().unwrap(); + match split.next() { + None => s, // no scheme present + Some(after) => { + // Check if the scheme is http or https. + if s == "http" { + fallback_port = 80; + } else if s == "https" { + fallback_port = 443; + } else { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("scheme should be http or https ({})", url), + )); + } + after + } + } + }; + // (2) split off path + let (before_path, path) = { + if let Some(slash) = after_scheme.find("/") { + (&after_scheme[0..slash], &after_scheme[slash..]) + } else { + (after_scheme, "/") + } + }; + + // (3) split off auth part + let after_auth = { + let mut split = before_path.splitn(2, "@"); + let s = split.next().unwrap(); + split.next().unwrap_or(s) + }; + // so now we should have : or just + let mut split = after_auth.split(":"); + let host = split.next().unwrap(); + let port: u16 = match split.next() { + Some(port_str) => match port_str.parse() { + Ok(port) => port, + Err(_) => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("invalid port ({})", url), + )) + } + }, + None => fallback_port, + }; + // make sure we don't have a second colon in this part + if split.next().is_some() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("unexpected extra colon ({})", url), + )); + } + + Ok(Self { + host: host.to_string(), + port: Some(port), + path: path.to_string(), + }) + } + + /// Returns the endpoint host. + pub fn host(&self) -> &str { + &self.host + } + + /// Returns the endpoint port. + pub fn port(&self) -> u16 { + match self.port { + None => 80, + Some(port) => port, + } + } + + /// Returns the endpoint path. + pub fn path(&self) -> &str { + &self.path + } +} + +impl<'a> std::net::ToSocketAddrs for &'a HttpEndpoint { + type Iter = <(&'a str, u16) as std::net::ToSocketAddrs>::Iter; + + fn to_socket_addrs(&self) -> std::io::Result { + (self.host(), self.port()).to_socket_addrs() + } +} + +/// Client for making HTTP requests. +#[derive(Debug)] +pub(crate) struct HttpClient { + address: SocketAddr, + stream: TcpStream, +} + +impl HttpClient { + /// Opens a connection to an HTTP endpoint. + pub fn connect(endpoint: E) -> std::io::Result { + let address = match endpoint.to_socket_addrs()?.next() { + None => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "could not resolve to any addresses", + )); + } + Some(address) => address, + }; + let stream = std::net::TcpStream::connect_timeout(&address, TCP_STREAM_TIMEOUT)?; + stream.set_read_timeout(Some(TCP_STREAM_TIMEOUT))?; + stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT))?; + + #[cfg(feature = "tokio")] + let stream = { + stream.set_nonblocking(true)?; + TcpStream::from_std(stream)? + }; + + Ok(Self { + address, + stream, + }) + } + + /// Sends a `GET` request for a resource identified by `uri` at the `host`. + /// + /// Returns the response body in `F` format. + #[allow(dead_code)] + pub async fn get(&mut self, uri: &str, host: &str) -> std::io::Result + where + F: TryFrom, Error = std::io::Error>, + { + let request = format!( + "GET {} HTTP/1.1\r\n\ + Host: {}\r\n\ + Connection: keep-alive\r\n\ + \r\n", + uri, host + ); + let response_body = self.send_request_with_retry(&request).await?; + F::try_from(response_body) + } + + /// Sends a `POST` request for a resource identified by `uri` at the `host` using the given HTTP + /// authentication credentials. + /// + /// The request body consists of the provided JSON `content`. Returns the response body in `F` + /// format. + #[allow(dead_code)] + pub async fn post( + &mut self, + uri: &str, + host: &str, + auth: &str, + content: serde_json::Value, + ) -> std::io::Result + where + F: TryFrom, Error = std::io::Error>, + { + let content = content.to_string(); + let request = format!( + "POST {} HTTP/1.1\r\n\ + Host: {}\r\n\ + Authorization: {}\r\n\ + Connection: keep-alive\r\n\ + Content-Type: application/json\r\n\ + Content-Length: {}\r\n\ + \r\n\ + {}", + uri, + host, + auth, + content.len(), + content + ); + let response_body = self.send_request_with_retry(&request).await?; + F::try_from(response_body) + } + + /// Sends an HTTP request message and reads the response, returning its body. Attempts to + /// reconnect and retry if the connection has been closed. + async fn send_request_with_retry(&mut self, request: &str) -> std::io::Result> { + match self.send_request(request).await { + Ok(bytes) => Ok(bytes), + Err(_) => { + // Reconnect and retry on fail. This can happen if the connection was closed after + // the keep-alive limits are reached, or generally if the request timed out due to + // Bitcoin Core being stuck on a long-running operation or its RPC queue being + // full. + // Block 100ms before retrying the request as in many cases the source of the error + // may be persistent for some time. + #[cfg(feature = "tokio")] + tokio::time::sleep(Duration::from_millis(100)).await; + #[cfg(not(feature = "tokio"))] + std::thread::sleep(Duration::from_millis(100)); + *self = Self::connect(self.address)?; + self.send_request(request).await + } + } + } + + /// Sends an HTTP request message and reads the response, returning its body. + async fn send_request(&mut self, request: &str) -> std::io::Result> { + self.write_request(request).await?; + self.read_response().await + } + + /// Writes an HTTP request message. + async fn write_request(&mut self, request: &str) -> std::io::Result<()> { + #[cfg(feature = "tokio")] + { + self.stream.write_all(request.as_bytes()).await?; + self.stream.flush().await + } + #[cfg(not(feature = "tokio"))] + { + self.stream.write_all(request.as_bytes())?; + self.stream.flush() + } + } + + /// Reads an HTTP response message. + async fn read_response(&mut self) -> std::io::Result> { + #[cfg(feature = "tokio")] + let stream = self.stream.split().0; + #[cfg(not(feature = "tokio"))] + let stream = std::io::Read::by_ref(&mut self.stream); + + let limited_stream = stream.take(MAX_HTTP_MESSAGE_HEADER_SIZE as u64); + + #[cfg(feature = "tokio")] + let mut reader = tokio::io::BufReader::new(limited_stream); + #[cfg(not(feature = "tokio"))] + let mut reader = std::io::BufReader::new(limited_stream); + + macro_rules! read_line { + () => { + read_line!(0) + }; + ($retry_count: expr) => {{ + let mut line = String::new(); + let mut timeout_count: u64 = 0; + let bytes_read = loop { + #[cfg(feature = "tokio")] + let read_res = reader.read_line(&mut line).await; + #[cfg(not(feature = "tokio"))] + let read_res = reader.read_line(&mut line); + match read_res { + Ok(bytes_read) => break bytes_read, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + timeout_count += 1; + if timeout_count > $retry_count { + return Err(e); + } else { + continue; + } + } + Err(e) => return Err(e), + } + }; + + match bytes_read { + 0 => None, + _ => { + // Remove trailing CRLF + if line.ends_with('\n') { + line.pop(); + if line.ends_with('\r') { + line.pop(); + } + } + Some(line) + } + } + }}; + } + + // Read and parse status line + // Note that we allow retrying a few times to reach TCP_STREAM_RESPONSE_TIMEOUT. + let status_line = + read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs()) + .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?; + let status = HttpStatus::parse(&status_line)?; + + // Read and parse relevant headers + let mut message_length = HttpMessageLength::Empty; + loop { + let line = read_line!() + .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no headers"))?; + if line.is_empty() { + break; + } + + let header = HttpHeader::parse(&line)?; + if header.has_name("Content-Length") { + let length = header + .value + .parse() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + if let HttpMessageLength::Empty = message_length { + message_length = HttpMessageLength::ContentLength(length); + } + continue; + } + + if header.has_name("Transfer-Encoding") { + message_length = HttpMessageLength::TransferEncoding(header.value.into()); + continue; + } + } + + // Read message body + let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len(); + reader.get_mut().set_limit(read_limit as u64); + let contents = match message_length { + HttpMessageLength::Empty => Vec::new(), + HttpMessageLength::ContentLength(length) => { + if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "out of range", + )); + } else { + let mut content = vec![0; length]; + #[cfg(feature = "tokio")] + reader.read_exact(&mut content[..]).await?; + #[cfg(not(feature = "tokio"))] + reader.read_exact(&mut content[..])?; + content + } + } + HttpMessageLength::TransferEncoding(coding) => { + if !coding.eq_ignore_ascii_case("chunked") { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "unsupported transfer coding", + )); + } else { + let mut content = Vec::new(); + #[cfg(feature = "tokio")] + { + // Since chunked_transfer doesn't have an async interface, only use it to + // determine the size of each chunk to read. + // + // TODO: Replace with an async interface when available. + // https://github.com/frewsxcv/rust-chunked-transfer/issues/7 + loop { + // Read the chunk header which contains the chunk size. + let mut chunk_header = String::new(); + reader.read_line(&mut chunk_header).await?; + if chunk_header == "0\r\n" { + // Read the terminator chunk since the decoder consumes the CRLF + // immediately when this chunk is encountered. + reader.read_line(&mut chunk_header).await?; + } + + // Decode the chunk header to obtain the chunk size. + let mut buffer = Vec::new(); + let mut decoder = + chunked_transfer::Decoder::new(chunk_header.as_bytes()); + decoder.read_to_end(&mut buffer)?; + + // Read the chunk body. + let chunk_size = match decoder.remaining_chunks_size() { + None => break, + Some(chunk_size) => chunk_size, + }; + let chunk_offset = content.len(); + content.resize(chunk_offset + chunk_size + "\r\n".len(), 0); + reader.read_exact(&mut content[chunk_offset..]).await?; + content.resize(chunk_offset + chunk_size, 0); + } + content + } + #[cfg(not(feature = "tokio"))] + { + let mut decoder = chunked_transfer::Decoder::new(reader); + decoder.read_to_end(&mut content)?; + content + } + } + } + }; + + if !status.is_ok() { + // TODO: Handle 3xx redirection responses. + let error = HttpError { + status_code: status.code.to_string(), + contents, + }; + return Err(std::io::Error::new(std::io::ErrorKind::Other, error)); + } + + Ok(contents) + } +} + +/// HTTP error consisting of a status code and body contents. +#[derive(Debug)] +pub(crate) struct HttpError { + pub(crate) status_code: String, + pub(crate) contents: Vec, +} + +impl std::error::Error for HttpError {} + +impl fmt::Display for HttpError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let contents = String::from_utf8_lossy(&self.contents); + write!(f, "status_code: {}, contents: {}", self.status_code, contents) + } +} + +/// HTTP response status code as defined by [RFC 7231]. +/// +/// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-6 +struct HttpStatus<'a> { + code: &'a str, +} + +impl<'a> HttpStatus<'a> { + /// Parses an HTTP status line as defined by [RFC 7230]. + /// + /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.1.2 + fn parse(line: &'a String) -> std::io::Result> { + let mut tokens = line.splitn(3, ' '); + + let http_version = tokens + .next() + .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no HTTP-Version"))?; + if !http_version.eq_ignore_ascii_case("HTTP/1.1") + && !http_version.eq_ignore_ascii_case("HTTP/1.0") + { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "invalid HTTP-Version", + )); + } + + let code = tokens + .next() + .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Status-Code"))?; + if code.len() != 3 || !code.chars().all(|c| c.is_ascii_digit()) { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "invalid Status-Code", + )); + } + + let _reason = tokens + .next() + .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Reason-Phrase"))?; + + Ok(Self { + code, + }) + } + + /// Returns whether the status is successful (i.e., 2xx status class). + fn is_ok(&self) -> bool { + self.code.starts_with('2') + } +} + +/// HTTP response header as defined by [RFC 7231]. +/// +/// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-7 +struct HttpHeader<'a> { + name: &'a str, + value: &'a str, +} + +impl<'a> HttpHeader<'a> { + /// Parses an HTTP header field as defined by [RFC 7230]. + /// + /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.2 + fn parse(line: &'a String) -> std::io::Result> { + let mut tokens = line.splitn(2, ':'); + let name = tokens + .next() + .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header name"))?; + let value = tokens + .next() + .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header value"))? + .trim_start(); + Ok(Self { + name, + value, + }) + } + + /// Returns whether the header field has the given name. + fn has_name(&self, name: &str) -> bool { + self.name.eq_ignore_ascii_case(name) + } +} + +/// HTTP message body length as defined by [RFC 7230]. +/// +/// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.3.3 +enum HttpMessageLength { + Empty, + ContentLength(usize), + TransferEncoding(String), +} + +/// An HTTP response body in binary format. +pub struct BinaryResponse(pub Vec); + +/// An HTTP response body in JSON format. +pub struct JsonResponse(pub serde_json::Value); + +#[derive(Debug)] +pub enum Error { + Io(std::io::Error), + Json(serde_json::Error), + Rpc(RpcError), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Error::Io(ref e) => write!(f, "IO error: {}", e), + Error::Json(ref e) => write!(f, "JSON decode error: {}", e), + Error::Rpc(ref e) => write!(f, "RPC error response: {:?}", e), + } + } +} + +impl From for Error { + fn from(error: std::io::Error) -> Self { + Error::Io(error) + } +} + +impl error::Error for Error { + fn cause(&self) -> Option<&dyn error::Error> { + match *self { + Error::Io(ref e) => Some(e), + Error::Json(ref e) => Some(e), + _ => None, + } + } +} + +#[derive(Debug)] +pub struct RpcError { + pub code: i32, + pub message: String, +} + +impl RpcError { + pub fn new(code: i32, message: String) -> Self { + RpcError { + code, + message, + } + } +} + +/// Interprets bytes from an HTTP response body as binary data. +impl TryFrom> for BinaryResponse { + type Error = std::io::Error; + + fn try_from(bytes: Vec) -> std::io::Result { + Ok(BinaryResponse(bytes)) + } +} + +/// Interprets bytes from an HTTP response body as a JSON value. +impl TryFrom> for JsonResponse { + type Error = std::io::Error; + + fn try_from(bytes: Vec) -> std::io::Result { + Ok(JsonResponse(serde_json::from_slice(&bytes)?)) + } +} + +#[cfg(test)] +mod endpoint_tests { + use super::HttpEndpoint; + + #[test] + fn with_default_port() { + let endpoint = HttpEndpoint::for_host("foo.com".into()); + assert_eq!(endpoint.host(), "foo.com"); + assert_eq!(endpoint.port(), 80); + } + + #[test] + fn with_custom_port() { + let endpoint = HttpEndpoint::for_host("foo.com".into()).with_port(8080); + assert_eq!(endpoint.host(), "foo.com"); + assert_eq!(endpoint.port(), 8080); + } + + #[test] + fn with_uri_path() { + let endpoint = HttpEndpoint::for_host("foo.com".into()).with_path("/path".into()); + assert_eq!(endpoint.host(), "foo.com"); + assert_eq!(endpoint.path(), "/path"); + } + + #[test] + fn without_uri_path() { + let endpoint = HttpEndpoint::for_host("foo.com".into()); + assert_eq!(endpoint.host(), "foo.com"); + assert_eq!(endpoint.path(), "/"); + } + + #[test] + fn convert_to_socket_addrs() { + let endpoint = HttpEndpoint::for_host("foo.com".into()); + let host = endpoint.host(); + let port = endpoint.port(); + + use std::net::ToSocketAddrs; + match (&endpoint).to_socket_addrs() { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(mut socket_addrs) => match socket_addrs.next() { + None => panic!("Expected socket address"), + Some(addr) => { + assert_eq!(addr, (host, port).to_socket_addrs().unwrap().next().unwrap()); + assert!(socket_addrs.next().is_none()); + } + }, + } + } +} + +#[cfg(test)] +pub(crate) mod client_tests { + use super::*; + use std::io::BufRead; + use std::io::Write; + + /// Server for handling HTTP client requests with a stock response. + pub struct HttpServer { + address: std::net::SocketAddr, + handler: std::thread::JoinHandle<()>, + shutdown: std::sync::Arc, + } + + /// Body of HTTP response messages. + pub enum MessageBody { + Empty, + Content(T), + ChunkedContent(T), + } + + impl HttpServer { + fn responding_with_body(status: &str, body: MessageBody) -> Self { + let response = match body { + MessageBody::Empty => format!("{}\r\n\r\n", status), + MessageBody::Content(body) => { + let body = body.to_string(); + format!( + "{}\r\n\ + Content-Length: {}\r\n\ + \r\n\ + {}", + status, + body.len(), + body + ) + } + MessageBody::ChunkedContent(body) => { + let mut chuncked_body = Vec::new(); + { + use chunked_transfer::Encoder; + let mut encoder = Encoder::with_chunks_size(&mut chuncked_body, 8); + encoder.write_all(body.to_string().as_bytes()).unwrap(); + } + format!( + "{}\r\n\ + Transfer-Encoding: chunked\r\n\ + \r\n\ + {}", + status, + String::from_utf8(chuncked_body).unwrap() + ) + } + }; + HttpServer::responding_with(response) + } + + pub fn responding_with_ok(body: MessageBody) -> Self { + HttpServer::responding_with_body("HTTP/1.1 200 OK", body) + } + + pub fn responding_with_not_found() -> Self { + HttpServer::responding_with_body::("HTTP/1.1 404 Not Found", MessageBody::Empty) + } + + pub fn responding_with_server_error(content: T) -> Self { + let body = MessageBody::Content(content); + HttpServer::responding_with_body("HTTP/1.1 500 Internal Server Error", body) + } + + fn responding_with(response: String) -> Self { + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let address = listener.local_addr().unwrap(); + + let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let shutdown_signaled = std::sync::Arc::clone(&shutdown); + let handler = std::thread::spawn(move || { + for stream in listener.incoming() { + let mut stream = stream.unwrap(); + stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT)).unwrap(); + + let lines_read = std::io::BufReader::new(&stream) + .lines() + .take_while(|line| !line.as_ref().unwrap().is_empty()) + .count(); + if lines_read == 0 { + continue; + } + + for chunk in response.as_bytes().chunks(16) { + if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) { + return; + } else { + if let Err(_) = stream.write(chunk) { + break; + } + if let Err(_) = stream.flush() { + break; + } + } + } + } + }); + + Self { + address, + handler, + shutdown, + } + } + + fn shutdown(self) { + self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst); + self.handler.join().unwrap(); + } + + pub fn endpoint(&self) -> HttpEndpoint { + HttpEndpoint::for_host(self.address.ip().to_string()).with_port(self.address.port()) + } + } + + #[test] + fn connect_to_unresolvable_host() { + match HttpClient::connect(("example.invalid", 80)) { + Err(e) => { + assert!( + e.to_string().contains("failed to lookup address information") + || e.to_string().contains("No such host"), + "{:?}", + e + ); + } + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn connect_with_no_socket_address() { + match HttpClient::connect(&vec![][..]) { + Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput), + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn connect_with_unknown_server() { + match HttpClient::connect(("::", 80)) { + #[cfg(target_os = "windows")] + Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::AddrNotAvailable), + #[cfg(not(target_os = "windows"))] + Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::ConnectionRefused), + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn connect_with_valid_endpoint() { + let server = HttpServer::responding_with_ok::(MessageBody::Empty); + + match HttpClient::connect(&server.endpoint()) { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(_) => {} + } + } + + #[tokio::test] + async fn read_empty_message() { + let server = HttpServer::responding_with("".to_string()); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof); + assert_eq!(e.get_ref().unwrap().to_string(), "no status line"); + } + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn read_incomplete_message() { + let server = HttpServer::responding_with("HTTP/1.1 200 OK".to_string()); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof); + assert_eq!(e.get_ref().unwrap().to_string(), "no headers"); + } + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn read_too_large_message_headers() { + let response = format!( + "HTTP/1.1 302 Found\r\n\ + Location: {}\r\n\ + \r\n", + "Z".repeat(MAX_HTTP_MESSAGE_HEADER_SIZE) + ); + let server = HttpServer::responding_with(response); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof); + assert_eq!(e.get_ref().unwrap().to_string(), "no headers"); + } + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn read_too_large_message_body() { + let body = "Z".repeat(MAX_HTTP_MESSAGE_BODY_SIZE + 1); + let server = HttpServer::responding_with_ok::(MessageBody::Content(body)); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "out of range"); + } + Ok(_) => panic!("Expected error"), + } + server.shutdown(); + } + + #[tokio::test] + async fn read_message_with_unsupported_transfer_coding() { + let response = String::from( + "HTTP/1.1 200 OK\r\n\ + Transfer-Encoding: gzip\r\n\ + \r\n\ + foobar", + ); + let server = HttpServer::responding_with(response); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); + assert_eq!(e.get_ref().unwrap().to_string(), "unsupported transfer coding"); + } + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn read_error() { + let server = HttpServer::responding_with_server_error("foo"); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::Other); + let http_error = e.into_inner().unwrap().downcast::().unwrap(); + assert_eq!(http_error.status_code, "500"); + assert_eq!(http_error.contents, "foo".as_bytes()); + } + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn read_empty_message_body() { + let server = HttpServer::responding_with_ok::(MessageBody::Empty); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(bytes) => assert_eq!(bytes.0, Vec::::new()), + } + } + + #[tokio::test] + async fn read_message_body_with_length() { + let body = "foo bar baz qux".repeat(32); + let content = MessageBody::Content(body.clone()); + let server = HttpServer::responding_with_ok::(content); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()), + } + } + + #[tokio::test] + async fn read_chunked_message_body() { + let body = "foo bar baz qux".repeat(32); + let chunked_content = MessageBody::ChunkedContent(body.clone()); + let server = HttpServer::responding_with_ok::(chunked_content); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()), + } + } + + #[tokio::test] + async fn reconnect_closed_connection() { + let server = HttpServer::responding_with_ok::(MessageBody::Empty); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + assert!(client.get::("/foo", "foo.com").await.is_ok()); + match client.get::("/foo", "foo.com").await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(bytes) => assert_eq!(bytes.0, Vec::::new()), + } + } + + #[test] + fn from_bytes_into_binary_response() { + let bytes = b"foo"; + match BinaryResponse::try_from(bytes.to_vec()) { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(response) => assert_eq!(&response.0, bytes), + } + } + + #[test] + fn from_invalid_bytes_into_json_response() { + let json = serde_json::json!({ "result": 42 }); + match JsonResponse::try_from(json.to_string().as_bytes()[..5].to_vec()) { + Err(_) => {} + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn from_valid_bytes_into_json_response() { + let json = serde_json::json!({ "result": 42 }); + match JsonResponse::try_from(json.to_string().as_bytes().to_vec()) { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(response) => assert_eq!(response.0, json), + } + } +} diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs new file mode 100644 index 00000000..7643685b --- /dev/null +++ b/rpc/src/lib.rs @@ -0,0 +1,12 @@ +// To the extent possible under law, the author(s) have dedicated all +// copyright and related and neighboring rights to this software to +// the public domain worldwide. This software is distributed without +// any warranty. +// +// You should have received a copy of the CC0 Public Domain Dedication +// along with this software. +// If not, see . +// + +pub mod http; +pub mod rpc; diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs new file mode 100644 index 00000000..aaae1c2b --- /dev/null +++ b/rpc/src/rpc.rs @@ -0,0 +1,239 @@ +//! Simple RPC client implementation which implements [`BlockSource`] against a Bitcoin Core RPC +//! endpoint. + +use crate::http::{Error, HttpClient, HttpEndpoint, HttpError, JsonResponse, RpcError}; + +use base64; +use serde_json; + +use std::cell::RefCell; +use std::convert::TryFrom; +use std::fmt::Debug; +use std::rc::Rc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +/// A simple RPC client for calling methods using HTTP `POST`. +#[derive(Debug)] +pub struct RpcClient { + basic_auth: String, + endpoint: HttpEndpoint, + client: Rc>, + id: AtomicUsize, +} + +impl RpcClient { + /// Creates a new RPC client connected to the given endpoint with the provided credentials. The + /// credentials should be a base64 encoding of a user name and password joined by a colon, as is + /// required for HTTP basic access authentication. + pub fn new( + user: Option, + password: Option, + endpoint: HttpEndpoint, + ) -> std::io::Result { + let client = HttpClient::connect(&endpoint)?; + + let basic_auth = match (user, password) { + (Some(u), Some(p)) => "Basic ".to_string() + &base64::encode(format!("{}:{}", u, p)), + _ => "Basic ".to_string(), + }; + + Ok(Self { + basic_auth, + endpoint, + client: Rc::new(RefCell::new(client)), + id: AtomicUsize::new(0), + }) + } + + pub fn from_url( + user: Option, + password: Option, + url: &str, + ) -> std::io::Result { + let endpoint = HttpEndpoint::from_url(url)?; + Self::new(user, password, endpoint) + } + + /// Calls a method with the response encoded in JSON format and interpreted as type `T`. + pub async fn call_method serde::de::Deserialize<'a>>( + &self, + method: &str, + params: &[serde_json::Value], + ) -> Result { + let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port()); + let uri = self.endpoint.path(); + let content = serde_json::json!({ + "method": method, + "params": params, + "id": &self.id.fetch_add(1, Ordering::AcqRel).to_string() + }); + + let mut response = match self + .client + .borrow_mut() + .post::(&uri, &host, &self.basic_auth, content) + .await + { + Ok(JsonResponse(response)) => response, + Err(e) if e.kind() == std::io::ErrorKind::Other => { + match e.get_ref().unwrap().downcast_ref::() { + Some(http_error) => match JsonResponse::try_from(http_error.contents.clone()) { + Ok(JsonResponse(response)) => response, + Err(_) => Err(e)?, + }, + None => Err(e)?, + } + } + Err(e) => Err(e)?, + }; + + if !response.is_object() { + return Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "expected JSON object", + ))); + } + + let error = &response["error"]; + if !error.is_null() { + let message = error["message"].as_str().unwrap_or("unknown error").to_string(); + return Err(Error::Rpc(RpcError::new( + i32::try_from(error["code"].as_i64().unwrap()).unwrap(), + message, + ))); + } + + let result = &mut response["result"]; + if result.is_null() { + return Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "expected JSON result", + ))); + } + + serde_json::from_value(result.clone()).map_err(|e| Error::Json(e)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::http::client_tests::{HttpServer, MessageBody}; + + /// Credentials + const USER: &'static str = "user"; + const PASS: &'static str = "password"; + + #[tokio::test] + async fn call_method_returning_unknown_response() { + let server = HttpServer::responding_with_not_found(); + let client = + RpcClient::new(Some(USER.into()), Some(PASS.into()), server.endpoint()).unwrap(); + + match client.call_method::("getblockcount", &[]).await { + Err(e) => match e { + Error::Io(io_e) => assert_eq!(io_e.kind(), std::io::ErrorKind::Other), + _ => panic!("Expected IO Error"), + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn call_method_returning_malformed_response() { + let response = serde_json::json!("foo"); + let server = HttpServer::responding_with_ok(MessageBody::Content(response)); + let client = + RpcClient::new(Some(USER.into()), Some(PASS.into()), server.endpoint()).unwrap(); + + match client.call_method::("getblockcount", &[]).await { + Err(e) => match e { + Error::Io(io_e) => { + assert_eq!(io_e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(io_e.get_ref().unwrap().to_string(), "expected JSON object"); + } + _ => panic!("Expected IO Error"), + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn call_method_returning_error() { + let errno = -8; + let errmsg = "invalid parameter"; + + let response = serde_json::json!({ + "error": { "code": errno.clone(), "message": errmsg.clone() }, + }); + let server = HttpServer::responding_with_server_error(response); + let client = + RpcClient::new(Some(USER.into()), Some(PASS.into()), server.endpoint()).unwrap(); + + let invalid_block_hash = serde_json::json!("foo"); + match client.call_method::("getblock", &[invalid_block_hash]).await { + Err(e) => match e { + Error::Rpc(rpc_e) => { + assert_eq!(rpc_e.code, errno); + assert_eq!(rpc_e.message, errmsg); + } + _ => panic!("Expected RPC Error"), + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn call_method_returning_missing_result() { + let response = serde_json::json!({ "result": null }); + let server = HttpServer::responding_with_ok(MessageBody::Content(response)); + let client = + RpcClient::new(Some(USER.into()), Some(PASS.into()), server.endpoint()).unwrap(); + + match client.call_method::("getblockcount", &[]).await { + Err(e) => match e { + Error::Io(io_e) => { + assert_eq!(io_e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(io_e.get_ref().unwrap().to_string(), "expected JSON result"); + } + _ => panic!("Expected IO Error"), + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn call_method_returning_malformed_result() { + let data = "foo"; + let response = serde_json::json!({ "result": data.clone() }); + let server = HttpServer::responding_with_ok(MessageBody::Content(response)); + let client = + RpcClient::new(Some(USER.into()), Some(PASS.into()), server.endpoint()).unwrap(); + + match client.call_method::("getblockcount", &[]).await { + Err(e) => match e { + Error::Json(json_e) => { + assert_eq!( + json_e.to_string(), + format!("invalid type: string \"{}\", expected u64", data) + ); + } + _ => panic!("Expected IO Error"), + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn call_method_returning_valid_result() { + let response = serde_json::json!({ "result": 654470 }); + let server = HttpServer::responding_with_ok(MessageBody::Content(response)); + let client = + RpcClient::new(Some(USER.into()), Some(PASS.into()), server.endpoint()).unwrap(); + + match client.call_method::("getblockcount", &[]).await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(count) => assert_eq!(count, 654470), + } + } +}