Skip to content

Commit 1631f87

Browse files
authored
Correct event relationships (#121)
* correct events.nats.token path Signed-off-by: Mike Mason <mimason@equinix.com> * add missing loggers Signed-off-by: Mike Mason <mimason@equinix.com> * subscriber.Listen is blocking so we never capture and exit with a signal Signed-off-by: Mike Mason <mimason@equinix.com> * correct policy example Signed-off-by: Mike Mason <mimason@equinix.com> * skip nacking for errors We need to dig into the errors that may be produced and create a list of errors which should be reprocessable. Otherwise we might reprocess a message which will never be succeed, wasting resources. Signed-off-by: Mike Mason <mimason@equinix.com> * create relationships for all matched relations This allows for relations to be determined based on their type. All matched relations will be created. Signed-off-by: Mike Mason <mimason@equinix.com> --------- Signed-off-by: Mike Mason <mimason@equinix.com>
1 parent e08e849 commit 1631f87

File tree

10 files changed

+91
-35
lines changed

10 files changed

+91
-35
lines changed

chart/permissions-api/templates/deployment-worker.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ spec:
7676
{{- end }}
7777
{{- if .Values.config.events.nats.token }}
7878
- name: PERMISSIONSAPI_EVENTS_SUBSCRIBER_NATS_TOKEN
79-
value: "{{ .Values.config.events.token }}"
79+
value: "{{ .Values.config.events.nats.token }}"
8080
{{- end }}
8181
{{- if .Values.config.oidc.issuer }}
8282
{{- with .Values.config.oidc.audience }}

cmd/worker.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ func worker(ctx context.Context, cfg *config.AppConfig) {
6262
logger.Fatalw("invalid spicedb policy", "error", err)
6363
}
6464

65-
engine := query.NewEngine("infratographer", spiceClient, query.WithPolicy(policy))
65+
engine := query.NewEngine("infratographer", spiceClient, query.WithPolicy(policy), query.WithLogger(logger))
6666

67-
subscriber, err := pubsub.NewSubscriber(ctx, cfg.Events.Subscriber, engine)
67+
subscriber, err := pubsub.NewSubscriber(ctx, cfg.Events.Subscriber, engine, pubsub.WithLogger(logger))
6868
if err != nil {
6969
logger.Fatalw("unable to initialize subscriber", "error", err)
7070
}
@@ -75,9 +75,13 @@ func worker(ctx context.Context, cfg *config.AppConfig) {
7575
}
7676
}
7777

78-
if err := subscriber.Listen(); err != nil {
79-
logger.Fatalw("error listening for events", "error", err)
80-
}
78+
logger.Info("Listening for events")
79+
80+
go func() {
81+
if err := subscriber.Listen(); err != nil {
82+
logger.Fatalw("error listening for events", "error", err)
83+
}
84+
}()
8185

8286
// Wait until we're told to stop
8387
sig := <-sigCh

internal/pubsub/subscriber.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ func (s *Subscriber) Subscribe(topic string) error {
8181

8282
s.changeChannels = append(s.changeChannels, msgChan)
8383

84+
s.logger.Infof("Subscribing to topic %s", topic)
85+
8486
return nil
8587
}
8688

