Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error reporting for subject transforms in streams #5617

Merged
merged 1 commit into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
if tw[d.Cluster] > 100 {
return fmt.Errorf("total weight needs to be <= 100")
}
err := ValidateMappingDestination(d.Subject)
err := ValidateMapping(src, d.Subject)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion server/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ var (
ErrInvalidMappingDestination = errors.New("invalid mapping destination")

// ErrInvalidMappingDestinationSubject is used to error on a bad transform destination mapping
ErrInvalidMappingDestinationSubject = fmt.Errorf("%w: invalid subject", ErrInvalidMappingDestination)
ErrInvalidMappingDestinationSubject = fmt.Errorf("%w: invalid transform", ErrInvalidMappingDestination)

// ErrMappingDestinationNotUsingAllWildcards is used to error on a transform destination not using all of the token wildcards
ErrMappingDestinationNotUsingAllWildcards = fmt.Errorf("%w: not using all of the token wildcard(s)", ErrInvalidMappingDestination)
Expand Down
36 changes: 33 additions & 3 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1433,7 +1433,7 @@
"constant": "JSSourceInvalidSubjectFilter",
"code": 400,
"error_code": 10145,
"description": "source subject filter is invalid",
"description": "source transform source: {err}",
"comment": "",
"help": "",
"url": "",
Expand All @@ -1443,7 +1443,7 @@
"constant": "JSSourceInvalidTransformDestination",
"code": 400,
"error_code": 10146,
"description": "source transform destination is invalid",
"description": "source transform: {err}",
"comment": "",
"help": "",
"url": "",
Expand Down Expand Up @@ -1493,7 +1493,7 @@
"constant": "JSMirrorInvalidSubjectFilter",
"code": 400,
"error_code": 10151,
"description": "mirror subject filter is invalid",
"description": "mirror transform source: {err}",
"comment": "",
"help": "",
"url": "",
Expand All @@ -1518,5 +1518,35 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSMirrorInvalidTransformDestination",
"code": 400,
"error_code": 10154,
"description": "mirror transform: {err}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamTransformInvalidSource",
"code": 400,
"error_code": 10155,
"description": "stream transform source: {err}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamTransformInvalidDestination",
"code": 400,
"error_code": 10156,
"description": "stream transform: {err}",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
108 changes: 93 additions & 15 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,12 @@ const (
// JSMirrorInvalidStreamName mirrored stream name is invalid
JSMirrorInvalidStreamName ErrorIdentifier = 10142

// JSMirrorInvalidSubjectFilter mirror subject filter is invalid
// JSMirrorInvalidSubjectFilter mirror transform source: {err}
JSMirrorInvalidSubjectFilter ErrorIdentifier = 10151

// JSMirrorInvalidTransformDestination mirror transform: {err}
JSMirrorInvalidTransformDestination ErrorIdentifier = 10154

// JSMirrorMaxMessageSizeTooBigErr stream mirror must have max message size >= source
JSMirrorMaxMessageSizeTooBigErr ErrorIdentifier = 10030

Expand Down Expand Up @@ -308,10 +311,10 @@ const (
// JSSourceInvalidStreamName sourced stream name is invalid
JSSourceInvalidStreamName ErrorIdentifier = 10141

// JSSourceInvalidSubjectFilter source subject filter is invalid
// JSSourceInvalidSubjectFilter source transform source: {err}
JSSourceInvalidSubjectFilter ErrorIdentifier = 10145

// JSSourceInvalidTransformDestination source transform destination is invalid
// JSSourceInvalidTransformDestination source transform: {err}
JSSourceInvalidTransformDestination ErrorIdentifier = 10146

// JSSourceMaxMessageSizeTooBigErr stream source must have max message size >= target
Expand Down Expand Up @@ -446,6 +449,12 @@ const (
// JSStreamTemplateNotFoundErr template not found
JSStreamTemplateNotFoundErr ErrorIdentifier = 10068

// JSStreamTransformInvalidDestination stream transform: {err}
JSStreamTransformInvalidDestination ErrorIdentifier = 10156

// JSStreamTransformInvalidSource stream transform source: {err}
JSStreamTransformInvalidSource ErrorIdentifier = 10155

// JSStreamUpdateErrF Generic stream update error string ({err})
JSStreamUpdateErrF ErrorIdentifier = 10069

Expand Down Expand Up @@ -541,7 +550,8 @@ var (
JSMemoryResourcesExceededErr: {Code: 500, ErrCode: 10028, Description: "insufficient memory resources available"},
JSMirrorConsumerSetupFailedErrF: {Code: 500, ErrCode: 10029, Description: "{err}"},
JSMirrorInvalidStreamName: {Code: 400, ErrCode: 10142, Description: "mirrored stream name is invalid"},
JSMirrorInvalidSubjectFilter: {Code: 400, ErrCode: 10151, Description: "mirror subject filter is invalid"},
JSMirrorInvalidSubjectFilter: {Code: 400, ErrCode: 10151, Description: "mirror transform source: {err}"},
JSMirrorInvalidTransformDestination: {Code: 400, ErrCode: 10154, Description: "mirror transform: {err}"},
JSMirrorMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10030, Description: "stream mirror must have max message size >= source"},
JSMirrorMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10150, Description: "mirror with multiple subject transforms cannot also have a single subject filter"},
JSMirrorOverlappingSubjectFilters: {Code: 400, ErrCode: 10152, Description: "mirror subject filters can not overlap"},
Expand All @@ -565,8 +575,8 @@ var (
JSSourceConsumerSetupFailedErrF: {Code: 500, ErrCode: 10045, Description: "{err}"},
JSSourceDuplicateDetected: {Code: 400, ErrCode: 10140, Description: "duplicate source configuration detected"},
JSSourceInvalidStreamName: {Code: 400, ErrCode: 10141, Description: "sourced stream name is invalid"},
JSSourceInvalidSubjectFilter: {Code: 400, ErrCode: 10145, Description: "source subject filter is invalid"},
JSSourceInvalidTransformDestination: {Code: 400, ErrCode: 10146, Description: "source transform destination is invalid"},
JSSourceInvalidSubjectFilter: {Code: 400, ErrCode: 10145, Description: "source transform source: {err}"},
JSSourceInvalidTransformDestination: {Code: 400, ErrCode: 10146, Description: "source transform: {err}"},
JSSourceMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10046, Description: "stream source must have max message size >= target"},
JSSourceMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10144, Description: "source with multiple subject transforms cannot also have a single subject filter"},
JSSourceOverlappingSubjectFilters: {Code: 400, ErrCode: 10147, Description: "source filters can not overlap"},
Expand Down Expand Up @@ -611,6 +621,8 @@ var (
JSStreamTemplateCreateErrF: {Code: 500, ErrCode: 10066, Description: "{err}"},
JSStreamTemplateDeleteErrF: {Code: 500, ErrCode: 10067, Description: "{err}"},
JSStreamTemplateNotFoundErr: {Code: 404, ErrCode: 10068, Description: "template not found"},
JSStreamTransformInvalidDestination: {Code: 400, ErrCode: 10156, Description: "stream transform: {err}"},
JSStreamTransformInvalidSource: {Code: 400, ErrCode: 10155, Description: "stream transform source: {err}"},
JSStreamUpdateErrF: {Code: 500, ErrCode: 10069, Description: "{err}"},
JSStreamWrongLastMsgIDErrF: {Code: 400, ErrCode: 10070, Description: "wrong last msg ID: {id}"},
JSStreamWrongLastSequenceErrF: {Code: 400, ErrCode: 10071, Description: "wrong last sequence: {seq}"},
Expand Down Expand Up @@ -1483,14 +1495,36 @@ func NewJSMirrorInvalidStreamNameError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSMirrorInvalidStreamName]
}

// NewJSMirrorInvalidSubjectFilterError creates a new JSMirrorInvalidSubjectFilter error: "mirror subject filter is invalid"
func NewJSMirrorInvalidSubjectFilterError(opts ...ErrorOption) *ApiError {
// NewJSMirrorInvalidSubjectFilterError creates a new JSMirrorInvalidSubjectFilter error: "mirror transform source: {err}"
func NewJSMirrorInvalidSubjectFilterError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

e := ApiErrors[JSMirrorInvalidSubjectFilter]
args := e.toReplacerArgs([]interface{}{"{err}", err})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSMirrorInvalidTransformDestinationError creates a new JSMirrorInvalidTransformDestination error: "mirror transform: {err}"
func NewJSMirrorInvalidTransformDestinationError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSMirrorInvalidSubjectFilter]
e := ApiErrors[JSMirrorInvalidTransformDestination]
args := e.toReplacerArgs([]interface{}{"{err}", err})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSMirrorMaxMessageSizeTooBigError creates a new JSMirrorMaxMessageSizeTooBigErr error: "stream mirror must have max message size >= source"
Expand Down Expand Up @@ -1747,24 +1781,36 @@ func NewJSSourceInvalidStreamNameError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSSourceInvalidStreamName]
}

