Skip to content

Commit 1029bb7

Browse files
committed
Add status change event
Will propagated to client via SSE in ith own channel, to inform clients about CRC status change. Status modified in "status_change_stream.go", as current status implementation has no single point of change, so tracking transition between states is challenging. Signed-off-by: Yevhen Vydolob <[email protected]>
1 parent 6d61cbf commit 1029bb7

File tree

5 files changed

+116
-3
lines changed

5 files changed

+116
-3
lines changed

pkg/crc/api/events/event_server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func NewEventServer(machine machine.Client) *EventServer {
5656

5757
sseServer.CreateStream(Logs)
5858
sseServer.CreateStream(ClusterLoad)
59+
sseServer.CreateStream(StatusChange)
5960
return eventServer
6061
}
6162

@@ -69,6 +70,8 @@ func createEventStream(server *EventServer, streamID string) EventStream {
6970
return newLogsStream(server)
7071
case ClusterLoad:
7172
return newClusterLoadStream(server)
73+
case StatusChange:
74+
return newStatusChangeStream(server)
7275
}
7376
return nil
7477
}

pkg/crc/api/events/events.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package events
33
import "github.com/r3labs/sse/v2"
44

55
const (
6-
Logs = "logs" // Logs event channel, contains daemon logs
7-
ClusterLoad = "cluster_load" // status event channel, contains VM load info
6+
Logs = "logs" // Logs event channel, contains daemon logs
7+
ClusterLoad = "cluster_load" // status event channel, contains VM load info
8+
StatusChange = "status_change" // status change channel, fires on 'starting', 'stopping', etc
89
)
910

1011
type EventPublisher interface {
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package events
2+
3+
import (
4+
"encoding/json"
5+
6+
"github.com/crc-org/crc/v2/pkg/crc/logging"
7+
"github.com/crc-org/crc/v2/pkg/crc/machine"
8+
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
9+
"github.com/crc-org/crc/v2/pkg/crc/machine/types"
10+
"github.com/crc-org/crc/v2/pkg/events"
11+
"github.com/r3labs/sse/v2"
12+
)
13+
14+
type serializableEvent struct {
15+
Status *types.ClusterStatusResult `json:"status"`
16+
Error string `json:"error,omitempty"`
17+
}
18+
19+
type statusChangeListener struct {
20+
machineClient machine.Client
21+
publisher EventPublisher
22+
}
23+
24+
func newStatusChangeStream(server *EventServer) EventStream {
25+
return newStream(newStatusChangeListener(server.machine), newEventPublisher(StatusChange, server.sseServer))
26+
}
27+
28+
func newStatusChangeListener(client machine.Client) EventProducer {
29+
return &statusChangeListener{
30+
machineClient: client,
31+
}
32+
}
33+
34+
func (st *statusChangeListener) Notify(changedEvent events.StatusChangedEvent) {
35+
logging.Debugf("State Changed Event %s", changedEvent)
36+
var event serializableEvent
37+
status, err := st.machineClient.Status()
38+
// if we cannot receive actual state, send error state with error description
39+
if err != nil {
40+
event = serializableEvent{Status: &types.ClusterStatusResult{
41+
CrcStatus: state.Error,
42+
}, Error: err.Error()}
43+
} else {
44+
// event could be fired, before actual code, which change state is called
45+
// so status could contain 'old' state, replace it with state received in event
46+
status.CrcStatus = changedEvent.State // override with actual reported state
47+
event = serializableEvent{Status: status}
48+
if changedEvent.Error != nil {
49+
event.Error = changedEvent.Error.Error()
50+
}
51+
52+
}
53+
data, err := json.Marshal(event)
54+
if err != nil {
55+
logging.Errorf("Could not serealize status changed event in to JSON: %s", err)
56+
return
57+
}
58+
st.publisher.Publish(&sse.Event{Event: []byte(StatusChange), Data: data})
59+
}
60+
61+
func (st *statusChangeListener) Start(publisher EventPublisher) {
62+
st.publisher = publisher
63+
events.StatusChanged.AddListener(st)
64+
65+
}
66+
67+
func (st *statusChangeListener) Stop() {
68+
events.StatusChanged.RemoveListener(st)
69+
}

pkg/crc/machine/sync.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
1111
"github.com/crc-org/crc/v2/pkg/crc/machine/types"
1212
crcPreset "github.com/crc-org/crc/v2/pkg/crc/preset"
13+
"github.com/crc-org/crc/v2/pkg/events"
1314
)
1415

1516
const startCancelTimeout = 15 * time.Second
@@ -69,6 +70,10 @@ func (s *Synchronized) Delete() error {
6970

7071
err := s.underlying.Delete()
7172
s.syncOperationDone <- Deleting
73+
74+
if err == nil {
75+
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.NoVM})
76+
}
7277
return err
7378
}
7479

@@ -80,6 +85,7 @@ func (s *Synchronized) prepareStart(startCancel context.CancelFunc) error {
8085
}
8186
s.startCancel = startCancel
8287
s.currentState = Starting
88+
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Starting})
8389

8490
return nil
8591
}
@@ -92,6 +98,13 @@ func (s *Synchronized) Start(ctx context.Context, startConfig types.StartConfig)
9298

9399
startResult, err := s.underlying.Start(ctx, startConfig)
94100
s.syncOperationDone <- Starting
101+
102+
if err == nil {
103+
events.StatusChanged.Fire(events.StatusChangedEvent{State: startResult.Status})
104+
} else {
105+
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
106+
}
107+
95108
return startResult, err
96109
}
97110

@@ -136,10 +149,16 @@ func (s *Synchronized) Stop() (state.State, error) {
136149
if err := s.prepareStopDelete(Stopping); err != nil {
137150
return state.Error, err
138151
}
152+
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopping})
139153

140154
st, err := s.underlying.Stop()
141155
s.syncOperationDone <- Stopping
142156

157+
if err == nil {
158+
events.StatusChanged.Fire(events.StatusChangedEvent{State: st})
159+
} else {
160+
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
161+
}
143162
return st, err
144163
}
145164

@@ -160,7 +179,14 @@ func (s *Synchronized) ConnectionDetails() (*types.ConnectionDetails, error) {
160179
}
161180

162181
func (s *Synchronized) PowerOff() error {
163-
return s.underlying.PowerOff()
182+
err := s.underlying.PowerOff()
183+
if err != nil {
184+
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopped})
185+
} else {
186+
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
187+
}
188+
189+
return err
164190
}
165191

166192
func (s *Synchronized) Status() (*types.ClusterStatusResult, error) {

pkg/events/events.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package events
2+
3+
import (
4+
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
5+
)
6+
7+
type StatusChangedEvent struct {
8+
State state.State
9+
Error error
10+
}
11+
12+
var (
13+
StatusChanged = NewEvent[StatusChangedEvent]()
14+
)

0 commit comments

Comments
 (0)