|
| 1 | +{-# LANGUAGE FlexibleContexts #-} |
| 2 | +{-# LANGUAGE OverloadedStrings #-} |
| 3 | + |
| 4 | +module Cardano.DbSync.Threads.Stake where |
| 5 | + |
| 6 | +import Cardano.BM.Trace (logInfo) |
| 7 | +import qualified Cardano.Db as DB |
| 8 | +import Cardano.DbSync.Api |
| 9 | +import Cardano.DbSync.Api.Types |
| 10 | +import Cardano.DbSync.Util |
| 11 | +import Control.Concurrent.Class.MonadSTM.Strict |
| 12 | +import qualified Control.Concurrent.STM.TBQueue as TBQ |
| 13 | +import Control.Monad |
| 14 | +import Control.Monad.IO.Class (liftIO) |
| 15 | +import Database.Persist.Postgresql (IsolationLevel (..), runSqlConnWithIsolation, withPostgresqlConn) |
| 16 | +import Cardano.DbSync.Cache.Stake |
| 17 | +import Cardano.DbSync.Cache.Types |
| 18 | + |
| 19 | +runStakeThread :: SyncEnv -> IO () |
| 20 | +runStakeThread syncEnv = do |
| 21 | + logInfo trce "Running Event thread" |
| 22 | + logException trce "runStakeThread: " (runStakeLoop syncEnv) |
| 23 | + logInfo trce "Shutting Event thread" |
| 24 | + where |
| 25 | + trce = getTrace syncEnv |
| 26 | + |
| 27 | +runStakeLoop :: SyncEnv -> IO () |
| 28 | +runStakeLoop syncEnv = |
| 29 | + DB.runIohkLogging trce $ |
| 30 | + withPostgresqlConn (envConnectionString syncEnv) actionDB |
| 31 | + where |
| 32 | + actionDB backend = runSqlConnWithIsolation (forever loopAction) backend Serializable |
| 33 | + |
| 34 | + loopAction = do |
| 35 | + action <- liftIO $ atomically $ TBQ.readTBQueue (scPriorityQueue stakeChan) |
| 36 | + case action of |
| 37 | + QueryInsertStake rewardAcc ca resVar -> do |
| 38 | + stakeId <- resolveInsertRewardAccount syncEnv ca rewardAcc |
| 39 | + liftIO $ atomically $ writeTMVar resVar stakeId |
| 40 | + CacheStake _ _ _ -> pure () |
| 41 | + BulkPrefetch _ -> pure () |
| 42 | + CommitStake -> DB.transactionCommit |
| 43 | + |
| 44 | + stakeChan = envStakeChans syncEnv |
| 45 | + trce = getTrace syncEnv |
0 commit comments