// NewJSSourceInvalidSubjectFilterError creates a new JSSourceInvalidSubjectFilter error: "source subject filter is invalid"
func NewJSSourceInvalidSubjectFilterError(opts ...ErrorOption) *ApiError {
// NewJSSourceInvalidSubjectFilterError creates a new JSSourceInvalidSubjectFilter error: "source transform source: {err}"
func NewJSSourceInvalidSubjectFilterError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSSourceInvalidSubjectFilter]
e := ApiErrors[JSSourceInvalidSubjectFilter]
args := e.toReplacerArgs([]interface{}{"{err}", err})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSSourceInvalidTransformDestinationError creates a new JSSourceInvalidTransformDestination error: "source transform destination is invalid"
func NewJSSourceInvalidTransformDestinationError(opts ...ErrorOption) *ApiError {
// NewJSSourceInvalidTransformDestinationError creates a new JSSourceInvalidTransformDestination error: "source transform: {err}"
func NewJSSourceInvalidTransformDestinationError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSSourceInvalidTransformDestination]
e := ApiErrors[JSSourceInvalidTransformDestination]
args := e.toReplacerArgs([]interface{}{"{err}", err})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSSourceMaxMessageSizeTooBigError creates a new JSSourceMaxMessageSizeTooBigErr error: "stream source must have max message size >= target"
Expand Down Expand Up @@ -2315,6 +2361,38 @@ func NewJSStreamTemplateNotFoundError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSStreamTemplateNotFoundErr]
}

