Skip to content
This repository was archived by the owner on Jan 24, 2025. It is now read-only.

Commit 96b3a2a

Browse files
authored
Merge pull request #888 from iotaledger/fix/module-lifecycle
Fix module lifecycles
2 parents a1b42be + a92c959 commit 96b3a2a

File tree

21 files changed

+428
-345
lines changed

21 files changed

+428
-345
lines changed

pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func NewProvider(opts ...options.Option[BlockDAG]) module.Provider[*engine.Engin
4141
b := New(e.NewSubModule("BlockDAG"), e.Workers.CreateGroup("BlockDAG"), int(e.Storage.Settings().APIProvider().CommittedAPI().ProtocolParameters().MaxCommittableAge())*2, e.EvictionState, e.BlockCache, e.ErrorHandler("blockdag"), opts...)
4242

4343
e.ConstructedEvent().OnTrigger(func() {
44-
b.Init(e.SyncManager.LatestCommitment)
44+
b.latestCommitmentFunc = e.SyncManager.LatestCommitment
4545

4646
wp := b.workers.CreatePool("BlockDAG.Append", workerpool.WithWorkerCount(2))
4747

@@ -59,6 +59,8 @@ func NewProvider(opts ...options.Option[BlockDAG]) module.Provider[*engine.Engin
5959
}, event.WithWorkerPool(wp))
6060

6161
e.Events.BlockDAG.LinkTo(b.events)
62+
63+
b.InitializedEvent().Trigger()
6264
})
6365

6466
return b
@@ -82,12 +84,6 @@ func New(subModule module.Module, workers *workerpool.Group, unsolidCommitmentBu
8284
})
8385
}
8486

85-
func (b *BlockDAG) Init(latestCommitmentFunc func() *model.Commitment) {
86-
b.latestCommitmentFunc = latestCommitmentFunc
87-
88-
b.InitializedEvent().Trigger()
89-
}
90-
9187
// Append is used to append new Blocks to the BlockDAG. It is the main function of the BlockDAG that triggers Events.
9288
func (b *BlockDAG) Append(modelBlock *model.Block) (block *blocks.Block, wasAppended bool, err error) {
9389
if block, wasAppended, err = b.append(modelBlock); wasAppended {

pkg/protocol/engine/booker/inmemorybooker/booker.go

+2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ func NewProvider(opts ...options.Option[Booker]) module.Provider[*engine.Engine,
4848
})
4949

5050
e.Events.Booker.LinkTo(b.events)
51+
52+
b.InitializedEvent().Trigger()
5153
})
5254

5355
return b

pkg/protocol/engine/clock/blocktime/clock.go

