Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

foldClient with ledger state #135

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 94 additions & 40 deletions src/node-client/lib/Convex/NodeClient/Fold.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GADTs #-}
Expand All @@ -11,6 +12,9 @@
Unlike 'foldBlocks' from 'Cardano.Api', this one supports rollbacks.
-}
module Convex.NodeClient.Fold(
LedgerStateArgs (..),
LedgerStateUpdate (..),
LedgerStateMode (..),
CatchingUp(..),
catchingUpWithNode,
caughtUpWithNode,
Expand All @@ -32,8 +36,12 @@ import Cardano.Api (Block (.
ChainTip (..),
Env,
SlotNo,
LedgerEvent,
LedgerState (..),
ValidationMode (..),
chainTipToChainPoint,
envSecurityParam)
envSecurityParam,
applyBlock)
import Cardano.Slotting.Slot (WithOrigin (At))
import Convex.NodeClient.ChainTip (JSONBlockNo (..),
JSONChainPoint (..),
Expand All @@ -48,6 +56,7 @@ import Data.Aeson (FromJSON
ToJSON)
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import Data.Functor ((<&>))
import GHC.Generics (Generic)
import Network.TypedProtocol.Pipelined (Nat (..))
import Ouroboros.Consensus.Block.Abstract (WithOrigin (..))
Expand All @@ -65,6 +74,23 @@ data CatchingUp =
deriving stock (Eq, Show, Generic)
deriving anyclass (FromJSON, ToJSON)

-- | Whether to keep track of the full ledger state on the client
data LedgerStateMode = FullLedgerState | NoLedgerState
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some haddocks explaining what this does (switching between ledger state / no ledger state) would be nice


-- | Whether we have the initial ledger state on the client
data LedgerStateArgs mode where
NoLedgerStateArgs :: LedgerStateArgs 'NoLedgerState
LedgerStateArgs :: LedgerState -> ValidationMode -> LedgerStateArgs 'FullLedgerState

-- | Whether we have the current ledger state for the client folding function
data LedgerStateUpdate mode where
NoLedgerStateUpdate :: LedgerStateUpdate 'NoLedgerState
LedgerStateUpdate :: LedgerState -> [LedgerEvent] -> LedgerStateUpdate 'FullLedgerState

-- | A history of the last @k@ states
type History mode a = Seq (SlotNo, LedgerStateUpdate mode, a)


getClientPoint :: CatchingUp -> JSONChainPoint
getClientPoint = \case
CatchingUpWithNode{clientPoint} -> clientPoint
Expand Down Expand Up @@ -107,45 +133,49 @@ resumingFrom = \case
{-| Run the client until 'Nothing' is returned
-}
foldClient ::
forall s.
forall mode s.
-- | Initial state
s ->
-- | Initial ledger state arguments
LedgerStateArgs mode ->
-- | Node connection data
Env ->
-- | Fold
(CatchingUp -> s -> BlockInMode CardanoMode -> IO (Maybe s)) ->
(CatchingUp -> s -> LedgerStateUpdate mode -> BlockInMode CardanoMode -> IO (Maybe s)) ->
PipelinedLedgerStateClient
foldClient initialState env applyBlock =
foldClient' @s @()
foldClient initialState initialLedgerState env accumulate =
foldClient' @mode @s @()
initialState
initialLedgerState
env
(\_ _ !s -> pure ((), s))
(\c !s -> fmap (fmap pure) . applyBlock c s)
(\c !s !args -> fmap (fmap pure) . accumulate c s args)

{-| A variant of 'foldClient' with more detailed control over rollbacks.
-}
foldClient' ::
forall s w.
forall mode s w.
Monoid w =>
s -- ^ Initial state
-> LedgerStateArgs mode -- ^ Initial ledger state arguments
-> Env -- ^ Node connection data
-> (ChainPoint -> w -> s -> IO (w, s)) -- ^ Rollback
-> (CatchingUp -> s -> BlockInMode CardanoMode -> IO (Maybe (w, s))) -- ^ Fold
-> (CatchingUp -> s -> LedgerStateUpdate mode -> BlockInMode CardanoMode -> IO (Maybe (w, s))) -- ^ Fold
-> PipelinedLedgerStateClient
foldClient' initialState env applyRollback applyBlock = PipelinedLedgerStateClient $ CSP.ChainSyncClientPipelined $ do
foldClient' initialState ledgerStateArgs env applyRollback accumulate = PipelinedLedgerStateClient $ CSP.ChainSyncClientPipelined $ do

-- NB: The code below was adapted from https://input-output-hk.github.io/cardano-node/cardano-api/src/Cardano.Api.LedgerState.html#foldBlocks

let
pipelineSize = 10 -- TODO: Configurable

initialHistory = initialStateHistory (mempty, initialState)
initialHistory = initialStateHistory ledgerStateArgs (mempty, initialState)

clientIdle_RequestMoreN
:: forall n. WithOrigin BlockNo
-> WithOrigin BlockNo
-> Nat n -- Number of requests inflight.
-> History (w, s)
-> History mode (w, s)
-> CSP.ClientPipelinedStIdle n ClientBlock ChainPoint ChainTip IO ()
clientIdle_RequestMoreN clientTip_ serverTip_ n history
= case pipelineDecisionMax pipelineSize n clientTip_ serverTip_ of
Expand All @@ -154,42 +184,67 @@ foldClient' initialState env applyRollback applyBlock = PipelinedLedgerStateClie
_ -> CSP.SendMsgRequestNextPipelined (clientIdle_RequestMoreN clientTip_ serverTip_ (Succ n) history)

clientNextN
:: Nat n
-> History (w, s)
:: forall n. Nat n
-> History mode (w, s)
-> ClientStNext n ClientBlock ChainPoint ChainTip IO ()
clientNextN n history =
ClientStNext {
recvMsgRollForward = \newBlock serverChainTip -> do
let BlockInMode (Block bh@(BlockHeader slotNo _blockHash currBlockNo) _) _ = newBlock
newClientTip = At currBlockNo
recvMsgRollForward = \newBlock@(BlockInMode bim@(Block bh@(BlockHeader slotNo _blockHash currBlockNo) _) _era ) serverChainTip -> do
let newClientTip = At currBlockNo
newServerTip = fromChainTip serverChainTip
cu = if newClientTip == newServerTip
then caughtUpWithNode serverChainTip
else catchingUpWithNode (blockHeaderPoint bh) (Just currBlockNo) (Just $ chainTipToChainPoint serverChainTip)
currentState =

update :: LedgerStateUpdate mode -> s -> IO (Maybe (LedgerStateUpdate mode, (w, s)))
update NoLedgerStateUpdate currentState = do
state <- accumulate
cu
currentState
NoLedgerStateUpdate
newBlock
return $ state <&> (,) NoLedgerStateUpdate
update (LedgerStateUpdate currentLedgerState _) currentState = do
let
LedgerStateArgs _ validationMode = ledgerStateArgs
newLedgerStateE = applyBlock env currentLedgerState validationMode bim

case newLedgerStateE of
Left _ -> return Nothing
Right (newLedgerState, newLedgerEvents) -> do
let ledgerStateUpdate = LedgerStateUpdate newLedgerState newLedgerEvents
state <- accumulate
cu
currentState
ledgerStateUpdate
newBlock
return $ state <&> (,) ledgerStateUpdate

(currentLedgerStateUpdate, currentState') =
case Seq.viewl history of
(_, (_, x)) Seq.:< _ -> x
Seq.EmptyL -> error "foldClient: clientNextN: Impossible - empty history!"
(_, ledgerState, (_, s)) Seq.:< _ -> (ledgerState, s)
Seq.EmptyL -> error "foldClient: clientNextN: Impossible - empty history!"

newState <- applyBlock cu currentState newBlock
newState <- update currentLedgerStateUpdate currentState'
case newState of
Nothing -> do
clientIdle_DoneN n
Just !s' -> do
let (newHistory, _) = pushHistoryState env history slotNo s'
Just (!ledgerStateUpdate, !s') -> do
let (newHistory, _) = pushHistoryState env history slotNo ledgerStateUpdate s'
return (clientIdle_RequestMoreN newClientTip newServerTip n newHistory)

, recvMsgRollBackward = \chainPoint serverChainTip -> do
let newClientTip = Origin
newServerTip = fromChainTip serverChainTip
(rolledBack, truncatedHistory) = case chainPoint of
ChainPointAtGenesis -> (Seq.empty, initialHistory)
ChainPoint slotNo _ -> rollbackStateHistory history slotNo
(lastSlotNo, currentState) =
(lastSlotNo, lastLedgerState, currentState) =
case Seq.viewl truncatedHistory of
(n', (_, x)) Seq.:< _ -> (n', x)
(n', state, (_, x)) Seq.:< _ -> (n', state, x)
Seq.EmptyL -> error "foldClient: clientNextN: Impossible - empty history after rollback!"
!rolledBackState <- applyRollback chainPoint (foldMap (fst . snd) rolledBack) currentState
let (newHistory, _) = pushHistoryState env truncatedHistory lastSlotNo rolledBackState
!rolledBackState <- applyRollback chainPoint (foldMap (\(_, _, (s, _)) -> s) rolledBack) currentState
let (newHistory, _) = pushHistoryState env truncatedHistory lastSlotNo lastLedgerState rolledBackState
return (clientIdle_RequestMoreN newClientTip newServerTip n newHistory)
}

Expand All @@ -213,30 +268,29 @@ foldClient' initialState env applyRollback applyBlock = PipelinedLedgerStateClie

return (clientIdle_RequestMoreN Origin Origin Zero initialHistory)

-- | A history of the last @k@ states
type History a = Seq (SlotNo, a)

-- | Add a new state to the history
pushHistoryState ::
pushHistoryState :: forall mode a.
Env -- ^ Environement used to get the security param, k.
-> History a -- ^ History of k items.
-> History mode a -- ^ History of k items.
-> SlotNo -- ^ Slot number of the new item.
-> LedgerStateUpdate mode
-> a -- ^ New item to add to the history
-> (History a, History a)
-> (History mode a, History mode a)
-- ^ ( The new history with the new item appended
-- , Any exisiting items that are now past the security parameter
-- and hence can no longer be rolled back.
-- )

pushHistoryState env hist ix st
pushHistoryState env hist ix ledgerStateUpdate st
= Seq.splitAt
(fromIntegral $ envSecurityParam env + 1)
((ix, st) Seq.:<| hist)
((ix, ledgerStateUpdate, st) Seq.:<| hist)

initialStateHistory :: forall mode a. LedgerStateArgs mode -> a -> History mode a
initialStateHistory (LedgerStateArgs ledgerState0 _) a = Seq.singleton (0, LedgerStateUpdate ledgerState0 [], a)
initialStateHistory NoLedgerStateArgs a = Seq.singleton (0, NoLedgerStateUpdate, a)


-- | Split the history into bits that have been rolled back (1st elemnt) and
-- bits that have not been rolled back (2nd element)
rollbackStateHistory :: History a -> SlotNo -> (History a, History a)
rollbackStateHistory hist maxInc = Seq.spanl ((> maxInc) . (\(x,_) -> x)) hist

initialStateHistory :: a -> History a
initialStateHistory a = Seq.singleton (0, a)
rollbackStateHistory :: forall mode a. History mode a -> SlotNo -> (History mode a, History mode a)
rollbackStateHistory hist maxInc = Seq.spanl ((> maxInc) . (\(x,_,_) -> x)) hist
7 changes: 4 additions & 3 deletions src/node-client/lib/Convex/NodeClient/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import Cardano.Api (BlockInMo
LocalNodeClientProtocols (..),
LocalNodeClientProtocolsInMode,
LocalNodeConnectInfo (..),
connectToLocalNode)
connectToLocalNode, LedgerState, initialLedgerState)
import Cardano.Slotting.Slot (WithOrigin (At, Origin))
import Control.Monad.IO.Class (MonadIO (..))
import Control.Monad.Trans.Class (lift)
Expand All @@ -44,11 +44,12 @@ newtype PipelinedLedgerStateClient =
runNodeClient ::
FilePath -- ^ Path to the cardano-node config file (e.g. <path to cardano-node project>/configuration/cardano/mainnet-config.json)
-> FilePath -- ^ Path to local cardano-node socket. This is the path specified by the @--socket-path@ command line option when running the node.
-> (LocalNodeConnectInfo CardanoMode -> Env -> IO PipelinedLedgerStateClient) -- ^ Client
-> (LocalNodeConnectInfo CardanoMode -> LedgerState -> Env -> IO PipelinedLedgerStateClient) -- ^ Client
-> ExceptT InitialLedgerStateError IO () -- ^ Final state
runNodeClient nodeConfigFilePath socketPath client = do
(connectInfo, env) <- loadConnectInfo nodeConfigFilePath socketPath
c <- liftIO (client connectInfo env)
ledgerState0 <- snd <$> initialLedgerState nodeConfigFilePath
c <- liftIO (client connectInfo ledgerState0 env)
lift $ connectToLocalNode connectInfo (protocols c)

protocols :: PipelinedLedgerStateClient -> LocalNodeClientProtocolsInMode CardanoMode
Expand Down
32 changes: 21 additions & 11 deletions src/node-client/lib/Convex/NodeClient/WaitForTxnClient.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE UndecidableInstances #-}
{-| A node client that waits for a transaction to appear on the chain
Expand All @@ -12,7 +13,7 @@ module Convex.NodeClient.WaitForTxnClient(

import Cardano.Api (BlockInMode, CardanoMode,
ChainPoint, Env,
LocalNodeConnectInfo, TxId)
LocalNodeConnectInfo, TxId, LedgerState)
import qualified Cardano.Api as C
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (TMVar, atomically, newEmptyTMVar,
Expand All @@ -23,7 +24,7 @@ import Control.Monad.Reader (MonadTrans, ReaderT (..), ask,
lift)
import Convex.Class (MonadBlockchain (..))
import Convex.MonadLog (MonadLog (..), logInfoS)
import Convex.NodeClient.Fold (CatchingUp (..), foldClient)
import Convex.NodeClient.Fold (CatchingUp (..), foldClient, LedgerStateArgs (..), LedgerStateUpdate, LedgerStateMode (..))
import Convex.NodeClient.Resuming (resumingClient)
import Convex.NodeClient.Types (PipelinedLedgerStateClient,
protocols)
Expand All @@ -32,7 +33,11 @@ import qualified Convex.NodeQueries as NodeQueries
{-| Start a 'waitForTxnClient' in a separate thread. Returns a TMVar that will contain the block that has the given
transaction.
-}
runWaitForTxn :: LocalNodeConnectInfo CardanoMode -> Env -> TxId -> IO (TMVar (BlockInMode CardanoMode))
runWaitForTxn ::
LocalNodeConnectInfo CardanoMode
-> Env
-> TxId
-> IO (TMVar (BlockInMode CardanoMode))
runWaitForTxn connectInfo env txi = do
tip' <- NodeQueries.queryTip connectInfo
tmv <- atomically newEmptyTMVar
Expand All @@ -41,13 +46,18 @@ runWaitForTxn connectInfo env txi = do

{-| Scan the new blocks until the transaction appears
-}
waitForTxnClient :: TMVar (BlockInMode CardanoMode) -> ChainPoint -> TxId -> Env -> PipelinedLedgerStateClient
waitForTxnClient ::
TMVar (BlockInMode CardanoMode)
-> ChainPoint
-> TxId
-> Env
-> PipelinedLedgerStateClient
waitForTxnClient tmv cp txId env =
resumingClient [cp] $ \_ ->
foldClient () env (applyBlock tmv txId)
foldClient () NoLedgerStateArgs env (applyBlock tmv txId)

applyBlock :: TMVar (BlockInMode CardanoMode) -> TxId -> CatchingUp -> () -> BlockInMode CardanoMode -> IO (Maybe ())
applyBlock tmv txi _ () block = do
applyBlock :: TMVar (BlockInMode CardanoMode) -> TxId -> CatchingUp -> () -> LedgerStateUpdate 'NoLedgerState -> BlockInMode CardanoMode -> IO (Maybe ())
applyBlock tmv txi _ () _ block = do
case block of
C.BlockInMode blck C.BabbageEraInCardanoMode ->
if checkTxIds txi blck
Expand All @@ -63,11 +73,11 @@ checkTxIds txi ((C.Block _ txns)) = any (checkTxId txi) txns
checkTxId :: TxId -> C.Tx C.BabbageEra -> Bool
checkTxId txi tx = txi == C.getTxId (C.getTxBody tx)

newtype MonadBlockchainWaitingT m a = MonadBlockchainWaitingT{unMonadBlockchainWaitingT :: ReaderT (LocalNodeConnectInfo CardanoMode, Env) m a }
newtype MonadBlockchainWaitingT m a = MonadBlockchainWaitingT{unMonadBlockchainWaitingT :: ReaderT (LocalNodeConnectInfo CardanoMode, LedgerState, Env) m a }
deriving newtype (Functor, Applicative, Monad, MonadIO, MonadFail)

runMonadBlockchainWaitingT :: LocalNodeConnectInfo CardanoMode -> Env -> MonadBlockchainWaitingT m a -> m a
runMonadBlockchainWaitingT connectInfo env (MonadBlockchainWaitingT action) = runReaderT action (connectInfo, env)
runMonadBlockchainWaitingT :: LocalNodeConnectInfo CardanoMode -> LedgerState -> Env -> MonadBlockchainWaitingT m a -> m a
runMonadBlockchainWaitingT connectInfo initialLedgerState env (MonadBlockchainWaitingT action) = runReaderT action (connectInfo, initialLedgerState, env)

instance MonadError e m => MonadError e (MonadBlockchainWaitingT m) where
throwError = lift . throwError
Expand All @@ -84,7 +94,7 @@ instance (MonadLog m) => MonadLog (MonadBlockchainWaitingT m) where
instance (MonadIO m, MonadBlockchain m, MonadLog m) => MonadBlockchain (MonadBlockchainWaitingT m) where
sendTx tx = MonadBlockchainWaitingT $ do
let txi = C.getTxId (C.getTxBody tx)
(info, env) <- ask
(info, _ledgerState0, env) <- ask
tmv <- liftIO (runWaitForTxn info env txi)
k <- sendTx tx
logInfoS $ "MonadBlockchainWaitingT.sendTx: Waiting for " <> show txi <> " to appear on the chain"
Expand Down
2 changes: 1 addition & 1 deletion src/wallet/lib/Convex/Wallet/Cli.hs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ runWallet logEnv port Config{cardanoNodeConfigFile, cardanoNodeSocket, walletFil
e <- liftIO (NC.balanceClientEnv walletFile initialState)
logInfoS $ "Starting wallet server on port " <> show port
_ <- liftIO $ forkIO (API.startServer (NC.bceState e) port)
let client _ env = do
let client _ _ env = do
pure (NC.balanceClient logEnv "wallet" e initialState (operatorPaymentCredential op) env)
result <- liftIO $ runExceptT (runNodeClient cardanoNodeConfigFile cardanoNodeSocket client)
case result of
Expand Down
Loading
Loading