Skip to content

Fix API Functionality for MongoDB >=6.0 #156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 107 additions & 65 deletions Database/MongoDB/Internal/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
#endif

module Database.MongoDB.Internal.Protocol (
-- * Global command arguments
FullCollection,
ReadPreferenceMode(..), setReadPreferenceMode,
-- * Pipe
Pipe, newPipe, newPipeWith, send, sendOpMsg, call, callOpMsg,
-- ** Notice
Expand Down Expand Up @@ -74,8 +76,7 @@ import qualified Database.MongoDB.Transport as Tr
#if MIN_VERSION_base(4,6,0)
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
putMVar, readMVar, mkWeakMVar, isEmptyMVar)
import GHC.List (foldl1')
import Conduit (repeatWhileMC, (.|), runConduit, foldlC)
import GHC.List (foldl', foldl1')
#else
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
putMVar, readMVar, addMVarFinalizer)
Expand Down Expand Up @@ -206,7 +207,7 @@ pcall p@Pipeline{..} message = do
liftIO $ atomically $ writeTChan responseQueue var
return $ readMVar var >>= either throwIO return -- return promise

pcallOpMsg :: Pipeline -> Maybe (Request, RequestId) -> Maybe FlagBit -> Document -> IO (IO Response)
pcallOpMsg :: Pipeline -> Maybe (Request, RequestId) -> Maybe FlagBit -> Document -> IO Response
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
pcallOpMsg p@Pipeline{..} message flagbit params = do
Expand All @@ -221,7 +222,7 @@ pcallOpMsg p@Pipeline{..} message flagbit params = do
-- put var into the response-queue so that it can
-- fetch the latest response
liftIO $ atomically $ writeTChan responseQueue var
return $ readMVar var >>= either throwIO return -- return promise
readMVar var >>= either throwIO return -- return promise

-- * Pipe

Expand Down Expand Up @@ -257,66 +258,65 @@ call pipe notices request = do
check requestId (responseTo, reply) = if requestId == responseTo then reply else
error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"