+55-43
Original file line numberDiff line numberDiff line change
@@ -35,52 +35,64 @@ type Clock struct {
3535
// NewProvider creates a new Clock provider with the given options.
3636
func NewProvider(opts ...options.Option[Clock]) module.Provider[*engine.Engine, clock.Clock] {
3737
return module.Provide(func(e *engine.Engine) clock.Clock {
38-
return options.Apply(&Clock{
39-
Module: e.NewSubModule("Clock"),
40-
acceptedTime: NewRelativeTime(),
41-
confirmedTime: NewRelativeTime(),
42-
workerPool: e.Workers.CreatePool("Clock", workerpool.WithWorkerCount(1), workerpool.WithCancelPendingTasksOnShutdown(true), workerpool.WithPanicOnSubmitAfterShutdown(true)),
43-
}, opts, func(c *Clock) {
44-
e.ConstructedEvent().OnTrigger(func() {
45-
latestCommitmentIndex := e.Storage.Settings().LatestCommitment().Slot()
46-
c.acceptedTime.Set(e.APIForSlot(latestCommitmentIndex).TimeProvider().SlotEndTime(latestCommitmentIndex))
47-
48-
latestFinalizedSlotIndex := e.Storage.Settings().LatestFinalizedSlot()
49-
c.confirmedTime.Set(e.APIForSlot(latestFinalizedSlotIndex).TimeProvider().SlotEndTime(latestFinalizedSlotIndex))
50-
51-
e.Events.Clock.AcceptedTimeUpdated.LinkTo(c.acceptedTime.OnUpdated)
52-
e.Events.Clock.ConfirmedTimeUpdated.LinkTo(c.confirmedTime.OnUpdated)
53-
54-
asyncOpt := event.WithWorkerPool(c.workerPool)
55-
c.ShutdownEvent().OnTrigger(lo.Batch(
56-
e.Events.BlockGadget.BlockAccepted.Hook(func(block *blocks.Block) {
57-
c.acceptedTime.Advance(block.IssuingTime())
58-
}, asyncOpt).Unhook,
59-
60-
e.Events.BlockGadget.BlockConfirmed.Hook(func(block *blocks.Block) {
61-
c.confirmedTime.Advance(block.IssuingTime())
62-
}, asyncOpt).Unhook,
63-
64-
e.Events.SlotGadget.SlotFinalized.Hook(func(slot iotago.SlotIndex) {
65-
timeProvider := e.APIForSlot(slot).TimeProvider()
66-
slotEndTime := timeProvider.SlotEndTime(slot)
67-
68-
c.acceptedTime.Advance(slotEndTime)
69-
c.confirmedTime.Advance(slotEndTime)
70-
}, asyncOpt).Unhook,
71-
72-
func() {
73-
c.workerPool.Shutdown()
74-
75-
c.StoppedEvent().Trigger()
76-
},
77-
))
78-
79-
c.InitializedEvent().Trigger()
38+
c := New(e.NewSubModule("Clock"), e, opts...)
39+
40+
e.ConstructedEvent().OnTrigger(func() {
41+
latestCommitmentIndex := e.Storage.Settings().LatestCommitment().Slot()
42+
c.acceptedTime.Set(e.APIForSlot(latestCommitmentIndex).TimeProvider().SlotEndTime(latestCommitmentIndex))
43+
44+
latestFinalizedSlotIndex := e.Storage.Settings().LatestFinalizedSlot()
45+
c.confirmedTime.Set(e.APIForSlot(latestFinalizedSlotIndex).TimeProvider().SlotEndTime(latestFinalizedSlotIndex))
46+
47+
e.Events.Clock.AcceptedTimeUpdated.LinkTo(c.acceptedTime.OnUpdated)
48+
e.Events.Clock.ConfirmedTimeUpdated.LinkTo(c.confirmedTime.OnUpdated)
49+
50+
asyncOpt := event.WithWorkerPool(c.workerPool)
51+
52+
unhook := lo.Batch(
53+
e.Events.BlockGadget.BlockAccepted.Hook(func(block *blocks.Block) {
54+
c.acceptedTime.Advance(block.IssuingTime())
55+
}, asyncOpt).Unhook,
56+
57+
e.Events.BlockGadget.BlockConfirmed.Hook(func(block *blocks.Block) {
58+
c.confirmedTime.Advance(block.IssuingTime())
59+
}, asyncOpt).Unhook,
60+
61+
e.Events.SlotGadget.SlotFinalized.Hook(func(slot iotago.SlotIndex) {
62+
timeProvider := e.APIForSlot(slot).TimeProvider()
63+
slotEndTime := timeProvider.SlotEndTime(slot)
64+
65+
c.acceptedTime.Advance(slotEndTime)
66+
c.confirmedTime.Advance(slotEndTime)
67+
}, asyncOpt).Unhook,
68+
)
69+
70+
c.ShutdownEvent().OnTrigger(func() {
71+
unhook()
72+
c.workerPool.Shutdown()
73+
74+
c.StoppedEvent().Trigger()
8075
})
8176

82-
c.ConstructedEvent().Trigger()
77+
c.InitializedEvent().Trigger()
78+
})
79+
80+
return c
81+
})
82+
}
83+
84+
func New(subModule module.Module, engine *engine.Engine, opts ...options.Option[Clock]) *Clock {
85+
return options.Apply(&Clock{
86+
Module: subModule,
87+
acceptedTime: NewRelativeTime(),
88+
confirmedTime: NewRelativeTime(),
89+
workerPool: engine.Workers.CreatePool("Clock", workerpool.WithWorkerCount(1), workerpool.WithCancelPendingTasksOnShutdown(true), workerpool.WithPanicOnSubmitAfterShutdown(true)),
90+
}, opts, func(c *Clock) {
91+
c.ShutdownEvent().OnTrigger(func() {
92+
c.workerPool.Shutdown()
8393
})
94+
95+
c.ConstructedEvent().Trigger()
8496
})
8597
}
8698

pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi
6262
return e.SyncManager.LatestCommitment().Slot()
6363
}
6464
s.blockCache = e.BlockCache
65-
e.Events.Scheduler.LinkTo(s.events)
6665
e.SybilProtection.InitializedEvent().OnTrigger(func() {
6766
s.seatManager = e.SybilProtection.SeatManager()
6867
})
@@ -103,7 +102,6 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi
103102
return 1 + Deficit(mana), nil
104103
}
105104
})
106-
s.ConstructedEvent().Trigger()
107105
e.Events.Booker.BlockBooked.Hook(func(block *blocks.Block) {
108106
s.AddBlock(block)
109107
s.selectBlockToScheduleWithLocking()
@@ -122,22 +120,28 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi
122120
})
123121

124122
e.InitializedEvent().OnTrigger(s.Start)
123+
124+
e.Events.Scheduler.LinkTo(s.events)
125+
126+
s.InitializedEvent().Trigger()
125127
})
126128

