Skip to content

Commit

Permalink
filter: enhancements in subscription management
Browse files Browse the repository at this point in the history
* waku_filter_v2: idiomatic way run periodic subscription manager
* filter subscriptions: add more debug logs
* filter: make sure the custom start and stop procs are called
* make sure filter protocol is started if it is mounted
* filter: dial push connection onsubscribe only
  • Loading branch information
Ivansete-status committed Dec 5, 2024
1 parent 1b532e8 commit 3af3b2b
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 83 deletions.
9 changes: 4 additions & 5 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -429,11 +429,10 @@ proc mountFilter*(
some(rateLimitSetting),
)

if node.started:
try:
await node.wakuFilter.start()
except CatchableError:
error "failed to start wakuFilter", error = getCurrentExceptionMsg()
try:
await node.wakuFilter.start()
except CatchableError:
error "failed to start wakuFilter", error = getCurrentExceptionMsg()

try:
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec))
Expand Down
27 changes: 20 additions & 7 deletions waku/waku_filter_v2/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

{.push raises: [].}

import std/options, chronicles, chronos, libp2p/protocols/protocol, bearssl/rand
import
std/options,
chronicles,
chronos,
libp2p/protocols/protocol,
bearssl/rand,
stew/byteutils
import
../node/peer_manager,
../node/delivery_monitor/subscriptions_observer,
Expand Down Expand Up @@ -101,6 +107,7 @@ proc sendSubscribeRequest(
proc ping*(
wfc: WakuFilterClient, servicePeer: RemotePeerInfo
): Future[FilterSubscribeResult] {.async.} =
debug "sending ping", servicePeer = shortLog($servicePeer)
let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.ping(requestId)

Expand Down Expand Up @@ -170,17 +177,23 @@ proc initProtocolHandler(wfc: WakuFilterClient) =
proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(int(DefaultMaxPushSize))

let decodeRes = MessagePush.decode(buf)
if decodeRes.isErr():
error "Failed to decode message push", peerId = conn.peerId
let msgPush = MessagePush.decode(buf).valueOr:
error "Failed to decode message push", peerId = conn.peerId, error = $error
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return

let messagePush = decodeRes.value #TODO: toAPI() split here
trace "Received message push", peerId = conn.peerId, messagePush
let msg_hash =
computeMessageHash(msgPush.pubsubTopic, msgPush.wakuMessage).to0xHex()

debug "Received message push",
peerId = conn.peerId,
msg_hash,
payload = shortLog(msgPush.wakuMessage.payload),
pubsubTopic = msgPush.pubsubTopic,
content_topic = msgPush.wakuMessage.contentTopic

for handler in wfc.pushHandlers:
asyncSpawn handler(messagePush.pubsubTopic, messagePush.wakuMessage)
asyncSpawn handler(msgPush.pubsubTopic, msgPush.wakuMessage)

# Protocol specifies no response for now
return
Expand Down
102 changes: 52 additions & 50 deletions waku/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ type WakuFilter* = ref object of LPProtocol
subscriptions*: FilterSubscriptions
# a mapping of peer ids to a sequence of filter criteria
peerManager: PeerManager
maintenanceTask: TimerCallback
messageCache: TimedCache[string]
peerRequestRateLimiter*: PerPeerRateLimiter
subscriptionsManagerFut: Future[void]

proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
trace "pinging subscriber", peerId = peerId
debug "pinging subscriber", peerId = peerId

if not wf.subscriptions.isSubscribed(peerId):
debug "pinging peer has no subscriptions", peerId = peerId
error "pinging peer has no subscriptions", peerId = peerId
return err(FilterSubscribeError.notFound())

wf.subscriptions.refreshSubscription(peerId)
Expand All @@ -45,14 +45,16 @@ proc subscribe(
peerId: PeerID,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
): FilterSubscribeResult =
): Future[FilterSubscribeResult] {.async.} =
# TODO: check if this condition is valid???
if pubsubTopic.isNone() or contentTopics.len == 0:
error "pubsubTopic and contentTopics must be specified", peerId = peerId
return err(
FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")
)

