Skip to content

Commit

Permalink
Set log filters + improve logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
brocaar committed Oct 22, 2024
1 parent 4f81dc4 commit d622b2d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 20 deletions.
40 changes: 22 additions & 18 deletions src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::traits::PrintFullError;
static SERVERS: OnceCell<RwLock<Vec<Server>>> = OnceCell::const_new();

struct Server {
host: String,
server: String,
uplink_only: bool,
gateway_id_prefixes: Vec<lrwn_filters::EuiPrefix>,
downlink_tx: UnboundedSender<(GatewayId, Vec<u8>)>,
Expand All @@ -43,21 +43,21 @@ impl Server {
// Check if we already have a socket for the given Gateway ID to the
// server and if not, we create it.
if let std::collections::hash_map::Entry::Vacant(e) = self.sockets.entry(gateway_id) {
info!(gateway_id = %gateway_id, server = %self.host, "Initializing forwarder to server");
info!(gateway_id = %gateway_id, server = %self.server, "Initializing forwarder to server");

let socket = UdpSocket::bind("0.0.0.0:0")
.await
.context("UDP socket bind")?;
socket
.connect(&self.host)
.connect(&self.server)
.await
.context("UDP socket connect")?;

let socket = Arc::new(socket);
let (stop_tx, stop_rx) = oneshot::channel::<()>();

tokio::spawn(handle_downlink(
self.host.clone(),
self.server.clone(),
stop_rx,
self.uplink_only,
socket.clone(),
Expand Down Expand Up @@ -142,23 +142,29 @@ async fn handle_uplink_packet(gateway_id: GatewayId, data: &[u8]) -> Result<()>
let socket = server.get_server_socket(gateway_id).await?;
socket.last_uplink = SystemTime::now();

let span = tracing::info_span!("", addr = %socket.socket.peer_addr().unwrap());
let _enter = span.enter();

match packet_type {
PacketType::PushData => {
info!(packet_type = %packet_type, "Sending UDP packet");
socket.push_data_token = Some(random_token);
socket.socket.send(data).await.context("Send UDP packet")?;
inc_server_udp_sent_count(&server.host, packet_type).await;
inc_server_udp_sent_count(&server.server, packet_type).await;
}
PacketType::PullData => {
info!(packet_type = %packet_type, "Sending UDP packet");
socket.pull_data_token = Some(random_token);
socket.socket.send(data).await.context("Send UDP packet")?;
inc_server_udp_sent_count(&server.host, packet_type).await;
inc_server_udp_sent_count(&server.server, packet_type).await;
}
PacketType::TxAck => {
if let Some(pull_resp_token) = socket.pull_resp_token {
if pull_resp_token == random_token {
info!(packet_type = %packet_type, "Sending UDP packet");
socket.pull_resp_token = None;
socket.socket.send(data).await.context("Send UDP packet")?;
inc_server_udp_sent_count(&server.host, packet_type).await;
inc_server_udp_sent_count(&server.server, packet_type).await;
}
}
}
Expand Down Expand Up @@ -243,12 +249,10 @@ async fn handle_downlink_packet(
}
}
PacketType::PullAck => {
let token = get_random_token(data)?;
info!(token = token, "PULL_DATA acknowledged");
// TODO: keep ack stats
}
PacketType::PushAck => {
let token = get_random_token(data)?;
info!(token = token, "PUSH_DATA acknowledged");
// TODO: keep ack stats
}

_ => {}
Expand All @@ -271,13 +275,13 @@ async fn handle_pull_resp(
}

async fn add_server(
host: String,
server: String,
uplink_only: bool,
gateway_id_prefixes: Vec<lrwn_filters::EuiPrefix>,
downlink_tx: UnboundedSender<(GatewayId, Vec<u8>)>,
) -> Result<()> {
info!(
host = host,
server = server,
uplink_only = uplink_only,
gateway_id_prefixes = ?gateway_id_prefixes,
"Adding server"
Expand All @@ -289,7 +293,7 @@ async fn add_server(

let mut servers = servers.write().await;
servers.push(Server {
host,
server,
uplink_only,
gateway_id_prefixes,
downlink_tx,
Expand All @@ -316,26 +320,26 @@ async fn cleanup_sockets() {
if duration < Duration::from_secs(60) {
true
} else {
warn!(server = server.host, gateway_id = %k, "Cleaning up inactive socket");
warn!(server = server.server, gateway_id = %k, "Cleaning up inactive socket");
false
}
} else {
warn!(server = server.host, gateway_id = %k, "Cleaning up inactive socket");
warn!(server = server.server, gateway_id = %k, "Cleaning up inactive socket");
false
}
});
}
}
}

async fn set_pull_resp_token(host: &str, gateway_id: GatewayId, token: u16) -> Result<()> {
async fn set_pull_resp_token(srv: &str, gateway_id: GatewayId, token: u16) -> Result<()> {
let servers = SERVERS
.get_or_init(|| async { RwLock::new(Vec::new()) })
.await;
let mut servers = servers.write().await;

for server in servers.iter_mut() {
if server.host.eq(host) {
if server.server.eq(srv) {
if let Some(v) = server.sockets.get_mut(&gateway_id) {
v.pull_resp_token = Some(token);
}
Expand Down
12 changes: 10 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::str::FromStr;

use clap::{Parser, Subcommand};
use signal_hook::{consts::SIGINT, consts::SIGTERM, iterator::Signals};
use tracing::info;
use tracing_subscriber::prelude::*;
use tracing::{info, Level};
use tracing_subscriber::{filter, prelude::*};

use chirpstack_packet_multiplexer::{cmd, config, forwarder, listener, monitoring};

Expand Down Expand Up @@ -31,8 +33,14 @@ async fn main() {
return;
}

let filter = filter::Targets::new().with_targets(vec![(
"chirpstack_packet_multiplexer",
Level::from_str(&config.logging.level).unwrap(),
)]);

tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(filter)
.init();

info!(
Expand Down

0 comments on commit d622b2d

Please sign in to comment.