diff --git a/node/cmd/transfer-verifier/transfer-verifier-sui.go b/node/cmd/transfer-verifier/transfer-verifier-sui.go index 3669962d07..2846b9242a 100644 --- a/node/cmd/transfer-verifier/transfer-verifier-sui.go +++ b/node/cmd/transfer-verifier/transfer-verifier-sui.go @@ -11,7 +11,6 @@ package transferverifier import ( // "bytes" "context" - "crypto/rand" "encoding/binary" "encoding/json" "fmt" @@ -27,7 +26,6 @@ import ( ipfslog "github.com/ipfs/go-log/v2" "github.com/spf13/cobra" "go.uber.org/zap" - "nhooyr.io/websocket" "github.com/wormhole-foundation/wormhole/sdk/vaa" ) @@ -348,7 +346,7 @@ func init() { // envStr = TransferVerifierCmd.Flags().String("env", "", `environment (may be "testnet" or "mainnet")`) // TODO - fix the flag handling - suiRPC = *TransferVerifierCmdSui.Flags().String("suiRPC", "https://rpc.ankr.com/sui/22fe735acb187df41c2e84b758d081aa48b31e69cce2dee73951b5bbfb88b403", "Sui RPC url") + suiRPC = *TransferVerifierCmdSui.Flags().String("suiRPC", "", "Sui RPC url") logLevel = TransferVerifierCmdSui.Flags().String("logLevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)") suiCoreContract = *TransferVerifierCmdSui.Flags().String("suiCoreContract", "0x5306f64e312b581766351c07af79c72fcb1cd25147157fdc2f8ad76de9a3fb6a", "Event to listen to in Sui") suiTokenBridgeEmitter = *TransferVerifierCmdSui.Flags().String("suiTokenBridgeEmitter", "0xccceeb29348f71bdd22ffef43a2a19c1f5b5e17c5cca5411529120182672ade5", "Token bridge emitter on Sui. Tied to the token bridge package.") @@ -381,64 +379,55 @@ func runTransferVerifierSui(cmd *cobra.Command, args []string) { //processDigest(*logger) // Process ALL of the incoming ones - processAllEvents(*logger) - //processEventsLive(*logger) + //processAllEvents(*logger) + processEventsLive(*logger) } +// https://github.com/wormhole-foundation/wormhole/blob/e297d96d101857f98e0fbba10168b6dc7b55d9c0/node/pkg/watchers/sui/watcher.go#L449 func processEventsLive(logger zap.Logger) { - nBig, _ := rand.Int(rand.Reader, big.NewInt(27)) - subId := nBig.Int64() + cursor := "null" + prevFirstDigest := "" - ctx := context.Background() - ws, _, err := websocket.Dial(ctx, "wss://rpc.ankr.com/sui/ws/22fe735acb187df41c2e84b758d081aa48b31e69cce2dee73951b5bbfb88b403", nil) - if err != nil { - logger.Error("couldn't connect to websocket Sui") - return - } - defer ws.Close(websocket.StatusNormalClosure, "") + for true { + time.Sleep(10 * time.Second) // Sleep a little bit to let things get processed - subscription := fmt.Sprintf(`{"jsonrpc":"2.0", "id": %d, "method": "suix_subscribeEvent", "params": {"All" : [{"MoveEventType": "%s"}, {"Package":"0x5306f64e312b581766351c07af79c72fcb1cd25147157fdc2f8ad76de9a3fb6a"},]}}`, subId, suiMoveEventType) + queryEventsCmd := fmt.Sprintf(`{"jsonrpc":"2.0", "id": 1, "method": "suix_queryEvents", "params": [{ "MoveEventType": "%s" }, %s, %d, %t]}`, + suiMoveEventType, cursor, 10, true) - err = ws.Write(ctx, websocket.MessageText, []byte(subscription)) - if err != nil { - logger.Error("couldn't create subscription with websocket") - } + res, err := suiQueryEvents(suiRPC, queryEventsCmd) - _, p, err := ws.Read(ctx) - if err != nil { - logger.Error("Failed to read websocket response to event subscription", zap.Error(err)) - } + if err != nil { + logger.Error(fmt.Sprintf("suiQueryEvents failed: %s", err)) + return + } + //cursor = fmt.Sprintf(`{"txDigest":"%s", "eventSeq":"%s"}`, res.Result.NextCursor.TxDigest, res.Result.NextCursor.EventSeq) - var subRes map[string]any - err = json.Unmarshal(p, &subRes) - if err != nil { - logger.Error("Failed to unmarshal req in subscription request", zap.Error(err)) - return - } - logger.Debug("Unmarshalled json", zap.Any("subRes", subRes)) - actualResult := subRes["result"] - logger.Debug("actualResult", zap.Any("res", actualResult)) + if len(res.Result.Data) == 0 { // Empty query + continue + } - if actualResult == nil { - logger.Error("Failed to request filter in subscription request", zap.Error(err)) - return - } + txDigestCurrent := res.Result.Data[0].ID.TxDigest + if prevFirstDigest == *txDigestCurrent { // No new data + logger.Info(fmt.Sprintf("No new events for hash %s", *txDigestCurrent)) + continue + } - for { - select { - case <-ctx.Done(): - logger.Error("sui_data_pump context done") + entries := res.Result.Data - default: - _, msg, err := ws.Read(ctx) - //var res SuiEventMsg + for _, entry := range entries { + + if prevFirstDigest == *entry.ID.TxDigest { // Already seen the TX. Don't need to process again. Should be sequential so we can leave this loop. + break + } + fmt.Println("============================================") + logger.Info("", zap.String("Hash", *entry.ID.TxDigest)) + err = processIncomingEvent(*entry.ID.TxDigest, logger) if err != nil { - continue + logger.Error(fmt.Sprintf("Unable to process event: %s", err.Error())) } - //err = json.Unmarshal(msg, &res) - - fmt.Println(string(msg[:])) } + + prevFirstDigest = *txDigestCurrent } } diff --git a/scripts/sui-transfer-verifier.sh b/scripts/sui-transfer-verifier.sh index 3a0d32649d..7a146638ff 100755 --- a/scripts/sui-transfer-verifier.sh +++ b/scripts/sui-transfer-verifier.sh @@ -12,8 +12,7 @@ TOKEN_BRIDGE_CONTRACT="0x26efee2b51c911237888e5dc6702868abca3c7ac12c53f76ef8eba0 TOKEN_BRIDGE_EMITTER="0xccceeb29348f71bdd22ffef43a2a19c1f5b5e17c5cca5411529120182672ade5" -# RPC="${ALCHEMY_RPC}" -RPC=https://rpc.ankr.com/sui/22fe735acb187df41c2e84b758d081aa48b31e69cce2dee73951b5bbfb88b403 +RPC= LOG_LEVEL="info"