Skip to content

Commit b4c0a45

Browse files
authored
Eng 565 messagehandler metrics (#4833)
<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced telemetry metrics for message handling - Added new performance tracking metrics for message processing - **Refactor** - Updated method signatures to support metric recording - Restructured metrics tracking from worker-related to message-handling focused - **Chores** - Improved observability of message processing operations - Reorganized constant attributes for better metric management <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 353b140 commit b4c0a45

File tree

3 files changed

+119
-49
lines changed

3 files changed

+119
-49
lines changed

pkg/orchestrator/message_handler.go

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ import (
88

99
"github.com/google/uuid"
1010
"github.com/rs/zerolog/log"
11+
"go.opentelemetry.io/otel/attribute"
1112

1213
"github.com/bacalhau-project/bacalhau/pkg/jobstore"
1314
"github.com/bacalhau-project/bacalhau/pkg/lib/envelope"
1415
"github.com/bacalhau-project/bacalhau/pkg/models"
1516
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
17+
"github.com/bacalhau-project/bacalhau/pkg/telemetry"
1618
)
1719

1820
// MessageHandler base implementation of requester Endpoint
@@ -35,27 +37,38 @@ func (m *MessageHandler) ShouldProcess(ctx context.Context, message *envelope.Me
3537

3638
// HandleMessage handles incoming messages
3739
// TODO: handle messages arriving out of order gracefully
38-
func (m *MessageHandler) HandleMessage(ctx context.Context, message *envelope.Message) error {
39-
var err error
40+
func (m *MessageHandler) HandleMessage(ctx context.Context, message *envelope.Message) (err error) {
41+
metrics := telemetry.NewMetricRecorder(
42+
attribute.String(AttrMessageType, message.Metadata.Get(envelope.KeyMessageType)),
43+
attribute.String(AttrOutcomeKey, AttrOutcomeSuccess),
44+
)
45+
defer func() {
46+
metrics.Count(ctx, messageHandlerProcessCount)
47+
metrics.Done(ctx, messageHandlerProcessDuration)
48+
}()
49+
4050
switch message.Metadata.Get(envelope.KeyMessageType) {
4151
case messages.BidResultMessageType:
42-
err = m.OnBidComplete(ctx, message)
52+
err = m.OnBidComplete(ctx, metrics, message)
4353
case messages.RunResultMessageType:
44-
err = m.OnRunComplete(ctx, message)
54+
err = m.OnRunComplete(ctx, metrics, message)
4555
case messages.ComputeErrorMessageType:
46-
err = m.OnComputeFailure(ctx, message)
56+
err = m.OnComputeFailure(ctx, metrics, message)
4757
}
4858

49-
return m.handleError(ctx, message, err)
59+
return m.handleError(ctx, metrics, message, err)
5060
}
5161

5262
// handleError logs the error with context and returns nil.
5363
// In the future, this can be extended to handle different error types differently.
54-
func (m *MessageHandler) handleError(ctx context.Context, message *envelope.Message, err error) error {
64+
func (m *MessageHandler) handleError(ctx context.Context, metrics *telemetry.MetricRecorder, message *envelope.Message, err error) error {
5565
if err == nil {
5666
return nil
5767
}
5868

69+
metrics.Error(err)
70+
metrics.AddAttributes(attribute.String(AttrOutcomeKey, AttrOutcomeFailure))
71+
5972
// For now, just log the error and return nil
6073
logger := log.Ctx(ctx).Error()
6174
for key, value := range message.Metadata.ToMap() {
@@ -66,7 +79,7 @@ func (m *MessageHandler) handleError(ctx context.Context, message *envelope.Mess
6679
}
6780

6881
// OnBidComplete handles the completion of a bid request
69-
func (m *MessageHandler) OnBidComplete(ctx context.Context, message *envelope.Message) error {
82+
func (m *MessageHandler) OnBidComplete(ctx context.Context, metrics *telemetry.MetricRecorder, message *envelope.Message) error {
7083
result, ok := message.Payload.(*messages.BidResult)
7184
if !ok {
7285
return envelope.NewErrUnexpectedPayloadType("BidResult", reflect.TypeOf(message.Payload).String())
@@ -92,6 +105,7 @@ func (m *MessageHandler) OnBidComplete(ctx context.Context, message *envelope.Me
92105
}
93106

94107
txContext, err := m.store.BeginTx(ctx)
108+
metrics.Latency(ctx, messageHandlerProcessPartDuration, AttrPartBeginTx)
95109
if err != nil {
96110
return fmt.Errorf("failed to begin transaction: %w", err)
97111
}
@@ -101,30 +115,36 @@ func (m *MessageHandler) OnBidComplete(ctx context.Context, message *envelope.Me
101115
if err = m.store.UpdateExecution(txContext, updateRequest); err != nil {
102116
return err
103117
}
118+
metrics.Latency(ctx, messageHandlerProcessPartDuration, AttrPartUpdateExec)
104119

105120
// enqueue evaluation to allow the scheduler to either accept the bid, or find a new node
106121
err = m.enqueueEvaluation(txContext, result.JobID, result.JobType)
107122
if err != nil {
108123
return err
109124
}
125+
metrics.Latency(ctx, messageHandlerProcessPartDuration, AttrPartCreateEval)
110126

111-
return txContext.Commit()
127+
err = txContext.Commit()
128+
metrics.Latency(ctx, messageHandlerProcessPartDuration, AttrPartCommitTx)
129+
return err
112130
}
113131

114-
func (m *MessageHandler) OnRunComplete(ctx context.Context, message *envelope.Message) error {
132+
func (m *MessageHandler) OnRunComplete(ctx context.Context, metrics *telemetry.MetricRecorder, message *envelope.Message) error {
115133
result, ok := message.Payload.(*messages.RunResult)
116134
if !ok {
117135
return envelope.NewErrUnexpectedPayloadType("RunResult", reflect.TypeOf(message.Payload).String())
118136
}
119137

120138
txContext, err := m.store.BeginTx(ctx)
139+
metrics.Latency(ctx, messageHandlerProcessPartDuration, AttrPartBeginTx)
121140
if err != nil {
122141
return fmt.Errorf("failed to begin transaction: %w", err)
123142
}
124143

125144
defer txContext.Rollback() //nolint:errcheck
126145

127146
job, err := m.store.GetJob(txContext, result.JobID)
147+
metrics.Latency(ctx, messageHandlerProcessPartDuration, AttrPartGetJob)
128148
if err != nil {
129149
return err
130150
}
@@ -158,22 +178,27 @@ func (m *MessageHandler) OnRunComplete(ctx context.Context, message *envelope.Me
158178
if err = m.store.UpdateExecution(txContext, updateRequest); err != nil {
159179
return err
160180
}
181+
metrics.Latency(ctx, messageHandlerProcessPartDuration, AttrPartUpdateExec)
161182

162183
// enqueue evaluation to allow the scheduler to mark the job as completed if all executions are completed
163184
if err = m.enqueueEvaluation(txContext, result.JobID, result.JobType); err != nil {
164185
return err
165186
}
187+
metrics.Latency(ctx, messageHandlerProcessPartDuration, AttrPartCreateEval)
166188

167-
return txContext.Commit()
189+
err = txContext.Commit()
190+
metrics.Latency(ctx, messageHandlerProcessPartDuration, AttrPartCommitTx)
191+
return err
168192
}
169193

170-
func (m *MessageHandler) OnComputeFailure(ctx context.Context, message *envelope.Message) error {
194+
func (m *MessageHandler) OnComputeFailure(ctx context.Context, metrics *telemetry.MetricRecorder, message *envelope.Message) error {
171195
result, ok := message.Payload.(*messages.ComputeError)
172196
if !ok {
173197
return envelope.NewErrUnexpectedPayloadType("ComputeError", reflect.TypeOf(message.Payload).String())
174198
}
175199

176200
txContext, err := m.store.BeginTx(ctx)
201+
metrics.Latency(ctx, messageHandlerProcessPartDuration, AttrPartBeginTx)
177202
if err != nil {
178203
return fmt.Errorf("failed to begin transaction: %w", err)
179204
}
@@ -197,13 +222,17 @@ func (m *MessageHandler) OnComputeFailure(ctx context.Context, message *envelope
197222
}); err != nil {
198223
return err
199224
}
225+
metrics.Latency(ctx, messageHandlerProcessPartDuration, AttrPartUpdateExec)
200226

201227
// enqueue evaluation to allow the scheduler find other nodes, or mark the job as failed
202228
if err = m.enqueueEvaluation(txContext, result.JobID, result.JobType); err != nil {
203229
return err
204230
}
231+
metrics.Latency(ctx, messageHandlerProcessPartDuration, AttrPartCreateEval)
205232

206-
return txContext.Commit()
233+
err = txContext.Commit()
234+
metrics.Latency(ctx, messageHandlerProcessPartDuration, AttrPartCommitTx)
235+
return err
207236
}
208237

209238
// enqueueEvaluation enqueues an evaluation to allow the scheduler to either accept the bid, or find a new node

pkg/orchestrator/message_handler_test.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (suite *MessageHandlerTestSuite) TestShouldProcess() {
4242
suite.False(suite.handler.ShouldProcess(context.Background(), envelope.NewMessage(nil).WithMetadataValue(envelope.KeyMessageType, "UnknownType")))
4343
}
4444

45-
func (suite *MessageHandlerTestSuite) TestOnBidComplete_Accepted() {
45+
func (suite *MessageHandlerTestSuite) TestHandleBidAccepted() {
4646
ctx := context.Background()
4747
bidResult := &messages.BidResult{
4848
BaseResponse: messages.BaseResponse{
@@ -60,11 +60,11 @@ func (suite *MessageHandlerTestSuite) TestOnBidComplete_Accepted() {
6060
suite.mockTx.EXPECT().Commit().Return(nil)
6161
suite.mockTx.EXPECT().Rollback().Return(nil)
6262

63-
err := suite.handler.OnBidComplete(ctx, message)
63+
err := suite.handler.HandleMessage(ctx, message)
6464
suite.NoError(err)
6565
}
6666

67-
func (suite *MessageHandlerTestSuite) TestOnBidComplete_Rejected() {
67+
func (suite *MessageHandlerTestSuite) TestHandleBidRejected() {
6868
ctx := context.Background()
6969
bidResult := &messages.BidResult{
7070
BaseResponse: messages.BaseResponse{
@@ -82,11 +82,11 @@ func (suite *MessageHandlerTestSuite) TestOnBidComplete_Rejected() {
8282
suite.mockTx.EXPECT().Commit().Return(nil)
8383
suite.mockTx.EXPECT().Rollback().Return(nil)
8484

85-
err := suite.handler.OnBidComplete(ctx, message)
85+
err := suite.handler.HandleMessage(ctx, message)
8686
suite.NoError(err)
8787
}
8888

89-
func (suite *MessageHandlerTestSuite) TestOnRunComplete() {
89+
func (suite *MessageHandlerTestSuite) TestHandleRunComplete() {
9090
ctx := context.Background()
9191
runResult := &messages.RunResult{
9292
BaseResponse: messages.BaseResponse{
@@ -106,11 +106,35 @@ func (suite *MessageHandlerTestSuite) TestOnRunComplete() {
106106
suite.mockTx.EXPECT().Commit().Return(nil)
107107
suite.mockTx.EXPECT().Rollback().Return(nil)
108108

109-
err := suite.handler.OnRunComplete(ctx, message)
109+
err := suite.handler.HandleMessage(ctx, message)
110110
suite.NoError(err)
111111
}
112112

113-
func (suite *MessageHandlerTestSuite) TestOnComputeFailure() {
113+
func (suite *MessageHandlerTestSuite) TestHandleRunCompleteForLongRunningJob() {
114+
ctx := context.Background()
115+
runResult := &messages.RunResult{
116+
BaseResponse: messages.BaseResponse{
117+
ExecutionID: "exec-1",
118+
JobID: "job-1",
119+
JobType: "service",
120+
},
121+
PublishResult: &models.SpecConfig{Type: "ipfs"},
122+
RunCommandResult: &models.RunCommandResult{ExitCode: 0},
123+
}
124+
message := envelope.NewMessage(runResult).WithMetadataValue(envelope.KeyMessageType, messages.RunResultMessageType)
125+
126+
suite.mockStore.EXPECT().BeginTx(gomock.Any()).Return(suite.mockTx, nil)
127+
suite.mockStore.EXPECT().GetJob(suite.mockTx, "job-1").Return(models.Job{Type: "service"}, nil)
128+
suite.mockStore.EXPECT().UpdateExecution(suite.mockTx, gomock.Any()).Return(nil)
129+
suite.mockStore.EXPECT().CreateEvaluation(suite.mockTx, gomock.Any()).Return(nil)
130+
suite.mockTx.EXPECT().Commit().Return(nil)
131+
suite.mockTx.EXPECT().Rollback().Return(nil)
132+
133+
err := suite.handler.HandleMessage(ctx, message)
134+
suite.NoError(err)
135+
}
136+
137+
func (suite *MessageHandlerTestSuite) TestHandleComputeFailure() {
114138
ctx := context.Background()
115139
computeError := &messages.ComputeError{
116140
BaseResponse: messages.BaseResponse{
@@ -127,11 +151,11 @@ func (suite *MessageHandlerTestSuite) TestOnComputeFailure() {
127151
suite.mockTx.EXPECT().Commit().Return(nil)
128152
suite.mockTx.EXPECT().Rollback().Return(nil)
129153

130-
err := suite.handler.OnComputeFailure(ctx, message)
154+
err := suite.handler.HandleMessage(ctx, message)
131155
suite.NoError(err)
132156
}
133157

134-
func (suite *MessageHandlerTestSuite) TestOnBidComplete_PropagatesErrors() {
158+
func (suite *MessageHandlerTestSuite) TestHandleMessagePropagatesErrors() {
135159
ctx := context.Background()
136160
bidResult := &messages.BidResult{
137161
BaseResponse: messages.BaseResponse{
@@ -148,8 +172,8 @@ func (suite *MessageHandlerTestSuite) TestOnBidComplete_PropagatesErrors() {
148172
suite.mockStore.EXPECT().UpdateExecution(suite.mockTx, gomock.Any()).Return(expectedErr)
149173
suite.mockTx.EXPECT().Rollback().Return(nil)
150174

151-
err := suite.handler.OnBidComplete(ctx, message)
152-
suite.ErrorIs(err, expectedErr)
175+
err := suite.handler.HandleMessage(ctx, message)
176+
suite.NoError(err) // HandleMessage swallows errors after logging them
153177
}
154178

155179
func TestMessageHandlerTestSuite(t *testing.T) {

pkg/orchestrator/metrics.go

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,15 @@
11
package orchestrator
22

33
import (
4-
"github.com/bacalhau-project/bacalhau/pkg/telemetry"
54
"go.opentelemetry.io/otel"
65
"go.opentelemetry.io/otel/attribute"
76
"go.opentelemetry.io/otel/metric"
8-
)
97

10-
var (
11-
Meter = otel.GetMeterProvider().Meter("orchestrator")
8+
"github.com/bacalhau-project/bacalhau/pkg/telemetry"
129
)
1310

14-
// Metrics for monitoring worker
1511
var (
16-
WorkerDequeueFaults = telemetry.Must(Meter.Int64Counter(
17-
"worker_dequeue_faults",
18-
metric.WithDescription("Number of times a worker failed to dequeue an evaluation"),
19-
))
20-
21-
WorkerProcessFaults = telemetry.Must(Meter.Int64Counter(
22-
"worker_process_faults",
23-
metric.WithDescription("Number of times a worker failed to process an evaluation"),
24-
))
25-
26-
WorkerAckFaults = telemetry.Must(Meter.Int64Counter(
27-
"worker_ack_faults",
28-
metric.WithDescription("Number of times a worker failed to ack an evaluation back to the broker"),
29-
))
30-
31-
WorkerNackFaults = telemetry.Must(Meter.Int64Counter(
32-
"worker_nack_faults",
33-
metric.WithDescription("Number of times a worker failed to nack an evaluation back to the broker"),
34-
))
12+
Meter = otel.GetMeterProvider().Meter("orchestrator")
3513
)
3614

3715
// Metrics for monitoring evaluation broker
@@ -62,6 +40,45 @@ var (
6240
))
6341
)
6442

43+
// Message handler metrics
44+
var (
45+
// Message processing metrics
46+
messageHandlerProcessDuration = telemetry.Must(Meter.Float64Histogram(
47+
"message.handler.process.duration",
48+
metric.WithDescription("Time taken to process a single message"),
49+
metric.WithUnit("s"),
50+
metric.WithExplicitBucketBoundaries(telemetry.DurationMsBuckets...),
51+
))
52+
53+
messageHandlerProcessPartDuration = telemetry.Must(Meter.Float64Histogram(
54+
"message.handler.process.part.duration",
55+
metric.WithDescription("Time taken for sub-operations within message handling"),
56+
metric.WithUnit("s"),
57+
metric.WithExplicitBucketBoundaries(telemetry.DurationMsBuckets...),
58+
))
59+
60+
messageHandlerProcessCount = telemetry.Must(Meter.Int64Counter(
61+
"message.handler.process.count",
62+
metric.WithDescription("Number of messages processed"),
63+
metric.WithUnit("1"),
64+
))
65+
)
66+
67+
const (
68+
AttrEvalType = "eval_type"
69+
AttrMessageType = "message_type"
70+
71+
AttrPartBeginTx = "begin_transaction"
72+
AttrPartGetJob = "get_job"
73+
AttrPartCommitTx = "commit_transaction"
74+
AttrPartUpdateExec = "update_execution"
75+
AttrPartCreateEval = "create_evaluation"
76+
77+
AttrOutcomeKey = "outcome"
78+
AttrOutcomeSuccess = "success"
79+
AttrOutcomeFailure = "failure"
80+
)
81+
6582
func EvalTypeAttribute(evaluationType string) metric.MeasurementOption {
66-
return metric.WithAttributes(attribute.String("eval_type", evaluationType))
83+
return metric.WithAttributes(attribute.String(AttrEvalType, evaluationType))
6784
}

0 commit comments

Comments
 (0)