// NewJSStreamTransformInvalidDestinationError creates a new JSStreamTransformInvalidDestination error: "stream transform: {err}"
func NewJSStreamTransformInvalidDestinationError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

e := ApiErrors[JSStreamTransformInvalidDestination]
args := e.toReplacerArgs([]interface{}{"{err}", err})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSStreamTransformInvalidSourceError creates a new JSStreamTransformInvalidSource error: "stream transform source: {err}"
func NewJSStreamTransformInvalidSourceError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

e := ApiErrors[JSStreamTransformInvalidSource]
args := e.toReplacerArgs([]interface{}{"{err}", err})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSStreamUpdateError creates a new JSStreamUpdateErrF error: "{err}"
func NewJSStreamUpdateError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
65 changes: 63 additions & 2 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11577,7 +11577,7 @@ func TestJetStreamMirrorBasics(t *testing.T) {
Name: "MBAD",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: "*", Destination: "{{wildcard(2)}}"}}},
}, ApiErrors[JSStreamCreateErrF].ErrCode)
}, ApiErrors[JSMirrorInvalidTransformDestination].ErrCode)

createStreamServerStreamConfig(&StreamConfig{
Name: "MBAD",
Expand Down Expand Up @@ -23760,6 +23760,67 @@ func TestJetStreamBadSubjectMappingStream(t *testing.T) {
},
},
})
require_Error(t, err, NewJSStreamUpdateError(errors.New("nats: source transform: invalid mapping destination: too many arguments passed to the function in {{wildcard(1)}}{{split(3,1)}}")))

require_Error(t, err, NewJSStreamUpdateError(errors.New("unable to get subject transform for source: invalid mapping destination: too many arguments passed to the function in {{wildcard(1)}}{{split(3,1)}}")))
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "test",
Sources: []*nats.StreamSource{
{
Name: "mapping",
SubjectTransforms: []nats.SubjectTransformConfig{
{
Source: "events.>.*",
Destination: "events.{{split(1,1)}}",
},
},
},
},
})
require_Error(t, err, NewJSStreamUpdateError(errors.New("nats: source transform source: invalid subject events.>.*")))

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "test",
Mirror: &nats.StreamSource{
Name: "mapping",
SubjectTransforms: []nats.SubjectTransformConfig{
{
Source: "events.*",
Destination: "events.{{split(3,1)}}",
},
},
},
})
require_Error(t, err, NewJSStreamUpdateError(errors.New("nats: mirror transform: invalid mapping destination: wildcard index out of range in {{split(3,1)}}: [3]")))

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "test",
Mirror: &nats.StreamSource{
Name: "mapping",
SubjectTransforms: []nats.SubjectTransformConfig{
{
Source: "events.>.*",
Destination: "events.{{split(1,1)}}",
},
},
},
})
require_Error(t, err, NewJSStreamUpdateError(errors.New("nats: mirror transform source: invalid subject events.>.*")))

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "test",
SubjectTransform: &nats.SubjectTransformConfig{
Source: "events.*",
Destination: "events.{{split(3,1)}}",
},
})
require_Error(t, err, NewJSStreamUpdateError(errors.New("nats: stream transform: invalid mapping destination: wildcard index out of range in {{split(3,1)}}: [3]")))

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "test",
SubjectTransform: &nats.SubjectTransformConfig{
Source: "events.>.*",
Destination: "events.{{split(1,1)}}",
},
})
require_Error(t, err, NewJSStreamUpdateError(errors.New("nats: stream transform source: invalid subject events.>.*")))
}
Loading