Skip to content

Commit

Permalink
Rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
tpaschalis committed Aug 1, 2024
1 parent 18fb637 commit fdcf676
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 47 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Main (unreleased)

- `mimir.rules.kubernetes` is now able to add extra labels to the Prometheus rules. (@psychomantys)

<<<<<<< HEAD
<<<<<<< HEAD
- Upgrade from OpenTelemetry v0.102.1 to v0.105.0.
- [`otelcol.receiver.*`] A new `compression_algorithms` attribute to configure which
Expand Down Expand Up @@ -125,7 +126,7 @@ Main (unreleased)
- Full list of changes: https://github.com/grafana/beyla/releases/tag/v1.7.0

- Enable instances connected to remotecfg-compatible servers to Register
themselves from the remote service. (@tpaschalis)
themselves to the remote service. (@tpaschalis)

### Bugfixes

Expand Down
22 changes: 2 additions & 20 deletions internal/service/remotecfg/remotecfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,8 @@ var _ service.Service = (*Service)(nil)
func (s *Service) Run(ctx context.Context, host service.Host) error {
s.ctrl = host.NewController(ServiceName)

s.registerCollector()
defer s.unregisterCollector()

s.fetch()
s.registerCollector()

// Run the service's own controller.
go func() {
Expand Down Expand Up @@ -292,6 +290,7 @@ func (s *Service) Update(newConfig any) error {

// Update the args as the last step to avoid polluting any comparisons
s.args = newArgs
s.registerCollector()
s.mut.Unlock()

// If we've already called Run, then immediately trigger an API call with
Expand All @@ -313,14 +312,12 @@ func (s *Service) fetch() {
}

func (s *Service) registerCollector() error {
s.mut.RLock()
req := connect.NewRequest(&collectorv1.RegisterCollectorRequest{
Id: s.args.ID,
Attributes: s.attrs,
Name: s.args.Name,
})
client := s.asClient
s.mut.RUnlock()

_, err := client.RegisterCollector(context.Background(), req)
if err != nil {
Expand All @@ -329,21 +326,6 @@ func (s *Service) registerCollector() error {
return nil
}

func (s *Service) unregisterCollector() error {
s.mut.RLock()
req := connect.NewRequest(&collectorv1.UnregisterCollectorRequest{
Id: s.args.ID,
})
client := s.asClient
s.mut.RUnlock()

_, err := client.UnregisterCollector(context.Background(), req)
if err != nil {
return err
}
return nil
}

func (s *Service) fetchRemote() error {
if !s.isEnabled() {
return nil
Expand Down
32 changes: 6 additions & 26 deletions internal/service/remotecfg/remotecfg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ func TestOnDiskCache(t *testing.T) {
client := &collectorClient{}
env.svc.asClient = client

var registerCalled, unregisterCalled atomic.Bool
var registerCalled atomic.Bool
client.registerCollectorFunc = buildRegisterCollectorFunc(&registerCalled)
client.unregisterCollectorFunc = buildUnregisterCollectorFunc(&unregisterCalled)

// Mock client to return an unparseable response.
client.getConfigFunc = buildGetConfigHandler("unparseable config")
Expand Down Expand Up @@ -83,11 +82,10 @@ func TestAPIResponse(t *testing.T) {
env.svc.asClient = client

// Mock client to return a valid response.
var registerCalled, unregisterCalled atomic.Bool
var registerCalled atomic.Bool
client.mut.Lock()
client.getConfigFunc = buildGetConfigHandler(cfg1)
client.registerCollectorFunc = buildRegisterCollectorFunc(&registerCalled)
client.unregisterCollectorFunc = buildUnregisterCollectorFunc(&unregisterCalled)
client.mut.Unlock()

// Run the service.
Expand All @@ -114,7 +112,6 @@ func TestAPIResponse(t *testing.T) {
}, 1*time.Second, 10*time.Millisecond)

cancel()
require.Eventually(t, func() bool { return unregisterCalled.Load() }, 1*time.Second, 10*time.Millisecond)
}

func buildGetConfigHandler(in string) func(context.Context, *connect.Request[collectorv1.GetConfigRequest]) (*connect.Response[collectorv1.GetConfigResponse], error) {
Expand All @@ -137,15 +134,6 @@ func buildRegisterCollectorFunc(called *atomic.Bool) func(ctx context.Context, r
}
}

func buildUnregisterCollectorFunc(called *atomic.Bool) func(ctx context.Context, req *connect.Request[collectorv1.UnregisterCollectorRequest]) (*connect.Response[collectorv1.UnregisterCollectorResponse], error) {
return func(ctx context.Context, req *connect.Request[collectorv1.UnregisterCollectorRequest]) (*connect.Response[collectorv1.UnregisterCollectorResponse], error) {
called.Store(true)
return &connect.Response[collectorv1.UnregisterCollectorResponse]{
Msg: &collectorv1.UnregisterCollectorResponse{},
}, nil
}
}

type testEnvironment struct {
t *testing.T
svc *Service
Expand Down Expand Up @@ -216,10 +204,9 @@ func (f fakeHost) NewController(id string) service.Controller {
}

type collectorClient struct {
mut sync.RWMutex
getConfigFunc func(context.Context, *connect.Request[collectorv1.GetConfigRequest]) (*connect.Response[collectorv1.GetConfigResponse], error)
registerCollectorFunc func(ctx context.Context, req *connect.Request[collectorv1.RegisterCollectorRequest]) (*connect.Response[collectorv1.RegisterCollectorResponse], error)
unregisterCollectorFunc func(ctx context.Context, req *connect.Request[collectorv1.UnregisterCollectorRequest]) (*connect.Response[collectorv1.UnregisterCollectorResponse], error)
mut sync.RWMutex
getConfigFunc func(context.Context, *connect.Request[collectorv1.GetConfigRequest]) (*connect.Response[collectorv1.GetConfigResponse], error)
registerCollectorFunc func(ctx context.Context, req *connect.Request[collectorv1.RegisterCollectorRequest]) (*connect.Response[collectorv1.RegisterCollectorResponse], error)
}

func (ag *collectorClient) GetConfig(ctx context.Context, req *connect.Request[collectorv1.GetConfigRequest]) (*connect.Response[collectorv1.GetConfigResponse], error) {
Expand All @@ -245,14 +232,7 @@ func (ag *collectorClient) RegisterCollector(ctx context.Context, req *connect.R
}

func (ag *collectorClient) UnregisterCollector(ctx context.Context, req *connect.Request[collectorv1.UnregisterCollectorRequest]) (*connect.Response[collectorv1.UnregisterCollectorResponse], error) {
ag.mut.RLock()
defer ag.mut.RUnlock()

if ag.unregisterCollectorFunc != nil {
return ag.unregisterCollectorFunc(ctx, req)
}

panic("unregisterCollectorFunc not set")
panic("unregisterCollector isn't wired yet")
}

type serviceController struct {
Expand Down

0 comments on commit fdcf676

Please sign in to comment.