@@ -163,6 +165,13 @@ func (s *Subscriber) processEvent(msg *message.Message) error {
163165
func (s *Subscriber) createRelationships(ctx context.Context, msg *message.Message, resource types.Resource, additionalSubjectIDs []gidx.PrefixedID) error {
164166
var relationships []types.Relationship
165167

168+
rType := s.qe.GetResourceType(resource.Type)
169+
if rType == nil {
170+
s.logger.Warnw("no resource type found for", "resource_type", resource.Type)
171+
172+
return nil
173+
}
174+
166175
// Attempt to create relationships from the message fields. If this fails, reject the message
167176
for _, id := range additionalSubjectIDs {
168177
subjResource, err := s.qe.NewResourceFromID(id)
@@ -172,13 +181,27 @@ func (s *Subscriber) createRelationships(ctx context.Context, msg *message.Messa
172181
continue
173182
}
174183

175-
relationship := types.Relationship{
176-
Resource: resource,
177-
Relation: subjResource.Type,
178-
Subject: subjResource,
179-
}
184+
for _, rel := range rType.Relationships {
185+
var relation string
186+
187+
for _, tName := range rel.Types {
188+
if tName == subjResource.Type {
189+
relation = rel.Relation
180190

181-
relationships = append(relationships, relationship)
191+
break
192+
}
193+
}
194+
195+
if relation != "" {
196+
relationship := types.Relationship{
197+
Resource: resource,
198+
Relation: relation,
199+
Subject: subjResource,
200+
}
201+
202+
relationships = append(relationships, relationship)
203+
}
204+
}
182205
}
183206

184207
if len(relationships) == 0 {
@@ -190,9 +213,7 @@ func (s *Subscriber) createRelationships(ctx context.Context, msg *message.Messa
190213
// Attempt to create the relationships in SpiceDB. If this fails, nak the message for reprocessing
191214
_, err := s.qe.CreateRelationships(ctx, relationships)
192215
if err != nil {
193-
s.logger.Errorw("error creating relationships - will reprocess", "error", err.Error())
194-
195-
return err
216+
s.logger.Errorw("error creating relationships - will not reprocess", "error", err.Error())
196217
}
197218

198219
return nil
@@ -201,9 +222,7 @@ func (s *Subscriber) createRelationships(ctx context.Context, msg *message.Messa
201222
func (s *Subscriber) deleteRelationships(ctx context.Context, msg *message.Message, resource types.Resource) error {
202223
_, err := s.qe.DeleteRelationships(ctx, resource)
203224
if err != nil {
204-
s.logger.Errorw("error deleting relationships - will reprocess", "error", err.Error())
205-
206-
return err
225+
s.logger.Errorw("error deleting relationships - will not reprocess", "error", err.Error())
207226
}
208227

209228
return nil

internal/pubsub/subscriber_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func TestNATS(t *testing.T) {
151151
return context.WithValue(ctx, contextKeyEngine, &engine)
152152
},
153153
CheckFn: func(ctx context.Context, t *testing.T, result testingx.TestResult[*Subscriber]) {
154-
require.ErrorIs(t, result.Err, eventtools.ErrNack)
154+
require.NoError(t, result.Err)
155155
},
156156
},
157157
{

internal/query/errors.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,13 @@ var (
99

1010
// ErrInvalidReference represents an error condition where a given SpiceDB object reference is for some reason invalid.
1111
ErrInvalidReference = errors.New("invalid reference")
12+
13+
// ErrInvalidNamespace represents an error when the id prefix is not found in the resource schema
14+
ErrInvalidNamespace = errors.New("invalid namespace")
15+
16+
// ErrInvalidType represents an error when a resource type is not found in the resource schema
17+
ErrInvalidType = errors.New("invalid type")
18+
19+
// ErrInvalidRelationship represents an error when no matching relationship was found
20+
ErrInvalidRelationship = errors.New("invalid relationship")
1221
)

internal/query/mock/mock.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,21 @@ func (e *Engine) NewResourceFromID(id gidx.PrefixedID) (types.Resource, error) {
104104
return out, nil
105105
}
106106

107+
// GetResourceType returns the resource type by name
108+
func (e *Engine) GetResourceType(name string) *types.ResourceType {
109+
if e.schema == nil {
110+
e.schema = iapl.DefaultPolicy().Schema()
111+
}
112+
113+
for _, resourceType := range e.schema {
114+
if resourceType.Name == name {
115+
return &resourceType
116+
}
117+
}
118+
119+
return nil
120+
}
121+
107122
// SubjectHasPermission returns nil to satisfy the Engine interface.
108123
func (e *Engine) SubjectHasPermission(ctx context.Context, subject types.Resource, action string, resource types.Resource, queryToken string) error {
109124
e.Called()

internal/query/relations.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package query
22

33
import (
44
"context"
5-
"errors"
65
"io"
76
"strings"
87

@@ -13,20 +12,14 @@ import (
1312

1413
var roleSubjectRelation = "subject"
1514

16-
var (
17-
errorInvalidNamespace = errors.New("invalid namespace")
18-
errorInvalidType = errors.New("invalid type")
19-
errorInvalidRelationship = errors.New("invalid relationship")
20-
)
21-
2215
func (e *engine) getTypeForResource(res types.Resource) (types.ResourceType, error) {
2316
for _, resType := range e.schema {
2417
if res.Type == resType.Name {
2518
return resType, nil
2619
}
2720
}
2821

29-
return types.ResourceType{}, errorInvalidType
22+
return types.ResourceType{}, ErrInvalidType
3023
}
3124

3225
func (e *engine) validateRelationship(rel types.Relationship) error {
@@ -40,6 +33,8 @@ func (e *engine) validateRelationship(rel types.Relationship) error {
4033
return err
4134
}
4235

36+
e.logger.Infow("validation relationship", "sub", subjType.Name, "rel", rel.Relation, "res", resType.Name)
37+
4338
for _, typeRel := range resType.Relationships {
4439
// If we find a relation with a name and type that matches our relationship,
4540
// return
@@ -53,7 +48,7 @@ func (e *engine) validateRelationship(rel types.Relationship) error {
5348
}
5449

5550
// No matching relationship was found, so we should return an error
56-
return errorInvalidRelationship
51+
return ErrInvalidRelationship
5752
}
5853

5954
func resourceToSpiceDBRef(namespace string, r types.Resource) *pb.ObjectReference {
@@ -441,7 +436,7 @@ func (e *engine) NewResourceFromID(id gidx.PrefixedID) (types.Resource, error) {
441436

442437
rType, ok := e.schemaPrefixMap[prefix]
443438
if !ok {
444-
return types.Resource{}, errorInvalidNamespace
439+
return types.Resource{}, ErrInvalidNamespace
445440
}
446441

447442
out := types.Resource{
@@ -451,3 +446,13 @@ func (e *engine) NewResourceFromID(id gidx.PrefixedID) (types.Resource, error) {
451446

452447
return out, nil
453448
}
449+
450+
// GetResourceType returns the resource type by name
451+
func (e *engine) GetResourceType(name string) *types.ResourceType {
452+
rType, ok := e.schemaTypeMap[name]
453+
if !ok {
454+
return nil
455+
}
456+
457+
return &rType
458+
}

internal/query/relations_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func TestRelationships(t *testing.T) {
224224
Subject: parentRes,
225225
},
226226
CheckFn: func(ctx context.Context, t *testing.T, res testingx.TestResult[[]types.Relationship]) {
227-
assert.ErrorIs(t, res.Err, errorInvalidRelationship)
227+
assert.ErrorIs(t, res.Err, ErrInvalidRelationship)
228228
},
229229
},
230230
{

internal/query/service.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Engine interface {
2121
ListRoles(ctx context.Context, resource types.Resource, queryToken string) ([]types.Role, error)
2222
DeleteRelationships(ctx context.Context, resource types.Resource) (string, error)
2323
NewResourceFromID(id gidx.PrefixedID) (types.Resource, error)
24+
GetResourceType(name string) *types.ResourceType
2425
SubjectHasPermission(ctx context.Context, subject types.Resource, action string, resource types.Resource, queryToken string) error
2526
}
2627

@@ -30,13 +31,16 @@ type engine struct {
3031
client *authzed.Client
3132
schema []types.ResourceType
3233
schemaPrefixMap map[string]types.ResourceType
34+
schemaTypeMap map[string]types.ResourceType
3335
}
3436

35-
func (e *engine) cacheSchemaPrefixes() {
37+
func (e *engine) cacheSchemaResources() {
3638
e.schemaPrefixMap = make(map[string]types.ResourceType, len(e.schema))
39+
e.schemaTypeMap = make(map[string]types.ResourceType, len(e.schema))
3740

3841
for _, res := range e.schema {
3942
e.schemaPrefixMap[res.IDPrefix] = res
43+
e.schemaTypeMap[res.Name] = res
4044
}
4145
}
4246

@@ -55,7 +59,7 @@ func NewEngine(namespace string, client *authzed.Client, options ...Option) Engi
5559
if e.schema == nil {
5660
e.schema = iapl.DefaultPolicy().Schema()
5761

58-
e.cacheSchemaPrefixes()
62+
e.cacheSchemaResources()
5963
}
6064

6165
return e
@@ -76,6 +80,6 @@ func WithPolicy(policy iapl.Policy) Option {
7680
return func(e *engine) {
7781
e.schema = policy.Schema()
7882

79-
e.cacheSchemaPrefixes()
83+
e.cacheSchemaResources()
8084
}
8185
}

policy.example.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,13 @@ actionbindings:
7171
- relationshipaction:
7272
relation: parent
7373
actionname: loadbalancer_delete
74-
- actionname: loadbalancer_create
74+
- actionname: loadbalancer_get
7575
typename: loadbalancer
7676
conditions:
7777
- rolebinding: {}
7878
- relationshipaction:
7979
relation: owner
80-
actionname: loadbalancer_create
80+
actionname: loadbalancer_get
8181
- actionname: loadbalancer_update
8282
typename: loadbalancer
8383
conditions:

0 commit comments

Comments
 (0)