Skip to content

Commit

Permalink
update local mcast hack logic to sendmsg()
Browse files Browse the repository at this point in the history
Better test of whether received packet was forwarded,
based on OS provided meta-data instead of peer provided
unicast flag.

Also use ORIGIN_TAG (original destination) address as
UDP source address if a local interface address.
  • Loading branch information
mdavidsaver committed Feb 16, 2025
1 parent ceb7a9e commit 1200b72
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 34 deletions.
96 changes: 63 additions & 33 deletions src/udp_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ struct UDPCollector final : public UDPManager::Search,
bool handle_one();

enum origin_t {
Remote, // non-local sender
Local, // sent from a local interface, including loopback
OriginTag, // payload of CMD_ORIGIN_TAG
Direct, // received directly from original client
Forward, // forwarded by a local peer
OriginTag, // ... with CMD_ORIGIN_TAG
};

void process_one(const SockAddr& dest, const uint8_t* buf, size_t nrx, origin_t origin);
void process_one(const uint8_t* buf, size_t nrx, origin_t origin);
static void handle_static(evutil_socket_t fd, short ev, void *raw)
{
(void)fd;
Expand All @@ -84,7 +84,7 @@ struct UDPCollector final : public UDPManager::Search,
}
}

void forwardM(const SockAddr& origin, const uint8_t* buf, size_t len);
void forwardM(const uint8_t* buf, size_t len);

// Search interface
public:
Expand Down Expand Up @@ -226,8 +226,6 @@ static constexpr size_t cmd_origin_tag_size = 8 + 16;

bool UDPCollector::handle_one()
{
SockAddr dest;

buf.resize(cmd_origin_tag_size + 0x10000 + 1);
auto rxbuf = &buf[cmd_origin_tag_size];
auto rxlen = buf.size()-cmd_origin_tag_size-1;
Expand All @@ -236,6 +234,7 @@ bool UDPCollector::handle_one()
// Ensure one extra byte at the end of the buffer for a nil after the last PV name
recvfromx rx{sock.sock, (char*)rxbuf, rxlen, &src, &dest};
const int nrx = rx.call();
srcIface = rx.dstif;

if(nrx>=0 && rx.ndrop!=0u && prevndrop!=rx.ndrop) {
log_debug_printf(logio, "UDP collector socket buffer overflowed %u -> %u\n", unsigned(prevndrop), unsigned(rx.ndrop));
Expand All @@ -261,16 +260,24 @@ bool UDPCollector::handle_one()
return true;
}

log_hex_printf(logio, Level::Debug, rxbuf, nrx, "UDP Rx %d, %s -> %s @%u (%s)\n",
nrx, src.tostring().c_str(), dest.tostring().c_str(), unsigned(rx.dstif), bind_addr.tostring().c_str());
origin_t origin = Direct;
if(manager->ifmap.is_lo(rx.dstif) && dest.compare(lo_mcast_addr.addr,false)==0) {
// packet forwarded by a local PVA peer (maybe us) as IPv4 local multicast
origin = Forward;
// UDP header info of forwarder not relevant to reply. Spoil...
src = dest = SockAddr(AF_INET); // IPv6 does not support local mcast :(
srcIface = 0;
}

origin_t origin = manager->ifmap.is_iface(src) ? Local : Remote;
log_hex_printf(logio, Level::Debug, rxbuf, nrx, "UDP Rx %d, %s -> %s @%u (%s) : %s\n",
nrx, src.tostring().c_str(), dest.tostring().c_str(), unsigned(rx.dstif), bind_addr.tostring().c_str(),
origin==Direct ? "Direct" : "Forward");

process_one(dest, rxbuf, nrx, origin);
process_one(rxbuf, nrx, origin);
return true;
}

