Skip to content

Commit a6f18a4

Browse files
committed
Add rewards thread
1 parent c38499b commit a6f18a4

File tree

9 files changed

+185
-23
lines changed

9 files changed

+185
-23
lines changed

cardano-db-sync/cardano-db-sync.cabal

+1
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ library
135135
Cardano.DbSync.Threads.Database
136136
Cardano.DbSync.Threads.EpochStake
137137
Cardano.DbSync.Threads.Ledger
138+
Cardano.DbSync.Threads.Rewards
138139
Cardano.DbSync.Threads.Stake
139140
Cardano.DbSync.Threads.TxInResolve
140141
Cardano.DbSync.Tracing.ToObjectOrphans

cardano-db-sync/src/Cardano/DbSync/Block.hs

-1
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ applyAndInsertBlock ::
136136
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
137137
applyAndInsertBlock syncEnv ((blockId, firstAfterRollback), cblock) = do
138138
applyRessultVar <- liftIO (asyncApplyResult syncEnv cblock)
139-
-- insertNewEpochLedgerEvents syncEnv (sdEpochNo (apSlotDetails applyResult)) (apEvents applyResult)
140139
whenGeneric $ \blk ->
141140
prepareInsertBlock syncEnv (blockId, blk) applyRessultVar firstAfterRollback
142141
where

cardano-db-sync/src/Cardano/DbSync/Config.hs

