Skip to content

Commit f971747

Browse files
authored
Streaming events (#1808)
Changes I made originally for the `hydra-doom` project to load all events in `state` (12GB+ in that use case) with constant memory using `conduit` streams. ~~There is a big TODO on this: `IOSim s` does not have a `MonadUnliftIO` instance and its not impossible to have one. We need to change the interface further such that we can compose `createHydraNode` and `hydrate` functions with in-memory `EventSource` variants.~~ ~~A bit of a wart: The `mkProjection` does run the conduit for each projection, instead we should run the conduit once and build the projected in-memory read/query model once.~~ Also, the change here is not constant-memory as the `ServerOutput` history is still kept fully in memory. But this is a different story and should be covered by #1618 --- * [x] CHANGELOG updated or not needed * [x] Documentation updated or not needed * [x] Haddocks updated or not needed * [x] No new TODOs introduced or explained herafter
2 parents 18ea31d + 30c01ad commit f971747

File tree

18 files changed

+289
-180
lines changed

18 files changed

+289
-180
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ changes.
1010

1111
## [0.20.1] - UNRELEASED
1212

13+
- Stream historical data from disk in the hydra-node API server.
14+
1315
- Record used and free memory when running `bench-e2e` benchmark.
16+
1417
- Submit observations to a `hydra-explorer` via optional `--explorer` option.
1518

1619
- Tested with `cardano-node 10.2` and `cardano-cli 10.3.0.0`.

hydra-node/hydra-node.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ library
118118
, cardano-slotting
119119
, cardano-strict-containers
120120
, cborg
121+
, conduit
121122
, containers
122123
, contra-tracer
123124
, cryptonite
@@ -340,6 +341,7 @@ test-suite tests
340341
, cardano-slotting
341342
, cardano-strict-containers
342343
, cborg
344+
, conduit
343345
, containers
344346
, contra-tracer
345347
, directory

hydra-node/json-schemas/logs.yaml

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -894,21 +894,45 @@ definitions:
894894
minimum: 0
895895
input:
896896
"$ref": "logs.yaml#/definitions/Input"
897+
- title: LoadingState
898+
description: >-
899+
Loading state events from persistence.
900+
type: object
901+
required:
902+
- tag
903+
properties:
904+
tag:
905+
type: string
906+
enum: ["LoadingState"]
907+
- title: ReplayingState
908+
description: >-
909+
Replaying state events from persistence.
910+
type: object
911+
required:
912+
- tag
913+
properties:
914+
tag:
915+
type: string
916+
enum: ["ReplayingState"]
897917
- title: LoadedState
898918
description: >-
899919
Loaded state events from persistence.
900920
type: object
901921
additionalProperties: false
902922
required:
903923
- tag
904-
- numberOfEvents
924+
- lastEventId
925+
- headState
905926
properties:
906927
tag:
907928
type: string
908929
enum: ["LoadedState"]
909-
numberOfEvents:
910-
type: integer
911-
minimum: 0
930+
lastEventId:
931+
oneOf:
932+
- type: "null"
933+
- type: integer
934+
headState:
935+
$ref: "logs.yaml#/definitions/HeadState"
912936
- title: Misconfiguration
913937
description: >-
914938
Hydra node detected a difference between loaded state and the node arguments.

hydra-node/src/Hydra/API/Projection.hs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ module Hydra.API.Projection where
1717

1818
import Hydra.Prelude
1919

20-
import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVar)
20+
import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO)
2121

2222
-- | 'Projection' type used to alter/project the API output to suit the client needs.
2323
data Projection stm event model = Projection
@@ -33,19 +33,16 @@ data Projection stm event model = Projection
3333
mkProjection ::
3434
MonadSTM m =>
3535
model ->
36-
[event] ->
3736
-- | Projection function
3837
(model -> event -> model) ->
3938
m (Projection (STM m) event model)
40-
mkProjection startingModel events project =
41-
atomically $ do
42-
tv <- newTVar startingModel
43-
mapM_ (update tv) events
44-
pure
45-
Projection
46-
{ getLatest = readTVar tv
47-
, update = update tv
48-
}
39+
mkProjection startingModel project = do
40+
tv <- newTVarIO startingModel
41+
pure
42+
Projection
43+
{ getLatest = readTVar tv
44+
, update = update tv
45+
}
4946
where
5047
update tv event =
5148
modifyTVar' tv $ \m ->

