@@ -66,9 +66,9 @@ import UnliftIO qualified as IO
66
66
67
67
type Stream i o = ConduitT i o StreamM ()
68
68
69
- type PullErr = SyncError SyncV2. PullError
69
+ type SyncErr = SyncError SyncV2. PullError
70
70
71
- type StreamM = (ExceptT PullErr (C. ResourceT IO ))
71
+ type StreamM = (ExceptT SyncErr (C. ResourceT IO ))
72
72
73
73
batchSize :: Int
74
74
batchSize = 5000
@@ -115,7 +115,7 @@ syncSortedStream codebase stream = do
115
115
validateAndSave codebase (catMaybes entityBatch)
116
116
C. runConduit $ stream C. .| C. chunksOf batchSize C. .| handler
117
117
118
- unpackChunk :: SyncV2. EntityChunk -> ExceptT PullErr Sqlite. Transaction (Maybe (Hash32 , TempEntity ))
118
+ unpackChunk :: SyncV2. EntityChunk -> ExceptT SyncErr Sqlite. Transaction (Maybe (Hash32 , TempEntity ))
119
119
unpackChunk = \ case
120
120
SyncV2. EntityChunk {hash, entityCBOR = entityBytes} -> do
121
121
-- Only want entities we don't already have
@@ -124,18 +124,18 @@ unpackChunk = \case
124
124
_ -> do
125
125
(Just . (hash,)) <$> unpackEntity entityBytes
126
126
where
127
- unpackEntity :: (CBORBytes TempEntity ) -> ExceptT PullErr Sqlite. Transaction TempEntity
127
+ unpackEntity :: (CBORBytes TempEntity ) -> ExceptT SyncErr Sqlite. Transaction TempEntity
128
128
unpackEntity entityBytes = do
129
129
case CBOR. deserialiseOrFailCBORBytes entityBytes of
130
130
Left err -> do throwError $ (SyncError . SyncV2. PullError'Sync $ SyncV2. SyncErrorDeserializationFailure err)
131
131
Right entity -> pure entity
132
132
133
- unpackChunks :: [SyncV2. EntityChunk ] -> ExceptT PullErr Sqlite. Transaction [(Hash32 , TempEntity )]
133
+ unpackChunks :: [SyncV2. EntityChunk ] -> ExceptT SyncErr Sqlite. Transaction [(Hash32 , TempEntity )]
134
134
unpackChunks xs = do
135
135
for xs unpackChunk
136
136
<&> catMaybes
137
137
138
- batchValidateEntities :: [(Hash32 , TempEntity )] -> ExceptT PullErr IO ()
138
+ batchValidateEntities :: [(Hash32 , TempEntity )] -> ExceptT SyncErr IO ()
139
139
batchValidateEntities entities = do
140
140
mismatches <- fmap catMaybes $ liftIO $ IO. pooledForConcurrently entities \ (hash, entity) -> do
141
141
IO. evaluate $ EV. validateTempEntity hash entity
@@ -281,7 +281,7 @@ syncToFile ::
281
281
CausalHash ->
282
282
Maybe SyncV2. BranchRef ->
283
283
FilePath ->
284
- IO (Either PullErr () )
284
+ IO (Either SyncErr () )
285
285
syncToFile codebase rootHash mayBranchRef destFilePath = do
286
286
liftIO $ Codebase. withConnection codebase \ conn -> do
287
287
liftIO . C. runResourceT . runExceptT $ withEntityStream conn rootHash mayBranchRef \ stream -> do
@@ -327,13 +327,13 @@ decodeUnframedEntities = C.transPipe (mapExceptT (lift . stToIO)) $ do
327
327
d <- newDecoder
328
328
loop bs d
329
329
where
330
- newDecoder :: ConduitT ByteString SyncV2. DownloadEntitiesChunk (ExceptT PullErr (ST s )) (Maybe ByteString -> ST s (CBOR. IDecode s (SyncV2. DownloadEntitiesChunk )))
330
+ newDecoder :: ConduitT ByteString SyncV2. DownloadEntitiesChunk (ExceptT SyncErr (ST s )) (Maybe ByteString -> ST s (CBOR. IDecode s (SyncV2. DownloadEntitiesChunk )))
331
331
newDecoder = do
332
332
(lift . lift) CBOR. deserialiseIncremental >>= \ case
333
333
CBOR. Done _ _ _ -> throwError . SyncError . SyncV2. PullError'Sync $ SyncV2. SyncErrorStreamFailure " Invalid initial decoder"
334
334
CBOR. Fail _ _ err -> throwError . SyncError . SyncV2. PullError'Sync $ SyncV2. SyncErrorDeserializationFailure err
335
335
CBOR. Partial k -> pure k
336
- loop :: ByteString -> (Maybe ByteString -> ST s (CBOR. IDecode s (SyncV2. DownloadEntitiesChunk ))) -> ConduitT ByteString SyncV2. DownloadEntitiesChunk (ExceptT PullErr (ST s )) ()
336
+ loop :: ByteString -> (Maybe ByteString -> ST s (CBOR. IDecode s (SyncV2. DownloadEntitiesChunk ))) -> ConduitT ByteString SyncV2. DownloadEntitiesChunk (ExceptT SyncErr (ST s )) ()
337
337
loop bs k = do
338
338
(lift . lift) (k (Just bs)) >>= \ case
339
339
CBOR. Fail _ _ err -> throwError . SyncError . SyncV2. PullError'Sync $ SyncV2. SyncErrorDeserializationFailure err
0 commit comments