Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/link/unicast/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,13 @@ static size_t _z_f_link_read_exact_tls(const _z_link_t *self, uint8_t *ptr, size
rb = _z_read_tls(&self->_socket._tls, &ptr[n], len - n);
}

if (rb == SIZE_MAX) {
n = rb;
if (rb == 0) {
// Socket closed
n = 0;
break;
} else if (rb == SIZE_MAX) {
// WANT_READ or timeout - continue loop to retry
continue;
}
n += rb;
} while (n != len);
Expand Down
6 changes: 4 additions & 2 deletions src/system/unix/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -662,11 +662,13 @@ size_t _z_read_tls(const _z_tls_socket_t *sock, uint8_t *ptr, size_t len) {
}

if (ret == MBEDTLS_ERR_SSL_WANT_READ || ret == MBEDTLS_ERR_SSL_WANT_WRITE) {
return 0;
// Like EAGAIN from recv() with SO_RCVTIMEO
return SIZE_MAX;
}

if (ret == 0 || ret == MBEDTLS_ERR_SSL_PEER_CLOSE_NOTIFY || ret == MBEDTLS_ERR_SSL_CONN_EOF) {
return SIZE_MAX;
// Connection closed gracefully, like TCP recv() returning 0
return 0;
}

_Z_ERROR("TLS read error: %d", ret);
Expand Down
56 changes: 43 additions & 13 deletions src/transport/unicast/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,39 +74,54 @@ static z_result_t _z_unicast_process_messages(_z_transport_unicast_t *ztu, _z_tr
return _Z_RES_OK;
}

static bool _z_unicast_client_read(_z_transport_unicast_t *ztu, _z_transport_peer_unicast_t *peer, size_t *to_read) {
static int _z_unicast_client_read(_z_transport_unicast_t *ztu, _z_transport_peer_unicast_t *peer, size_t *to_read) {
switch (ztu->_common._link->_cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
if (_z_zbuf_len(&ztu->_common._zbuf) < _Z_MSG_LEN_ENC_SIZE) {
_z_link_socket_recv_zbuf(ztu->_common._link, &ztu->_common._zbuf, peer->_socket);
size_t rb = _z_link_socket_recv_zbuf(ztu->_common._link, &ztu->_common._zbuf, peer->_socket);
if (rb == SIZE_MAX) {
return _Z_UNICAST_PEER_READ_STATUS_PENDING_DATA;
}
if (rb == 0) {
return _Z_UNICAST_PEER_READ_STATUS_SOCKET_CLOSED;
}
if (_z_zbuf_len(&ztu->_common._zbuf) < _Z_MSG_LEN_ENC_SIZE) {
_z_zbuf_compact(&ztu->_common._zbuf);
return false;
return _Z_UNICAST_PEER_READ_STATUS_PENDING_DATA;
}
}
// Get stream size
*to_read = _z_read_stream_size(&ztu->_common._zbuf);
// Read data
if (_z_zbuf_len(&ztu->_common._zbuf) < *to_read) {
_z_link_socket_recv_zbuf(ztu->_common._link, &ztu->_common._zbuf, peer->_socket);
size_t rb = _z_link_socket_recv_zbuf(ztu->_common._link, &ztu->_common._zbuf, peer->_socket);
if (rb == SIZE_MAX) {
return _Z_UNICAST_PEER_READ_STATUS_PENDING_DATA;
}
if (rb == 0) {
return _Z_UNICAST_PEER_READ_STATUS_SOCKET_CLOSED;
}
if (_z_zbuf_len(&ztu->_common._zbuf) < *to_read) {
_z_zbuf_set_rpos(&ztu->_common._zbuf, _z_zbuf_get_rpos(&ztu->_common._zbuf) - _Z_MSG_LEN_ENC_SIZE);
_z_zbuf_compact(&ztu->_common._zbuf);
return false;
return _Z_UNICAST_PEER_READ_STATUS_PENDING_DATA;
}
}
break;
case Z_LINK_CAP_FLOW_DATAGRAM:
_z_zbuf_compact(&ztu->_common._zbuf);
*to_read = _z_link_socket_recv_zbuf(ztu->_common._link, &ztu->_common._zbuf, peer->_socket);
if (*to_read == SIZE_MAX) {
return false;
return _Z_UNICAST_PEER_READ_STATUS_PENDING_DATA;
}
if (*to_read == 0) {
return _Z_UNICAST_PEER_READ_STATUS_SOCKET_CLOSED;
}
break;
default:
break;
}
return true;
return _Z_UNICAST_PEER_READ_STATUS_OK;
}

z_result_t _zp_unicast_read(_z_transport_unicast_t *ztu, bool single_read) {
Expand All @@ -128,11 +143,15 @@ z_result_t _zp_unicast_read(_z_transport_unicast_t *ztu, bool single_read) {
_z_zbuf_reset(&ztu->_common._zbuf);
size_t to_read = 0;
// Retrieve data if any
if (_z_unicast_client_read(ztu, curr_peer, &to_read)) {
int read_status = _z_unicast_client_read(ztu, curr_peer, &to_read);
if (read_status == _Z_UNICAST_PEER_READ_STATUS_OK) {
// Process data
_Z_RETURN_IF_ERR(_z_unicast_process_messages(ztu, curr_peer, to_read))
} else {
} else if (read_status == _Z_UNICAST_PEER_READ_STATUS_PENDING_DATA) {
return _Z_NO_DATA_PROCESSED;
} else {
// Socket closed or critical error
_Z_ERROR_RETURN(_Z_ERR_TRANSPORT_RX_FAILED);
}
}
return _Z_RES_OK;
Expand Down Expand Up @@ -257,6 +276,9 @@ static int _z_unicast_peer_read(_z_transport_unicast_t *ztu, _z_transport_peer_u
if (*to_read == SIZE_MAX) {
return _Z_UNICAST_PEER_READ_STATUS_PENDING_DATA;
}
if (*to_read == 0) {
return _Z_UNICAST_PEER_READ_STATUS_SOCKET_CLOSED;
}
break;
default:
break;
Expand Down Expand Up @@ -360,11 +382,19 @@ void *_zp_unicast_read_task(void *ztu_arg) {
{
size_t to_read = 0;
// Retrieve data
if (!_z_unicast_client_read(ztu, curr_peer, &to_read)) {
int read_status = _z_unicast_client_read(ztu, curr_peer, &to_read);
if (read_status == _Z_UNICAST_PEER_READ_STATUS_OK) {
// Process data
if (_z_unicast_process_messages(ztu, curr_peer, to_read) != _Z_RES_OK) {
ztu->_common._read_task_running = false;
continue;
}
} else if (read_status == _Z_UNICAST_PEER_READ_STATUS_PENDING_DATA) {
// No data available yet, continue loop
continue;
}
// Process data
if (_z_unicast_process_messages(ztu, curr_peer, to_read) != _Z_RES_OK) {
} else {
// Socket closed or critical error - exit task to allow reconnection
_Z_ERROR("Connection closed or error detected, exiting read task");
ztu->_common._read_task_running = false;
continue;
}
Expand Down
Loading