diff --git a/waku/nwaku_test_utils.go b/waku/nwaku_test_utils.go index 5b1bc96..4d80b1c 100644 --- a/waku/nwaku_test_utils.go +++ b/waku/nwaku_test_utils.go @@ -12,6 +12,7 @@ import ( "time" "github.com/cenkalti/backoff/v3" + "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/waku-go-bindings/waku/common" "google.golang.org/protobuf/proto" @@ -215,3 +216,42 @@ func SubscribeNodesToTopic(nodes []*WakuNode, topic string) error { } return nil } + +func (n *WakuNode) GetStoredMessages(storeNode *WakuNode, storeRequest *common.StoreQueryRequest) (*common.StoreQueryResponse, error) { + Debug("Starting store query request") + + if storeRequest == nil { + Debug("Using DefaultStoreQueryRequest") + storeRequest = &DefaultStoreQueryRequest + } + + storeMultiaddr, err := storeNode.ListenAddresses() + if err != nil { + Error("Failed to retrieve listen addresses for store node: %v", err) + return nil, err + } + + if len(storeMultiaddr) == 0 { + Error("Store node has no available listen addresses") + return nil, fmt.Errorf("store node has no available listen addresses") + } + + storeNodeAddrInfo, err := peer.AddrInfoFromString(storeMultiaddr[0].String()) + if err != nil { + Error("Failed to convert store node address to AddrInfo: %v", err) + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + Debug("Querying store node for messages") + res, err := n.StoreQuery(ctx, storeRequest, *storeNodeAddrInfo) + if err != nil { + Error("StoreQuery failed: %v", err) + return nil, err + } + + Debug("Store query successful, retrieved %d messages", len(*res.Messages)) + return res, nil +} diff --git a/waku/store_test.go b/waku/store_test.go new file mode 100644 index 0000000..4e3ce68 --- /dev/null +++ b/waku/store_test.go @@ -0,0 +1,1028 @@ +package waku + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + + "github.com/waku-org/waku-go-bindings/waku/common" + "google.golang.org/protobuf/proto" +) + +func TestStoreQuery3Nodes(t *testing.T) { + Debug("Starting test to verify store query from a peer using direct peer connections") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + Debug("Publishing message from Node1 using RelayPublish") + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte("test-message"), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + queryTimestamp := proto.Int64(time.Now().UnixNano()) + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + Debug("Waiting for message delivery to Node2") + time.Sleep(2 * time.Second) + + Debug("Verifying that Node2 received the message") + err = node2.VerifyMessageReceived(message, msgHash) + require.NoError(t, err, "Node2 should have received the message") + + Debug("Node3 querying stored messages from Node2") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + } + res, err := node3.GetStoredMessages(node2, storeQueryRequest) + var storedMessages = *res.Messages + require.NoError(t, err, "Failed to retrieve stored messages from Node2") + require.NotEmpty(t, storedMessages, "Expected at least one stored message") + Debug("Verifying stored message matches the published message") + require.Equal(t, message.Payload, storedMessages[0].WakuMessage.Payload, "Stored message payload does not match") + Debug("Test successfully verified store query from a peer using direct peer connections") +} + +func TestStoreQueryMultipleMessages(t *testing.T) { + Debug("Starting test to verify store query with multiple messages") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + numMessages := 50 + var sentHashes []common.MessageHash + defaultPubsubTopic := DefaultPubsubTopic + + Debug("Publishing %d messages from Node1 using RelayPublish", numMessages) + for i := 0; i < numMessages; i++ { + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte(fmt.Sprintf("message-%d", i)), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + sentHashes = append(sentHashes, msgHash) + } + + Debug("Waiting for message delivery to Node2") + time.Sleep(5 * time.Second) + + Debug("Node3 querying stored messages from Node2") + res, err := node3.GetStoredMessages(node2, nil) + require.NoError(t, err, "Failed to retrieve stored messages from Node2") + require.NotNil(t, res.Messages, "Expected stored messages but received nil") + + storedMessages := *res.Messages + + require.Len(t, storedMessages, numMessages, "Expected to retrieve exactly 50 messages") + + Debug("Verifying stored message hashes match sent message hashes") + var receivedHashes []common.MessageHash + for _, storedMsg := range storedMessages { + receivedHashes = append(receivedHashes, storedMsg.MessageHash) + } + + require.ElementsMatch(t, sentHashes, receivedHashes, "Sent and received message hashes do not match") + + Debug("Test successfully verified store query with multiple messages") +} + +func TestStoreQueryWith5Pagination(t *testing.T) { + Debug("Starting test to verify store query with pagination") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + numMessages := 10 + defaultPubsubTopic := DefaultPubsubTopic + + Debug("Publishing %d messages from Node1 using RelayPublish", numMessages) + for i := 0; i < numMessages; i++ { + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte(fmt.Sprintf("message-%d", i)), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + _, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + } + + Debug("Waiting for message delivery to Node2") + time.Sleep(5 * time.Second) + + Debug("Node3 querying stored messages from Node2 with PaginationLimit = 5") + storeRequest := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(uint64(5)), + PaginationForward: true, + } + + res, err := node3.GetStoredMessages(node2, &storeRequest) + require.NoError(t, err, "Failed to retrieve stored messages from Node2") + require.NotNil(t, res.Messages, "Expected stored messages but received nil") + + storedMessages := *res.Messages + + require.Len(t, storedMessages, 5, "Expected to retrieve exactly 5 messages due to pagination limit") + + Debug("Test successfully verified store query with pagination limit") +} + +func TestStoreQueryWithPaginationMultiplePages(t *testing.T) { + Debug("Starting test to verify store query with pagination across multiple pages") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + numMessages := 8 + var sentHashes []common.MessageHash + defaultPubsubTopic := DefaultPubsubTopic + + Debug("Publishing %d messages from Node1 using RelayPublish", numMessages) + for i := 0; i < numMessages; i++ { + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte(fmt.Sprintf("message-%d", i)), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + sentHashes = append(sentHashes, msgHash) + } + + Debug("Waiting for message delivery to Node2") + time.Sleep(5 * time.Second) + + Debug("Node3 querying first page of stored messages from Node2") + storeRequest1 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(5), + PaginationForward: true, + } + + res1, err := node3.GetStoredMessages(node2, &storeRequest1) + require.NoError(t, err, "Failed to retrieve first page of stored messages from Node2") + require.NotNil(t, res1.Messages, "Expected stored messages but received nil") + + storedMessages1 := *res1.Messages + require.Len(t, storedMessages1, 5, "Expected to retrieve exactly 5 messages from first query") + + for i := 0; i < 5; i++ { + require.Equal(t, sentHashes[i], storedMessages1[i].MessageHash, "Message order mismatch in first query") + } + + Debug("Node3 querying second page of stored messages from Node2") + storeRequest2 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(5), + PaginationForward: true, + PaginationCursor: &res1.PaginationCursor, + } + + res2, err := node3.GetStoredMessages(node2, &storeRequest2) + require.NoError(t, err, "Failed to retrieve second page of stored messages from Node2") + require.NotNil(t, res2.Messages, "Expected stored messages but received nil") + + storedMessages2 := *res2.Messages + require.Len(t, storedMessages2, 3, "Expected to retrieve exactly 3 messages from second query") + + for i := 0; i < 3; i++ { + require.Equal(t, sentHashes[i+5], storedMessages2[i].MessageHash, "Message order mismatch in second query") + } + + Debug("Test successfully verified store query pagination across multiple pages") +} + +func TestStoreQueryWithPaginationReverseOrder(t *testing.T) { + Debug("Starting test to verify store query with pagination in reverse order") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 (Relay enabled)") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 (Relay & Store enabled)") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = false + + Debug("Creating Node3 (Peer connected to Node2)") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node1 to Node2") + err = node1.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node1 to Node2") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Waiting for peer connections to stabilize") + time.Sleep(2 * time.Second) + + numMessages := 8 + var sentHashes []common.MessageHash + defaultPubsubTopic := DefaultPubsubTopic + + Debug("Publishing %d messages from Node1 using RelayPublish", numMessages) + for i := 0; i < numMessages; i++ { + message := node1.CreateMessage(&pb.WakuMessage{ + Payload: []byte(fmt.Sprintf("message-%d", i)), + ContentTopic: "test-content-topic", + Timestamp: proto.Int64(time.Now().UnixNano()), + }) + + msgHash, err := node1.RelayPublishNoCTX(defaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message from Node1") + + sentHashes = append(sentHashes, msgHash) + } + + Debug("Waiting for message delivery to Node2") + time.Sleep(5 * time.Second) + + Debug("Node3 querying first page of stored messages from Node2 (Newest first)") + storeRequest1 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(5), + PaginationForward: false, + } + + res1, err := node3.GetStoredMessages(node2, &storeRequest1) + require.NoError(t, err, "Failed to retrieve first page of stored messages from Node2") + require.NotNil(t, res1.Messages, "Expected stored messages but received nil") + + storedMessages1 := *res1.Messages + require.Len(t, storedMessages1, 5, "Expected to retrieve exactly 5 messages from first query") + + for i := 0; i < 5; i++ { + require.Equal(t, sentHashes[numMessages-1-i], storedMessages1[i].MessageHash, "Message order mismatch in first query") + } + + Debug("Node3 querying second page of stored messages from Node2") + storeRequest2 := common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(5), + PaginationForward: false, + PaginationCursor: &res1.PaginationCursor, + } + + res2, err := node3.GetStoredMessages(node2, &storeRequest2) + require.NoError(t, err, "Failed to retrieve second page of stored messages from Node2") + require.NotNil(t, res2.Messages, "Expected stored messages but received nil") + + storedMessages2 := *res2.Messages + require.Len(t, storedMessages2, 3, "Expected to retrieve exactly 3 messages from second query") + + for i := 0; i < 3; i++ { + require.Equal(t, sentHashes[numMessages-6-i], storedMessages2[i].MessageHash, "Message order mismatch in second query") + } + + Debug("Test successfully verified store query pagination in reverse order") +} + +func TestQueryFailwhenNoStorePeer(t *testing.T) { + Debug("Starting test to verify store query failure when node2 has no store") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = false + + Debug("Creating Node2 with Relay enabled but Store disabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = true + + Debug("Creating Node3") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Sender Node1 is publishing a message") + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Verifying that Node3 fails to retrieve stored messages since Node2 has store disabled") + storedMessages, err := node3.GetStoredMessages(node2, nil) + require.Error(t, err, "Expected Node3's store query to fail because Node2 has store disabled") + require.Empty(t, storedMessages, "Expected no messages in store for Node3") + + Debug("Test successfully verified that store query fails when Node2 does not store messages") +} + +func TestQueryFailWithIncorrectStaticNode(t *testing.T) { + Debug("Starting test to verify store query failure when Node3 has an incorrect static node address") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node1Address, err := node1.ListenAddresses() + require.NoError(t, err, "Failed to get listening address for Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + node2Config.Staticnodes = []string{node1Address[0].String()} + + Debug("Creating Node2 with Store enabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node2Address, err := node2.ListenAddresses() + require.NoError(t, err, "Failed to get listening address for Node2") + + var incorrectAddress = node2Address[0].String()[:len(node2Address[0].String())-10] + node3Config := DefaultWakuConfig + node3Config.Relay = true + node3Config.Staticnodes = []string{incorrectAddress} + + Debug("Original Node2 Address: %s", node2Address[0].String()) + Debug("Modified Node2 Address: %s", incorrectAddress) + + Debug("Creating Node3 with an incorrect static node address") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Sender Node1 is publishing a message") + queryTimestamp := proto.Int64(time.Now().UnixNano()) + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Verifying that Node3 fails to retrieve stored messages due to incorrect static node") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + } + storedmsgs, err := node3.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Expected Node3's store query to fail due to incorrect static node") + require.Nil(t, (*storedmsgs.Messages)[0].WakuMessage, "Expected no messages in store for Node3") + + Debug("Test successfully verified store query failure due to incorrect static node configuration") +} + +func TestStoreQueryWithoutData(t *testing.T) { + Debug("Starting test to verify store query returns only message hashes when IncludeData is false") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true // Enable store on Node2 + + Debug("Creating Node2 with Store enabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = true + + Debug("Creating Node3") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Sender Node1 is publishing a message") + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Querying stored messages from Node3 with IncludeData = false") + storeQueryRequest := &common.StoreQueryRequest{ + IncludeData: false, + } + + storedmsgs, err := node3.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.NotNil(t, storedmsgs.Messages, "Expected store response to contain message hashes") + //require.Len(t, *storedmsgs.Messages, 1, "Expected exactly one stored message") + + // Access the first message + // firstMessage := (*storedmsgs.Messages)[0] + + firstMessage := (*storedmsgs.Messages)[0] + require.Nil(t, firstMessage.WakuMessage, "Expected message payload to be empty when IncludeData is false") + require.NotEmpty(t, (*storedmsgs.Messages)[0].MessageHash, "Expected message hash to be present") + Debug("Queried message hash: %s", (*storedmsgs.Messages)[0].MessageHash) + + Debug("Test successfully verified that store query returns only message hashes when IncludeData is false") +} + +func TestStoreQueryWithWrongContentTopic(t *testing.T) { + Debug("Starting test to verify store query fails when using an incorrect content topic and an old timestamp") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 with Store enabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node3Config := DefaultWakuConfig + node3Config.Relay = true + + Debug("Creating Node3") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + Debug("Connecting Node3 to Node2") + err = node3.ConnectPeer(node2) + require.NoError(t, err, "Failed to connect Node3 to Node2") + + Debug("Recording timestamp before message publication") + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Sender Node1 is publishing a message with a correct content topic") + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Querying stored messages from Node3 with an incorrect content topic and an old timestamp") + storeQueryRequest := &common.StoreQueryRequest{ + ContentTopics: &[]string{"incorrect-content-topic"}, + TimeStart: queryTimestamp, + } + + storedmsgs, _ := node3.GetStoredMessages(node2, storeQueryRequest) + //require.Error(t, err, "Expected error when querying with an incorrect content topic and old timestamp") + require.Nil(t, (*storedmsgs.Messages)[0], "Expected no messages to be returned for incorrect content topic and timestamp") + //Debug("Queried message hash: %s", (*storedmsgs.Messages)[0].MessageHash) + Debug("Test successfully verified that store query fails when using an incorrect content topic and an old timestamp") +} + +func TestCheckStoredMSGsEphemeralTrue(t *testing.T) { + Debug("Starting test to verify ephemeral messages are not stored") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 with Store enabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + Debug("Recording timestamp before message publication") + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Sender Node1 is publishing an ephemeral message") + message := node1.CreateMessage() + ephemeralTrue := true + message.Ephemeral = &ephemeralTrue + + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Querying stored messages from Node2") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Equal(t, 0, len(*storedmsgs.Messages), "Expected no stored messages for ephemeral messages") + + Debug("Test successfully verified that ephemeral messages are not stored") +} + +func TestCheckStoredMSGsEphemeralFalse(t *testing.T) { + Debug("Starting test to verify non-ephemeral messages are stored") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 with Relay enabled") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 with Store enabled") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + Debug("Recording timestamp before message publication") + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Sender Node1 is publishing a non-ephemeral message") + message := node1.CreateMessage() + ephemeralFalse := false + message.Ephemeral = &ephemeralFalse + + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Querying stored messages from Node2") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Equal(t, 1, len(*storedmsgs.Messages), "Expected exactly one stored message") + + Debug("Test finished successfully ") +} + +func TestCheckLegacyStore(t *testing.T) { + Debug("Starting test ") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1 ") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + node2Config.LegacyStore = true + + Debug("Creating Node2") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Sender Node1 is publishing a message") + message := node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err) + require.NotEmpty(t, msgHash) + + Debug("Querying stored messages from Node2 using Node1") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Equal(t, 1, len(*storedmsgs.Messages), "Expected exactly one stored message") + + Debug("Test finished successfully ") + +} + +func TestStoredMessagesWithVDifferentPayloads(t *testing.T) { + Debug("Starting test ") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2 ") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + for _, pLoad := range SAMPLE_INPUTS { + queryTimestamp := proto.Int64(time.Now().UnixNano()) + + Debug("Sender Node1 is publishing message with payload: %s", pLoad.Value) + message := node1.CreateMessage() + message.Payload = []byte(pLoad.Value) + + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message") + require.NotEmpty(t, msgHash, "Message hash is empty") + + Debug("Querying stored messages from Node2 using Node1") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + IncludeData: true, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + retrievedMessage := (*storedmsgs.Messages)[0] + require.Equal(t, pLoad.Value, string(retrievedMessage.WakuMessage.Payload), "Expected WakuMessage but got nil") + Debug("Payload matches expected %s", string(retrievedMessage.WakuMessage.Payload)) + } + + Debug("Test finished successfully ") +} + +func TestStoredMessagesWithDifferentContentTopics(t *testing.T) { + Debug("Starting test for different content topics") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + for _, contentTopic := range CONTENT_TOPICS_DIFFERENT_SHARDS { + + Debug("Node1 is publishing message with content topic: %s", contentTopic) + queryTimestamp := proto.Int64(time.Now().UnixNano()) + message := node1.CreateMessage() + message.ContentTopic = contentTopic + + msgHash, err := node1.RelayPublishNoCTX(DefaultPubsubTopic, message) + require.NoError(t, err, "Failed to publish message") + require.NotEmpty(t, msgHash, "Message hash is empty") + + Debug("Querying stored messages from Node2 using Node1") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + IncludeData: true, + ContentTopics: &[]string{contentTopic}, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Greater(t, len(*storedmsgs.Messages), 0, "Expected at least one stored message") + require.Equal(t, contentTopic, (*storedmsgs.Messages)[0].WakuMessage.ContentTopic, "Stored message content topic does not match expected") + Debug("Veified content topic %s ", (*storedmsgs.Messages)[0].WakuMessage.ContentTopic) + } + + Debug("Test finished successfully") +} + +func TestStoredMessagesWithDifferentPubsubTopics(t *testing.T) { + Debug("Starting test for different pubsub topics") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + node2Config := DefaultWakuConfig + node2Config.Relay = true + node2Config.Store = true + + Debug("Creating Node2") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + defer func() { + Debug("Stopping and destroying both nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + Debug("Connecting Node2 to Node1") + err = node2.ConnectPeer(node1) + require.NoError(t, err, "Failed to connect Node2 to Node1") + + for _, pubsubTopic := range PUBSUB_TOPICS_STORE { + + Debug("Node1 is publishing message on pubsub topic: %s", pubsubTopic) + node1.RelaySubscribe(pubsubTopic) + node2.RelaySubscribe(pubsubTopic) + queryTimestamp := proto.Int64(time.Now().UnixNano()) + var msg = node1.CreateMessage() + msgHash, err := node1.RelayPublishNoCTX(pubsubTopic, msg) + require.NoError(t, err, "Failed to publish message") + require.NotEmpty(t, msgHash, "Message hash is empty") + + Debug("Querying stored messages from Node2 using Node1") + storeQueryRequest := &common.StoreQueryRequest{ + TimeStart: queryTimestamp, + IncludeData: true, + PubsubTopic: pubsubTopic, + } + + storedmsgs, err := node1.GetStoredMessages(node2, storeQueryRequest) + require.NoError(t, err, "Failed to query store messages from Node2") + require.Greater(t, len(*storedmsgs.Messages), 0, "Expected at least one stored message") + require.Equal(t, pubsubTopic, (*storedmsgs.Messages)[0].PubsubTopic, "Stored message pubsub topic does not match expected") + } + + Debug("Test finished successfully") +} diff --git a/waku/test_data.go b/waku/test_data.go index 67125d1..7552e23 100644 --- a/waku/test_data.go +++ b/waku/test_data.go @@ -1,27 +1,39 @@ package waku import ( + "fmt" "time" "github.com/waku-org/waku-go-bindings/waku/common" + "google.golang.org/protobuf/proto" ) var DefaultWakuConfig common.WakuConfig +var DefaultStoreQueryRequest common.StoreQueryRequest +var DEFAULT_CLUSTER_ID = 16 func init() { - DefaultWakuConfig = WakuConfig{ + DefaultWakuConfig = common.WakuConfig{ Relay: false, LogLevel: "DEBUG", Discv5Discovery: true, ClusterID: 16, - Shards: []uint16{64}, - PeerExchange: false, - Store: false, - Filter: false, - Lightpush: false, - Discv5UdpPort: 0, - TcpPort: 0, + //Shards: []uint16{64}, + PeerExchange: false, + Store: false, + Filter: false, + Lightpush: false, + Discv5UdpPort: 0, + TcpPort: 0, + } + + DefaultStoreQueryRequest = common.StoreQueryRequest{ + IncludeData: true, + ContentTopics: &[]string{"test-content-topic"}, + PaginationLimit: proto.Uint64(uint64(50)), + PaginationForward: true, + TimeStart: proto.Int64(time.Now().Add(-5 * time.Minute).UnixNano()), // 5 mins before now } } @@ -33,3 +45,73 @@ var ( MinPort = 1024 // Minimum allowable port (exported) MaxPort = 65535 // Maximum allowable port (exported) ) + +var SAMPLE_INPUTS = []struct { + Description string + Value string +}{ + {"A simple string", "Hello World!"}, + {"An integer", "1234567890"}, + {"A dictionary", `{"key": "value"}`}, + {"Chinese characters", "这是一些中文"}, + {"Emojis", "🚀🌟✨"}, + {"Lorem ipsum text", "Lorem ipsum dolor sit amet"}, + {"HTML content", "Hello"}, + {"Cyrillic characters", "\u041f\u0440\u0438\u0432\u0435\u0442"}, + {"Base64 encoded string", "Base64==dGVzdA=="}, + {"Binary data", "d29ya2luZyB3aXRoIGJpbmFyeSBkYXRh: \x50\x51"}, + {"Special characters with whitespace", "\t\nSpecial\tCharacters\n"}, + {"Boolean false as a string", "False"}, + {"A float number", "3.1415926535"}, + {"A list", "[1, 2, 3, 4, 5]"}, + {"Hexadecimal number as a string", "0xDEADBEEF"}, + {"Email format", "user@example.com"}, + {"URL format", "http://example.com"}, + {"Date and time in ISO format", "2023-11-01T12:00:00Z"}, + {"String with escaped quotes", `"Escaped" \"quotes\"`}, + {"A regular expression", "Regular expression: ^[a-z0-9_-]{3,16}$"}, + {"A very long string", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"}, + {"A JSON string", `{"name": "John", "age": 30, "city": "New York"}`}, + {"A Unix path", "/usr/local/bin"}, + {"A Windows path", "C:\\Windows\\System32"}, + {"An SQL query", "SELECT * FROM users WHERE id = 1;"}, + {"JavaScript code snippet", "function test() { console.log('Hello World'); }"}, + {"A CSS snippet", "body { background-color: #fff; }"}, + {"A Python one-liner", "print('Hello World')"}, + {"An IP address", "192.168.1.1"}, + {"A domain name", "www.example.com"}, + {"A user agent string", "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}, + {"A credit card number", "1234-5678-9012-3456"}, + {"A phone number", "+1234567890"}, + {"A UUID", "123e4567-e89b-12d3-a456-426614174000"}, + {"A hashtag", "#helloWorld"}, + {"A Twitter handle", "@username"}, + {"A password", "P@ssw0rd!"}, + {"A date in common format", "01/11/2023"}, + {"A time string", "12:00:00"}, + {"A mathematical equation", "E = mc^2"}, +} + +var PUBSUB_TOPICS_STORE = []string{ + + fmt.Sprintf("/waku/2/rs/%d/0", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/1", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/2", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/3", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/4", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/5", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/6", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/7", DEFAULT_CLUSTER_ID), + fmt.Sprintf("/waku/2/rs/%d/8", DEFAULT_CLUSTER_ID), +} + +var CONTENT_TOPICS_DIFFERENT_SHARDS = []string{ + "/myapp/1/latest/proto", + "/waku/2/content/test.js", + "/app/22/sometopic/someencoding", + "/toychat/2/huilong/proto", + "/statusim/1/community/cbor", + "/app/27/sometopic/someencoding", + "/app/29/sometopic/someencoding", + "/app/20/sometopic/someencoding", +}