Skip to content

Commit

Permalink
full content & pubsub topic support
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Feb 11, 2025
1 parent 1002828 commit 6c222cf
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 151 deletions.
3 changes: 2 additions & 1 deletion waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ proc setupProtocols(
if conf.storeSync:
(
await node.mountStoreSync(
conf.storeSyncRange, conf.storeSyncInterval, conf.storeSyncRelayJitter
conf.clusterId, conf.shards, conf.contentTopics, conf.storeSyncRange,
conf.storeSyncInterval, conf.storeSyncRelayJitter,
)
).isOkOr:
return err("failed to mount waku store sync protocol: " & $error)
Expand Down
21 changes: 8 additions & 13 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -208,29 +208,24 @@ proc mountSharding*(

proc mountStoreSync*(
node: WakuNode,
cluster: uint16,
shards: seq[uint16],
contentTopics: seq[string],
storeSyncRange = 3600,
storeSyncInterval = 300,
storeSyncRelayJitter = 20,
): Future[Result[void, string]] {.async.} =
let idsChannel = newAsyncQueue[(SyncID, uint16)](100)
let idsChannel = newAsyncQueue[(SyncID, PubsubTopic, ContentTopic)](100)
let wantsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](100)
let needsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](100)

var cluster: uint16
var shards: seq[uint16]
let enrRes = node.enr.toTyped()
if enrRes.isOk():
let shardingRes = enrRes.get().relaySharding()
if shardingRes.isSome():
let relayShard = shardingRes.get()
cluster = relayShard.clusterID
shards = relayShard.shardIds
let pubsubTopics = shards.mapIt($RelayShard(clusterId: cluster, shardId: it))

let recon =
?await SyncReconciliation.new(
cluster, shards, node.peerManager, node.wakuArchive, storeSyncRange.seconds,
storeSyncInterval.seconds, storeSyncRelayJitter.seconds, idsChannel, wantsChannel,
needsChannel,
cluster, pubsubTopics, contentTopics, node.peerManager, node.wakuArchive,
storeSyncRange.seconds, storeSyncInterval.seconds, storeSyncRelayJitter.seconds,
idsChannel, wantsChannel, needsChannel,
)

node.wakuStoreReconciliation = recon
Expand Down
15 changes: 9 additions & 6 deletions waku/waku_store_sync/codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ proc deltaEncode*(value: RangesData): seq[byte] =
buf = uint64(value.cluster).toBytes(Leb128)
output &= @buf

# encode shards
buf = uint64(value.shards.len).toBytes(Leb128)
# TODO encode topics
#[ buf = uint64(value.shards.len).toBytes(Leb128)
output &= @buf
for shard in value.shards:
buf = uint64(shard).toBytes(Leb128)
output &= @buf
output &= @buf ]#

# the first range is implicit but must be explicit when encoded
let (bound, _) = value.ranges[0]
Expand Down Expand Up @@ -231,7 +231,9 @@ proc getCluster(idx: var int, buffer: seq[byte]): Result[uint16, string] =

return ok(uint16(val))

proc getShards(idx: var int, buffer: seq[byte]): Result[seq[uint16], string] =
#[ proc getShards(idx: var int, buffer: seq[byte]): Result[seq[uint16], string] =
#TODO switch to topics
if idx + VarIntLen > buffer.len:
return err("Cannot decode shards count")
Expand All @@ -251,7 +253,7 @@ proc getShards(idx: var int, buffer: seq[byte]): Result[seq[uint16], string] =
shards.add(uint16(val))
return ok(shards)
return ok(shards) ]#

proc deltaDecode*(
itemSet: var ItemSet, buffer: seq[byte], setLength: int
Expand Down Expand Up @@ -295,7 +297,8 @@ proc deltaDecode*(T: type RangesData, buffer: seq[byte]): Result[T, string] =
idx = 0

payload.cluster = ?getCluster(idx, buffer)
payload.shards = ?getShards(idx, buffer)
#TODO decode topics
#payload.shards = ?getShards(idx, buffer)

lastTime = ?getTimestamp(idx, buffer)

Expand Down
3 changes: 2 additions & 1 deletion waku/waku_store_sync/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ type

RangesData* = object
cluster*: uint16
shards*: seq[uint16]
pubsubTopics*: seq[PubsubTopic]
contentTopics*: seq[ContentTopic]

ranges*: seq[(Slice[SyncID], RangeType)]
fingerprints*: seq[Fingerprint] # Range type fingerprint stored here in order
Expand Down
104 changes: 57 additions & 47 deletions waku/waku_store_sync/reconciliation.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.push raises: [].}

