Skip to content

Commit 1343e4d

Browse files
whynowyyhl25
authored andcommitted
feat: add sdk infomation metrics (#2208)
Signed-off-by: Derek Wang <[email protected]>
1 parent 1abb5ed commit 1343e4d

File tree

17 files changed

+375
-282
lines changed

17 files changed

+375
-282
lines changed

examples/21-simple-mono-vertex.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@ spec:
1414
sink:
1515
udsink:
1616
container:
17-
image: quay.io/numaio/numaflow-rs/sink-log:stable
17+
image: quay.io/numaio/numaflow-rs/sink-log:stable

pkg/metrics/metrics.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,22 @@ const (
3333
LabelPartitionName = "partition_name"
3434
LabelMonoVertexName = "mvtx_name"
3535

36+
LabelComponent = "component"
37+
LabelComponentName = "component_name"
38+
LabelSDKLanguage = "language"
39+
LabelSDKVersion = "version"
40+
LabelSDKType = "type" // container type, e.g sourcer, sourcetransformer, sinker, etc. see serverinfo.ContainerType
41+
3642
LabelReason = "reason"
3743
)
3844

45+
var (
46+
SDKInfo = promauto.NewGaugeVec(prometheus.GaugeOpts{
47+
Name: "sdk_info",
48+
Help: "A metric with a constant value '1', labeled by SDK information such as version, language, and type",
49+
}, []string{LabelComponent, LabelComponentName, LabelSDKType, LabelSDKVersion, LabelSDKLanguage})
50+
)
51+
3952
// Generic forwarder metrics
4053
var (
4154
// ReadMessagesCount is used to indicate the number of total messages read

pkg/sdkclient/serverinfo/serverinfo.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ func waitForServerInfo(timeout time.Duration, filePath string) (*ServerInfo, err
6767
minNumaflowVersion := serverInfo.MinimumNumaflowVersion
6868
sdkLanguage := serverInfo.Language
6969
numaflowVersion := numaflow.GetVersion().Version
70-
containerType, err := getContainerType(filePath)
71-
if err != nil {
72-
return nil, fmt.Errorf("failed to get container type: %w", err)
70+
containerType := getContainerType(filePath)
71+
if containerType == ContainerTypeUnknown {
72+
return nil, fmt.Errorf("unknown container type")
7373
}
7474

7575
// If MinimumNumaflowVersion is empty, skip the numaflow compatibility check as there was an
@@ -221,11 +221,15 @@ func checkSDKCompatibility(sdkVersion string, sdkLanguage Language, containerTyp
221221

222222
// getContainerType returns the container type from the server info file path
223223
// serverInfoFilePath is in the format of "/var/run/numaflow/{ContainerType}-server-info"
224-
func getContainerType(serverInfoFilePath string) (ContainerType, error) {
224+
func getContainerType(serverInfoFilePath string) ContainerType {
225225
splits := strings.Split(serverInfoFilePath, "/")
226-
if containerType := strings.TrimSuffix(splits[len(splits)-1], "-server-info"); containerType == "" {
227-
return "", fmt.Errorf("failed to get container type from server info file path: %s", serverInfoFilePath)
228-
} else {
229-
return ContainerType(containerType), nil
226+
containerType := ContainerType(strings.TrimSuffix(splits[len(splits)-1], "-server-info"))
227+
switch containerType {
228+
case ContainerTypeSourcer, ContainerTypeSourcetransformer, ContainerTypeSinker, ContainerTypeMapper,
229+
ContainerTypeReducer, ContainerTypeReducestreamer, ContainerTypeSessionreducer,
230+
ContainerTypeSideinput, ContainerTypeFbsinker:
231+
return containerType
232+
default:
233+
return ContainerTypeUnknown
230234
}
231235
}

pkg/sdkclient/serverinfo/serverinfo_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -186,16 +186,16 @@ func Test_CheckNumaflowCompatibility(t *testing.T) {
186186
func Test_CheckSDKCompatibility_MinimumBeingStableReleases(t *testing.T) {
187187
var testMinimumSupportedSDKVersions = sdkConstraints{
188188
Python: map[ContainerType]string{
189-
sourcer: "0.6.0rc100",
189+
ContainerTypeSourcer: "0.6.0rc100",
190190
},
191191
Go: map[ContainerType]string{
192-
sourcer: "0.6.0-z",
192+
ContainerTypeSourcer: "0.6.0-z",
193193
},
194194
Java: map[ContainerType]string{
195-
sourcer: "0.6.0-z",
195+
ContainerTypeSourcer: "0.6.0-z",
196196
},
197197
Rust: map[ContainerType]string{
198-
sourcer: "0.1.0-z",
198+
ContainerTypeSourcer: "0.1.0-z",
199199
},
200200
}
201201
tests := []struct {
@@ -283,7 +283,7 @@ func Test_CheckSDKCompatibility_MinimumBeingStableReleases(t *testing.T) {
283283
}
284284
for _, tt := range tests {
285285
t.Run(tt.name, func(t *testing.T) {
286-
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, sourcer, tt.minimumSupportedSDKVersions)
286+
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, ContainerTypeSourcer, tt.minimumSupportedSDKVersions)
287287
if tt.shouldErr {
288288
assert.Error(t, err, "Expected error")
289289
assert.Contains(t, err.Error(), tt.errMessage)
@@ -298,16 +298,16 @@ func Test_CheckSDKCompatibility_MinimumBeingStableReleases(t *testing.T) {
298298
func Test_CheckSDKCompatibility_MinimumBeingPreReleases(t *testing.T) {
299299
var testMinimumSupportedSDKVersions = sdkConstraints{
300300
Python: map[ContainerType]string{
301-
sourcer: "0.6.0b1",
301+
ContainerTypeSourcer: "0.6.0b1",
302302
},
303303
Go: map[ContainerType]string{
304-
sourcer: "0.6.0-rc2",
304+
ContainerTypeSourcer: "0.6.0-rc2",
305305
},
306306
Java: map[ContainerType]string{
307-
sourcer: "0.6.0-rc2",
307+
ContainerTypeSourcer: "0.6.0-rc2",
308308
},
309309
Rust: map[ContainerType]string{
310-
sourcer: "0.1.0-rc3",
310+
ContainerTypeSourcer: "0.1.0-rc3",
311311
},
312312
}
313313
tests := []struct {
@@ -395,7 +395,7 @@ func Test_CheckSDKCompatibility_MinimumBeingPreReleases(t *testing.T) {
395395
}
396396
for _, tt := range tests {
397397
t.Run(tt.name, func(t *testing.T) {
398-
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, sourcer, tt.minimumSupportedSDKVersions)
398+
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, ContainerTypeSourcer, tt.minimumSupportedSDKVersions)
399399
if tt.shouldErr {
400400
assert.Error(t, err, "Expected error")
401401
assert.Contains(t, err.Error(), tt.errMessage)

pkg/sdkclient/serverinfo/types.go

Lines changed: 49 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,16 @@ type ContainerType string
3232
// the string content matches the corresponding server info file name.
3333
// DO NOT change it unless the server info file name is changed.
3434
const (
35-
sourcer ContainerType = "sourcer"
36-
sourcetransformer ContainerType = "sourcetransformer"
37-
sinker ContainerType = "sinker"
38-
mapper ContainerType = "mapper"
39-
reducer ContainerType = "reducer"
40-
reducestreamer ContainerType = "reducestreamer"
41-
sessionreducer ContainerType = "sessionreducer"
42-
sideinput ContainerType = "sideinput"
43-
fbsinker ContainerType = "fb-sinker"
35+
ContainerTypeSourcer ContainerType = "sourcer"
36+
ContainerTypeSourcetransformer ContainerType = "sourcetransformer"
37+
ContainerTypeSinker ContainerType = "sinker"
38+
ContainerTypeMapper ContainerType = "mapper"
39+
ContainerTypeReducer ContainerType = "reducer"
40+
ContainerTypeReducestreamer ContainerType = "reducestreamer"
41+
ContainerTypeSessionreducer ContainerType = "sessionreducer"
42+
ContainerTypeSideinput ContainerType = "sideinput"
43+
ContainerTypeFbsinker ContainerType = "fb-sinker"
44+
ContainerTypeUnknown ContainerType = "unknown"
4445
)
4546

4647
type sdkConstraints map[Language]map[ContainerType]string
@@ -87,51 +88,51 @@ More details about version comparison can be found in the PEP 440 and semver doc
8788
var minimumSupportedSDKVersions = sdkConstraints{
8889
Python: map[ContainerType]string{
8990
// meaning the minimum supported python SDK version is 0.9.0
90-
sourcer: "0.9.0rc100",
91-
sourcetransformer: "0.9.0rc100",
92-
sinker: "0.9.0rc100",
93-
mapper: "0.9.0rc100",
94-
reducer: "0.9.0rc100",
95-
reducestreamer: "0.9.0rc100",
96-
sessionreducer: "0.9.0rc100",
97-
sideinput: "0.9.0rc100",
98-
fbsinker: "0.9.0rc100",
91+
ContainerTypeSourcer: "0.9.0rc100",
92+
ContainerTypeSourcetransformer: "0.9.0rc100",
93+
ContainerTypeSinker: "0.9.0rc100",
94+
ContainerTypeMapper: "0.9.0rc100",
95+
ContainerTypeReducer: "0.9.0rc100",
96+
ContainerTypeReducestreamer: "0.9.0rc100",
97+
ContainerTypeSessionreducer: "0.9.0rc100",
98+
ContainerTypeSideinput: "0.9.0rc100",
99+
ContainerTypeFbsinker: "0.9.0rc100",
99100
},
100101
Go: map[ContainerType]string{
101-
// meaning the minimum supported go SDK version is 0.8.0
102-
sourcer: "0.9.0-z",
103-
sourcetransformer: "0.9.0-z",
104-
sinker: "0.9.0-z",
105-
mapper: "0.9.0-z",
106-
reducer: "0.9.0-z",
107-
reducestreamer: "0.9.0-z",
108-
sessionreducer: "0.9.0-z",
109-
sideinput: "0.9.0-z",
110-
fbsinker: "0.9.0-z",
102+
// meaning the minimum supported go SDK version is 0.9.0
103+
ContainerTypeSourcer: "0.9.0-z",
104+
ContainerTypeSourcetransformer: "0.9.0-z",
105+
ContainerTypeSinker: "0.9.0-z",
106+
ContainerTypeMapper: "0.9.0-z",
107+
ContainerTypeReducer: "0.9.0-z",
108+
ContainerTypeReducestreamer: "0.9.0-z",
109+
ContainerTypeSessionreducer: "0.9.0-z",
110+
ContainerTypeSideinput: "0.9.0-z",
111+
ContainerTypeFbsinker: "0.9.0-z",
111112
},
112113
Java: map[ContainerType]string{
113-
// meaning the minimum supported java SDK version is 0.8.0
114-
sourcer: "0.9.0-z",
115-
sourcetransformer: "0.9.0-z",
116-
sinker: "0.9.0-z",
117-
mapper: "0.9.0-z",
118-
reducer: "0.9.0-z",
119-
reducestreamer: "0.9.0-z",
120-
sessionreducer: "0.9.0-z",
121-
sideinput: "0.9.0-z",
122-
fbsinker: "0.9.0-z",
114+
// meaning the minimum supported go SDK version is 0.9.0
115+
ContainerTypeSourcer: "0.9.0-z",
116+
ContainerTypeSourcetransformer: "0.9.0-z",
117+
ContainerTypeSinker: "0.9.0-z",
118+
ContainerTypeMapper: "0.9.0-z",
119+
ContainerTypeReducer: "0.9.0-z",
120+
ContainerTypeReducestreamer: "0.9.0-z",
121+
ContainerTypeSessionreducer: "0.9.0-z",
122+
ContainerTypeSideinput: "0.9.0-z",
123+
ContainerTypeFbsinker: "0.9.0-z",
123124
},
124125
Rust: map[ContainerType]string{
125-
// meaning the minimum supported rust SDK version is 0.2.0
126-
sourcer: "0.1.0-z",
127-
sourcetransformer: "0.1.0-z",
128-
sinker: "0.1.0-z",
129-
mapper: "0.1.0-z",
130-
reducer: "0.1.0-z",
131-
reducestreamer: "0.1.0-z",
132-
sessionreducer: "0.1.0-z",
133-
sideinput: "0.1.0-z",
134-
fbsinker: "0.1.0-z",
126+
// meaning the minimum supported go SDK version is 0.1.0
127+
ContainerTypeSourcer: "0.1.0-z",
128+
ContainerTypeSourcetransformer: "0.1.0-z",
129+
ContainerTypeSinker: "0.1.0-z",
130+
ContainerTypeMapper: "0.1.0-z",
131+
ContainerTypeReducer: "0.1.0-z",
132+
ContainerTypeReducestreamer: "0.1.0-z",
133+
ContainerTypeSessionreducer: "0.1.0-z",
134+
ContainerTypeSideinput: "0.1.0-z",
135+
ContainerTypeFbsinker: "0.1.0-z",
135136
},
136137
}
137138

pkg/sideinputs/manager/manager.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
2929
"github.com/numaproj/numaflow/pkg/isbsvc"
30+
"github.com/numaproj/numaflow/pkg/metrics"
3031
"github.com/numaproj/numaflow/pkg/sdkclient"
3132
"github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
3233
"github.com/numaproj/numaflow/pkg/sdkclient/sideinput"
@@ -87,6 +88,7 @@ func (sim *sideInputsManager) Start(ctx context.Context) error {
8788
if err != nil {
8889
return err
8990
}
91+
metrics.SDKInfo.WithLabelValues(dfv1.ComponentSideInputManager, fmt.Sprintf("%s-%s", sim.pipelineName, sim.sideInput.Name), string(serverinfo.ContainerTypeSideinput), serverInfo.Version, string(serverInfo.Language)).Set(1)
9092

9193
// Create a new gRPC client for Side Input
9294
sideInputClient, err := sideinput.New(serverInfo)

pkg/sinks/sink.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
154154
if err != nil {
155155
return err
156156
}
157+
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeSinker), serverInfo.Version, string(serverInfo.Language)).Set(1)
157158

158159
sdkClient, err := sinkclient.New(ctx, serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
159160
if err != nil {
@@ -183,6 +184,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
183184
if err != nil {
184185
return err
185186
}
187+
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeFbsinker), serverInfo.Version, string(serverInfo.Language)).Set(1)
186188

187189
sdkClient, err := sinkclient.New(ctx, serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize), sdkclient.WithUdsSockAddr(sdkclient.FbSinkAddr))
188190
if err != nil {

pkg/sources/source.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
199199
if err != nil {
200200
return err
201201
}
202+
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeSourcer), serverInfo.Version, string(serverInfo.Language)).Set(1)
202203

203204
srcClient, err := sourceclient.New(ctx, serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
204205
if err != nil {
@@ -238,6 +239,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
238239
if err != nil {
239240
return err
240241
}
242+
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeSourcetransformer), serverInfo.Version, string(serverInfo.Language)).Set(1)
241243

242244
srcTransformerClient, err := sourcetransformer.New(ctx, serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
243245
if err != nil {

pkg/udf/map_udf.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error {
139139
if err != nil {
140140
return err
141141
}
142+
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeMapper), serverInfo.Version, string(serverInfo.Language)).Set(1)
142143

143144
// track all the resources that need to be closed
144145
var resourcesToClose []io.Closer

pkg/udf/reduce_udf.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,15 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error {
101101
if err != nil {
102102
return err
103103
}
104+
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeReducestreamer), serverInfo.Version, string(serverInfo.Language)).Set(1)
104105
client, err = reducer.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize), sdkclient.WithUdsSockAddr(sdkclient.ReduceStreamAddr))
105106
} else {
106107
// Wait for server info to be ready
107108
serverInfo, err = serverinfo.SDKServerInfo(serverinfo.WithServerInfoFilePath(sdkclient.ReduceServerInfoFile))
108109
if err != nil {
109110
return err
110111
}
112+
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeReducer), serverInfo.Version, string(serverInfo.Language)).Set(1)
111113
client, err = reducer.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
112114
}
113115
if err != nil {
@@ -134,6 +136,7 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error {
134136
if err != nil {
135137
return err
136138
}
139+
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeSessionreducer), serverInfo.Version, string(serverInfo.Language)).Set(1)
137140

138141
client, err := sessionreducer.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
139142
if err != nil {

rust/Cargo.lock

Lines changed: 2 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)