-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpublisher.go
149 lines (125 loc) · 3.58 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package ctrl
import (
"context"
"errors"
"github.com/google/uuid"
"github.com/metal-toolbox/rivets/v2/condition"
"github.com/metal-toolbox/rivets/v2/events"
"github.com/metal-toolbox/rivets/v2/events/registry"
"github.com/sirupsen/logrus"
orc "github.com/metal-toolbox/conditionorc/pkg/api/v1/orchestrator/client"
)
// The Publisher interface wraps the Task and StatusValue publishers into one,
// such that the caller invokes Publish and this interface takes care of publishing the status and the Task.
//
// Subsequently the Task updates is all that is to be published, replacing the statusValue updates.
type Publisher interface {
Publish(ctx context.Context, task *condition.Task[any, any], tsUpdateOnly bool) error
}
type PublisherHTTP struct {
logger *logrus.Logger
statusValuePublisher *HTTPConditionStatusPublisher
taskRepository *HTTPTaskRepository
}
func NewHTTPPublisher(
appName string,
serverID,
conditionID uuid.UUID,
conditionKind condition.Kind,
orcQueryor orc.Queryor,
logger *logrus.Logger) Publisher {
p := &PublisherHTTP{logger: logger}
httpStatusValuePublisher := NewHTTPConditionStatusPublisher(
appName,
serverID,
conditionID,
conditionKind,
orcQueryor,
logger,
)
p.statusValuePublisher = httpStatusValuePublisher.(*HTTPConditionStatusPublisher)
httpTaskRepository := NewHTTPTaskRepository(
appName,
serverID,
conditionID,
conditionKind,
orcQueryor,
logger,
)
p.taskRepository = httpTaskRepository.(*HTTPTaskRepository)
return p
}
func (p *PublisherHTTP) Publish(ctx context.Context, task *condition.Task[any, any], tsUpdateOnly bool) error {
var err error
if errTask := p.taskRepository.Publish(ctx, task, tsUpdateOnly); errTask != nil {
p.logger.WithError(errTask).Error("Task publish error")
err = errors.Join(err, errTask)
}
errSV := p.statusValuePublisher.Publish(ctx, task.Server.ID, task.State, task.Status.MustMarshal(), tsUpdateOnly)
if errSV != nil {
p.logger.WithError(errSV).Error("Status Value publish error")
err = errors.Join(err, errSV)
}
return err
}
type PublisherNATS struct {
logger *logrus.Logger
statusValuePublisher *NatsConditionStatusPublisher
taskRepository *NatsConditionTaskRepository
}
func NewNatsPublisher(
appName,
conditionID,
serverID,
facilityCode string,
conditionKind condition.Kind,
controllerID registry.ControllerID,
kvReplicas int,
stream *events.NatsJetstream,
logger *logrus.Logger,
) (Publisher, error) {
p := &PublisherNATS{logger: logger}
svPublisher, err := NewNatsConditionStatusPublisher(
appName,
conditionID,
facilityCode,
conditionKind,
controllerID,
kvReplicas,
stream,
logger,
)
if err != nil {
return nil, err
}
p.statusValuePublisher = svPublisher.(*NatsConditionStatusPublisher)
taskRepository, err := NewNatsConditionTaskRepository(
conditionID,
serverID,
facilityCode,
conditionKind,
controllerID,
kvReplicas,
stream,
logger,
)
if err != nil {
return nil, err
}
p.taskRepository = taskRepository.(*NatsConditionTaskRepository)
return p, nil
}
func (p *PublisherNATS) Publish(ctx context.Context, task *condition.Task[any, any], tsUpdateOnly bool) error {
var err error
errTask := p.taskRepository.Publish(ctx, task, tsUpdateOnly)
if errTask != nil {
p.logger.WithError(errTask).Error("Task publish error")
err = errors.Join(err, errTask)
}
errSV := p.statusValuePublisher.Publish(ctx, task.Server.ID, task.State, task.Status.MustMarshal(), tsUpdateOnly)
if errSV != nil {
p.logger.WithError(errSV).Error("Status Value publish error")
err = errors.Join(err, errSV)
}
return err
}