Skip to content

Commit

Permalink
foldClient with ledger state #135 (#145)
Browse files Browse the repository at this point in the history
  • Loading branch information
the-headless-ghost authored Apr 22, 2024
1 parent f48f375 commit 98e6a37
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 52 deletions.
1 change: 1 addition & 0 deletions src/node-client/convex-node-client.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ library
-- cardano dependencies
build-depends:
cardano-api,
cardano-api:internal,
cardano-slotting,
ouroboros-consensus,
ouroboros-network-protocols,
Expand Down
133 changes: 93 additions & 40 deletions src/node-client/lib/Convex/NodeClient/Fold.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GADTs #-}
Expand All @@ -11,6 +12,9 @@
Unlike 'foldBlocks' from 'Cardano.Api', this one supports rollbacks.
-}
module Convex.NodeClient.Fold(
LedgerStateArgs (..),
LedgerStateUpdate (..),
LedgerStateMode (..),
CatchingUp(..),
catchingUpWithNode,
caughtUpWithNode,
Expand All @@ -31,8 +35,12 @@ import Cardano.Api (Block (.
ChainTip (..),
Env,
SlotNo,
LedgerState (..),
ValidationMode (..),
chainTipToChainPoint,
envSecurityParam)
envSecurityParam,
applyBlock)
import Cardano.Api.LedgerEvents.LedgerEvent (LedgerEvent)
import Cardano.Slotting.Slot (WithOrigin (At))
import Convex.NodeClient.ChainTip (JSONBlockNo (..),
JSONChainPoint (..),
Expand All @@ -46,6 +54,7 @@ import Data.Aeson (FromJSON
ToJSON)
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import Data.Functor ((<&>))
import GHC.Generics (Generic)
import Network.TypedProtocol.Pipelined (Nat (..))
import Ouroboros.Consensus.Block.Abstract (WithOrigin (..))
Expand All @@ -55,6 +64,22 @@ import qualified Ouroboros.Network.Protocol.ChainSync.ClientPipelined as CSP
import Ouroboros.Network.Protocol.ChainSync.PipelineDecision (PipelineDecision (..),
pipelineDecisionMax)

-- | Whether to keep track of the full ledger state on the client
data LedgerStateMode = FullLedgerState | NoLedgerState

-- | Whether we have the initial ledger state on the client
data LedgerStateArgs mode where
NoLedgerStateArgs :: LedgerStateArgs 'NoLedgerState
LedgerStateArgs :: LedgerState -> ValidationMode -> LedgerStateArgs 'FullLedgerState

-- | Whether we have the current ledger state for the client folding function
data LedgerStateUpdate mode where
NoLedgerStateUpdate :: LedgerStateUpdate 'NoLedgerState
LedgerStateUpdate :: LedgerState -> [LedgerEvent] -> LedgerStateUpdate 'FullLedgerState

-- | A history of the last @k@ states
type History mode a = Seq (SlotNo, LedgerStateUpdate mode, a)

{-| Whether we have fully caught up with the node
-}
data CatchingUp =
Expand Down Expand Up @@ -105,49 +130,54 @@ resumingFrom = \case
{-| Run the client until 'Nothing' is returned
-}
foldClient ::
forall s.
forall mode s.
-- | Initial state
s ->
-- | Initial ledger state arguments
LedgerStateArgs mode ->
-- | Node connection data
Env ->
-- | Fold
(CatchingUp -> s -> BlockInMode -> IO (Maybe s)) ->
(CatchingUp -> s -> LedgerStateUpdate mode -> BlockInMode -> IO (Maybe s)) ->
PipelinedLedgerStateClient
foldClient initialState env applyBlock =
foldClient' @s @()
foldClient initialState initialLedgerState env accumulate =
foldClient' @mode @s @()
initialState
initialLedgerState
env
(\_ _ !s -> pure ((), s))
(\c !s -> fmap (fmap pure) . applyBlock c s)
(\c !s !args -> fmap (fmap pure) . accumulate c s args)

{-| A variant of 'foldClient' with more detailed control over rollbacks.
-}
foldClient' ::
forall s w.
forall mode s w.
Monoid w =>
-- | Initial state
s ->
-- | ^ Initial ledger state arguments
LedgerStateArgs mode ->
-- | Node connection data
Env ->
-- | Rollback
(ChainPoint -> w -> s -> IO (w, s)) ->
-- | Fold
(CatchingUp -> s -> BlockInMode -> IO (Maybe (w, s))) -> -- ^ Fold
(CatchingUp -> s -> LedgerStateUpdate mode -> BlockInMode -> IO (Maybe (w, s))) ->
PipelinedLedgerStateClient
foldClient' initialState env applyRollback applyBlock = PipelinedLedgerStateClient $ CSP.ChainSyncClientPipelined $ do
foldClient' initialState ledgerStateArgs env applyRollback accumulate = PipelinedLedgerStateClient $ CSP.ChainSyncClientPipelined $ do

-- NB: The code below was adapted from https://input-output-hk.github.io/cardano-node/cardano-api/src/Cardano.Api.LedgerState.html#foldBlocks

let
pipelineSize = 10 -- TODO: Configurable

initialHistory = initialStateHistory (mempty, initialState)
initialHistory = initialStateHistory ledgerStateArgs (mempty, initialState)

clientIdle_RequestMoreN
:: forall n. WithOrigin BlockNo
-> WithOrigin BlockNo
-> Nat n -- Number of requests inflight.
-> History (w, s)
-> History mode (w, s)
-> CSP.ClientPipelinedStIdle n BlockInMode ChainPoint ChainTip IO ()
clientIdle_RequestMoreN clientTip_ serverTip_ n history
= case pipelineDecisionMax pipelineSize n clientTip_ serverTip_ of
Expand All @@ -156,42 +186,66 @@ foldClient' initialState env applyRollback applyBlock = PipelinedLedgerStateClie
_ -> CSP.SendMsgRequestNextPipelined (pure ()) (clientIdle_RequestMoreN clientTip_ serverTip_ (Succ n) history)

clientNextN
:: Nat n
-> History (w, s)
:: forall n. Nat n
-> History mode (w, s)
-> ClientStNext n BlockInMode ChainPoint ChainTip IO ()
clientNextN n history =
ClientStNext {
recvMsgRollForward = \newBlock serverChainTip -> do
let BlockInMode _ (Block bh@(BlockHeader slotNo _blockHash currBlockNo) _) = newBlock
newClientTip = At currBlockNo
recvMsgRollForward = \newBlock@(BlockInMode _ bim@(Block bh@(BlockHeader slotNo _blockHash currBlockNo) _)) serverChainTip -> do
let newClientTip = At currBlockNo
newServerTip = fromChainTip serverChainTip
cu = if newClientTip == newServerTip
then caughtUpWithNode serverChainTip
else catchingUpWithNode (blockHeaderPoint bh) (Just currBlockNo) (Just $ chainTipToChainPoint serverChainTip)
currentState =

update :: LedgerStateUpdate mode -> s -> IO (Maybe (LedgerStateUpdate mode, (w, s)))
update NoLedgerStateUpdate currentState = do
state <- accumulate
cu
currentState
NoLedgerStateUpdate
newBlock
return $ state <&> (,) NoLedgerStateUpdate
update (LedgerStateUpdate currentLedgerState _) currentState = do
let
LedgerStateArgs _ validationMode = ledgerStateArgs
newLedgerStateE = applyBlock env currentLedgerState validationMode bim

case newLedgerStateE of
Left _ -> return Nothing
Right (newLedgerState, newLedgerEvents) -> do
let ledgerStateUpdate = LedgerStateUpdate newLedgerState newLedgerEvents
state <- accumulate
cu
currentState
ledgerStateUpdate
newBlock
return $ state <&> (,) ledgerStateUpdate

(currentLedgerStateUpdate, currentState') =
case Seq.viewl history of
(_, (_, x)) Seq.:< _ -> x
Seq.EmptyL -> error "foldClient: clientNextN: Impossible - empty history!"
(_, ledgerState, (_, s)) Seq.:< _ -> (ledgerState, s)
Seq.EmptyL -> error "foldClient: clientNextN: Impossible - empty history!"

newState <- applyBlock cu currentState newBlock
newState <- update currentLedgerStateUpdate currentState'
case newState of
Nothing -> do
clientIdle_DoneN n
Just !s' -> do
let (newHistory, _) = pushHistoryState env history slotNo s'
Just (!ledgerStateUpdate, !s') -> do
let (newHistory, _) = pushHistoryState env history slotNo ledgerStateUpdate s'
return (clientIdle_RequestMoreN newClientTip newServerTip n newHistory)
, recvMsgRollBackward = \chainPoint serverChainTip -> do
let newClientTip = Origin
newServerTip = fromChainTip serverChainTip
(rolledBack, truncatedHistory) = case chainPoint of
ChainPointAtGenesis -> (Seq.empty, initialHistory)
ChainPoint slotNo _ -> rollbackStateHistory history slotNo
(lastSlotNo, currentState) =
(lastSlotNo, lastLedgerState, currentState) =
case Seq.viewl truncatedHistory of
(n', (_, x)) Seq.:< _ -> (n', x)
(n', state, (_, x)) Seq.:< _ -> (n', state, x)
Seq.EmptyL -> error "foldClient: clientNextN: Impossible - empty history after rollback!"
!rolledBackState <- applyRollback chainPoint (foldMap (fst . snd) rolledBack) currentState
let (newHistory, _) = pushHistoryState env truncatedHistory lastSlotNo rolledBackState
!rolledBackState <- applyRollback chainPoint (foldMap (\(_, _, (s, _)) -> s) rolledBack) currentState
let (newHistory, _) = pushHistoryState env truncatedHistory lastSlotNo lastLedgerState rolledBackState
return (clientIdle_RequestMoreN newClientTip newServerTip n newHistory)
}

Expand All @@ -215,34 +269,33 @@ foldClient' initialState env applyRollback applyBlock = PipelinedLedgerStateClie

return (clientIdle_RequestMoreN Origin Origin Zero initialHistory)

-- | A history of the last @k@ states
type History a = Seq (SlotNo, a)

-- | Add a new state to the history
pushHistoryState
:: -- | Environement used to get the security param, k.
pushHistoryState :: forall mode a.
-- | Environement used to get the security param, k.
Env
-- | History of k items.
-> History a
-> History mode a
-- | Slot number of the new item.
-> SlotNo
-> LedgerStateUpdate mode
-- | New item to add to the history
-> a
-- | ( The new history with the new item appended
-- , Any exisiting items that are now past the security parameter
-- and hence can no longer be rolled back.
-- )
-> (History a, History a)
-> (History mode a, History mode a)

pushHistoryState env hist ix st
pushHistoryState env hist ix ledgerStateUpdate st
= Seq.splitAt
(fromIntegral $ envSecurityParam env + 1)
((ix, st) Seq.:<| hist)
((ix, ledgerStateUpdate, st) Seq.:<| hist)

initialStateHistory :: forall mode a. LedgerStateArgs mode -> a -> History mode a
initialStateHistory (LedgerStateArgs ledgerState0 _) a = Seq.singleton (0, LedgerStateUpdate ledgerState0 [], a)
initialStateHistory NoLedgerStateArgs a = Seq.singleton (0, NoLedgerStateUpdate, a)

-- | Split the history into bits that have been rolled back (1st elemnt) and
-- bits that have not been rolled back (2nd element)
rollbackStateHistory :: History a -> SlotNo -> (History a, History a)
rollbackStateHistory hist maxInc = Seq.spanl ((> maxInc) . (\(x,_) -> x)) hist

initialStateHistory :: a -> History a
initialStateHistory a = Seq.singleton (0, a)
rollbackStateHistory :: forall mode a. History mode a -> SlotNo -> (History mode a, History mode a)
rollbackStateHistory hist maxInc = Seq.spanl ((> maxInc) . (\(x,_,_) -> x)) hist
24 changes: 15 additions & 9 deletions src/node-client/lib/Convex/NodeClient/WaitForTxnClient.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE UndecidableInstances #-}
{-| A node client that waits for a transaction to appear on the chain
Expand All @@ -11,7 +12,8 @@ module Convex.NodeClient.WaitForTxnClient(
) where

import Cardano.Api (BlockInMode, ChainPoint, Env,
LocalNodeConnectInfo, TxId)
LocalNodeConnectInfo, TxId,
LedgerState)
import qualified Cardano.Api as C
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (TMVar, atomically, newEmptyTMVar,
Expand All @@ -22,7 +24,11 @@ import Control.Monad.Reader (MonadTrans, ReaderT (..), ask,
lift)
import Convex.Class (MonadBlockchain (..))
import Convex.MonadLog (MonadLog (..), logInfoS)
import Convex.NodeClient.Fold (CatchingUp (..), foldClient)
import Convex.NodeClient.Fold (CatchingUp (..),
LedgerStateArgs (..),
LedgerStateUpdate,
LedgerStateMode (..),
foldClient)
import Convex.NodeClient.Resuming (resumingClient)
import Convex.NodeClient.Types (PipelinedLedgerStateClient,
protocols)
Expand All @@ -43,10 +49,10 @@ runWaitForTxn connectInfo env txi = do
waitForTxnClient :: TMVar BlockInMode -> ChainPoint -> TxId -> Env -> PipelinedLedgerStateClient
waitForTxnClient tmv cp txId env =
resumingClient [cp] $ \_ ->
foldClient () env (applyBlock tmv txId)
foldClient () NoLedgerStateArgs env (applyBlock tmv txId)

applyBlock :: TMVar BlockInMode -> TxId -> CatchingUp -> () -> BlockInMode -> IO (Maybe ())
applyBlock tmv txi _ () block = do
applyBlock :: TMVar BlockInMode -> TxId -> CatchingUp -> () -> LedgerStateUpdate 'NoLedgerState -> BlockInMode -> IO (Maybe ())
applyBlock tmv txi _ () _ block = do
case block of
C.BlockInMode C.BabbageEra blck ->
if checkTxIds txi blck
Expand All @@ -62,11 +68,11 @@ checkTxIds txi ((C.Block _ txns)) = any (checkTxId txi) txns
checkTxId :: TxId -> C.Tx C.BabbageEra -> Bool
checkTxId txi tx = txi == C.getTxId (C.getTxBody tx)

newtype MonadBlockchainWaitingT m a = MonadBlockchainWaitingT{unMonadBlockchainWaitingT :: ReaderT (LocalNodeConnectInfo, Env) m a }
newtype MonadBlockchainWaitingT m a = MonadBlockchainWaitingT{unMonadBlockchainWaitingT :: ReaderT (LocalNodeConnectInfo, LedgerState, Env) m a }
deriving newtype (Functor, Applicative, Monad, MonadIO, MonadFail)

runMonadBlockchainWaitingT :: LocalNodeConnectInfo -> Env -> MonadBlockchainWaitingT m a -> m a
runMonadBlockchainWaitingT connectInfo env (MonadBlockchainWaitingT action) = runReaderT action (connectInfo, env)
runMonadBlockchainWaitingT :: LocalNodeConnectInfo -> LedgerState -> Env -> MonadBlockchainWaitingT m a -> m a
runMonadBlockchainWaitingT connectInfo initialLedgerState env (MonadBlockchainWaitingT action) = runReaderT action (connectInfo, initialLedgerState, env)

instance MonadError e m => MonadError e (MonadBlockchainWaitingT m) where
throwError = lift . throwError
Expand All @@ -83,7 +89,7 @@ instance (MonadLog m) => MonadLog (MonadBlockchainWaitingT m) where
instance (MonadIO m, MonadBlockchain m, MonadLog m) => MonadBlockchain (MonadBlockchainWaitingT m) where
sendTx tx = MonadBlockchainWaitingT $ do
let txi = C.getTxId (C.getTxBody tx)
(info, env) <- ask
(info, _ledgerState0, env) <- ask
tmv <- liftIO (runWaitForTxn info env txi)
k <- sendTx tx
logInfoS $ "MonadBlockchainWaitingT.sendTx: Waiting for " <> show txi <> " to appear on the chain"
Expand Down
11 changes: 8 additions & 3 deletions src/wallet/lib/Convex/Wallet/NodeClient/BalanceClient.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE DataKinds #-}
{-| A node client that shows the balance of the wallet
-}
module Convex.Wallet.NodeClient.BalanceClient(
Expand All @@ -19,7 +20,10 @@ import Control.Monad.Trans.Maybe (runMaybeT)
import Convex.MonadLog (MonadLogKatipT (..), logInfo,
logInfoS)
import Convex.NodeClient.Fold (CatchingUp (..), catchingUp,
catchingUpWithNode, foldClient)
catchingUpWithNode, foldClient,
LedgerStateArgs (..),
LedgerStateUpdate,
LedgerStateMode (..))
import Convex.NodeClient.Resuming (resumingClient)
import Convex.NodeClient.Types (PipelinedLedgerStateClient)
import Convex.Utils (toShelleyPaymentCredential)
Expand Down Expand Up @@ -48,13 +52,14 @@ balanceClient logEnv ns clientEnv walletState wallet env =
in resumingClient [cp] $ \_ ->
foldClient
(i, utxoSet walletState)
NoLedgerStateArgs
env
(applyBlock logEnv ns clientEnv wallet)

{-| Apply a new block
-}
applyBlock :: K.LogEnv -> K.Namespace -> BalanceClientEnv -> C.PaymentCredential -> CatchingUp -> (CatchingUp, UtxoSet C.CtxTx ()) -> BlockInMode -> IO (Maybe (CatchingUp, UtxoSet C.CtxTx ()))
applyBlock logEnv ns BalanceClientEnv{bceFile, bceState} wallet c (oldC, state) block = K.runKatipContextT logEnv () ns $ runMonadLogKatipT $ runMaybeT $ do
applyBlock :: K.LogEnv -> K.Namespace -> BalanceClientEnv -> C.PaymentCredential -> CatchingUp -> (CatchingUp, UtxoSet C.CtxTx ()) -> LedgerStateUpdate 'NoLedgerState -> BlockInMode -> IO (Maybe (CatchingUp, UtxoSet C.CtxTx ()))
applyBlock logEnv ns BalanceClientEnv{bceFile, bceState} wallet c (oldC, state) _ block = K.runKatipContextT logEnv () ns $ runMonadLogKatipT $ runMaybeT $ do
let change = Utxos.extract_ (toShelleyPaymentCredential wallet) state block
newUTxOs = apply state change
C.BlockInMode _ (C.getBlockHeader -> header) = block
Expand Down

0 comments on commit 98e6a37

Please sign in to comment.