Skip to content

Commit 898c1d1

Browse files
committed
Commit secondsry transactions before the main one
1 parent 3373fee commit 898c1d1

File tree

6 files changed

+42
-17
lines changed

6 files changed

+42
-17
lines changed

cardano-db-sync/src/Cardano/DbSync/Api.hs

+22-7
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ module Cardano.DbSync.Api (
2929
withDBSyncConnections,
3030
withScriptConnection,
3131
withDatumConnection,
32+
commitAll,
3233
mkSyncEnvFromConfig,
3334
getInsertOptions,
3435
getTrace,
@@ -44,7 +45,7 @@ import qualified Cardano.Chain.Genesis as Byron
4445
import Cardano.Crypto.ProtocolMagic (ProtocolMagicId (..))
4546
import qualified Cardano.Db as DB
4647
import Cardano.DbSync.Api.Types
47-
import Cardano.DbSync.Cache.Types (CacheCapacity (..), newEmptyCache, newMAChannels, newStakeChannels, useNoCache)
48+
import Cardano.DbSync.Cache.Types
4849
import Cardano.DbSync.Config.Cardano
4950
import Cardano.DbSync.Config.Shelley
5051
import Cardano.DbSync.Config.Types
@@ -61,14 +62,15 @@ import qualified Cardano.Ledger.Shelley.Genesis as Shelley
6162
import Cardano.Prelude
6263
import Cardano.Slotting.Slot (EpochNo (..))
6364
import Control.Concurrent.Class.MonadSTM.Strict (
65+
newEmptyTMVarIO,
6466
newTBQueueIO,
6567
newTVarIO,
6668
readTVar,
6769
readTVarIO,
70+
takeTMVar,
6871
writeTVar,
6972
)
70-
import qualified Control.Concurrent.Class.MonadSTM.Strict.TBQueue as TBQ
71-
import Control.Concurrent.MVar
73+
import qualified Control.Concurrent.STM.TBQueue as TBQ
7274
import Control.Monad.Logger (LoggingT, MonadLoggerIO)
7375
import Control.Monad.Trans.Resource (MonadUnliftIO)
7476
import qualified Data.Strict.Maybe as Strict
@@ -247,10 +249,11 @@ hasLedgerState syncEnv =
247249
NoLedger _ -> False
248250

249251
writePrefetch :: SyncEnv -> CardanoBlock -> IO ()
250-
writePrefetch syncEnv cblock = do
251-
atomically $
252-
TBQ.writeTBQueue (pTxInQueue $ envPrefetch syncEnv) $
253-
PrefetchTxIdBlock cblock
252+
writePrefetch _syncEnv _cblock = pure ()
253+
254+
-- atomically $
255+
-- TBQ.writeTBQueue (pTxInQueue $ envPrefetch syncEnv) $
256+
-- PrefetchTxIdBlock cblock
254257

255258
mkSyncEnv ::
256259
Trace IO Text ->
@@ -381,6 +384,18 @@ withGivenConnection toConn syncEnv action = do
381384
where
382385
connVar = toConn $ envBackends syncEnv
383386

387+
commitAll :: SyncEnv -> IO ()
388+
commitAll syncEnv = do
389+
maRet <- newEmptyTMVarIO
390+
stakeRet <- newEmptyTMVarIO
391+
-- queue actions are async here, so we let them run, while blocking on sync actions.
392+
atomically $ TBQ.writeTBQueue (macPriorityQueue $ envMAChans syncEnv) $ CommitMA maRet
393+
atomically $ TBQ.writeTBQueue (scPriorityQueue $ envStakeChans syncEnv) $ CommitStake stakeRet
394+
withScriptConnection syncEnv DB.transactionCommit
395+
withDatumConnection syncEnv DB.transactionCommit
396+
atomically $ takeTMVar maRet
397+
atomically $ takeTMVar stakeRet
398+
384399
mkSyncEnvFromConfig ::
385400
Trace IO Text ->
386401
DbConnections ->

cardano-db-sync/src/Cardano/DbSync/Block.hs

+10-4
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ import Control.Monad.Logger (LoggingT)
4545
import qualified Data.ByteString.Short as SBS
4646
import qualified Data.Strict.Maybe as Strict
4747
import Database.Persist.Sql
48-
import Database.Persist.SqlBackend.Internal
4948
import Ouroboros.Consensus.Byron.Ledger (ByronBlock (..))
5049
import Ouroboros.Consensus.Cardano.Block (HardForkBlock (..))
5150
import qualified Ouroboros.Consensus.HardFork.Combinator as Consensus
@@ -110,8 +109,12 @@ applyAndInsertBlocks syncEnv firstAfterRollback = go
110109
let flagList = firstAfterRollback : replicate (length rest) False
111110
let zippedArgs = zip (DB.BlockKey <$> [newBlockId ..]) flagList
112111
let (byronBlocks, blocks) = takeWhileByron $ zip zippedArgs (blk : rest)
113-
DB.runDbIohkLoggingExceptT backend tracer $ mapM_ (applyAndInsertByronBlock syncEnv) byronBlocks
114-
DB.runDbIohkLoggingExceptT backend tracer $ mapM_ (applyAndInsertBlock syncEnv) blocks -- we can use this split to parallelise even further within
112+
DB.runDbIohkLoggingExceptT backend tracer $ do
113+
mapM_ (applyAndInsertByronBlock syncEnv) byronBlocks
114+
liftIO $ commitAll syncEnv
115+
DB.runDbIohkLoggingExceptT backend tracer $ do
116+
mapM_ (applyAndInsertBlock syncEnv) blocks -- we can use this split to parallelise even further within
117+
liftIO $ commitAll syncEnv -- make sure all secondary transactions are commited before the main one.
115118
backend = envBackend syncEnv
116119
tracer = getTrace syncEnv
117120

@@ -239,6 +242,7 @@ insertBlockRest syncEnv blkNo applyResult tookSnapshot = do
239242
commited <-
240243
if withinTwoMin || tookSnapshot
241244
then do
245+
liftIO $ commitAll syncEnv
242246
lift DB.transactionCommit
243247
pure True
244248
else pure False
@@ -247,7 +251,9 @@ insertBlockRest syncEnv blkNo applyResult tookSnapshot = do
247251
ranIndexes <- liftIO $ getRanIndexes syncEnv
248252
lift $ addConstraintsIfNotExist syncEnv tracer
249253
unless ranIndexes $ do
250-
lift $ unless commited DB.transactionCommit
254+
lift $ unless commited $ do
255+
liftIO $ commitAll syncEnv
256+
DB.transactionCommit
251257
liftIO $ runIndexMigrations syncEnv
252258

253259
withinTwoMin = isSyncedWithinSeconds details 120 == SyncFollowing

cardano-db-sync/src/Cardano/DbSync/Cache.hs

+1-1
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ queryOrInsertSyncMultiAsset syncEnv policy aName = do
316316
Right maId -> pure maId
317317
Left _ -> liftIO $ do
318318
resultVar <- newEmptyTMVarIO
319-
atomically $ TBQ.writeTBQueue (macPriorityQueue $ maChan) $ QueryInsertMA policy aName resultVar
319+
atomically $ TBQ.writeTBQueue (macPriorityQueue maChan) $ QueryInsertMA policy aName resultVar
320320
atomically $ takeTMVar resultVar
321321
where
322322
maChan = envMAChans syncEnv

cardano-db-sync/src/Cardano/DbSync/Cache/Types.hs

+2-2
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ data StakeDBAction
272272
= QueryInsertStake RewAccount CacheAction (StrictTMVar IO DB.StakeAddressId)
273273
| CacheStake RewAccount DB.StakeAddressId Bool
274274
| BulkPrefetchStake CardanoBlock
275-
| CommitStake
275+
| CommitStake (StrictTMVar IO ())
276276

277277
data StakeChannels = StakeChannels
278278
{ scPriorityQueue :: TBQueue StakeDBAction
@@ -293,7 +293,7 @@ data MADBAction
293293
= QueryInsertMA (PolicyID StandardCrypto) AssetName (StrictTMVar IO DB.MultiAssetId)
294294
| CacheMA (PolicyID StandardCrypto) AssetName DB.MultiAssetId
295295
| BulkPrefetchMA CardanoBlock
296-
| CommitMA
296+
| CommitMA (StrictTMVar IO ())
297297

298298
data MAChannels = MAChannels
299299
{ macPriorityQueue :: TBQueue MADBAction

cardano-db-sync/src/Cardano/DbSync/Threads/MultiAsset.hs

+3-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ runMALoop syncEnv =
3939
liftIO $ atomically $ writeTMVar resVar stakeId
4040
CacheMA {} -> pure ()
4141
BulkPrefetchMA _ -> pure ()
42-
CommitMA -> DB.transactionCommit
42+
CommitMA retVar -> do
43+
DB.transactionCommit
44+
liftIO $ atomically $ writeTMVar retVar ()
4345

4446
maChan = envMAChans syncEnv
4547
trce = getTrace syncEnv

cardano-db-sync/src/Cardano/DbSync/Threads/Stake.hs

+4-2
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,11 @@ runStakeLoop syncEnv =
3737
QueryInsertStake rewardAcc ca resVar -> do
3838
stakeId <- resolveInsertRewardAccount syncEnv ca rewardAcc
3939
liftIO $ atomically $ writeTMVar resVar stakeId
40-
CacheStake _ _ _ -> pure ()
40+
CacheStake {} -> pure ()
4141
BulkPrefetchStake _ -> pure ()
42-
CommitStake -> DB.transactionCommit
42+
CommitStake retVar -> do
43+
DB.transactionCommit
44+
liftIO $ atomically $ writeTMVar retVar ()
4345

4446
stakeChan = envStakeChans syncEnv
4547
trce = getTrace syncEnv

0 commit comments

Comments
 (0)