Skip to content

Commit 064aaf1

Browse files
committed
Add Resolve Inputs thread
1 parent ed8a586 commit 064aaf1

File tree

11 files changed

+236
-69
lines changed

11 files changed

+236
-69
lines changed

cardano-db-sync/cardano-db-sync.cabal

+1
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ library
135135
Cardano.DbSync.Threads.Database
136136
Cardano.DbSync.Threads.EpochStake
137137
Cardano.DbSync.Threads.Stake
138+
Cardano.DbSync.Threads.TxInResolve
138139
Cardano.DbSync.Tracing.ToObjectOrphans
139140
Cardano.DbSync.Types
140141

cardano-db-sync/src/Cardano/DbSync/Api.hs

+10
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ module Cardano.DbSync.Api (
3232
getTopLevelConfig,
3333
getNetwork,
3434
hasLedgerState,
35+
writePrefetch,
3536
generateNewEpochEvents,
3637
) where
3738

@@ -63,6 +64,7 @@ import Control.Concurrent.Class.MonadSTM.Strict (
6364
readTVarIO,
6465
writeTVar,
6566
)
67+
import qualified Control.Concurrent.Class.MonadSTM.Strict.TBQueue as TBQ
6668
import qualified Data.Strict.Maybe as Strict
6769
import Data.Time.Clock (getCurrentTime)
6870
import Database.Persist.Postgresql (ConnectionString)
@@ -231,6 +233,12 @@ hasLedgerState syncEnv =
231233
HasLedger _ -> True
232234
NoLedger _ -> False
233235