hydra-node/src/Hydra/API/Server.hs

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22

33
module Hydra.API.Server where
44

5-
import Hydra.Prelude hiding (TVar, readTVar, seq)
5+
import Hydra.Prelude hiding (TVar, mapM_, readTVar, seq)
66

77
import Cardano.Ledger.Core (PParams)
8+
import Conduit (runConduitRes, sinkList, (.|))
89
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
910
import Control.Concurrent.STM.TChan (newBroadcastTChanIO, writeTChan)
1011
import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO)
1112
import Control.Exception (IOException)
13+
import Data.Conduit.Combinators (iterM)
1214
import Hydra.API.APIServerLog (APIServerLog (..))
1315
import Hydra.API.ClientInput (ClientInput)
1416
import Hydra.API.HTTPServer (httpApp)
@@ -89,18 +91,28 @@ withAPIServer ::
8991
withAPIServer config env party persistence tracer chain pparams serverOutputFilter callback action =
9092
handle onIOException $ do
9193
responseChannel <- newBroadcastTChanIO
92-
timedOutputEvents <- loadAll
93-
94-
-- Intialize our read model from stored events
95-
headStatusP <- mkProjection Idle (output <$> timedOutputEvents) projectHeadStatus
96-
snapshotUtxoP <- mkProjection Nothing (output <$> timedOutputEvents) projectSnapshotUtxo
97-
commitInfoP <- mkProjection CannotCommit (output <$> timedOutputEvents) projectCommitInfo
98-
headIdP <- mkProjection Nothing (output <$> timedOutputEvents) projectInitializingHeadId
99-
pendingDepositsP <- mkProjection [] (output <$> timedOutputEvents) projectPendingDeposits
100-
94+
-- Intialize our read models from stored events
95+
-- NOTE: we do not keep the stored events around in memory
96+
headStatusP <- mkProjection Idle projectHeadStatus
97+
snapshotUtxoP <- mkProjection Nothing projectSnapshotUtxo
98+
commitInfoP <- mkProjection CannotCommit projectCommitInfo
99+
headIdP <- mkProjection Nothing projectInitializingHeadId
100+
pendingDepositsP <- mkProjection [] projectPendingDeposits
101+
loadedHistory <-
102+
runConduitRes $
103+
source
104+
-- .| mapC output
105+
.| iterM (lift . atomically . update headStatusP . output)
106+
.| iterM (lift . atomically . update snapshotUtxoP . output)
107+
.| iterM (lift . atomically . update commitInfoP . output)
108+
.| iterM (lift . atomically . update headIdP . output)
109+
.| iterM (lift . atomically . update pendingDepositsP . output)
110+
-- FIXME: don't load whole history into memory
111+
.| sinkList
112+
113+
history <- newTVarIO loadedHistory
101114
-- NOTE: we need to reverse the list because we store history in a reversed
102115
-- list in memory but in order on disk
103-
history <- newTVarIO (reverse timedOutputEvents)
104116
(notifyServerRunning, waitForServerRunning) <- setupServerNotification
105117

106118
let serverSettings =
@@ -138,7 +150,7 @@ withAPIServer config env party persistence tracer chain pparams serverOutputFilt
138150
where
139151
APIServerConfig{host, port, tlsCertPath, tlsKeyPath} = config
140152

141-
PersistenceIncremental{loadAll, append} = persistence
153+
PersistenceIncremental{source, append} = persistence
142154

143155
startServer settings app =
144156
case (tlsCertPath, tlsKeyPath) of

hydra-node/src/Hydra/Events.hs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ module Hydra.Events where
1515

1616
import Hydra.Prelude
1717

18+
import Conduit (ConduitT, MonadUnliftIO, ResourceT, runResourceT, sourceToList)
1819
import Hydra.Chain.ChainState (IsChainState)
1920
import Hydra.HeadLogic.Outcome (StateChanged)
2021
import Hydra.Tx.IsTx (ArbitraryIsTx)
@@ -25,10 +26,14 @@ class HasEventId a where
2526
getEventId :: a -> EventId
2627