import
std/[sequtils, options, packedsets],
std/[sequtils, options, packedsets, sets],
stew/byteutils,
results,
chronicles,
Expand All @@ -20,6 +20,7 @@ import
../waku_core/codecs,
../waku_core/time,
../waku_core/topics/pubsub_topic,
../waku_core/topics/content_topic,
../waku_core/message/digest,
../waku_core/message/message,
../node/peer_manager/peer_manager,
Expand All @@ -38,7 +39,8 @@ const DefaultStorageCap = 50_000

type SyncReconciliation* = ref object of LPProtocol
cluster: uint16
shards: PackedSet[uint16]
pubsubTopics: HashSet[PubsubTopic]
contentTopics: HashSet[ContentTopic]

peerManager: PeerManager

Expand All @@ -47,7 +49,7 @@ type SyncReconciliation* = ref object of LPProtocol
storage: SyncStorage

# Receive IDs from transfer protocol for storage
idsRx: AsyncQueue[(SyncID, uint16)]
idsRx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)]

# Send Hashes to transfer protocol for reception
localWantsTx: AsyncQueue[(PeerId, WakuMessageHash)]
Expand All @@ -72,32 +74,31 @@ proc messageIngress*(

let id = SyncID(time: msg.timestamp, hash: msgHash)

let parseRes = RelayShard.parse(pubsubTopic)

if parseRes.isErr():
return err("failed to parse pubsub topic: " & $pubsubTopic)

let shard = parseRes.get().shardId

self.storage.insert(id, shard).isOkOr:
self.storage.insert(id, pubsubTopic, msg.contentTopic).isOkOr:
return err(
"failed to insert new message: msg_hash: " & $msgHash.toHex() & " error: " & $error
)

proc messageIngress*(
self: SyncReconciliation, msgHash: WakuMessageHash, msg: WakuMessage, shard: uint16
self: SyncReconciliation,
msgHash: WakuMessageHash,
pubsubTopic: PubsubTopic,
msg: WakuMessage,
): Result[void, string] =
let id = SyncID(time: msg.timestamp, hash: msgHash)

self.storage.insert(id, shard).isOkOr:
self.storage.insert(id, pubsubTopic, msg.contentTopic).isOkOr:
return err(
"failed to insert new message: msg_hash: " & $id.hash.toHex() & " error: " & $error
)

proc messageIngress*(
self: SyncReconciliation, id: SyncID, shard: uint16
self: SyncReconciliation,
id: SyncID,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic,
): Result[void, string] =
self.storage.insert(id, shard).isOkOr:
self.storage.insert(id, pubsubTopic, contentTopic).isOkOr:
return err(
"failed to insert new message: msg_hash: " & $id.hash.toHex() & " error: " & $error
)
Expand All @@ -112,12 +113,21 @@ proc preProcessPayload(
if self.cluster != payload.cluster:
return none(RangesData)

let shardsIntersection = self.shards * payload.shards.toPackedSet()
if payload.pubsubTopics.len > 0:
let pubsubIntersection = self.pubsubTopics * payload.pubsubTopics.toHashSet()

if shardsIntersection.len < 1:
return none(RangesData)
if pubsubIntersection.len < 1:
return none(RangesData)

payload.pubsubTopics = pubsubIntersection.toSeq()

if payload.contentTopics.len > 0:
let contentIntersection = self.contentTopics * payload.contentTopics.toHashSet()

payload.shards = shardsIntersection.toSeq()
if contentIntersection.len < 1:
return none(RangesData)

payload.contentTopics = contentIntersection.toSeq()

let timeRange = calculateTimeRange(self.relayJitter, self.syncRange)
let selfLowerBound = timeRange.a
Expand Down Expand Up @@ -179,11 +189,16 @@ proc processRequest(
if preProcessedPayloadRes.isSome():
let preProcessedPayload = preProcessedPayloadRes.get()

sendPayload =
self.storage.processPayload(preProcessedPayload, hashToSend, hashToRecv)
sendPayload = self.storage.processPayload(
preProcessedPayload.cluster, preProcessedPayload.pubsubTopics,
preProcessedPayload.contentTopics, preProcessedPayload.ranges,
preProcessedPayload.fingerprints, preProcessedPayload.itemSets, hashToSend,
hashToRecv,
)

sendPayload.cluster = self.cluster
sendPayload.shards = self.shards.toSeq()
sendPayload.pubsubTopics = self.pubsubTopics.toSeq()
sendPayload.contentTopics = self.contentTopics.toSeq()

for hash in hashToSend:
await self.remoteNeedsTx.addLast((conn.peerId, hash))
Expand Down Expand Up @@ -228,10 +243,13 @@ proc initiate(
upper = SyncID(time: timeRange.b, hash: FullFingerprint)
bounds = lower .. upper

fingerprint = self.storage.computeFingerprint(bounds, self.shards)
fingerprint = self.storage.computeFingerprint(
bounds, self.pubsubTopics.toSeq(), self.contentTopics.toSeq()
)
initPayload = RangesData(
cluster: self.cluster,
shards: self.shards.toSeq(),
pubsubTopics: self.pubsubTopics.toSeq(),
contentTopics: self.contentTopics.toSeq(),
ranges: @[(bounds, RangeType.Fingerprint)],
fingerprints: @[fingerprint],
itemSets: @[],
Expand Down Expand Up @@ -286,7 +304,7 @@ proc storeSynchronization*(

proc initFillStorage(
syncRange: timer.Duration, wakuArchive: WakuArchive
): Future[Result[(seq[SyncID], seq[uint16]), string]] {.async.} =
): Future[Result[SeqStorage, string]] {.async.} =
if wakuArchive.isNil():
return err("waku archive unavailable")

Expand All @@ -306,8 +324,7 @@ proc initFillStorage(

debug "initial storage filling started"

var ids = newSeq[SyncID](DefaultStorageCap)
var shards = newSeq[uint16](DefaultStorageCap)
var storage = SeqStorage.new(DefaultStorageCap)

# we assume IDs are in order

Expand All @@ -318,38 +335,31 @@ proc initFillStorage(
for i in 0 ..< response.hashes.len:
let hash = response.hashes[i]
let msg = response.messages[i]
let topic = response.topics[i]

let parseRes = RelayShard.parse(topic)

if parseRes.isErr():
error "failed to parse pubsub topic", pubsubTopic = topic
continue

let shard = parseRes.get().shardId
let pubsubTopic = response.topics[i]

ids.add(SyncID(time: msg.timestamp, hash: hash))
shards.add(shard)
let id = SyncID(time: msg.timestamp, hash: hash)
discard storage.insert(id, pubsubTopic, msg.contentTopic)

if response.cursor.isNone():
break

query.cursor = response.cursor

debug "initial storage filling done", elements = ids.len
debug "initial storage filling done", elements = storage.length()

return ok((ids, shards))
return ok(storage)

proc new*(
T: type SyncReconciliation,
cluster: uint16,
shards: seq[uint16],
pubsubTopics: seq[PubSubTopic],
contentTopics: seq[ContentTopic],
peerManager: PeerManager,
wakuArchive: WakuArchive,
syncRange: timer.Duration = DefaultSyncRange,
syncInterval: timer.Duration = DefaultSyncInterval,
relayJitter: timer.Duration = DefaultGossipSubJitter,
idsRx: AsyncQueue[(SyncID, uint16)],
idsRx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)],
localWantsTx: AsyncQueue[(PeerId, WakuMessageHash)],
remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)],
): Future[Result[T, string]] {.async.} =
Expand All @@ -359,12 +369,12 @@ proc new*(
warn "will not sync messages before this point in time", error = res.error
SeqStorage.new(DefaultStorageCap)
else:
let (ele, shar) = res.get()
SeqStorage.new(ele, shar)
res.get()

var sync = SyncReconciliation(
cluster: cluster,
shards: shards.toPackedSet(),
pubsubTopics: pubsubTopics.toHashSet(),
contentTopics: contentTopics.toHashSet(),
peerManager: peerManager,
storage: storage,
syncRange: syncRange,
Expand Down Expand Up @@ -421,9 +431,9 @@ proc periodicPrune(self: SyncReconciliation) {.async.} =

proc idsReceiverLoop(self: SyncReconciliation) {.async.} =
while true: # infinite loop
let (id, shard) = await self.idsRx.popfirst()
let (id, pubsub, content) = await self.idsRx.popfirst()

self.messageIngress(id, shard).isOkOr:
self.messageIngress(id, pubsub, content).isOkOr:
error "message ingress failed", error = error

proc start*(self: SyncReconciliation) =
Expand Down
Loading

0 comments on commit 6c222cf

Please sign in to comment.