1
1
//! [`jsonrpsee`] transport adapter implementation for IPC.
2
2
3
3
use crate :: stream_codec:: StreamCodec ;
4
- use futures:: StreamExt ;
5
- use interprocess:: local_socket:: tokio:: { LocalSocketStream , OwnedReadHalf , OwnedWriteHalf } ;
4
+ use futures:: { StreamExt , TryFutureExt } ;
5
+ use interprocess:: local_socket:: {
6
+ tokio:: { prelude:: * , RecvHalf , SendHalf } ,
7
+ GenericFilePath ,
8
+ } ;
6
9
use jsonrpsee:: {
7
10
async_client:: { Client , ClientBuilder } ,
8
11
core:: client:: { ReceivedMessage , TransportReceiverT , TransportSenderT } ,
9
12
} ;
10
13
use std:: io;
11
14
use tokio:: io:: AsyncWriteExt ;
12
- use tokio_util:: {
13
- codec:: FramedRead ,
14
- compat:: { Compat , FuturesAsyncReadCompatExt , FuturesAsyncWriteCompatExt } ,
15
- } ;
15
+ use tokio_util:: codec:: FramedRead ;
16
16
17
17
/// Sending end of IPC transport.
18
18
#[ derive( Debug ) ]
19
19
pub ( crate ) struct Sender {
20
- inner : Compat < OwnedWriteHalf > ,
20
+ inner : SendHalf ,
21
21
}
22
22
23
23
#[ async_trait:: async_trait]
@@ -44,7 +44,7 @@ impl TransportSenderT for Sender {
44
44
/// Receiving end of IPC transport.
45
45
#[ derive( Debug ) ]
46
46
pub ( crate ) struct Receiver {
47
- pub ( crate ) inner : FramedRead < Compat < OwnedReadHalf > , StreamCodec > ,
47
+ pub ( crate ) inner : FramedRead < RecvHalf , StreamCodec > ,
48
48
}
49
49
50
50
#[ async_trait:: async_trait]
@@ -63,20 +63,17 @@ impl TransportReceiverT for Receiver {
63
63
pub ( crate ) struct IpcTransportClientBuilder ;
64
64
65
65
impl IpcTransportClientBuilder {
66
- pub ( crate ) async fn build (
67
- self ,
68
- endpoint : impl AsRef < str > ,
69
- ) -> Result < ( Sender , Receiver ) , IpcError > {
70
- let endpoint = endpoint. as_ref ( ) . to_string ( ) ;
71
- let conn = LocalSocketStream :: connect ( endpoint. clone ( ) )
66
+ pub ( crate ) async fn build ( self , path : & str ) -> Result < ( Sender , Receiver ) , IpcError > {
67
+ let conn = async { path. to_fs_name :: < GenericFilePath > ( ) }
68
+ . and_then ( LocalSocketStream :: connect)
72
69
. await
73
- . map_err ( |err| IpcError :: FailedToConnect { path : endpoint , err } ) ?;
70
+ . map_err ( |err| IpcError :: FailedToConnect { path : path . to_string ( ) , err } ) ?;
74
71
75
- let ( rhlf , whlf ) = conn. into_split ( ) ;
72
+ let ( recv , send ) = conn. split ( ) ;
76
73
77
74
Ok ( (
78
- Sender { inner : whlf . compat_write ( ) } ,
79
- Receiver { inner : FramedRead :: new ( rhlf . compat ( ) , StreamCodec :: stream_incoming ( ) ) } ,
75
+ Sender { inner : send } ,
76
+ Receiver { inner : FramedRead :: new ( recv , StreamCodec :: stream_incoming ( ) ) } ,
80
77
) )
81
78
}
82
79
}
@@ -92,14 +89,14 @@ impl IpcClientBuilder {
92
89
/// ```
93
90
/// use jsonrpsee::{core::client::ClientT, rpc_params};
94
91
/// use reth_ipc::client::IpcClientBuilder;
92
+ ///
95
93
/// # async fn run_client() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
96
94
/// let client = IpcClientBuilder::default().build("/tmp/my-uds").await?;
97
95
/// let response: String = client.request("say_hello", rpc_params![]).await?;
98
- /// # Ok(())
99
- /// # }
96
+ /// # Ok(()) }
100
97
/// ```
101
- pub async fn build ( self , path : impl AsRef < str > ) -> Result < Client , IpcError > {
102
- let ( tx, rx) = IpcTransportClientBuilder :: default ( ) . build ( path ) . await ?;
98
+ pub async fn build ( self , name : & str ) -> Result < Client , IpcError > {
99
+ let ( tx, rx) = IpcTransportClientBuilder :: default ( ) . build ( name ) . await ?;
103
100
Ok ( self . build_with_tokio ( tx, rx) )
104
101
}
105
102
@@ -139,20 +136,24 @@ pub enum IpcError {
139
136
140
137
#[ cfg( test) ]
141
138
mod tests {
142
- use crate :: server:: dummy_endpoint;
143
- use interprocess:: local_socket:: tokio:: LocalSocketListener ;
139
+ use interprocess:: local_socket:: ListenerOptions ;
144
140
145
141
use super :: * ;
142
+ use crate :: server:: dummy_name;
146
143
147
144
#[ tokio:: test]
148
145
async fn test_connect ( ) {
149
- let endpoint = dummy_endpoint ( ) ;
150
- let binding = LocalSocketListener :: bind ( endpoint. clone ( ) ) . unwrap ( ) ;
146
+ let name = & dummy_name ( ) ;
147
+
148
+ let binding = ListenerOptions :: new ( )
149
+ . name ( name. as_str ( ) . to_fs_name :: < GenericFilePath > ( ) . unwrap ( ) )
150
+ . create_tokio ( )
151
+ . unwrap ( ) ;
151
152
tokio:: spawn ( async move {
152
153
let _x = binding. accept ( ) . await ;
153
154
} ) ;
154
155
155
- let ( tx, rx) = IpcTransportClientBuilder :: default ( ) . build ( endpoint ) . await . unwrap ( ) ;
156
+ let ( tx, rx) = IpcTransportClientBuilder :: default ( ) . build ( name ) . await . unwrap ( ) ;
156
157
let _ = IpcClientBuilder :: default ( ) . build_with_tokio ( tx, rx) ;
157
158
}
158
159
}
0 commit comments