Skip to content

Commit a067a0f

Browse files
committed
Implement state change event
Signed-off-by: Yevhen Vydolob <[email protected]>
1 parent f152448 commit a067a0f

File tree

8 files changed

+122
-11
lines changed

8 files changed

+122
-11
lines changed

Diff for: pkg/crc/api/events/cluster_load_stream.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type TickListener struct {
2020
}
2121

2222
func newClusterLoadStream(server *EventServer) EventStream {
23-
return newStream(newStatusListener(server.machine), newEventPublisher(CLUSTER_LOAD, server.sseServer))
23+
return newStream(newStatusListener(server.machine), newEventPublisher(ClusterLoad, server.sseServer))
2424
}
2525

2626
func newStatusListener(machine crcMachine.Client) EventProducer {

Diff for: pkg/crc/api/events/event_server.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ func NewEventServer(machine machine.Client) *EventServer {
5454
stream.RemoveSubscriber(sub)
5555
}
5656

57-
sseServer.CreateStream(LOGS)
58-
sseServer.CreateStream(CLUSTER_LOAD)
57+
sseServer.CreateStream(Logs)
58+
sseServer.CreateStream(ClusterLoad)
59+
sseServer.CreateStream(StatusChange)
5960
return eventServer
6061
}
6162

@@ -65,10 +66,12 @@ func (es *EventServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
6566

6667
func createEventStream(server *EventServer, streamID string) EventStream {
6768
switch streamID {
68-
case LOGS:
69+
case Logs:
6970
return newLogsStream(server)
70-
case CLUSTER_LOAD:
71+
case ClusterLoad:
7172
return newClusterLoadStream(server)
73+
case StatusChange:
74+
return newStatusChangeStream(server)
7275
}
7376
return nil
7477
}

Diff for: pkg/crc/api/events/events.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package events
33
import "github.com/r3labs/sse/v2"
44

55
const (
6-
LOGS = "logs" // Logs event channel, contains daemon logs
7-
CLUSTER_LOAD = "cluster_load" // status event channel, contains VM load info
6+
Logs = "logs" // Logs event channel, contains daemon logs
7+
ClusterLoad = "cluster_load" // status event channel, contains VM load info
8+
StatusChange = "status_change" // status change channel, fires on 'starting', 'stopping', etc
89
)
910

1011
type EventPublisher interface {

Diff for: pkg/crc/api/events/log_stream.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func newSSEStreamHook(server *sse.Server) *streamHook {
2323
&logrus.JSONFormatter{
2424
TimestampFormat: "",
2525
DisableTimestamp: false,
26-
DisableHTMLEscape: false,
26+
DisableHTMLEscape: true,
2727
DataKey: "",
2828
FieldMap: nil,
2929
CallerPrettyfier: nil,
@@ -56,7 +56,12 @@ func (s *streamHook) Fire(entry *logrus.Entry) error {
5656
return err
5757
}
5858

59-
s.server.Publish(LOGS, &sse.Event{Event: []byte(LOGS), Data: line})
59+
// remove "Line Feed"("\n") character which add was added by json.Encoder
60+
if line[len(line)-1] == 10 {
61+
line = line[:len(line)-1]
62+
}
63+
64+
s.server.Publish(Logs, &sse.Event{Event: []byte(Logs), Data: line})
6065
return nil
6166
}
6267

Diff for: pkg/crc/api/events/status_change_stream.go

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package events
2+
3+
import (
4+
"encoding/json"
5+
6+
"github.com/crc-org/crc/v2/pkg/crc/logging"
7+
"github.com/crc-org/crc/v2/pkg/crc/machine"
8+
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
9+
"github.com/crc-org/crc/v2/pkg/crc/machine/types"
10+
"github.com/crc-org/crc/v2/pkg/events"
11+
"github.com/r3labs/sse/v2"
12+
)
13+
14+
type statusChangeEvent struct {
15+
Status *types.ClusterStatusResult `json:"status"`
16+
Error string `json:"error,omitempty"`
17+
}
18+
19+
type statusChange struct {
20+
listenerDisposable events.Disposable
21+
machineClient machine.Client
22+
}
23+
24+
func newStatusChangeStream(server *EventServer) EventStream {
25+
return newStream(newStatusChangeListener(server.machine), newEventPublisher(StatusChange, server.sseServer))
26+
}
27+
28+
func newStatusChangeListener(client machine.Client) EventProducer {
29+
return &statusChange{
30+
machineClient: client,
31+
}
32+
}
33+
34+
func (st *statusChange) Start(publisher EventPublisher) {
35+
st.listenerDisposable = events.StatusChanged.AddListener(func(changedEvent events.StatusChangedEvent) {
36+
logging.Debugf("State Changed Event %s", changedEvent)
37+
var event statusChangeEvent
38+
status, err := st.machineClient.Status()
39+
// if we cannot receive actual state, send error state with error description
40+
if err != nil {
41+
event = statusChangeEvent{Status: &types.ClusterStatusResult{
42+
CrcStatus: state.Error,
43+
}, Error: err.Error()}
44+
} else {
45+
status.CrcStatus = changedEvent.State // override with actual reported state
46+
event = statusChangeEvent{Status: status}
47+
if changedEvent.Error != nil {
48+
event.Error = changedEvent.Error.Error()
49+
}
50+
51+
}
52+
data, _ := json.Marshal(event)
53+
publisher.Publish(&sse.Event{Event: []byte(StatusChange), Data: data})
54+
})
55+
}
56+
57+
func (st *statusChange) Stop() {
58+
if st.listenerDisposable != nil {
59+
st.listenerDisposable()
60+
st.listenerDisposable = nil
61+
}
62+
}

Diff for: pkg/crc/machine/sync.go

+27-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
1111
"github.com/crc-org/crc/v2/pkg/crc/machine/types"
1212
crcPreset "github.com/crc-org/crc/v2/pkg/crc/preset"
13+
"github.com/crc-org/crc/v2/pkg/events"
1314
)
1415

1516
const startCancelTimeout = 15 * time.Second
@@ -69,6 +70,10 @@ func (s *Synchronized) Delete() error {
6970

7071
err := s.underlying.Delete()
7172
s.syncOperationDone <- Deleting
73+
74+
if err == nil {
75+
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Novm})
76+
}
7277
return err
7378
}
7479

