Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 120 additions & 148 deletions src/transport/tcp/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,52 +209,6 @@ impl TcpConnection {
}
}

/// Open connection to remote peer at `address`.
// TODO: https://github.com/paritytech/litep2p/issues/347 this function can be removed
pub(super) async fn open_connection(
connection_id: ConnectionId,
keypair: Keypair,
stream: TcpStream,
address: AddressType,
peer: Option<PeerId>,
yamux_config: crate::yamux::Config,
max_read_ahead_factor: usize,
max_write_buffer_size: usize,
connection_open_timeout: Duration,
substream_open_timeout: Duration,
) -> Result<NegotiatedConnection, NegotiationError> {
tracing::debug!(
target: LOG_TARGET,
?address,
?peer,
"open connection to remote peer",
);

match tokio::time::timeout(connection_open_timeout, async move {
Self::negotiate_connection(
stream,
peer,
connection_id,
keypair,
Role::Dialer,
address,
yamux_config,
max_read_ahead_factor,
max_write_buffer_size,
substream_open_timeout,
)
.await
})
.await
{
Err(_) => {
tracing::trace!(target: LOG_TARGET, ?connection_id, "connection timed out during negotiation");
Err(NegotiationError::Timeout)
}
Ok(result) => result,
}
}

/// Open substream for `protocol`.
pub(super) async fn open_substream(
mut control: crate::yamux::Control,
Expand Down Expand Up @@ -317,26 +271,20 @@ impl TcpConnection {
) -> Result<NegotiatedConnection, NegotiationError> {
tracing::debug!(target: LOG_TARGET, ?address, "accept connection");

match tokio::time::timeout(connection_open_timeout, async move {
Self::negotiate_connection(
stream,
None,
connection_id,
keypair,
Role::Listener,
AddressType::Socket(address),
yamux_config,
max_read_ahead_factor,
max_write_buffer_size,
substream_open_timeout,
)
.await
})
Self::negotiate_connection(
stream,
None,
connection_id,
keypair,
Role::Listener,
AddressType::Socket(address),
yamux_config,
max_read_ahead_factor,
max_write_buffer_size,
connection_open_timeout,
substream_open_timeout,
)
.await
{
Err(_) => Err(NegotiationError::Timeout),
Ok(result) => result,
}
}

/// Accept substream.
Expand Down Expand Up @@ -410,89 +358,107 @@ impl TcpConnection {
yamux_config: crate::yamux::Config,
max_read_ahead_factor: usize,
max_write_buffer_size: usize,
connection_open_timeout: Duration,
substream_open_timeout: Duration,
) -> Result<NegotiatedConnection, NegotiationError> {
tracing::trace!(
target: LOG_TARGET,
?address,
?dialed_peer,
?role,
"negotiate connection",
"negotiate connection to remote peer",
);

let stream = TokioAsyncReadCompatExt::compat(stream).into_inner();
let stream = TokioAsyncWriteCompatExt::compat_write(stream);
let negotiate_task = async move {
let stream = TokioAsyncReadCompatExt::compat(stream).into_inner();
let stream = TokioAsyncWriteCompatExt::compat_write(stream);

// negotiate `noise`
let (stream, _) =
Self::negotiate_protocol(stream, &role, vec!["/noise"], substream_open_timeout).await?;
// negotiate `noise`
let (stream, _) =
Self::negotiate_protocol(stream, &role, vec!["/noise"], substream_open_timeout)
.await?;

tracing::trace!(
target: LOG_TARGET,
"`multistream-select` and `noise` negotiated",
);
tracing::trace!(
target: LOG_TARGET,
"`multistream-select` and `noise` negotiated",
);

// perform noise handshake
let (stream, peer) = noise::handshake(
stream.inner(),
&keypair,
role,
max_read_ahead_factor,
max_write_buffer_size,
substream_open_timeout,
noise::HandshakeTransport::Tcp,
)
.await?;
// perform noise handshake
let (stream, peer) = noise::handshake(
stream.inner(),
&keypair,
role,
max_read_ahead_factor,
max_write_buffer_size,
substream_open_timeout,
noise::HandshakeTransport::Tcp,
)
.await?;

if let Some(dialed_peer) = dialed_peer {
if dialed_peer != peer {
tracing::debug!(target: LOG_TARGET, ?dialed_peer, ?peer, "peer id mismatch");
return Err(NegotiationError::PeerIdMismatch(dialed_peer, peer));
if let Some(dialed_peer) = dialed_peer {
if dialed_peer != peer {
tracing::debug!(target: LOG_TARGET, ?dialed_peer, ?peer, "peer id mismatch");
return Err(NegotiationError::PeerIdMismatch(dialed_peer, peer));
}
}
}

tracing::trace!(target: LOG_TARGET, "noise handshake done");
let stream: NoiseSocket<Compat<TcpStream>> = stream;

// negotiate `yamux`
let (stream, _) =
Self::negotiate_protocol(stream, &role, vec!["/yamux/1.0.0"], substream_open_timeout)
.await?;
tracing::trace!(target: LOG_TARGET, "`yamux` negotiated");
tracing::trace!(target: LOG_TARGET, "noise handshake done");
let stream: NoiseSocket<Compat<TcpStream>> = stream;

let connection = crate::yamux::Connection::new(stream.inner(), yamux_config, role.into());
let (control, connection) = crate::yamux::Control::new(connection);
// negotiate `yamux`
let (stream, _) = Self::negotiate_protocol(
stream,
&role,
vec!["/yamux/1.0.0"],
substream_open_timeout,
)
.await?;
tracing::trace!(target: LOG_TARGET, "`yamux` negotiated");

let connection =
crate::yamux::Connection::new(stream.inner(), yamux_config, role.into());
let (control, connection) = crate::yamux::Control::new(connection);

let address = match address {
AddressType::Socket(address) => Multiaddr::empty()
.with(Protocol::from(address.ip()))
.with(Protocol::Tcp(address.port())),
AddressType::Dns {
address,
port,
dns_type,
} => match dns_type {
DnsType::Dns => Multiaddr::empty()
.with(Protocol::Dns(Cow::Owned(address)))
.with(Protocol::Tcp(port)),
DnsType::Dns4 => Multiaddr::empty()
.with(Protocol::Dns4(Cow::Owned(address)))
.with(Protocol::Tcp(port)),
DnsType::Dns6 => Multiaddr::empty()
.with(Protocol::Dns6(Cow::Owned(address)))
.with(Protocol::Tcp(port)),
},
};
let endpoint = match role {
Role::Dialer => Endpoint::dialer(address, connection_id),
Role::Listener => Endpoint::listener(address, connection_id),
};

let address = match address {
AddressType::Socket(address) => Multiaddr::empty()
.with(Protocol::from(address.ip()))
.with(Protocol::Tcp(address.port())),
AddressType::Dns {
address,
port,
dns_type,
} => match dns_type {
DnsType::Dns => Multiaddr::empty()
.with(Protocol::Dns(Cow::Owned(address)))
.with(Protocol::Tcp(port)),
DnsType::Dns4 => Multiaddr::empty()
.with(Protocol::Dns4(Cow::Owned(address)))
.with(Protocol::Tcp(port)),
DnsType::Dns6 => Multiaddr::empty()
.with(Protocol::Dns6(Cow::Owned(address)))
.with(Protocol::Tcp(port)),
},
Ok(NegotiatedConnection {
peer,
control,
connection,
endpoint,
substream_open_timeout,
})
};
let endpoint = match role {
Role::Dialer => Endpoint::dialer(address, connection_id),
Role::Listener => Endpoint::listener(address, connection_id),

let Ok(result) = tokio::time::timeout(connection_open_timeout, negotiate_task).await else {
tracing::trace!(target: LOG_TARGET, ?connection_id, "connection timed out during negotiation");
return Err(NegotiationError::Timeout);
};

Ok(NegotiatedConnection {
peer,
control,
connection,
endpoint,
substream_open_timeout,
})
result
}

/// Handles the yamux substream.
Expand Down Expand Up @@ -787,12 +753,13 @@ mod tests {
.await
.unwrap();

match TcpConnection::open_connection(
match TcpConnection::negotiate_connection(
stream,
None,
ConnectionId::from(0usize),
Keypair::generate(),
stream,
Role::Dialer,
AddressType::Socket(address),
None,
Default::default(),
5,
2,
Expand Down Expand Up @@ -889,12 +856,13 @@ mod tests {
.await
.unwrap();

match TcpConnection::open_connection(
match TcpConnection::negotiate_connection(
stream,
None,
ConnectionId::from(0usize),
Keypair::generate(),
stream,
Role::Dialer,
AddressType::Socket(address),
None,
Default::default(),
5,
2,
Expand Down Expand Up @@ -1038,12 +1006,13 @@ mod tests {
.await
.unwrap();

match TcpConnection::open_connection(
match TcpConnection::negotiate_connection(
stream,
None,
ConnectionId::from(0usize),
Keypair::generate(),
stream,
Role::Dialer,
AddressType::Socket(address),
None,
Default::default(),
5,
2,
Expand Down Expand Up @@ -1091,12 +1060,13 @@ mod tests {
.await
.unwrap();

match TcpConnection::open_connection(
match TcpConnection::negotiate_connection(
stream,
None,
ConnectionId::from(0usize),
Keypair::generate(),
stream,
Role::Dialer,
AddressType::Socket(address),
None,
Default::default(),
5,
2,
Expand Down Expand Up @@ -1271,12 +1241,13 @@ mod tests {
.await
.unwrap();

match TcpConnection::open_connection(
match TcpConnection::negotiate_connection(
stream,
None,
ConnectionId::from(0usize),
Keypair::generate(),
stream,
Role::Dialer,
AddressType::Socket(address),
None,
Default::default(),
5,
2,
Expand Down Expand Up @@ -1406,12 +1377,13 @@ mod tests {
.await
.unwrap();

match TcpConnection::open_connection(
match TcpConnection::negotiate_connection(
stream,
None,
ConnectionId::from(0usize),
Keypair::generate(),
stream,
Role::Dialer,
AddressType::Socket(address),
None,
Default::default(),
5,
2,
Expand Down
8 changes: 5 additions & 3 deletions src/transport/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,13 @@ impl Transport for TcpTransport {
.await
.map_err(|error| (connection_id, error))?;

TcpConnection::open_connection(
TcpConnection::negotiate_connection(
stream,
peer,
connection_id,
keypair,
stream,
Role::Dialer,
socket_address,
peer,
yamux_config,
max_read_ahead_factor,
max_write_buffer_size,
Expand Down Expand Up @@ -530,6 +531,7 @@ impl Transport for TcpTransport {
yamux_config,
max_read_ahead_factor,
max_write_buffer_size,
connection_open_timeout,
substream_open_timeout,
)
.await
Expand Down