Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: lite-protocol-tester receiver exit check #3187

Merged
merged 8 commits into from
Dec 7, 2024
5 changes: 3 additions & 2 deletions apps/liteprotocoltester/diagnose_connections.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ proc allPeers(pm: PeerManager): string =
var allStr: string = ""
for idx, peer in pm.wakuPeerStore.peers():
allStr.add(
" " & $idx & ". | " & constructMultiaddrStr(peer) & " | protos: " &
$peer.protocols & " | caps: " & $peer.enr.map(getCapabilities) & "\n"
" " & $idx & ". | " & constructMultiaddrStr(peer) & " | agent: " &
peer.getAgent() & " | protos: " & $peer.protocols & " | caps: " &
$peer.enr.map(getCapabilities) & "\n"
)
return allStr

Expand Down
12 changes: 10 additions & 2 deletions apps/liteprotocoltester/filter_subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ proc maintainSubscription(

if subscribeRes.isErr():
noFailedSubscribes += 1
lpt_service_peer_failure_count.inc(labelValues = ["receiver"])
lpt_service_peer_failure_count.inc(
labelValues = ["receiver", actualFilterPeer.getAgent()]
)
error "Subscribe request failed.",
err = subscribeRes.error,
peer = actualFilterPeer,
Expand Down Expand Up @@ -150,11 +152,17 @@ proc setupAndSubscribe*(
let interval = millis(20000)
var printStats: CallbackFunc

# calculate max wait after the last known message arrived before exiting
# 20% of expected messages times the expected interval but capped to 10min
let maxWaitForLastMessage: Duration =
min(conf.messageInterval.milliseconds * (conf.numMessages div 5), 10.minutes)

printStats = CallbackFunc(
proc(udata: pointer) {.gcsafe.} =
stats.echoStats()

if conf.numMessages > 0 and waitFor stats.checkIfAllMessagesReceived():
if conf.numMessages > 0 and
waitFor stats.checkIfAllMessagesReceived(maxWaitForLastMessage):
waitFor unsubscribe(wakuNode, conf.pubsubTopics[0], conf.contentTopics[0])
info "All messages received. Exiting."

Expand Down
2 changes: 1 addition & 1 deletion apps/liteprotocoltester/infra.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ MESSAGE_INTERVAL_MILLIS=1000
MIN_MESSAGE_SIZE=15Kb
MAX_MESSAGE_SIZE=145Kb
PUBSUB=/waku/2/rs/16/32
CONTENT_TOPIC=/tester/2/light-pubsub-test/fleet
CONTENT_TOPIC=/tester/2/light-pubsub-test-at-infra/status-prod
CLUSTER_ID=16
LIGHTPUSH_BOOTSTRAP=enr:-QEKuED9AJm2HGgrRpVaJY2nj68ao_QiPeUT43sK-aRM7sMJ6R4G11OSDOwnvVacgN1sTw-K7soC5dzHDFZgZkHU0u-XAYJpZIJ2NIJpcISnYxMvim11bHRpYWRkcnO4WgAqNiVib290LTAxLmRvLWFtczMuc3RhdHVzLnByb2Quc3RhdHVzLmltBnZfACw2JWJvb3QtMDEuZG8tYW1zMy5zdGF0dXMucHJvZC5zdGF0dXMuaW0GAbveA4Jyc40AEAUAAQAgAEAAgAEAiXNlY3AyNTZrMaEC3rRtFQSgc24uWewzXaxTY8hDAHB8sgnxr9k8Rjb5GeSDdGNwgnZfg3VkcIIjKIV3YWt1Mg0
FILTER_BOOTSTRAP=enr:-QEcuED7ww5vo2rKc1pyBp7fubBUH-8STHEZHo7InjVjLblEVyDGkjdTI9VdqmYQOn95vuQH-Htku17WSTzEufx-Wg4mAYJpZIJ2NIJpcIQihw1Xim11bHRpYWRkcnO4bAAzNi5ib290LTAxLmdjLXVzLWNlbnRyYWwxLWEuc3RhdHVzLnByb2Quc3RhdHVzLmltBnZfADU2LmJvb3QtMDEuZ2MtdXMtY2VudHJhbDEtYS5zdGF0dXMucHJvZC5zdGF0dXMuaW0GAbveA4Jyc40AEAUAAQAgAEAAgAEAiXNlY3AyNTZrMaECxjqgDQ0WyRSOilYU32DA5k_XNlDis3m1VdXkK9xM6kODdGNwgnZfg3VkcIIjKIV3YWt1Mg0
4 changes: 3 additions & 1 deletion apps/liteprotocoltester/lightpush_publisher.nim
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ proc publishMessages(
continue
else:
noFailedPush += 1
lpt_service_peer_failure_count.inc(labelValues = ["publisher"])
lpt_service_peer_failure_count.inc(
labelValues = ["publisher", actualServicePeer.getAgent()]
)
if not preventPeerSwitch and noFailedPush > maxFailedPush:
info "Max push failure limit reached, Try switching peer."
let peerOpt = selectRandomServicePeer(
Expand Down
6 changes: 3 additions & 3 deletions apps/liteprotocoltester/lpt_metrics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ declarePublicCounter lpt_publisher_failed_messages_count,
declarePublicCounter lpt_publisher_sent_bytes, "number of total bytes sent"

declarePublicCounter lpt_service_peer_failure_count,
"number of failure during using service peer [publisher/receiever]", ["role"]
"number of failure during using service peer [publisher/receiever]", ["role", "agent"]

declarePublicCounter lpt_change_service_peer_count,
"number of times [publisher/receiver] had to change service peer", ["role"]

declarePublicGauge lpt_px_peers,
"Number of peers PeerExchange discovered and can be dialed"

declarePublicGauge lpt_dialed_peers, "Number of peers successfully dialed"
declarePublicGauge lpt_dialed_peers, "Number of peers successfully dialed", ["agent"]

declarePublicGauge lpt_dial_failures, "Number of dial failures by cause"
declarePublicGauge lpt_dial_failures, "Number of dial failures by cause", ["agent"]
24 changes: 16 additions & 8 deletions apps/liteprotocoltester/service_peer_management.nim
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,29 @@ proc tryCallAllPxPeers*(
if connOpt.value().isSome():
okPeers.add(randomPeer)
info "Dialing successful",
peer = constructMultiaddrStr(randomPeer), codec = codec
lpt_dialed_peers.inc()
peer = constructMultiaddrStr(randomPeer),
agent = randomPeer.getAgent(),
codec = codec
lpt_dialed_peers.inc(labelValues = [randomPeer.getAgent()])
else:
lpt_dial_failures.inc()
error "Dialing failed", peer = constructMultiaddrStr(randomPeer), codec = codec
lpt_dial_failures.inc(labelValues = [randomPeer.getAgent()])
error "Dialing failed",
peer = constructMultiaddrStr(randomPeer),
agent = randomPeer.getAgent(),
codec = codec
else:
lpt_dial_failures.inc()
lpt_dial_failures.inc(labelValues = [randomPeer.getAgent()])
error "Timeout dialing service peer",
peer = constructMultiaddrStr(randomPeer), codec = codec
peer = constructMultiaddrStr(randomPeer),
agent = randomPeer.getAgent(),
codec = codec

var okPeersStr: string = ""
for idx, peer in okPeers:
okPeersStr.add(
" " & $idx & ". | " & constructMultiaddrStr(peer) & " | protos: " &
$peer.protocols & " | caps: " & $peer.enr.map(getCapabilities) & "\n"
" " & $idx & ". | " & constructMultiaddrStr(peer) & " | agent: " &
peer.getAgent() & " | protos: " & $peer.protocols & " | caps: " &
$peer.enr.map(getCapabilities) & "\n"
)
echo "PX returned peers found callable for " & codec & " / " & $capability & ":\n"
echo okPeersStr
Expand Down
39 changes: 38 additions & 1 deletion apps/liteprotocoltester/statistics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ proc addMessage*(

lpt_receiver_sender_peer_count.set(value = self.len)

proc lastMessageArrivedAt*(self: Statistics): Result[Moment, void] =
if self.receivedMessages > 0:
return ok(self.helper.prevArrivedAt)
return err()

proc lossCount*(self: Statistics): uint32 =
self.helper.maxIndex - self.receivedMessages

Expand Down Expand Up @@ -274,16 +279,48 @@ proc jsonStats*(self: PerPeerStatistics): string =
"{\"result:\": \"Error while generating json stats: " & getCurrentExceptionMsg() &
"\"}"

proc checkIfAllMessagesReceived*(self: PerPeerStatistics): Future[bool] {.async.} =
proc lastMessageArrivedAt*(self: PerPeerStatistics): Result[Moment, void] =
var lastArrivedAt = Moment.init(0, Millisecond)
for stat in self.values:
let lastMsgFromPeerAt = stat.lastMessageArrivedAt().valueOr:
continue

if lastMsgFromPeerAt > lastArrivedAt:
lastArrivedAt = lastMsgFromPeerAt

if lastArrivedAt == Moment.init(0, Millisecond):
return err()

return ok(lastArrivedAt)

proc checkIfAllMessagesReceived*(
self: PerPeerStatistics, maxWaitForLastMessage: Duration
): Future[bool] {.async.} =
# if there are no peers have sent messages, assume we just have started.
if self.len == 0:
return false

# check if numerically all messages are received.
# this suggest we received at least one message already from one peer
var isAlllMessageReceived = true
for stat in self.values:
if (stat.allMessageCount == 0 and stat.receivedMessages == 0) or
stat.helper.maxIndex < stat.allMessageCount:
isAlllMessageReceived = false
break

if not isAlllMessageReceived:
# if not all message received we still need to check if last message arrived within a time frame
# to avoid endless waiting while publishers are already quit.
let lastMessageAt = self.lastMessageArrivedAt().valueOr:
return false

# last message shall arrived within time limit
if Moment.now() - lastMessageAt < maxWaitForLastMessage:
return false
else:
info "No message since max wait time", maxWait = $maxWaitForLastMessage

## Ok, we see last message arrived from all peers,
## lets check if all messages are received
## and if not let's wait another 20 secs to give chance the system will send them.
Expand Down
7 changes: 7 additions & 0 deletions waku/waku_core/peers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,10 @@ func hasUdpPort*(peer: RemotePeerInfo): bool =

let typedEnr = typedEnrRes.get()
typedEnr.udp.isSome() or typedEnr.udp6.isSome()

proc getAgent*(peer: RemotePeerInfo): string =
## Returns the agent version of a peer
if peer.agent.isEmptyOrWhitespace():
return "unknown"

return peer.agent
Loading