Skip to content

Commit 3adef8f

Browse files
committed
add workerQueue
1 parent 442e776 commit 3adef8f

File tree

6 files changed

+65
-24
lines changed

6 files changed

+65
-24
lines changed

ghcide/session-loader/Development/IDE/Session.hs

+3-3
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ import Data.Void
9494

9595
import Control.Concurrent.STM.Stats (atomically, modifyTVar',
9696
readTVar, writeTVar)
97-
import Control.Concurrent.STM.TQueue
9897
import Control.DeepSeq
9998
import Control.Exception (evaluate)
10099
import Control.Monad.IO.Unlift (MonadUnliftIO)
@@ -105,7 +104,8 @@ import Data.HashSet (HashSet)
105104
import qualified Data.HashSet as Set
106105
import Database.SQLite.Simple
107106
import Development.IDE.Core.Tracing (withTrace)
108-
import Development.IDE.Core.WorkerThread (awaitRunInThread,
107+
import Development.IDE.Core.WorkerThread (WorkerQueue,
108+
awaitRunInThread,
109109
withWorkerQueue)
110110
import Development.IDE.Session.Diagnostics (renderCradleError)
111111
import Development.IDE.Types.Shake (WithHieDb,
@@ -438,7 +438,7 @@ getHieDbLoc dir = do
438438
-- components mapping to the same hie.yaml file are mapped to the same
439439
-- HscEnv which is updated as new components are discovered.
440440

441-
loadSessionWithOptions :: Recorder (WithPriority Log) -> SessionLoadingOptions -> FilePath -> TQueue (IO ()) -> IO (Action IdeGhcSession)
441+
loadSessionWithOptions :: Recorder (WithPriority Log) -> SessionLoadingOptions -> FilePath -> WorkerQueue (IO ()) -> IO (Action IdeGhcSession)
442442
loadSessionWithOptions recorder SessionLoadingOptions{..} rootDir que = do
443443
let toAbsolutePath = toAbsolute rootDir -- see Note [Root Directory]
444444
cradle_files <- newIORef []

ghcide/src/Development/IDE/Core/Compile.hs

+2-1
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ import GHC.Driver.Config.CoreToStg.Prep
129129
#if MIN_VERSION_ghc(9,7,0)
130130
import Data.Foldable (toList)
131131
import GHC.Unit.Module.Warnings
132+
import Development.IDE.Core.WorkerThread (writeWorkerQueue)
132133
#else
133134
import Development.IDE.Core.FileStore (shareFilePath)
134135
#endif
@@ -899,7 +900,7 @@ indexHieFile se mod_summary srcPath !hash hf = do
899900
-- hiedb doesn't use the Haskell src, so we clear it to avoid unnecessarily keeping it around
900901
let !hf' = hf{hie_hs_src = mempty}
901902
modifyTVar' indexPending $ HashMap.insert srcPath hash
902-
writeTQueue indexQueue $ \withHieDb -> do
903+
writeWorkerQueue indexQueue $ \withHieDb -> do
903904
-- We are now in the worker thread
904905
-- Check if a newer index of this file has been scheduled, and if so skip this one
905906
newerScheduled <- atomically $ do

ghcide/src/Development/IDE/Core/FileStore.hs

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ module Development.IDE.Core.FileStore(
2222
) where
2323

2424
import Control.Concurrent.STM.Stats (STM, atomically)
25-
import Control.Concurrent.STM.TQueue (writeTQueue)
2625
import Control.Exception
2726
import Control.Monad.Extra
2827
import Control.Monad.IO.Class
@@ -40,6 +39,7 @@ import Development.IDE.Core.IdeConfiguration (isWorkspaceFile)
4039
import Development.IDE.Core.RuleTypes
4140
import Development.IDE.Core.Shake hiding (Log)
4241
import qualified Development.IDE.Core.Shake as Shake
42+
import Development.IDE.Core.WorkerThread (writeWorkerQueue)
4343
import Development.IDE.GHC.Orphans ()
4444
import Development.IDE.Graph
4545
import Development.IDE.Import.DependencyInformation
@@ -247,7 +247,7 @@ typecheckParentsAction recorder nfp = do
247247
setSomethingModified :: VFSModified -> IdeState -> String -> IO [Key] -> IO ()
248248
setSomethingModified vfs state reason actionBetweenSession = do
249249
-- Update database to remove any files that might have been renamed/deleted
250-
atomically $ writeTQueue (indexQueue $ hiedbWriter $ shakeExtras state) (\withHieDb -> withHieDb deleteMissingRealFiles)
250+
atomically $ writeWorkerQueue (indexQueue $ hiedbWriter $ shakeExtras state) (\withHieDb -> withHieDb deleteMissingRealFiles)
251251
void $ restartShakeSession (shakeExtras state) vfs reason [] actionBetweenSession
252252

253253
registerFileWatches :: [String] -> LSP.LspT Config IO Bool

ghcide/src/Development/IDE/Core/Shake.hs

+5-5
Original file line numberDiff line numberDiff line change
@@ -262,12 +262,12 @@ data HieDbWriter
262262
-- | Actions to queue up on the index worker thread
263263
-- The inner `(HieDb -> IO ()) -> IO ()` wraps `HieDb -> IO ()`
264264
-- with (currently) retry functionality
265-
type IndexQueue = TQueue (((HieDb -> IO ()) -> IO ()) -> IO ())
265+
type IndexQueue = WorkerQueue (((HieDb -> IO ()) -> IO ()) -> IO ())
266266

267267
data ThreadQueue = ThreadQueue {
268268
tIndexQueue :: IndexQueue
269-
, tRestartQueue :: TQueue (IO ())
270-
, tLoaderQueue :: TQueue (IO ())
269+
, tRestartQueue :: WorkerQueue (IO ())
270+
, tLoaderQueue :: WorkerQueue (IO ())
271271
}
272272

273273
-- Note [Semantic Tokens Cache Location]
@@ -342,9 +342,9 @@ data ShakeExtras = ShakeExtras
342342
-- ^ Default HLS config, only relevant if the client does not provide any Config
343343
, dirtyKeys :: TVar KeySet
344344
-- ^ Set of dirty rule keys since the last Shake run
345-
, restartQueue :: TQueue (IO ())
345+
, restartQueue :: WorkerQueue (IO ())
346346
-- ^ Queue of restart actions to be run.
347-
, loaderQueue :: TQueue (IO ())
347+
, loaderQueue :: WorkerQueue (IO ())
348348
-- ^ Queue of loader actions to be run.
349349
}
350350

Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
1+
{-
2+
Module : Development.IDE.Core.WorkerThread
3+
Author : @soulomoon
4+
5+
Description : This module provides an API for managing worker threads in the IDE.
6+
see Note [Serializing runs in separate thread]
7+
-}
18
module Development.IDE.Core.WorkerThread
2-
(withWorkerQueue, awaitRunInThread)
9+
(withWorkerQueue, awaitRunInThread, withWorkerQueueOfOne, WorkerQueue, writeWorkerQueue)
310
where
411

512
import Control.Concurrent.Async
613
import Control.Concurrent.STM
714
import Control.Concurrent.Strict (newBarrier, signalBarrier,
815
waitBarrier)
16+
import Control.Exception (finally)
917
import Control.Monad (forever)
1018
import Control.Monad.Cont (ContT (ContT))
19+
import Control.Monad.IO.Class (liftIO)
1120

1221
{-
1322
Note [Serializing runs in separate thread]
@@ -18,31 +27,61 @@ Like the db writes, session loading in session loader, shake session restarts.
1827
1928
Originally we used various ways to implement this, but it was hard to maintain and error prone.
2029
Moreover, we can not stop these threads uniformly when we are shutting down the server.
21-
22-
`Development.IDE.Core.WorkerThread` module provides a simple api to implement this easily.
2330
-}
2431

25-
-- | 'withWorkerQueue' creates a new 'TQueue', and launches a worker
32+
data WorkerQueue a = WorkerQueueOfOne (TMVar a) | WorkerQueueOfMany (TQueue a)
33+
34+
writeWorkerQueue :: WorkerQueue a -> a -> STM ()
35+
writeWorkerQueue (WorkerQueueOfOne tvar) action = putTMVar tvar action
36+
writeWorkerQueue (WorkerQueueOfMany tqueue) action = writeTQueue tqueue action
37+
38+
newWorkerQueue :: STM (WorkerQueue a)
39+
newWorkerQueue = WorkerQueueOfMany <$> newTQueue
40+
41+
newWorkerQueueOfOne :: STM (WorkerQueue a)
42+
newWorkerQueueOfOne = WorkerQueueOfOne <$> newEmptyTMVar
43+
44+
45+
-- | 'withWorkerQueue' creates a new 'WorkerQueue', and launches a worker
2646
-- thread which polls the queue for requests and runs the given worker
2747
-- function on them.
28-
withWorkerQueue :: (t -> IO a) -> ContT () IO (TQueue t)
29-
withWorkerQueue workerAction = ContT $ \mainAction -> do
30-
q <- newTQueueIO
48+
withWorkerQueue :: (t -> IO a) -> ContT () IO (WorkerQueue t)
49+
withWorkerQueue workerAction = do
50+
q <- liftIO $ atomically newWorkerQueue
51+
runWorkerQueue q workerAction
52+
53+
-- | 'withWorkerQueueOfOne' creates a new 'WorkerQueue' that only allows one action to be queued at a time.
54+
-- and one action can only be queued after the previous action has been done.
55+
-- this is useful when we want to cancel the action waiting in the queue, if it's thread is cancelled.
56+
-- e.g. session loading in session loader. When a shake session is restarted, we want to cancel the previous pending session loading.
57+
withWorkerQueueOfOne :: (t -> IO a) -> ContT () IO (WorkerQueue t)
58+
withWorkerQueueOfOne workerAction = do
59+
q <- liftIO $ atomically newWorkerQueueOfOne
60+
runWorkerQueue q workerAction
61+
62+
runWorkerQueue :: WorkerQueue t -> (t -> IO a) -> ContT () IO (WorkerQueue t)
63+
runWorkerQueue q workerAction = ContT $ \mainAction -> do
3164
withAsync (writerThread q) $ \_ -> mainAction q
3265
where
3366
writerThread q =
3467
forever $ do
35-
l <- atomically $ readTQueue q
36-
workerAction l
68+
case q of
69+
-- only remove the action from the queue after it has been run if it is a one-shot queue
70+
WorkerQueueOfOne tvar -> do
71+
l <- atomically $ readTMVar tvar
72+
workerAction l `finally` atomically (takeTMVar tvar)
73+
WorkerQueueOfMany q -> do
74+
l <- atomically $ readTQueue q
75+
workerAction l
3776

3877
-- | 'awaitRunInThread' queues up an 'IO' action to be run by a worker thread,
3978
-- and then blocks until the result is computed.
40-
awaitRunInThread :: TQueue (IO ()) -> IO result -> IO result
79+
awaitRunInThread :: WorkerQueue (IO ()) -> IO result -> IO result
4180
awaitRunInThread q act = do
4281
-- Take an action from TQueue, run it and
4382
-- use barrier to wait for the result
4483
barrier <- newBarrier
45-
atomically $ writeTQueue q $ do
84+
atomically $ writeWorkerQueue q $ do
4685
res <- act
4786
signalBarrier barrier res
4887
waitBarrier barrier

ghcide/src/Development/IDE/LSP/LanguageServer.hs

+3-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ import Control.Monad.Trans.Cont (evalContT)
3939
import Development.IDE.Core.IdeConfiguration
4040
import Development.IDE.Core.Shake hiding (Log)
4141
import Development.IDE.Core.Tracing
42-
import Development.IDE.Core.WorkerThread (withWorkerQueue)
42+
import Development.IDE.Core.WorkerThread (withWorkerQueue,
43+
withWorkerQueueOfOne)
4344
import qualified Development.IDE.Session as Session
4445
import Development.IDE.Types.Shake (WithHieDb,
4546
WithHieDbShield (..))
@@ -261,7 +262,7 @@ handleInit recorder defaultRoot getHieDbLoc getIdeState lifetime exitClientMsg c
261262
runWithWorkerThreads :: Recorder (WithPriority Session.Log) -> FilePath -> (WithHieDb -> ThreadQueue -> IO ()) -> IO ()
262263
runWithWorkerThreads recorder dbLoc f = evalContT $ do
263264
sessionRestartTQueue <- withWorkerQueue id
264-
sessionLoaderTQueue <- withWorkerQueue id
265+
sessionLoaderTQueue <- withWorkerQueueOfOne id
265266
(WithHieDbShield hiedb, threadQueue) <- runWithDb recorder dbLoc
266267
liftIO $ f hiedb (ThreadQueue threadQueue sessionRestartTQueue sessionLoaderTQueue)
267268

0 commit comments

Comments
 (0)