Skip to content

Commit d6bc3b3

Browse files
committed
concurrent-merge: try using two threads
1 parent e5e59fe commit d6bc3b3

File tree

1 file changed

+49
-15
lines changed

1 file changed

+49
-15
lines changed

concurrent-merge/Main.hs

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,66 @@
11
#! /usr/bin/env nix-shell
22
#! nix-shell -i runhaskell
3-
#! nix-shell -p "haskellPackages.ghcWithPackages (p: [ p.base ])"
3+
#! nix-shell -p "haskellPackages.ghcWithPackages (p: [ p.base p.containers ])"
44

55
module Main where
66

7-
import Control.Concurrent.MVar (MVar, newEmptyMVar, readMVar, takeMVar, putMVar)
7+
import Control.Concurrent.MVar
88
import Control.Concurrent (forkIO)
99
import Data.Maybe (isJust, fromJust)
10-
import Data.List (minimumBy)
11-
import Data.Ord (comparing)
10+
import Data.List (minimumBy, maximumBy)
11+
import Data.Ord (Ordering, comparing)
12+
import qualified Data.Sequence as Seq
1213

13-
input1 = [ 1, 1, 1, 2 ]
14+
input1 = [ 1, 1, 1, 5 ]
1415
input2 = [ 2, 3, 4, 4 ]
1516

16-
worker :: [Int] -> MVar (Maybe Int) -> IO ()
17-
worker [] slot = putMVar slot Nothing
18-
worker (x:xs) slot = putMVar slot (Just x) >> worker xs slot
17+
frontWorker :: MVar (Seq.Seq Int) -> MVar (Maybe Int) -> IO ()
18+
frontWorker queueVar slot = do
19+
queue <- takeMVar queueVar
20+
case queue of
21+
Seq.Empty -> do
22+
putMVar queueVar Seq.Empty
23+
putMVar slot Nothing
24+
h Seq.:<| t -> do
25+
putMVar queueVar t
26+
putMVar slot (Just h)
27+
frontWorker queueVar slot
1928

20-
merge :: [MVar (Maybe Int)] -> [Int] -> IO [Int]
21-
merge slots output = traverse readMVar slots >>= \values ->
29+
backWorker :: MVar (Seq.Seq Int) -> MVar (Maybe Int) -> IO ()
30+
backWorker queueVar slot = do
31+
queue <- takeMVar queueVar
32+
case queue of
33+
Seq.Empty -> do
34+
putMVar queueVar Seq.Empty
35+
putMVar slot Nothing
36+
i Seq.:|> l -> do
37+
putMVar queueVar i
38+
putMVar slot (Just l)
39+
backWorker queueVar slot
40+
41+
type Pair = (Int, Maybe Int)
42+
43+
merge
44+
:: ((Pair -> Pair -> Ordering)
45+
-> [Pair] -> Pair)
46+
-> [MVar (Maybe Int)]
47+
-> [Int]
48+
-> IO [Int]
49+
merge cmp slots output = traverse readMVar slots >>= \values ->
2250
case filter (isJust . snd) $ zip [0..] values of
2351
[] -> return output
2452
ls -> let
25-
(ix, Just el) = minimumBy (comparing (fromJust . snd)) ls
26-
in takeMVar (slots !! ix) >> merge slots (output ++ [el])
53+
(ix, Just el) = cmp (comparing (fromJust . snd)) ls
54+
in takeMVar (slots !! ix) >> merge cmp slots (output ++ [el])
2755

2856
main :: IO ()
2957
main = do
30-
slots <- sequenceA $ replicate 2 newEmptyMVar
31-
traverse (forkIO . uncurry worker) $ zip [input1, input2] slots
32-
print =<< merge slots []
58+
frontSlots <- sequenceA $ replicate 2 newEmptyMVar
59+
backSlots <- sequenceA $ replicate 2 newEmptyMVar
60+
i1 <- newMVar $ Seq.fromList input1
61+
i2 <- newMVar $ Seq.fromList input2
62+
traverse (forkIO . uncurry frontWorker) $ zip [i1, i2] frontSlots
63+
traverse (forkIO . uncurry backWorker) $ zip [i1, i2] backSlots
64+
l1 <- merge minimumBy frontSlots []
65+
l2 <- merge maximumBy backSlots []
66+
print $ l1 ++ (reverse l2)

0 commit comments

Comments
 (0)