2728
newtype EventSource e m = EventSource
28-
{ getEvents :: HasEventId e => m [e]
29-
-- ^ Retrieve all events from the event source.
29+
{ sourceEvents :: HasEventId e => ConduitT () e (ResourceT m) ()
30+
-- ^ Stream all events from the event source.
3031
}
3132

33+
-- | Retrieve all events from the event source as a list.
34+
getEvents :: (HasEventId e, MonadUnliftIO m) => EventSource e m -> m [e]
35+
getEvents EventSource{sourceEvents} = runResourceT $ sourceToList sourceEvents
36+
3237
newtype EventSink e m = EventSink
3338
{ putEvent :: HasEventId e => e -> m ()
3439
-- ^ Send a single event to the event sink.

hydra-node/src/Hydra/Events/FileBased.hs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ module Hydra.Events.FileBased where
55

66
import Hydra.Prelude
77

8+
import Conduit (mapMC, (.|))
89
import Control.Concurrent.Class.MonadSTM (newTVarIO, writeTVar)
910
import Hydra.Chain.ChainState (IsChainState)
1011
import Hydra.Events (EventSink (..), EventSource (..), StateEvent (..))
@@ -29,7 +30,7 @@ eventPairFromPersistenceIncremental ::
2930
(IsChainState tx, MonadSTM m) =>
3031
PersistenceIncremental (PersistedStateChange tx) m ->
3132
m (EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
32-
eventPairFromPersistenceIncremental PersistenceIncremental{append, loadAll} = do
33+
eventPairFromPersistenceIncremental PersistenceIncremental{append, source} = do
3334
eventIdV <- newTVarIO Nothing
3435
let
3536
getLastSeenEventId = readTVar eventIdV
@@ -41,17 +42,18 @@ eventPairFromPersistenceIncremental PersistenceIncremental{append, loadAll} = do
4142
maybe 0 (+ 1) <$> readTVar eventIdV
4243

4344
-- Keep track of the last seen event id when loading
44-
getEvents = do
45-
items <- loadAll
46-
atomically . forM items $ \i -> do
47-
event <- case i of
48-
New e -> pure e
49-
Legacy sc -> do
50-
eventId <- getNextEventId
51-
pure $ StateEvent eventId sc
52-
53-
setLastSeenEventId event
54-
pure event
45+
sourceEvents =
46+
source
47+
.| mapMC
48+
( \i -> lift . atomically $ do
49+
event <- case i of
50+
New e -> pure e
51+
Legacy sc -> do
52+
eventId <- getNextEventId
53+
pure $ StateEvent eventId sc
54+
setLastSeenEventId event
55+
pure event
56+
)
5557

5658
-- Filter events that are already stored
5759
putEvent e@StateEvent{eventId} = do
@@ -65,7 +67,7 @@ eventPairFromPersistenceIncremental PersistenceIncremental{append, loadAll} = do
6567
append (New e)
6668
atomically $ setLastSeenEventId e
6769

68-
pure (EventSource{getEvents}, EventSink{putEvent})
70+
pure (EventSource{sourceEvents}, EventSink{putEvent})
6971

7072
-- | Internal data type used by 'createJSONFileEventSourceAndSink' to be
7173
-- compatible with plain usage of 'PersistenceIncrementa' using plain

hydra-node/src/Hydra/HeadLogic.hs

Lines changed: 25 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import Hydra.Chain (
3434
ChainStateHistory,
3535
OnChainTx (..),
3636
PostChainTx (..),
37-
initHistory,
3837
pushNewState,
3938
rollbackHistory,
4039
)
@@ -1666,48 +1665,34 @@ aggregateState ::
16661665
Outcome tx ->
16671666
HeadState tx
16681667
aggregateState s outcome =
1669-
recoverState s $ collectStateChanged outcome
1668+
foldl' aggregate s $ collectStateChanged outcome
16701669
where
16711670
collectStateChanged = \case
16721671
Error{} -> []
16731672
Wait{stateChanges} -> stateChanges
16741673
Continue{stateChanges} -> stateChanges
16751674

1676-
recoverChainStateHistory ::
1677-
(Foldable t, IsChainState tx) =>
1678-
ChainStateType tx ->
1679-
t (StateChanged tx) ->
1680-
ChainStateHistory tx
1681-
recoverChainStateHistory initialChainState =
1682-
foldl' aggregateChainStateHistory (initHistory initialChainState)
1683-
where
1684-
aggregateChainStateHistory history = \case
1685-
HeadInitialized{chainState} -> pushNewState chainState history
1686-
CommittedUTxO{chainState} -> pushNewState chainState history
1687-
HeadAborted{chainState} -> pushNewState chainState history
1688-
HeadOpened{chainState} -> pushNewState chainState history
1689-
TransactionAppliedToLocalUTxO{} -> history
1690-
CommitRecovered{} -> history
1691-
CommitRecorded{} -> history
1692-
DecommitRecorded{} -> history
1693-
SnapshotRequestDecided{} -> history
1694-
SnapshotRequested{} -> history
1695-
TransactionReceived{} -> history
1696-
PartySignedSnapshot{} -> history
1697-
SnapshotConfirmed{} -> history
1698-
CommitFinalized{} -> history
1699-
DecommitFinalized{} -> history
1700-
HeadClosed{chainState} -> pushNewState chainState history
1701-
HeadContested{chainState} -> pushNewState chainState history
1702-
HeadIsReadyToFanout{} -> history
1703-
HeadFannedOut{chainState} -> pushNewState chainState history
1704-
ChainRolledBack{chainState} ->
1705-
rollbackHistory (chainStateSlot chainState) history
1706-
TickObserved{} -> history
1707-
1708-
recoverState ::
1709-
(Foldable t, IsChainState tx) =>
1710-
HeadState tx ->
1711-
t (StateChanged tx) ->
1712-
HeadState tx
1713-
recoverState = foldl' aggregate
1675+
aggregateChainStateHistory :: IsChainState tx => ChainStateHistory tx -> StateChanged tx -> ChainStateHistory tx
1676+
aggregateChainStateHistory history = \case
1677+
HeadInitialized{chainState} -> pushNewState chainState history
1678+
CommittedUTxO{chainState} -> pushNewState chainState history
1679+
HeadAborted{chainState} -> pushNewState chainState history
1680+
HeadOpened{chainState} -> pushNewState chainState history
1681+
TransactionAppliedToLocalUTxO{} -> history
1682+
CommitRecovered{} -> history
1683+
CommitRecorded{} -> history
1684+
DecommitRecorded{} -> history
1685+
SnapshotRequestDecided{} -> history
1686+
SnapshotRequested{} -> history
1687+
TransactionReceived{} -> history
1688+
PartySignedSnapshot{} -> history
1689+
SnapshotConfirmed{} -> history
1690+
CommitFinalized{} -> history
1691+
DecommitFinalized{} -> history
1692+
HeadClosed{chainState} -> pushNewState chainState history
1693+
HeadContested{chainState} -> pushNewState chainState history
1694+
HeadIsReadyToFanout{} -> history
1695+
HeadFannedOut{chainState} -> pushNewState chainState history
1696+
ChainRolledBack{chainState} ->
1697+
rollbackHistory (chainStateSlot chainState) history
1698+
TickObserved{} -> history

hydra-node/src/Hydra/Network/Reliability.hs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,10 @@ import Hydra.Logging (traceWith)
110110
import Hydra.Network (Network (..), NetworkCallback (..), NetworkComponent)
111111
import Hydra.Network.Authenticate (Authenticated (..))
112112
import Hydra.Network.Heartbeat (Heartbeat (..), isPing)
113-
import Hydra.Persistence (Persistence (..), PersistenceIncremental (..))
113+
import Hydra.Persistence (Persistence (..), PersistenceIncremental (..), loadAll)
114114
import Hydra.Tx (Party)
115115
import Test.QuickCheck.Instances.Vector ()
116+
import UnliftIO (MonadUnliftIO)
116117

117118
data ReliableMsg msg = ReliableMsg
118119
{ knownMessageIds :: Vector Int
@@ -180,7 +181,7 @@ data MessagePersistence m msg = MessagePersistence
180181
-- NOTE: This handle is returned in the underlying context just for the sake of
181182
-- convenience.
182183
mkMessagePersistence ::
183-
(MonadThrow m, FromJSON msg, ToJSON msg) =>
184+
(MonadUnliftIO m, MonadThrow m, FromJSON msg, ToJSON msg) =>
184185
Int ->
185186
PersistenceIncremental (Heartbeat msg) m ->
186187
Persistence (Vector Int) m ->

0 commit comments

Comments
 (0)