if contentTopics.len > MaxContentTopicsPerRequest:
error "exceeds maximum content topics", peerId = peerId
return err(
FilterSubscribeError.badRequest(
"exceeds maximum content topics: " & $MaxContentTopicsPerRequest
Expand All @@ -61,12 +63,14 @@ proc subscribe(

let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))

trace "subscribing peer to filter criteria",
debug "subscribing peer to filter criteria",
peerId = peerId, filterCriteria = filterCriteria

wf.subscriptions.addSubscription(peerId, filterCriteria).isOkOr:
(await wf.subscriptions.addSubscription(peerId, filterCriteria, wf.peerManager)).isOkOr:
return err(FilterSubscribeError.serviceUnavailable(error))

debug "correct subscription", peerId = peerId

ok()

proc unsubscribe(
Expand All @@ -76,11 +80,13 @@ proc unsubscribe(
contentTopics: seq[ContentTopic],
): FilterSubscribeResult =
if pubsubTopic.isNone() or contentTopics.len == 0:
error "pubsubTopic and contentTopics must be specified", peerId = peerId
return err(
FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")
)

if contentTopics.len > MaxContentTopicsPerRequest:
error "exceeds maximum content topics", peerId = peerId
return err(
FilterSubscribeError.badRequest(
"exceeds maximum content topics: " & $MaxContentTopicsPerRequest
Expand All @@ -93,27 +99,31 @@ proc unsubscribe(
peerId = peerId, filterCriteria = filterCriteria

wf.subscriptions.removeSubscription(peerId, filterCriteria).isOkOr:
error "failed to remove subscription", error = $error
return err(FilterSubscribeError.notFound())

## Note: do not remove from peerRequestRateLimiter to prevent trick with subscribe/unsubscribe loop
## We remove only if peerManager removes the peer
debug "correct unsubscription", peerId = peerId

ok()

proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
proc unsubscribeAll(
wf: WakuFilter, peerId: PeerID
): Future[FilterSubscribeResult] {.async.} =
if not wf.subscriptions.isSubscribed(peerId):
debug "unsubscribing peer has no subscriptions", peerId = peerId
return err(FilterSubscribeError.notFound())

debug "removing peer subscription", peerId = peerId
wf.subscriptions.removePeer(peerId)
await wf.subscriptions.removePeer(peerId)
wf.subscriptions.cleanUp()

ok()

proc handleSubscribeRequest*(
wf: WakuFilter, peerId: PeerId, request: FilterSubscribeRequest
): FilterSubscribeResponse =
): Future[FilterSubscribeResponse] {.async.} =
info "received filter subscribe request", peerId = peerId, request = request
waku_filter_requests.inc(labelValues = [$request.filterSubscribeType])

Expand All @@ -127,12 +137,13 @@ proc handleSubscribeRequest*(
of FilterSubscribeType.SUBSCRIBER_PING:
subscribeResult = wf.pingSubscriber(peerId)
of FilterSubscribeType.SUBSCRIBE:
subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
subscribeResult =
await wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE:
subscribeResult =
wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE_ALL:
subscribeResult = wf.unsubscribeAll(peerId)
subscribeResult = await wf.unsubscribeAll(peerId)

let
requestDuration = Moment.now() - requestStartTime
Expand All @@ -143,6 +154,7 @@ proc handleSubscribeRequest*(
)

if subscribeResult.isErr():
error "subscription request error", peerId = shortLog(peerId), request = request
return FilterSubscribeResponse(
requestId: request.requestId,
statusCode: subscribeResult.error.kind.uint32,
Expand All @@ -152,22 +164,19 @@ proc handleSubscribeRequest*(
return FilterSubscribeResponse.ok(request.requestId)

proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
trace "pushing message to subscribed peer", peer_id = shortLog(peer)
debug "pushing message to subscribed peer", peerId = shortLog(peer)

if not wf.peerManager.wakuPeerStore.hasPeer(peer, WakuFilterPushCodec):
# Check that peer has not been removed from peer store
error "no addresses for peer", peer_id = shortLog(peer)
error "no addresses for peer", peerId = shortLog(peer)
return

## TODO: Check if dial is necessary always???
let conn = await wf.peerManager.dialPeer(peer, WakuFilterPushCodec)
if conn.isNone():
## We do not remove this peer, but allow the underlying peer manager
## to do so if it is deemed necessary
error "no connection to peer", peer_id = shortLog(peer)
let conn = wf.subscriptions.getConnectionByPeerId(peer).valueOr:
error "could not get connection by peer id", error = $error
return

await conn.get().writeLp(buffer)
await conn.writeLp(buffer)
debug "published successful", peerId = shortLog(peer)
waku_service_network_bytes.inc(
amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"]
)
Expand All @@ -181,15 +190,17 @@ proc pushToPeers(

## it's also refresh expire of msghash, that's why update cache every time, even if it has a value.
if wf.messageCache.put(msgHash, Moment.now()):
notice "duplicate message found, not-pushing message to subscribed peers",
error "duplicate message found, not-pushing message to subscribed peers",
pubsubTopic = messagePush.pubsubTopic,
contentTopic = messagePush.wakuMessage.contentTopic,
payload = shortLog(messagePush.wakuMessage.payload),
target_peer_ids = targetPeerIds,
msg_hash = msgHash
else:
notice "pushing message to subscribed peers",
pubsubTopic = messagePush.pubsubTopic,
contentTopic = messagePush.wakuMessage.contentTopic,
payload = shortLog(messagePush.wakuMessage.payload),
target_peer_ids = targetPeerIds,
msg_hash = msgHash

Expand All @@ -201,19 +212,19 @@ proc pushToPeers(
pushFuts.add(pushFut)
await allFutures(pushFuts)

proc maintainSubscriptions*(wf: WakuFilter) =
trace "maintaining subscriptions"
proc maintainSubscriptions*(wf: WakuFilter) {.async.} =
debug "maintaining subscriptions"

## Remove subscriptions for peers that have been removed from peer store
var peersToRemove: seq[PeerId]
for peerId in wf.subscriptions.peersSubscribed.keys:
if not wf.peerManager.wakuPeerStore.hasPeer(peerId, WakuFilterPushCodec):
debug "peer has been removed from peer store, removing subscription",
debug "peer has been removed from peer store, we will remove subscription",
peerId = peerId
peersToRemove.add(peerId)

if peersToRemove.len > 0:
wf.subscriptions.removePeers(peersToRemove)
await wf.subscriptions.removePeers(peersToRemove)
wf.peerRequestRateLimiter.unregister(peersToRemove)

wf.subscriptions.cleanUp()
Expand All @@ -227,7 +238,7 @@ proc handleMessage*(
) {.async.} =
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()

trace "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash
debug "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash

let handleMessageStartTime = Moment.now()

Expand All @@ -236,7 +247,7 @@ proc handleMessage*(
let subscribedPeers =
wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
if subscribedPeers.len == 0:
trace "no subscribed peers found",
error "no subscribed peers found",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
msg_hash = msgHash
Expand Down Expand Up @@ -270,7 +281,7 @@ proc handleMessage*(

proc initProtocolHandler(wf: WakuFilter) =
proc handler(conn: Connection, proto: string) {.async.} =
trace "filter subscribe request handler triggered", peer_id = shortLog(conn.peerId)
debug "filter subscribe request handler triggered", peerId = shortLog(conn.peerId)

var response: FilterSubscribeResponse

Expand All @@ -290,13 +301,13 @@ proc initProtocolHandler(wf: WakuFilter) =

let request = decodeRes.value #TODO: toAPI() split here

response = wf.handleSubscribeRequest(conn.peerId, request)
response = await wf.handleSubscribeRequest(conn.peerId, request)

debug "sending filter subscribe response",
peer_id = shortLog(conn.peerId), response = response
do:
debug "filter request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $wf.peerRequestRateLimiter.setting
peerId = shortLog(conn.peerId), limit = $wf.peerRequestRateLimiter.setting
response = FilterSubscribeResponse(
requestId: "N/A",
statusCode: FilterSubscribeErrorKind.TOO_MANY_REQUESTS.uint32,
Expand All @@ -319,7 +330,7 @@ proc new*(
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
let wf = WakuFilter(
subscriptions: FilterSubscriptions.init(
subscriptions: FilterSubscriptions.new(
subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer
),
peerManager: peerManager,
Expand All @@ -331,28 +342,19 @@ proc new*(
setServiceLimitMetric(WakuFilterSubscribeCodec, rateLimitSetting)
return wf

const MaintainSubscriptionsInterval* = 1.minutes

proc startMaintainingSubscriptions(wf: WakuFilter, interval: Duration) =
trace "starting to maintain subscriptions"
var maintainSubs: CallbackFunc
maintainSubs = CallbackFunc(
proc(udata: pointer) {.gcsafe.} =
maintainSubscriptions(wf)
wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs)
)

wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs)
proc periodicSubscriptionsMaintenance(wf: WakuFilter) {.async.} =
const MaintainSubscriptionsInterval = 1.minutes
debug "starting to maintain subscriptions"
while true:
await wf.maintainSubscriptions()
await sleepAsync(MaintainSubscriptionsInterval)

method start*(wf: WakuFilter) {.async, base.} =
proc start*(wf: WakuFilter) {.async.} =
debug "starting filter protocol"
wf.startMaintainingSubscriptions(MaintainSubscriptionsInterval)

await procCall LPProtocol(wf).start()
wf.subscriptionsManagerFut = wf.periodicSubscriptionsMaintenance()

method stop*(wf: WakuFilter) {.async, base.} =
proc stop*(wf: WakuFilter) {.async.} =
debug "stopping filter protocol"
if not wf.maintenanceTask.isNil():
wf.maintenanceTask.clearTimer()

await wf.subscriptionsManagerFut.cancelAndWait()
await procCall LPProtocol(wf).stop()
Loading

0 comments on commit 3af3b2b

Please sign in to comment.