@@ -80,6 +85,7 @@ func (s *Synchronized) prepareStart(startCancel context.CancelFunc) error {
8085
}
8186
s.startCancel = startCancel
8287
s.currentState = Starting
88+
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Starting})
8389

8490
return nil
8591
}
@@ -92,6 +98,13 @@ func (s *Synchronized) Start(ctx context.Context, startConfig types.StartConfig)
9298

9399
startResult, err := s.underlying.Start(ctx, startConfig)
94100
s.syncOperationDone <- Starting
101+
102+
if err == nil {
103+
events.StatusChanged.Fire(events.StatusChangedEvent{State: startResult.Status})
104+
} else {
105+
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
106+
}
107+
95108
return startResult, err
96109
}
97110

@@ -136,10 +149,16 @@ func (s *Synchronized) Stop() (state.State, error) {
136149
if err := s.prepareStopDelete(Stopping); err != nil {
137150
return state.Error, err
138151
}
152+
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopping})
139153

140154
st, err := s.underlying.Stop()
141155
s.syncOperationDone <- Stopping
142156

157+
if err == nil {
158+
events.StatusChanged.Fire(events.StatusChangedEvent{State: st})
159+
} else {
160+
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
161+
}
143162
return st, err
144163
}
145164

@@ -160,7 +179,14 @@ func (s *Synchronized) ConnectionDetails() (*types.ConnectionDetails, error) {
160179
}
161180

162181
func (s *Synchronized) PowerOff() error {
163-
return s.underlying.PowerOff()
182+
err := s.underlying.PowerOff()
183+
if err != nil {
184+
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopped})
185+
} else {
186+
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
187+
}
188+
189+
return err
164190
}
165191

166192
func (s *Synchronized) Status() (*types.ClusterStatusResult, error) {

Diff for: pkg/events/emitter.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,6 @@ func (e *event[T]) Fire(event T) {
4949
e.eventMutex.Lock()
5050
defer e.eventMutex.Unlock()
5151
for _, l := range e.listeners {
52-
l.Listener(event)
52+
go l.Listener(event)
5353
}
5454
}

Diff for: pkg/events/events.go

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package events
2+
3+
import (
4+
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
5+
)
6+
7+
type StatusChangedEvent struct {
8+
State state.State
9+
Error error
10+
}
11+
12+
var (
13+
StatusChanged = NewEvent[StatusChangedEvent]()
14+
)

0 commit comments

Comments
 (0)