236+
writePrefetch :: SyncEnv -> CardanoBlock -> IO ()
237+
writePrefetch syncEnv cblock = do
238+
atomically $
239+
TBQ.writeTBQueue (pTxInQueue $ envPrefetch syncEnv) $
240+
PrefetchTxIdBlock cblock
241+
234242
mkSyncEnv ::
235243
Trace IO Text ->
236244
SqlBackend ->
@@ -258,6 +266,7 @@ mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemS
258266
, cacheCapacityTx = 100000
259267
}
260268
else pure useNoCache
269+
prefetch <- newPrefetch
261270
consistentLevelVar <- newTVarIO Unchecked
262271
indexesVar <- newTVarIO $ enpForceIndexes syncNP
263272
bts <- getBootstrapInProgress trce (isTxOutConsumedBootstrap' syncNodeConfigFromFile) backend
@@ -295,6 +304,7 @@ mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemS
295304
{ envBackend = backend
296305
, envBootstrap = bootstrapVar
297306
, envCache = cache
307+
, envPrefetch = prefetch
298308
, envConnectionString = connectionString
299309
, envConsistentLevel = consistentLevelVar
300310
, envDbConstraints = dbCNamesVar

cardano-db-sync/src/Cardano/DbSync/Api/Types.hs

+22-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
{-# LANGUAGE GADTs #-}
22
{-# LANGUAGE ScopedTypeVariables #-}
3-
{-# LANGUAGE NoImplicitPrelude #-}
43

54
module Cardano.DbSync.Api.Types (
65
SyncEnv (..),
@@ -10,34 +9,39 @@ module Cardano.DbSync.Api.Types (
109
RunMigration,
1110
ConsistentLevel (..),
1211
CurrentEpochNo (..),
12+
Prefetch (..),
13+
PrefetchTxId (..),
14+
newPrefetch,
1315
) where
1416

1517
import qualified Cardano.Db as DB
1618
import Cardano.DbSync.Cache.Types (CacheStatus, StakeChannels)
1719
import Cardano.DbSync.Config.Types (SyncNodeConfig)
20+
import qualified Cardano.DbSync.Era.Shelley.Generic.Tx.Types as Generic
1821
import Cardano.DbSync.Ledger.Types (HasLedgerEnv)
1922
import Cardano.DbSync.LocalStateQuery (NoLedgerEnv)
2023
import Cardano.DbSync.Types (
24+
CardanoBlock,
2125
OffChainPoolResult,
2226
OffChainPoolWorkQueue,
2327
OffChainVoteResult,
2428
OffChainVoteWorkQueue,
2529
)
26-
import Cardano.Prelude (Bool, Eq, IO, Show, Word64)
2730
import Cardano.Slotting.Slot (EpochNo (..))
28-
import Control.Concurrent.Class.MonadSTM.Strict (
29-
StrictTVar,
30-
)
31+
import Control.Concurrent.Class.MonadSTM.Strict (StrictTVar, newTBQueueIO, newTVarIO)
3132
import Control.Concurrent.Class.MonadSTM.Strict.TBQueue (StrictTBQueue)
33+
import Data.Map (Map)
3234
import qualified Data.Strict.Maybe as Strict
3335
import Data.Time.Clock (UTCTime)
36+
import Data.Word (Word64)
3437
import Database.Persist.Postgresql (ConnectionString)
3538
import Database.Persist.Sql (SqlBackend)
3639
import Ouroboros.Consensus.BlockchainTime.WallClock.Types (SystemStart (..))
3740
import Ouroboros.Network.Magic (NetworkMagic (..))
3841

3942
data SyncEnv = SyncEnv
4043
{ envBackend :: !SqlBackend
44+
, envPrefetch :: !Prefetch
4145
, envCache :: !CacheStatus
4246
, envConnectionString :: !ConnectionString
4347
, envConsistentLevel :: !(StrictTVar IO ConsistentLevel)
@@ -101,3 +105,16 @@ data ConsistentLevel = Consistent | DBAheadOfLedger | Unchecked
101105
newtype CurrentEpochNo = CurrentEpochNo
102106
{ cenEpochNo :: Strict.Maybe EpochNo
103107
}
108+
109+
data PrefetchTxId = PrefetchTxIdBlock CardanoBlock | PrefetchTxIdBlocks [CardanoBlock]
110+
111+
data Prefetch = Prefetch
112+
{ pTxInQueue :: StrictTBQueue IO PrefetchTxId
113+
, pTxIn :: StrictTVar IO (Map Generic.TxInKey (Maybe (DB.TxId, Either Generic.TxInKey DB.TxOutIdW, Maybe DB.DbLovelace)))
114+
}
115+
116+
newPrefetch :: IO Prefetch
117+
newPrefetch =
118+
Prefetch
119+
<$> newTBQueueIO 1000
120+
<*> newTVarIO mempty

cardano-db-sync/src/Cardano/DbSync/Block.hs

+2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ applyAndInsertBlockMaybe ::
6464
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
6565
applyAndInsertBlockMaybe syncEnv tracer cblk = do
6666
bl <- liftIO $ isConsistent syncEnv
67+
when bl $ liftIO $ writePrefetch syncEnv cblk
6768
(!applyRes, !tookSnapshot) <- liftIO (mkApplyResult bl)
6869
if bl
6970
then -- In the usual case it will be consistent so we don't need to do any queries. Just insert the block
@@ -83,6 +84,7 @@ applyAndInsertBlockMaybe syncEnv tracer cblk = do
8384
]
8485
rollbackFromBlockNo syncEnv (blockNo cblk)
8586
liftIO $ setConsistentLevel syncEnv Consistent
87+
when bl $ liftIO $ writePrefetch syncEnv cblk
8688
insertBlock syncEnv cblk applyRes True tookSnapshot
8789
Right blockId | Just (adaPots, slotNo, epochNo) <- getAdaPots applyRes -> do
8890
replaced <- lift $ DB.replaceAdaPots blockId $ mkAdaPots blockId slotNo epochNo adaPots

cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Generic/Tx/Shelley.hs

+6-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ module Cardano.DbSync.Era.Shelley.Generic.Tx.Shelley (
1212
getTxSize,
1313
mkTxIn,
1414
fromTxIn,
15+
toTxInKey,
1516
mkTxOut,
1617
mkTxWithdrawals,
1718
mkTxWithdrawal,
@@ -123,12 +124,14 @@ mkTxOut txBody = zipWith fromTxOut [0 ..] $ toList (txBody ^. Core.outputsTxBody
123124
, txOutDatum = NoDatum -- Shelley does not support plutus data
124125
}
125126

127+
toTxInKey :: Ledger.TxIn StandardCrypto -> TxInKey
128+
toTxInKey (Ledger.TxIn txId (TxIx w64)) = TxInKey txId w64
129+
126130
fromTxIn :: Ledger.TxIn StandardCrypto -> TxIn
127-
fromTxIn (Ledger.TxIn (Ledger.TxId txid) (TxIx w64)) =
131+
fromTxIn txIn =
128132
TxIn
129-
{ txInIndex = w64
133+
{ txInKey = toTxInKey txIn
130134
, txInRedeemerIndex = Nothing
131-
, txInTxId = Ledger.TxId txid
132135
}
133136

134137
txHashId :: (EraCrypto era ~ StandardCrypto, Core.EraTx era) => Core.Tx era -> ByteString

cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Generic/Tx/Types.hs

+9-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ module Cardano.DbSync.Era.Shelley.Generic.Tx.Types (
1212
TxCertificate (..),
1313
TxWithdrawal (..),
1414
TxIn (..),
15+
TxInKey (..),
1516
TxOut (..),
1617
TxRedeemer (..),
1718
TxScript (..),
@@ -97,9 +98,14 @@ data TxWithdrawal = TxWithdrawal
9798
, txwAmount :: !Coin
9899
}
99100

101+
data TxInKey = TxInKey
102+
{ txInTxId :: !(Ledger.TxId StandardCrypto)
103+
, txInIndex :: !Word64
104+
}
105+
deriving (Eq, Show, Ord)
106+
100107
data TxIn = TxIn
101-
{ txInIndex :: !Word64
102-
, txInTxId :: !(Ledger.TxId StandardCrypto)
108+
{ txInKey :: !TxInKey
103109
, txInRedeemerIndex :: !(Maybe Word64) -- This only has a meaning for Alonzo.
104110
}
105111
deriving (Show)
@@ -174,7 +180,7 @@ getMaybeDatumHash (Just hsh) = DatumHash hsh
174180
sumTxOutCoin :: [TxOut] -> Coin
175181
sumTxOutCoin = Coin . sum . map (unCoin . txOutAdaValue)
176182

177-
toTxHash :: TxIn -> ByteString
183+
toTxHash :: TxInKey -> ByteString
178184
toTxHash = unTxHash . txInTxId
179185

180186
class AlonzoEraTxBody era => DBScriptPurpose era where

cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Query.hs

+3-8
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
module Cardano.DbSync.Era.Shelley.Query (
77
resolveStakeAddress,
88
resolveInputTxOutId,
9-
resolveInputValue,
109
resolveInputTxOutIdValue,
1110
queryResolveInputCredentials,
1211
) where
@@ -26,18 +25,14 @@ import Database.Esqueleto.Experimental (
2625
resolveStakeAddress :: MonadIO m => ByteString -> ReaderT SqlBackend m (Either LookupFail StakeAddressId)
2726
resolveStakeAddress addr = queryStakeAddress addr renderByteArray
2827

29-
resolveInputTxOutId :: MonadIO m => SyncEnv -> Generic.TxIn -> ReaderT SqlBackend m (Either LookupFail (TxId, TxOutIdW))
28+
resolveInputTxOutId :: MonadIO m => SyncEnv -> Generic.TxInKey -> ReaderT SqlBackend m (Either LookupFail (TxId, TxOutIdW))
3029
resolveInputTxOutId syncEnv txIn =
3130
queryTxOutId (getTxOutTableType syncEnv) (Generic.toTxHash txIn, fromIntegral (Generic.txInIndex txIn))
3231

33-
resolveInputValue :: MonadIO m => SyncEnv -> Generic.TxIn -> ReaderT SqlBackend m (Either LookupFail (TxId, DbLovelace))
34-
resolveInputValue syncEnv txIn =
35-
queryTxOutValue (getTxOutTableType syncEnv) (Generic.toTxHash txIn, fromIntegral (Generic.txInIndex txIn))
36-
37-
resolveInputTxOutIdValue :: MonadIO m => SyncEnv -> Generic.TxIn -> ReaderT SqlBackend m (Either LookupFail (TxId, TxOutIdW, DbLovelace))
32+
resolveInputTxOutIdValue :: MonadIO m => SyncEnv -> Generic.TxInKey -> ReaderT SqlBackend m (Either LookupFail (TxId, TxOutIdW, DbLovelace))
3833
resolveInputTxOutIdValue syncEnv txIn =
3934
queryTxOutIdValue (getTxOutTableType syncEnv) (Generic.toTxHash txIn, fromIntegral (Generic.txInIndex txIn))
4035

41-
queryResolveInputCredentials :: MonadIO m => SyncEnv -> Generic.TxIn -> ReaderT SqlBackend m (Either LookupFail (Maybe ByteString, Bool))
36+
queryResolveInputCredentials :: MonadIO m => SyncEnv -> Generic.TxInKey -> ReaderT SqlBackend m (Either LookupFail (Maybe ByteString, Bool))
4237
queryResolveInputCredentials syncEnv txIn = do
4338
queryTxOutCredentials (getTxOutTableType syncEnv) (Generic.toTxHash txIn, fromIntegral (Generic.txInIndex txIn))

0 commit comments

Comments
 (0)