-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ module Cardano.DbSync.Config (
1515
SyncProtocol (..),
1616
SyncNodeConfig (..),
1717
SyncNodeParams (..),
18-
cardanoLedgerConfig,
1918
genesisProtocolMagicId,
2019
readCardanoGenesisConfig,
2120
readSyncNodeConfig,

cardano-db-sync/src/Cardano/DbSync/Config/Cardano.hs

-5
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
module Cardano.DbSync.Config.Cardano (
99
GenesisConfig (..),
10-
cardanoLedgerConfig,
1110
genesisProtocolMagicId,
1211
mkTopLevelConfig,
1312
mkProtocolInfoCardano,
@@ -35,7 +34,6 @@ import Ouroboros.Consensus.Cardano (Nonce (..), ProtVer (ProtVer))
3534
import qualified Ouroboros.Consensus.Cardano as Consensus
3635
import Ouroboros.Consensus.Cardano.Node
3736
import Ouroboros.Consensus.Config (TopLevelConfig (..), emptyCheckpointsMap)
38-
import Ouroboros.Consensus.Ledger.Basics (LedgerConfig)
3937
import Ouroboros.Consensus.Node.ProtocolInfo (ProtocolInfo)
4038
import qualified Ouroboros.Consensus.Node.ProtocolInfo as Consensus
4139
import Ouroboros.Consensus.Shelley.Eras (StandardCrypto)
@@ -73,9 +71,6 @@ readCardanoGenesisConfig enc =
7371

7472
-- -------------------------------------------------------------------------------------------------
7573

76-
cardanoLedgerConfig :: GenesisConfig -> LedgerConfig CardanoBlock
77-
cardanoLedgerConfig = topLevelConfigLedger . mkTopLevelConfig
78-
7974
mkTopLevelConfig :: GenesisConfig -> TopLevelConfig CardanoBlock
8075
mkTopLevelConfig cfg = Consensus.pInfoConfig $ fst $ mkProtocolInfoCardano cfg []
8176

cardano-db-sync/src/Cardano/DbSync/Ledger/Async.hs

+90-6
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,25 @@
22

33
module Cardano.DbSync.Ledger.Async where
44

5+
import Cardano.DbSync.Era.Shelley.Generic.Rewards as Generic
6+
import Cardano.DbSync.Ledger.Event
57
import Cardano.DbSync.Ledger.Types
6-
import Cardano.Ledger.BaseTypes (EpochNo)
8+
import Cardano.DbSync.Types
9+
import Cardano.Ledger.BaseTypes
710
import Cardano.Ledger.Crypto (StandardCrypto)
811
import qualified Cardano.Ledger.EpochBoundary as Ledger
12+
import qualified Cardano.Ledger.Rewards as Ledger
13+
import Cardano.Ledger.Shelley.RewardUpdate as Ledger
914
import Control.Concurrent.Class.MonadSTM.Strict
1015
import qualified Control.Concurrent.STM.TBQueue as TBQ
16+
import Control.Monad.Extra (whenJust)
17+
import Data.Map (Map)
18+
import Data.Set (Set)
19+
import Data.Word (Word64)
20+
21+
--------------------------------------------------------------------------------
22+
-- EpochStake
23+
--------------------------------------------------------------------------------
1124

1225
newEpochStakeChannels :: IO EpochStakeChannels
1326
newEpochStakeChannels =
@@ -18,9 +31,9 @@ newEpochStakeChannels =
1831
<*> newTVarIO Nothing
1932

2033
-- To be used by the main thread
21-
ensureEpochDone :: EpochStakeChannels -> EpochNo -> Ledger.SnapShot StandardCrypto -> IO ()
22-
ensureEpochDone sQueue epoch snapshot = atomically $ do
23-
mLastEpochDone <- waitFinished sQueue
34+
ensureStakeDone :: EpochStakeChannels -> EpochNo -> Ledger.SnapShot StandardCrypto -> IO ()
35+
ensureStakeDone sQueue epoch snapshot = atomically $ do
36+
mLastEpochDone <- waitStakeFinished sQueue
2437
case mLastEpochDone of
2538
Just lastEpochDone | lastEpochDone == epoch -> pure ()
2639
_ -> do
@@ -29,8 +42,8 @@ ensureEpochDone sQueue epoch snapshot = atomically $ do
2942
retry
3043

3144
-- To be used by the main thread
32-
waitFinished :: EpochStakeChannels -> STM IO (Maybe EpochNo)
33-
waitFinished sQueue = do
45+
waitStakeFinished :: EpochStakeChannels -> STM IO (Maybe EpochNo)
46+
waitStakeFinished sQueue = do
3447
stakeThreadState <- readTVar (epochResult sQueue)
3548
case stakeThreadState of
3649
Just (lastEpoch, Done) -> pure $ Just lastEpoch -- Normal case
@@ -42,3 +55,74 @@ writeEpochStakeAction :: EpochStakeChannels -> EpochNo -> Ledger.SnapShot Standa
4255
writeEpochStakeAction sQueue epoch snapShot checkFirst = do
4356
TBQ.writeTBQueue (estakeQueue sQueue) $ EpochStakeDBAction epoch snapShot checkFirst
4457
writeTVar (epochResult sQueue) $ Just (epoch, Running)
58+
59+
--------------------------------------------------------------------------------
60+
-- Rewards
61+
--------------------------------------------------------------------------------
62+
63+
newRewardsChannels :: IO RewardsChannels
64+
newRewardsChannels =
65+
RewardsChannels
66+
<$> TBQ.newTBQueueIO 5
67+
<*> newTVarIO Nothing
68+
69+
-- TODO: add a boolean flag that shows the start of the epoch, so that 'isNewEpoch' is more reliable
70+
asyncWriteRewards :: HasLedgerEnv -> CardanoLedgerState -> EpochNo -> Bool -> [LedgerEvent] -> IO ()
71+
asyncWriteRewards env newState currentEpochNo isNewEpoch rewardEventsEB = do
72+
rewState <- atomically $ readTVar $ rewardsResult rc
73+
if isNewEpoch
74+
then do
75+
case rewState of
76+
Just (e', RewRunning) | e' == currentEpochNo -> do
77+
waitRewardUntil rc (e', RewDone)
78+
_ -> do
79+
ensureRewardsDone rc currentEpochNo (findTotal rewardEventsEB)
80+
waitEBRewardsAction rc currentEpochNo rewardEventsEB
81+
else do
82+
case rewState of {}
83+
whenJust (Generic.getRewardsUpdate (getTopLevelconfigHasLedger env) (clsState newState)) $ \ru -> do
84+
atomically $ writeRewardsAction rc currentEpochNo currentEpochNo False (Ledger.rs ru) -- (e-1) (e+1)
85+
where
86+
rc = leRewardsChans env
87+
88+
_subFromCurrentEpoch :: Word64 -> EpochNo
89+
_subFromCurrentEpoch m =
90+
if unEpochNo currentEpochNo >= m
91+
then EpochNo $ unEpochNo currentEpochNo - m
92+
else EpochNo 0
93+
94+
findTotal :: [LedgerEvent] -> Maybe (Map StakeCred (Set (Ledger.Reward StandardCrypto)))
95+
findTotal [] = Nothing
96+
findTotal (LedgerTotalRewards _ mp : _) = Just mp
97+
findTotal (_ : rest) = findTotal rest
98+
99+
-- To be used by the main thread
100+
ensureRewardsDone :: RewardsChannels -> EpochNo -> Maybe (Map StakeCred (Set (Ledger.Reward StandardCrypto))) -> IO ()
101+
ensureRewardsDone rc epoch mmp = do
102+
whenJust mmp $ \mp -> do
103+
atomically $ writeRewardsAction rc epoch epoch True mp -- e-2 e-1
104+
waitRewardUntil rc (epoch, RewDone)
105+
106+
waitEBRewardsAction :: RewardsChannels -> EpochNo -> [LedgerEvent] -> IO ()
107+
waitEBRewardsAction rc epoch les = do
108+
atomically $ do
109+
TBQ.writeTBQueue (rQueue rc) $ RewardsEpochBoundary epoch les
110+
writeTVar (rewardsResult rc) $ Just (epoch, RewEBRunning)
111+
waitRewardUntil rc (epoch, RewEBDone)
112+
113+
-- To be used by the main thread
114+
writeRewardsAction :: RewardsChannels -> EpochNo -> EpochNo -> Bool -> Map StakeCred (Set (Ledger.Reward StandardCrypto)) -> STM IO ()
115+
writeRewardsAction rc epoch epoch' checkFirst mp = do
116+
TBQ.writeTBQueue (rQueue rc) $ RewardsDBAction epoch epoch' mp checkFirst
117+
writeTVar (rewardsResult rc) $ Just (epoch, RewRunning)
118+
119+
waitRewardUntil :: RewardsChannels -> (EpochNo, EpochRewardState) -> IO ()
120+
waitRewardUntil rc st = waitRewardUntilPred rc (== st)
121+
122+
-- blocks until the reward result satisfies a specific predicate.
123+
waitRewardUntilPred :: RewardsChannels -> ((EpochNo, EpochRewardState) -> Bool) -> IO ()
124+
waitRewardUntilPred rc prd = atomically $ do
125+
rewardsThreadState <- readTVar (rewardsResult rc)
126+
case rewardsThreadState of
127+
Just st | prd st -> pure ()
128+
_ -> retry

cardano-db-sync/src/Cardano/DbSync/Ledger/Event.hs

+12
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ module Cardano.DbSync.Ledger.Event (
1919
convertPoolRewards,
2020
ledgerEventName,
2121
splitDeposits,
22+
splitRewardsEpochBoundary,
2223
) where
2324

2425
import Cardano.Db hiding (AdaPots, EpochNo, SyncState, TreasuryWithdrawals, epochNo)
@@ -465,3 +466,14 @@ splitDeposits les =
465466
case le of
466467
LedgerDeposits hsh coin -> Left (txHashFromSafe hsh, coin)
467468
_ -> Right le
469+
470+
splitRewardsEpochBoundary :: [LedgerEvent] -> ([LedgerEvent], [LedgerEvent])
471+
splitRewardsEpochBoundary les =
472+
partitionEithers $ eitherRewEB <$> les
473+
where
474+
eitherRewEB :: LedgerEvent -> Either LedgerEvent LedgerEvent
475+
eitherRewEB le = case le of
476+
LedgerDeltaRewards {} -> Left le
477+
LedgerRestrainedRewards {} -> Left le
478+
LedgerTotalRewards {} -> Left le
479+
_ -> Right le

cardano-db-sync/src/Cardano/DbSync/Ledger/State.hs

+6-10
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ module Cardano.DbSync.Ledger.State (
3232
hashToAnnotation,
3333
getHeaderHash,
3434
runLedgerStateWriteThread,
35-
getSliceMeta,
3635
findProposedCommittee,
3736
) where
3837

@@ -176,6 +175,7 @@ mkHasLedgerEnv trce protoInfo dir nw systemStart syncOptions = do
176175
intervar <- newTVarIO Strict.Nothing
177176
swQueue <- newTBQueueIO 5 -- Should be relatively shallow.
178177
stakeChans <- newEpochStakeChannels
178+
rewardsChans <- newRewardsChannels
179179
applyQueue <- newTBQueueIO 10
180180
pure
181181
HasLedgerEnv
@@ -194,6 +194,7 @@ mkHasLedgerEnv trce protoInfo dir nw systemStart syncOptions = do
194194
, leStateWriteQueue = swQueue
195195
, leApplyQueue = applyQueue
196196
, leEpochStakeChans = stakeChans
197+
, leRewardsChans = rewardsChans
197198
}
198199

199200
initCardanoLedgerState :: Consensus.ProtocolInfo CardanoBlock -> CardanoLedgerState
@@ -203,9 +204,6 @@ initCardanoLedgerState pInfo =
203204
, clsEpochBlockNo = GenesisEpochBlockNo
204205
}
205206

206-
getTopLevelconfigHasLedger :: HasLedgerEnv -> TopLevelConfig CardanoBlock
207-
getTopLevelconfigHasLedger = Consensus.pInfoConfig . leProtocolInfo
208-
209207
readCurrentStateUnsafe :: HasLedgerEnv -> IO (ExtLedgerState CardanoBlock)
210208
readCurrentStateUnsafe hle = atomically (clsState . ledgerDbCurrent <$> readStateUnsafe hle)
211209

@@ -234,13 +232,15 @@ applyBlock env blk = do
234232
let oldState = ledgerDbCurrent ledgerDB
235233
!result <- throwLeftIO $ tickThenReapplyCheckHash (ExtLedgerCfg (getTopLevelconfigHasLedger env)) blk (clsState oldState)
236234
let ledgerEventsFull = mapMaybe (convertAuxLedgerEvent (leHasRewards env)) (lrEvents result)
237-
let (ledgerEvents, deposits) = splitDeposits ledgerEventsFull
235+
let (ledgerEvents', deposits) = splitDeposits ledgerEventsFull
236+
let (rewardsEpochBoundary, ledgerEvents) = splitRewardsEpochBoundary ledgerEvents'
238237
let !newLedgerState = finaliseDrepDistr (lrResult result)
239238
!details <- atomically $ getSlotDetails env (ledgerState newLedgerState) time (cardanoBlockSlotNo blk)
240239
!newEpoch <- throwLeftIO $ mkOnNewEpoch (clsState oldState) newLedgerState (findAdaPots ledgerEvents)
241240
let !newEpochBlockNo = applyToEpochBlockNo (isJust $ blockIsEBB blk) (isJust newEpoch) (clsEpochBlockNo oldState)
242241
let !newState = CardanoLedgerState newLedgerState newEpochBlockNo
243242
asyncWriteStakeSnapShot env oldState newState
243+
asyncWriteRewards env newState (sdEpochNo details) (isJust newEpoch) rewardsEpochBoundary
244244
let !ledgerDB' = pushLedgerDB ledgerDB newState
245245
atomically $ writeTVar (leStateVar env) (Strict.Just ledgerDB')
246246
let !appResult =
@@ -321,13 +321,9 @@ asyncWriteStakeSnapShot env oldState newState =
321321
EpochBlockNo n
322322
| n == 0
323323
, Just (snapshot, epoch) <- Generic.getSnapShot (clsState oldState) -> do
324-
ensureEpochDone (leEpochStakeChans env) epoch snapshot
324+
ensureStakeDone (leEpochStakeChans env) epoch snapshot
325325
_ -> pure ()
326326

327-
getSliceMeta :: Generic.StakeSliceRes -> Maybe (Bool, EpochNo)
328-
getSliceMeta (Generic.Slice (Generic.StakeSlice epochNo _) isFinal) = Just (isFinal, epochNo)
329-
getSliceMeta _ = Nothing
330-
331327
storeSnapshotAndCleanupMaybe ::
332328
HasLedgerEnv ->
333329
CardanoLedgerState ->

cardano-db-sync/src/Cardano/DbSync/Ledger/Types.hs

+19
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import Cardano.DbSync.Types (
2121
CardanoPoint,
2222
PoolKeyHash,
2323
SlotDetails,
24+
StakeCred,
2425
)
2526
import Cardano.Ledger.Alonzo.Scripts (Prices)
2627
import qualified Cardano.Ledger.BaseTypes as Ledger
@@ -29,6 +30,7 @@ import Cardano.Ledger.Conway.Governance
2930
import Cardano.Ledger.Credential (Credential (..))
3031
import qualified Cardano.Ledger.EpochBoundary as Ledger
3132
import Cardano.Ledger.Keys (KeyRole (..))
33+
import qualified Cardano.Ledger.Rewards as Ledger
3234
import Cardano.Ledger.Shelley.LedgerState (NewEpochState ())
3335
import Cardano.Prelude hiding (atomically)
3436
import Cardano.Slotting.Slot (
@@ -48,6 +50,7 @@ import qualified Data.Strict.Maybe as Strict
4850
import Lens.Micro (Traversal')
4951
import Ouroboros.Consensus.BlockchainTime.WallClock.Types (SystemStart (..))
5052
import Ouroboros.Consensus.Cardano.Block hiding (CardanoBlock, CardanoLedgerState)
53+
import Ouroboros.Consensus.Config (TopLevelConfig (..))
5154
import Ouroboros.Consensus.HardFork.Combinator.Basics (LedgerState (..))
5255
import Ouroboros.Consensus.Ledger.Abstract (getTipSlot)
5356
import Ouroboros.Consensus.Ledger.Extended (ExtLedgerState (..))
@@ -76,6 +79,7 @@ data HasLedgerEnv = HasLedgerEnv
7679
, leStateWriteQueue :: !(TBQueue (FilePath, CardanoLedgerState))
7780
, leApplyQueue :: TBQueue LedgerAction
7881
, leEpochStakeChans :: EpochStakeChannels
82+
, leRewardsChans :: RewardsChannels
7983
}
8084

8185
data CardanoLedgerState = CardanoLedgerState
@@ -188,6 +192,9 @@ updatedCommittee membersToRemove membersToAdd newQuorum committee =
188192
newCommitteeMembers
189193
newQuorum
190194

195+
getTopLevelconfigHasLedger :: HasLedgerEnv -> TopLevelConfig CardanoBlock
196+
getTopLevelconfigHasLedger = Consensus.pInfoConfig . leProtocolInfo
197+
191198
newtype LedgerDB = LedgerDB
192199
{ ledgerDbCheckpoints :: AnchoredSeq (WithOrigin SlotNo) CardanoLedgerState CardanoLedgerState
193200
}
@@ -213,6 +220,18 @@ data EpochStakeChannels = EpochStakeChannels
213220
, epochResult :: StrictTVar IO (Maybe (EpochNo, EpochState))
214221
}
215222

223+
data EpochRewardState = RewRunning | RewDone | RewEBRunning | RewEBDone
224+
deriving (Eq, Ord, Show)
225+
226+
data RewardsDBAction
227+
= RewardsDBAction EpochNo EpochNo (Map StakeCred (Set (Ledger.Reward StandardCrypto))) Bool
228+
| RewardsEpochBoundary EpochNo [LedgerEvent]
229+
230+
data RewardsChannels = RewardsChannels
231+
{ rQueue :: TBQueue RewardsDBAction
232+
, rewardsResult :: StrictTVar IO (Maybe (EpochNo, EpochRewardState))
233+
}
234+
216235
-- | Per-era pure getters and setters on @NewEpochState@. Note this is a bit of an abuse
217236
-- of the cardano-ledger/ouroboros-consensus public APIs, because ledger state is not
218237
-- designed to be updated this way. We are only replaying the chain, so this should be
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE OverloadedStrings #-}
3+
4+
module Cardano.DbSync.Threads.Rewards where
5+
6+
import Cardano.BM.Trace (logInfo)
7+
import qualified Cardano.Db as DB
8+
import Cardano.DbSync.Api
9+
import Cardano.DbSync.Api.Types
10+
import Cardano.DbSync.Era.Shelley.Generic.Rewards
11+
import Cardano.DbSync.Era.Universal.Epoch
12+
import Cardano.DbSync.Era.Universal.Insert.LedgerEvent
13+
import Cardano.DbSync.Error
14+
import Cardano.DbSync.Ledger.Event
15+
import Cardano.DbSync.Ledger.Types
16+
import Cardano.DbSync.Util
17+
import Control.Concurrent.Class.MonadSTM.Strict
18+
import qualified Control.Concurrent.STM.TBQueue as TBQ
19+
import Control.Monad.IO.Class (liftIO)
20+
import Control.Monad.Trans.Except.Extra (runExceptT)
21+
import qualified Data.Map as Map
22+
import Database.Persist.Postgresql (IsolationLevel (..), runSqlConnWithIsolation, withPostgresqlConn)
23+
24+
runRewardsThread ::
25+
SyncEnv ->
26+
IO ()
27+
runRewardsThread syncEnv =
28+
case envLedgerEnv syncEnv of
29+
NoLedger _ -> pure ()
30+
HasLedger le -> do
31+
logInfo trce "Running Rewards thread"
32+
logException trce "runRewardsThread: " (runRewLoop syncEnv le)
33+
logInfo trce "Shutting Rewards thread"
34+
where
35+
trce = getTrace syncEnv
36+
37+
runRewLoop :: SyncEnv -> HasLedgerEnv -> IO ()
38+
runRewLoop syncEnv lenv =
39+
DB.runIohkLogging trce $
40+
withPostgresqlConn (envConnectionString syncEnv) loop
41+
where
42+
loop backend = do
43+
runOrThrowIO $ runSqlConnWithIsolation (runExceptT loopAction) backend Serializable
44+
loop backend
45+
46+
loopAction = do
47+
rAction <- liftIO $ atomically $ TBQ.readTBQueue (rQueue rc)
48+
case rAction of
49+
RewardsDBAction epoch epoch' mp _shouldCheck -> do
50+
insertRewards syncEnv epoch epoch' (Map.toList $ unRewards $ convertPoolRewards mp)
51+
liftIO $ atomically $ writeTVar (rewardsResult rc) $ Just (epoch', RewDone)
52+
RewardsEpochBoundary epoch events -> do
53+
insertNewEpochLedgerEvents syncEnv epoch events
54+
liftIO $ atomically $ writeTVar (rewardsResult rc) $ Just (epoch, RewEBDone)
55+
56+
rc = leRewardsChans lenv
57+
trce = getTrace syncEnv

0 commit comments

Comments
 (0)