Skip to content

Commit 49f7c9a

Browse files
authored
Merge pull request #6099 from mysteriumnetwork/control-plane
Control plane messages support
2 parents 2dfb075 + a6de045 commit 49f7c9a

File tree

4 files changed

+198
-2
lines changed

4 files changed

+198
-2
lines changed

cmd/commands/service/command.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/mysteriumnetwork/node/cmd/commands/cli/clio"
3434
"github.com/mysteriumnetwork/node/config"
3535
"github.com/mysteriumnetwork/node/config/urfavecli/clicontext"
36+
"github.com/mysteriumnetwork/node/core/control"
3637
"github.com/mysteriumnetwork/node/core/node"
3738
"github.com/mysteriumnetwork/node/metadata"
3839
"github.com/mysteriumnetwork/node/services"
@@ -80,7 +81,9 @@ func NewCommand(licenseCommandName string) *cli.Command {
8081
errorChannel: quit,
8182
}
8283
go func() {
83-
quit <- cmdService.Run(ctx)
84+
cp := control.NewControlPlane(di.BrokerConnection, cmdService.tequilapi)
85+
quit <- cmdService.Run(ctx, cp)
86+
cp.Stop()
8487
}()
8588

8689
return describeQuit(<-quit)
@@ -114,7 +117,7 @@ type serviceCommand struct {
114117
}
115118

116119
// Run runs a command
117-
func (sc *serviceCommand) Run(ctx *cli.Context) (err error) {
120+
func (sc *serviceCommand) Run(ctx *cli.Context, cp *control.ControlPlane) (err error) {
118121
serviceTypes := make([]string, 0)
119122

120123
activeServices := config.Current.GetString(config.FlagActiveServices.Name)
@@ -160,6 +163,9 @@ func (sc *serviceCommand) Run(ctx *cli.Context) (err error) {
160163
go sc.runService(startRequest)
161164
}
162165

166+
if err := cp.Start(providerID); err != nil {
167+
return err
168+
}
163169
return <-sc.errorChannel
164170
}
165171

core/control/consumer.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright (C) 2019 The "MysteriumNetwork/node" Authors.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU General Public License as published by
6+
* the Free Software Foundation, either version 3 of the License, or
7+
* (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*/
17+
18+
package control
19+
20+
import (
21+
"github.com/mysteriumnetwork/node/communication"
22+
)
23+
24+
var _ communication.MessageConsumer = (*consumer)(nil)
25+
26+
// consumer represents the pub/sub consumer of control messages
27+
type consumer struct {
28+
callback func(controlMessage) error
29+
topic communication.MessageEndpoint
30+
}
31+
32+
// GetMessageEndpoint returns endpoint where to receive messages
33+
func (c *consumer) GetMessageEndpoint() (communication.MessageEndpoint, error) {
34+
return c.topic, nil
35+
}
36+
37+
// NewMessage creates struct where message from endpoint will be serialized
38+
func (c *consumer) NewMessage() (messagePtr interface{}) {
39+
return &controlMessage{}
40+
}
41+
42+
// Consume handles messages from endpoint
43+
func (c *consumer) Consume(messagePtr interface{}) error {
44+
return c.callback(*messagePtr.(*controlMessage))
45+
}

core/control/control.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright (C) 2019 The "MysteriumNetwork/node" Authors.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU General Public License as published by
6+
* the Free Software Foundation, either version 3 of the License, or
7+
* (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*/
17+
18+
package control
19+
20+
import (
21+
"fmt"
22+
23+
"github.com/mysteriumnetwork/node/communication"
24+
"github.com/mysteriumnetwork/node/communication/nats"
25+
"github.com/mysteriumnetwork/node/tequilapi/client"
26+
)
27+
28+
type controlMessage []struct {
29+
Service string `json:"service"`
30+
Command string `json:"command"`
31+
}
32+
33+
// ControlPlane is a struct that represents the control plane of the node
34+
type ControlPlane struct {
35+
nats communication.Receiver
36+
api *client.Client
37+
identity string
38+
}
39+
40+
// NewControlPlane creates a new control plane
41+
func NewControlPlane(connection nats.Connection, api *client.Client) *ControlPlane {
42+
return &ControlPlane{
43+
nats: nats.NewReceiver(connection, communication.NewCodecJSON(), ""),
44+
api: api,
45+
}
46+
}
47+
48+
// Start starts the control plane
49+
func (c *ControlPlane) Start(identity string) error {
50+
c.identity = identity
51+
return c.nats.Receive(communication.MessageConsumer(&consumer{
52+
callback: c.handler,
53+
topic: communication.MessageEndpoint(fmt.Sprintf("%s.control-plane.v1", identity)),
54+
}))
55+
}
56+
57+
// Stop stops the control plane
58+
func (c *ControlPlane) Stop() {
59+
c.nats.ReceiveUnsubscribe(communication.MessageEndpoint(fmt.Sprintf("%s.control-plane.v1", c.identity)))
60+
}

core/control/handler.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright (C) 2019 The "MysteriumNetwork/node" Authors.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU General Public License as published by
6+
* the Free Software Foundation, either version 3 of the License, or
7+
* (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*/
17+
18+
package control
19+
20+
import (
21+
"github.com/mysteriumnetwork/node/services"
22+
"github.com/mysteriumnetwork/node/tequilapi/contract"
23+
"github.com/rs/zerolog/log"
24+
)
25+
26+
// handler is a function that handles control messages
27+
func (c *ControlPlane) handler(request controlMessage) error {
28+
currentServices, err := c.api.Services()
29+
if err != nil {
30+
return err
31+
}
32+
33+
for _, r := range request {
34+
if r.Command == "start" {
35+
log.Info().Str("service", r.Service).Msg("executing control start request")
36+
if err := c.startService(r.Service, c.identity); err != nil {
37+
log.Warn().AnErr("err", err).Msg("failed to start service")
38+
}
39+
continue
40+
}
41+
42+
for _, service := range currentServices {
43+
if service.Type != r.Service {
44+
continue
45+
}
46+
47+
if r.Command == "stop" {
48+
log.Info().Str("service", service.Type).Msg("executing control stop request")
49+
if err := c.stopService(service.ID); err != nil {
50+
log.Warn().AnErr("err", err).Msg("failed to start service")
51+
}
52+
}
53+
54+
if r.Command == "restart" {
55+
log.Info().Str("service", service.Type).Msg("executing control restart request")
56+
if err := c.stopService(service.ID); err != nil {
57+
log.Warn().AnErr("err", err).Msg("failed to stop service")
58+
}
59+
if err := c.startService(service.Type, c.identity); err != nil {
60+
log.Warn().AnErr("err", err).Msg("failed to start service")
61+
}
62+
}
63+
}
64+
}
65+
return nil
66+
}
67+
68+
func (c *ControlPlane) startService(serviceType, providerID string) error {
69+
serviceOpts, err := services.GetStartOptions(serviceType)
70+
if err != nil {
71+
return err
72+
}
73+
startRequest := contract.ServiceStartRequest{
74+
ProviderID: providerID,
75+
Type: serviceType,
76+
AccessPolicies: &contract.ServiceAccessPolicies{IDs: serviceOpts.AccessPolicyList},
77+
Options: serviceOpts,
78+
}
79+
_, err = c.api.ServiceStart(startRequest)
80+
return err
81+
}
82+
83+
func (c *ControlPlane) stopService(id string) error {
84+
return c.api.ServiceStop(id)
85+
}

0 commit comments

Comments
 (0)