callOpMsg :: Pipe -> Request -> Maybe FlagBit -> Document -> IO (IO Reply)
callOpMsg :: Pipe -> Request -> Maybe FlagBit -> Document -> IO Reply
-- ^ Send requests as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
callOpMsg pipe request flagBit params = do
requestId <- genRequestId
promise <- pcallOpMsg pipe (Just (request, requestId)) flagBit params
promise' <- promise :: IO Response
return $ snd <$> produce requestId promise'
produce requestId promise
where
-- We need to perform streaming here as within the OP_MSG protocol mongoDB expects
-- our client to keep receiving messages after the MoreToCome flagbit was
-- set by the server until our client receives an empty flagbit. After the
-- first MoreToCome flagbit was set the responseTo field in the following
-- headers will reference the cursorId that was set in the previous message.
-- see:
-- https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#moretocome-on-responses
checkFlagBit p =
case p of
(_, r) ->
case r of
ReplyOpMsg{..} -> flagBits == [MoreToCome]
-- This is called by functions using the OP_MSG protocol,
-- so this has to be ReplyOpMsg
_ -> error "Impossible"
produce reqId p = runConduit $
case p of
(rt, r) ->
case r of
ReplyOpMsg{..} ->
if flagBits == [MoreToCome]
then yieldResponses .| foldlC mergeResponses p
else return $ (rt, check reqId p)
_ -> error "Impossible" -- see comment above
yieldResponses = repeatWhileMC
(do
var <- newEmptyMVar
liftIO $ atomically $ writeTChan (responseQueue pipe) var
readMVar var >>= either throwIO return :: IO Response
)
checkFlagBit
mergeResponses p@(rt,rep) p' =
case (p, p') of
((_, r), (_, r')) ->
case (r, r') of
(ReplyOpMsg _ sec _, ReplyOpMsg _ sec' _) -> do
let (section, section') = (head sec, head sec')
(cur, cur') = (maybe Nothing cast $ look "cursor" section,
maybe Nothing cast $ look "cursor" section')
case (cur, cur') of
(Just doc, Just doc') -> do
let (docs, docs') =
( fromJust $ cast $ valueAt "nextBatch" doc :: [Document]
, fromJust $ cast $ valueAt "nextBatch" doc' :: [Document])
id' = fromJust $ cast $ valueAt "id" doc' :: Int32
(rt, check id' (rt, rep{ sections = docs' ++ docs })) -- todo: avoid (++)
-- Since we use this to process moreToCome messages, we
-- know that there will be a nextBatch key in the document
_ -> error "Impossible"
_ -> error "Impossible" -- see comment above
check requestId (responseTo, reply) = if requestId == responseTo then reply else
error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
-- The OP_MSG protocol expects our client to receive multiple messages if the ExhaustAllowed
-- bit is set on the request. Continue to poll for messages until the MoreToCome flagbit is
-- no longer set.
-- https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.md#moretocome
produce reqId (rt, rep)
| rt /= reqId = error $ "expected response id (" ++ show rt ++ ") to match request id (" ++ show reqId ++ ")"
| checkFlagBit rep = foldr mergeResponses rep <$> generateResponseChain
| otherwise = return rep
generateResponseChain = loop id
where
loop f = do
x <- snd <$> yieldNextResponse
if checkFlagBit x
then loop (f . (x:))
else return (f [x])
yieldNextResponse = do
var <- newEmptyMVar
liftIO $ atomically $ writeTChan (responseQueue pipe) var
readMVar var >>= either throwIO return :: IO Response
checkFlagBit rep =
case rep of
ReplyOpMsg{..} -> flagBits == [MoreToCome]
-- This is called by functions using the OP_MSG protocol,
-- so this has to be ReplyOpMsg
_ -> error "Impossible"
mergeResponses rep' rep =
case (rep, rep') of
(ReplyOpMsg _ sec _, ReplyOpMsg _ sec' _) -> do
-- Look for the nested data in the GetMore responses
-- https://github.com/mongodb/specifications/blob/master/source/find_getmore_killcursors_commands/find_getmore_killcursors_commands.md#getmore
let (section, section') = (head sec, head sec')
(cur, cur') = ( cast =<< look "cursor" section
, cast =<< look "cursor" section'
)
case (cur, cur') of
(Just doc, Just doc') -> do
let (docs, docs') =
( fromJust $ cast $ valueAt "nextBatch" doc :: [Document]
, fromJust $ cast $ valueAt "nextBatch" doc' :: [Document]
)
rep { sections = docs' ++ docs } -- todo: avoid (++)
-- Extra case for the processed first batch
(Nothing, Just doc') -> do
let (docs, docs') =
( sec -- already processed
, fromJust $ cast $ valueAt "nextBatch" doc' :: [Document]
)
rep { sections = docs' ++ docs } -- todo: avoid (++)
-- Since we use this to process moreToCome messages, we
-- know that there will be a nextBatch key in the document
_ -> error "Impossible"
_ -> error "Impossible" -- see comment above

-- * Message

Expand Down Expand Up @@ -570,21 +570,23 @@ putOpMsg cmd requestId flagBit params = do
Query{..} -> do
let n = T.splitOn "." qFullCollection
db = head n
sec0 = foldl1' merge [qProjector, [ "$db" =: db ], qSelector]
sec0 = foldl1' merge [qReadPreference, optionsToDoc qOptions, qProjector, [ "$db" =: db ], qSelector]
putInt32 biT
putInt8 0
putDocument sec0
GetMore{..} -> do
let n = T.splitOn "." gFullCollection
(db, coll) = (head n, last n)
pre = ["getMore" =: gCursorId, "collection" =: coll, "$db" =: db, "batchSize" =: gBatchSize]
pre = merge
gReadPreference
["getMore" =: gCursorId, "collection" =: coll, "$db" =: db, "batchSize" =: gBatchSize]
putInt32 (bit $ bitOpMsg $ ExhaustAllowed)
putInt8 0
putDocument pre
Message{..} -> do
putInt32 biT
putInt8 0
putDocument $ merge [ "$db" =: mDatabase ] mParams
putDocument $ foldl1' merge [mReadPreference, [ "$db" =: mDatabase ], mParams]
Kc k -> case k of
KillC{..} -> do
let n = T.splitOn "." kFullCollection
Expand Down Expand Up @@ -649,21 +651,26 @@ bitOpMsg ExhaustAllowed = 16
-- ** Request

-- | A request is a message that is sent with a 'Reply' expected in return
-- Read preference is a global argument required for requests to secondaries using the OP_MSG protocol.
-- https://github.com/mongodb/specifications/blob/ffa75b41736f669c754dff9c0a0d60bb70eb319f/source/server-selection/server-selection.md?plain=1#L503
data Request =
Query {
qOptions :: [QueryOption],
qFullCollection :: FullCollection,
qSkip :: Int32, -- ^ Number of initial matching documents to skip
qBatchSize :: Int32, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size.
qSelector :: Document, -- ^ @[]@ = return all documents in collection
qProjector :: Document -- ^ @[]@ = return whole document
qProjector :: Document, -- ^ @[]@ = return whole document
qReadPreference :: Document
} | GetMore {
gFullCollection :: FullCollection,
gBatchSize :: Int32,
gCursorId :: CursorId
gCursorId :: CursorId,
gReadPreference :: Document
} | Message {
mDatabase :: Text,
mParams :: Document
mParams :: Document,
mReadPreference :: Document
}
deriving (Show, Eq)

Expand All @@ -679,6 +686,15 @@ data QueryOption =
| Partial -- ^ Get partial results from a /mongos/ if some shards are down, instead of throwing an error.
deriving (Show, Eq)

-- https://www.mongodb.com/docs/manual/core/read-preference/
data ReadPreferenceMode
= Primary -- Default mode.
| Secondary
| PrimaryPreferred
| SecondaryPreferred -- Corresponds to slaveOk access mode.
| Nearest
deriving (Show, Eq)

-- *** Binary format

qOpcode :: Request -> Opcode
Expand Down Expand Up @@ -721,6 +737,32 @@ qBit Database.MongoDB.Internal.Protocol.Partial = bit 7
qBits :: [QueryOption] -> Int32
qBits = bitOr . map qBit

optionToDoc :: QueryOption -> Document
optionToDoc qOpt =
case qOpt of
TailableCursor -> ["tailable" =: True]
NoCursorTimeout -> ["noCursorTimeout" =: True]
AwaitData -> ["awaitData" =: True]
Partial -> ["allowPartialResults" =: True]
SlaveOK -> []

optionsToDoc :: [QueryOption] -> Document
optionsToDoc = foldl' merge [] . map optionToDoc

readPreferenceModeToText :: ReadPreferenceMode -> Text
readPreferenceModeToText mode =
case mode of
Primary -> "primary"
Secondary -> "secondary"
PrimaryPreferred -> "primaryPreferred"
SecondaryPreferred -> "secondaryPreferred"
Nearest -> "nearest"

setReadPreferenceMode :: ReadPreferenceMode -> Document
setReadPreferenceMode mode =
["mode" =: (readPreferenceModeToText mode)]


-- ** Reply

-- | A reply is a message received in response to a 'Request'
Expand Down
Loading