void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t nrx, origin_t origin)
void UDPCollector::process_one(const uint8_t *buf, size_t nrx, origin_t origin)
{
FixedBuf M(true, const_cast<uint8_t*>(buf), nrx);
Header head{};
Expand Down Expand Up @@ -312,17 +319,22 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t
from_wire(M, port);
if(server.isAny()) {
server = src;
if(origin==OriginTag) {
log_err_printf(logio, "CMD_ORIGIN_TAG search with reply to sender never works%s", "\n");
if(origin!=Direct) {
log_err_printf(logio, "Forwarded SEARCH with reply to sender never works%s", "\n");
return;
}
}
server.setPort(port);

if(!(flags&pva_search_flags::Unicast)) {
// can't use bcast/mcast as source address when replying
dest = SockAddr(dest.family());
}

if(!M.good() || !(flags&pva_search_flags::Unicast) || dest.family()!=AF_INET) {
// invalid, bcast, or not ipv4

} else if(dest.compare(lo_mcast_addr.addr,false)!=0) {
} else if(origin==Direct) {
assert(buf==&this->buf[cmd_origin_tag_size]);
// clear unicast flag in forwarded message
*save_flags &= ~pva_search_flags::Unicast;
Expand All @@ -332,7 +344,7 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t
to_wire(R, server);
assert(R.good());
}
forwardM(dest, buf, nrx);
forwardM(buf, nrx);
return;

} else {
Expand Down Expand Up @@ -438,18 +450,29 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t
// only accept when sent to the mcast address through the loopback address
// since we only join the mcast group on loopback this will hopefully
// frustrate attempts to inject CMD_ORIGIN_TAG externally.
if(M.good() && origin==Local && dest.compare(lo_mcast_addr.addr,false)==0) {
originaddr.setPort(bind_addr.port());

process_one(originaddr, M.save(), M.size(), OriginTag);
if(!M.good()) {
log_err_printf(logio, "malformed ORIGIN_TAG from %s %c%c\n",
originaddr.tostring().c_str(),
M.good() ? 'T' : 'F',
origin==Forward ? 'T' : 'F');
return;

} else if(origin==Forward
&& (srcIface = manager->ifmap.index_of(originaddr))!=0)
{
originaddr.setPort(bind_addr.port());
dest = originaddr;
process_one(M.save(), M.size(), OriginTag);
return;

} else {
// forwarded w/ non-local ORIGIN_TAG??
log_warn_printf(logio, "Ignore originated from %s %c%c\n",
originaddr.tostring().c_str(),
M.good() ? 'T' : 'F',
origin==Forward ? 'T' : 'F');
// continue and try to process search as if directly received
}
log_debug_printf(logio, "Ignore originated from %s %c%c%c\n",
originaddr.tostring().c_str(),
M.good() ? 'T' : 'F',
origin==Local ? 'T' : 'F',
dest.compare(lo_mcast_addr.addr,false)==0 ? 'T' : 'F');

break;
}
Expand All @@ -459,10 +482,10 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t
}
}

void UDPCollector::forwardM(const SockAddr& origin, const uint8_t *pbuf, size_t plen)
void UDPCollector::forwardM(const uint8_t *pbuf, size_t plen)
{
log_debug_printf(logio, "Forward as originated for %s\n",
origin.tostring().c_str());
dest.tostring().c_str());

assert(buf.size() > cmd_origin_tag_size);
assert(pbuf==&buf[cmd_origin_tag_size]);
Expand All @@ -471,11 +494,15 @@ void UDPCollector::forwardM(const SockAddr& origin, const uint8_t *pbuf, size_t
FixedBuf M(true, &buf[0], cmd_origin_tag_size);

to_wire(M, Header{CMD_ORIGIN_TAG, 0, 16u});
to_wire(M, origin);
to_wire(M, dest);
assert(M.good());
assert(M.save()==&buf[cmd_origin_tag_size]);
}

// mcast_prep_sendto() will override routing
srcIface = 0;
dest = SockAddr(src.family());

sock.mcast_prep_sendto(lo_mcast_addr);
src = lo_mcast_addr.addr;
reply(&buf[0], cmd_origin_tag_size+plen);
Expand All @@ -485,17 +512,20 @@ bool UDPCollector::reply(const void *msg, size_t msglen) const
{
manager->loop.assertInLoop();

log_hex_printf(logio, Level::Debug, msg, msglen, "Send %s -> %s\n",
bind_addr.tostring().c_str(), src.tostring().c_str());
log_hex_printf(logio, Level::Debug, msg, msglen, "Send %s -> %s, %s,%u\n",
bind_addr.tostring().c_str(), src.tostring().c_str(),
dest.tostring().c_str(), unsigned(srcIface));

auto ntx = sendto(sock.sock, (char*)msg, msglen, 0, &src->sa, src.size());
auto ntx = sendtox{sock.sock, (char*)msg, msglen, &src, &dest, srcIface}.call();
if(ntx<0) {
int err = evutil_socket_geterror(sock.sock);
if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) {
// nothing to do here
} else {
log_warn_printf(logio, "UDP TX Error on %s -> %s : (%d) %s\n",
name.c_str(), src.tostring().c_str(),
log_warn_printf(logio, "UDP TX Error on %s,%s,%u -> %s : (%d) %s\n",
name.c_str(),
dest.tostring().c_str(), unsigned(srcIface),
src.tostring().c_str(),
err, evutil_socket_error_to_string(err));
}
return false; // wait for more I/O
Expand Down
4 changes: 3 additions & 1 deletion src/udp_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ struct PVXS_API UDPManager

struct PVXS_API Search {
std::vector<std::string> otherproto; // any protocols other than "tcp"
SockAddr src;
SockAddr src; // sender/client address
SockAddr dest; // destination IP used by client
SockAddr server;
uint64_t srcIface;
uint32_t searchID;
uint8_t peerVersion;
bool protoTCP = false; // included protocol "tcp"
Expand Down

0 comments on commit 1200b72

Please sign in to comment.