127129
return s
128130
})
129131
}
130132

131-
func New(module module.Module, apiProvider iotago.APIProvider, opts ...options.Option[Scheduler]) *Scheduler {
133+
func New(subModule module.Module, apiProvider iotago.APIProvider, opts ...options.Option[Scheduler]) *Scheduler {
132134
return options.Apply(
133135
&Scheduler{
134-
Module: module,
136+
Module: subModule,
135137
events: scheduler.NewEvents(),
136138
deficits: shrinkingmap.New[iotago.AccountID, Deficit](),
137139
apiProvider: apiProvider,
138140
validatorBuffer: NewValidatorBuffer(),
139141
}, opts, func(s *Scheduler) {
140142
s.ShutdownEvent().OnTrigger(s.shutdown)
143+
144+
s.ConstructedEvent().Trigger()
141145
},
142146
)
143147
}

pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package passthrough
22

33
import (
44
"github.com/iotaledger/hive.go/runtime/module"
5+
"github.com/iotaledger/hive.go/runtime/options"
56
"github.com/iotaledger/iota-core/pkg/protocol/engine"
67
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
78
"github.com/iotaledger/iota-core/pkg/protocol/engine/congestioncontrol/scheduler"
@@ -19,21 +20,29 @@ func NewProvider() module.Provider[*engine.Engine, scheduler.Scheduler] {
1920
s := New(e.NewSubModule("Scheduler"))
2021

2122
e.ConstructedEvent().OnTrigger(func() {
22-
e.Events.Scheduler.LinkTo(s.events)
23-
2423
e.Events.Booker.BlockBooked.Hook(func(block *blocks.Block) {
2524
s.AddBlock(block)
2625
})
26+
27+
e.Events.Scheduler.LinkTo(s.events)
28+
29+
s.InitializedEvent().Trigger()
2730
})
2831

2932
return s
3033
})
3134
}
3235

