diff --git a/unison-cli/src/Unison/Share/SyncV2.hs b/unison-cli/src/Unison/Share/SyncV2.hs index 8f9768a7e0..d4e64e9cce 100644 --- a/unison-cli/src/Unison/Share/SyncV2.hs +++ b/unison-cli/src/Unison/Share/SyncV2.hs @@ -62,7 +62,7 @@ import Unison.Sync.Types qualified as Share import Unison.Sync.Types qualified as Sync import Unison.SyncV2.API (Routes (downloadEntitiesStream)) import Unison.SyncV2.API qualified as SyncV2 -import Unison.SyncV2.Types (CBORBytes) +import Unison.SyncV2.Types (CBORBytes, CBORStream) import Unison.SyncV2.Types qualified as SyncV2 import Unison.Util.Servant.CBOR qualified as CBOR import Unison.Util.Timing qualified as Timing @@ -440,14 +440,14 @@ type SyncAPI = ("ucm" Servant.:> "v2" Servant.:> "sync" Servant.:> SyncV2.API) syncAPI :: Proxy SyncAPI syncAPI = Proxy @SyncAPI -downloadEntitiesStreamClientM :: SyncV2.DownloadEntitiesRequest -> Servant.ClientM (Servant.SourceT IO (CBORBytes SyncV2.DownloadEntitiesChunk)) +downloadEntitiesStreamClientM :: SyncV2.DownloadEntitiesRequest -> Servant.ClientM (Servant.SourceT IO (CBORStream SyncV2.DownloadEntitiesChunk)) SyncV2.Routes { downloadEntitiesStream = downloadEntitiesStreamClientM } = Servant.client syncAPI -- | Helper for running clientM that returns a stream of entities. -- You MUST consume the stream within the callback, it will be closed when the callback returns. -withConduit :: forall r. Servant.ClientEnv -> (Stream () (SyncV2.DownloadEntitiesChunk) -> StreamM r) -> Servant.ClientM (Servant.SourceIO (CBORBytes SyncV2.DownloadEntitiesChunk)) -> StreamM r +withConduit :: forall r. Servant.ClientEnv -> (Stream () (SyncV2.DownloadEntitiesChunk) -> StreamM r) -> Servant.ClientM (Servant.SourceIO (CBORStream SyncV2.DownloadEntitiesChunk)) -> StreamM r withConduit clientEnv callback clientM = do ExceptT $ withRunInIO \runInIO -> do Servant.withClientM clientM clientEnv $ \case @@ -456,7 +456,7 @@ withConduit clientEnv callback clientM = do conduit <- liftIO $ Servant.fromSourceIO sourceT (runInIO . runExceptT $ callback (conduit C..| unpackCBORBytesStream)) -unpackCBORBytesStream :: Stream (CBORBytes SyncV2.DownloadEntitiesChunk) SyncV2.DownloadEntitiesChunk +unpackCBORBytesStream :: Stream (CBORStream SyncV2.DownloadEntitiesChunk) SyncV2.DownloadEntitiesChunk unpackCBORBytesStream = C.map (BL.toStrict . coerce @_ @BL.ByteString) C..| decodeUnframedEntities diff --git a/unison-share-api/src/Unison/SyncV2/API.hs b/unison-share-api/src/Unison/SyncV2/API.hs index ae575c1885..4aec0e6b54 100644 --- a/unison-share-api/src/Unison/SyncV2/API.hs +++ b/unison-share-api/src/Unison/SyncV2/API.hs @@ -21,7 +21,7 @@ type API = NamedRoutes Routes type DownloadEntitiesStream = -- | The causal hash the client needs. The server should provide it and all of its dependencies ReqBody '[CBOR, JSON] DownloadEntitiesRequest - :> StreamPost NoFraming CBOR (SourceIO (CBORBytes DownloadEntitiesChunk)) + :> StreamPost NoFraming OctetStream (SourceIO (CBORStream DownloadEntitiesChunk)) data Routes mode = Routes { downloadEntitiesStream :: mode :- "entities" :> "download" :> DownloadEntitiesStream diff --git a/unison-share-api/src/Unison/SyncV2/Types.hs b/unison-share-api/src/Unison/SyncV2/Types.hs index 80272de8ab..c2935110d9 100644 --- a/unison-share-api/src/Unison/SyncV2/Types.hs +++ b/unison-share-api/src/Unison/SyncV2/Types.hs @@ -7,6 +7,7 @@ module Unison.SyncV2.Types SyncError (..), DownloadEntitiesError (..), CBORBytes (..), + CBORStream(..), EntityKind (..), serialiseCBORBytes, deserialiseOrFailCBORBytes, diff --git a/unison-share-api/src/Unison/Util/Servant/CBOR.hs b/unison-share-api/src/Unison/Util/Servant/CBOR.hs index 18fd94904c..580b1a7124 100644 --- a/unison-share-api/src/Unison/Util/Servant/CBOR.hs +++ b/unison-share-api/src/Unison/Util/Servant/CBOR.hs @@ -5,6 +5,7 @@ module Unison.Util.Servant.CBOR ( CBOR, UnknownCBORBytes, CBORBytes (..), + CBORStream (..), deserialiseOrFailCBORBytes, serialiseCBORBytes, decodeCBORBytes, @@ -86,3 +87,14 @@ serialiseUnknownCBORBytes = CBORBytes . CBOR.serialise data Unknown type UnknownCBORBytes = CBORBytes Unknown + +-- | Wrapper for a stream of CBOR data. Each chunk may not be a complete CBOR value, but the concatenation of all the chunks is a valid CBOR stream. +newtype CBORStream a = CBORStream BL.ByteString + deriving (Serialise) via (BL.ByteString) + deriving (Eq, Show, Ord) + +instance MimeRender OctetStream (CBORStream a) where + mimeRender Proxy (CBORStream bs) = bs + +instance MimeUnrender OctetStream (CBORStream a) where + mimeUnrender Proxy bs = Right (CBORStream bs)