3336
func New(subModule module.Module) *Scheduler {
34-
return module.InitSimpleLifecycle(&Scheduler{
37+
return options.Apply(&Scheduler{
3538
Module: subModule,
3639
events: scheduler.NewEvents(),
40+
}, nil, func(s *Scheduler) {
41+
s.ShutdownEvent().OnTrigger(func() {
42+
s.StoppedEvent().Trigger()
43+
})
44+
45+
s.ConstructedEvent().Trigger()
3746
})
3847
}
3948

pkg/protocol/engine/consensus/blockgadget/thresholdblockgadget/gadget.go

+15-5
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,21 @@ func NewProvider(opts ...options.Option[Gadget]) module.Provider[*engine.Engine,
3636
return module.Provide(func(e *engine.Engine) blockgadget.Gadget {
3737
g := New(e.NewSubModule("ThresholdBlockGadget"), e.BlockCache, e.SybilProtection.SeatManager(), e.ErrorHandler("gadget"), opts...)
3838

39-
wp := e.Workers.CreatePool("ThresholdBlockGadget", workerpool.WithWorkerCount(1))
40-
e.Events.Booker.BlockBooked.Hook(g.TrackWitnessWeight, event.WithWorkerPool(wp))
39+
e.ConstructedEvent().OnTrigger(func() {
40+
wp := e.Workers.CreatePool("ThresholdBlockGadget", workerpool.WithWorkerCount(1))
41+
e.Events.Booker.BlockBooked.Hook(g.TrackWitnessWeight, event.WithWorkerPool(wp))
4142

42-
e.Events.BlockGadget.LinkTo(g.events)
43+
e.Events.BlockGadget.LinkTo(g.events)
44+
45+
g.InitializedEvent().Trigger()
46+
})
4347

4448
return g
4549
})
4650
}
4751

4852
func New(subModule module.Module, blockCache *blocks.Blocks, seatManager seatmanager.SeatManager, errorHandler func(error), opts ...options.Option[Gadget]) *Gadget {
49-
return module.InitSimpleLifecycle(options.Apply(&Gadget{
53+
return options.Apply(&Gadget{
5054
Module: subModule,
5155
events: blockgadget.NewEvents(),
5256
seatManager: seatManager,
@@ -56,7 +60,13 @@ func New(subModule module.Module, blockCache *blocks.Blocks, seatManager seatman
5660
optsAcceptanceThreshold: 0.67,
5761
optsConfirmationThreshold: 0.67,
5862
optsConfirmationRatificationThreshold: 2,
59-
}, opts))
63+
}, opts, func(g *Gadget) {
64+
g.ShutdownEvent().OnTrigger(func() {
65+
g.StoppedEvent().Trigger()
66+
})
67+
68+
g.ConstructedEvent().Trigger()
69+
})
6070
}
6171

6272
func (g *Gadget) Events() *blockgadget.Events {

pkg/protocol/engine/consensus/slotgadget/totalweightslotgadget/gadget.go

+25-17
Original file line numberDiff line numberDiff line change
@@ -37,27 +37,21 @@ type Gadget struct {
3737

3838
func NewProvider(opts ...options.Option[Gadget]) module.Provider[*engine.Engine, slotgadget.Gadget] {
3939
return module.Provide(func(e *engine.Engine) slotgadget.Gadget {
40-
return options.Apply(&Gadget{
41-
Module: e.NewSubModule("TotalWeightSlotGadget"),
42-
events: slotgadget.NewEvents(),
43-
slotTrackers: shrinkingmap.New[iotago.SlotIndex, *slottracker.SlotTracker](),
44-
optsSlotFinalizationThreshold: 0.67,
45-
errorHandler: e.ErrorHandler("slotgadget"),
46-
}, opts, func(g *Gadget) {
47-
e.Events.SlotGadget.LinkTo(g.events)
48-
49-
e.ConstructedEvent().OnTrigger(func() {
50-
g.seatManager = e.SybilProtection.SeatManager()
40+
g := New(e.NewSubModule("TotalWeightSlotGadget"), e, opts...)
5141

52-
e.Events.BlockGadget.BlockConfirmed.Hook(g.trackVotes)
53-
})
42+
e.ConstructedEvent().OnTrigger(func() {
43+
g.seatManager = e.SybilProtection.SeatManager()
5444

5545
g.storeLastFinalizedSlotFunc = func(slot iotago.SlotIndex) {
5646
if err := e.Storage.Settings().SetLatestFinalizedSlot(slot); err != nil {
5747
g.errorHandler(ierrors.Wrap(err, "failed to set latest finalized slot"))
5848
}
5949
}
6050

51+
g.ConstructedEvent().Trigger()
52+
53+
e.Events.BlockGadget.BlockConfirmed.Hook(g.trackVotes)
54+
6155
e.InitializedEvent().OnTrigger(func() {
6256
// Can't use setter here as it has a side effect.
6357
g.mutex.Lock()
@@ -67,11 +61,25 @@ func NewProvider(opts ...options.Option[Gadget]) module.Provider[*engine.Engine,
6761
g.InitializedEvent().Trigger()
6862
})
6963

70-
g.ShutdownEvent().OnTrigger(func() {
71-
g.StoppedEvent().Trigger()
72-
})
64+
e.Events.SlotGadget.LinkTo(g.events)
7365

74-
g.ConstructedEvent().Trigger()
66+
g.InitializedEvent().Trigger()
67+
})
68+
69+
return g
70+
})
71+
}
72+
73+
func New(subModule module.Module, engine *engine.Engine, opts ...options.Option[Gadget]) *Gadget {
74+
return options.Apply(&Gadget{
75+
Module: subModule,
76+
events: slotgadget.NewEvents(),
77+
slotTrackers: shrinkingmap.New[iotago.SlotIndex, *slottracker.SlotTracker](),
78+
optsSlotFinalizationThreshold: 0.67,
79+
errorHandler: engine.ErrorHandler("slotgadget"),
80+
}, opts, func(g *Gadget) {
81+
g.ShutdownEvent().OnTrigger(func() {
82+
g.StoppedEvent().Trigger()
7583
})
7684
})
7785
}

pkg/protocol/engine/filter/postsolidfilter/postsolidblockfilter/post_solid_block_filter.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,17 @@ func NewProvider(opts ...options.Option[PostSolidBlockFilter]) module.Provider[*
3838
e.Ledger.InitializedEvent().OnTrigger(func() {
3939
c.Init(e.Ledger.Account, e.BlockCache.Block, e.Ledger.RMCManager().RMC)
4040
})
41+
42+
c.InitializedEvent().Trigger()
4143
})
4244

4345
return c
4446
})
4547
}
4648

47-
func New(module module.Module, opts ...options.Option[PostSolidBlockFilter]) *PostSolidBlockFilter {
49+
func New(subModule module.Module, opts ...options.Option[PostSolidBlockFilter]) *PostSolidBlockFilter {
4850
return options.Apply(&PostSolidBlockFilter{
49-
Module: module,
51+
Module: subModule,
5052
events: postsolidfilter.NewEvents(),
5153
}, opts, func(p *PostSolidBlockFilter) {
5254
p.ShutdownEvent().OnTrigger(func() {

pkg/protocol/engine/filter/presolidfilter/presolidblockfilter/pre_solid_block_filter.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,18 @@ func NewProvider(opts ...options.Option[PreSolidBlockFilter]) module.Provider[*e
3939
})
4040

4141
e.Events.PreSolidFilter.LinkTo(f.events)
42+
43+
f.InitializedEvent().Trigger()
4244
})
4345

4446
return f
4547
})
4648
}
4749

4850
// New creates a new PreSolidBlockFilter.
49-
func New(module module.Module, apiProvider iotago.APIProvider, opts ...options.Option[PreSolidBlockFilter]) *PreSolidBlockFilter {
51+
func New(subModule module.Module, apiProvider iotago.APIProvider, opts ...options.Option[PreSolidBlockFilter]) *PreSolidBlockFilter {
5052
return options.Apply(&PreSolidBlockFilter{
51-
Module: module,
53+
Module: subModule,
5254
events: presolidfilter.NewEvents(),
5355
apiProvider: apiProvider,
5456
}, opts, func(p *PreSolidBlockFilter) {

0 commit comments

Comments
 (0)