Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit 0cb698d

Browse files
authored
Use JSON-RPC over HTTP for communication between plugins (#297)
Signed-off-by: Bill Farner <[email protected]>
1 parent e3de881 commit 0cb698d

File tree

280 files changed

+13525
-19938
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

280 files changed

+13525
-19938
lines changed

cli/serverutil.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,15 @@ import (
55

66
log "github.com/Sirupsen/logrus"
77
"github.com/docker/infrakit/discovery"
8-
"github.com/docker/infrakit/rpc"
8+
"github.com/docker/infrakit/rpc/server"
99
)
1010

1111
// RunPlugin runs a plugin server, advertising with the provided name for discovery.
12-
// THe plugin should conform to the rpc call convention as implemented in the rpc package.
12+
// The plugin should conform to the rpc call convention as implemented in the rpc package.
1313
func RunPlugin(name string, plugin interface{}) {
14-
_, stopped, err := rpc.StartPluginAtPath(path.Join(discovery.Dir(), name), plugin)
14+
stoppable, err := server.StartPluginAtPath(path.Join(discovery.Dir(), name), plugin)
1515
if err != nil {
1616
log.Error(err)
1717
}
18-
19-
<-stopped // block until done
18+
stoppable.AwaitStopped()
2019
}

cmd/cli/flavor.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,7 @@ func flavorPluginCommand(plugins func() discovery.Plugins) *cobra.Command {
3131
return err
3232
}
3333

34-
flavorPlugin, err = flavor_plugin.NewClient(endpoint.Protocol, endpoint.Address)
35-
if err != nil {
36-
return err
37-
}
34+
flavorPlugin = flavor_plugin.NewClient(endpoint.Address)
3835

3936
return nil
4037
},

cmd/cli/group.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,7 @@ func groupPluginCommand(plugins func() discovery.Plugins) *cobra.Command {
3535
return err
3636
}
3737

38-
groupPlugin, err = group_plugin.NewClient(endpoint.Protocol, endpoint.Address)
39-
if err != nil {
40-
return err
41-
}
38+
groupPlugin = group_plugin.NewClient(endpoint.Address)
4239

4340
return nil
4441
},

cmd/cli/instance.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,7 @@ func instancePluginCommand(plugins func() discovery.Plugins) *cobra.Command {
3030
return err
3131
}
3232

33-
instancePlugin, err = instance_plugin.NewClient(endpoint.Protocol, endpoint.Address)
34-
if err != nil {
35-
return err
36-
}
33+
instancePlugin = instance_plugin.NewClient(endpoint.Address)
3734

3835
return nil
3936
},

cmd/group/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,15 @@ func main() {
4040
if err != nil {
4141
return nil, err
4242
}
43-
return instance_client.NewClient(endpoint.Protocol, endpoint.Address)
43+
return instance_client.NewClient(endpoint.Address), nil
4444
}
4545

4646
flavorPluginLookup := func(n string) (flavor.Plugin, error) {
4747
endpoint, err := plugins.Find(n)
4848
if err != nil {
4949
return nil, err
5050
}
51-
return flavor_client.NewClient(endpoint.Protocol, endpoint.Address)
51+
return flavor_client.NewClient(endpoint.Address), nil
5252
}
5353

5454
cli.RunPlugin(name, group_server.PluginServer(

cmd/manager/main.go

+4-13
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"github.com/docker/infrakit/discovery"
1010
"github.com/docker/infrakit/leader"
1111
"github.com/docker/infrakit/manager"
12-
"github.com/docker/infrakit/rpc"
1312
group_rpc "github.com/docker/infrakit/rpc/group"
1413
"github.com/docker/infrakit/store"
1514
"github.com/spf13/cobra"
@@ -52,28 +51,20 @@ func runMain(backend *backend) error {
5251

5352
log.Infoln("Starting up manager:", backend)
5453

55-
manager, err := manager.NewManager(backend.plugins,
54+
mgr, err := manager.NewManager(backend.plugins,
5655
backend.leader, backend.snapshot, backend.pluginName)
5756
if err != nil {
5857
return err
5958
}
6059

61-
_, err = manager.Start()
60+
_, err = mgr.Start()
6261
if err != nil {
6362
return err
6463
}
6564

66-
_, stopped, err := rpc.StartPluginAtPath(
67-
filepath.Join(discovery.Dir(), backend.id),
68-
group_rpc.PluginServer(manager),
69-
)
70-
if err != nil {
71-
return err
72-
}
73-
74-
<-stopped // block until done
65+
cli.RunPlugin(backend.id, group_rpc.PluginServer(mgr))
7566

76-
manager.Stop()
67+
mgr.Stop()
7768
log.Infoln("Manager stopped")
7869

7970
return err

discovery/dir_test.go

+9-11
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77
"testing"
88
"time"
99

10-
server "github.com/docker/infrakit/rpc"
1110
rpc "github.com/docker/infrakit/rpc/instance"
11+
"github.com/docker/infrakit/rpc/server"
1212
"github.com/stretchr/testify/require"
1313
)
1414

@@ -31,17 +31,15 @@ func TestDirDiscovery(t *testing.T) {
3131

3232
name1 := "server1"
3333
path1 := filepath.Join(dir, name1)
34-
stop1, errors1, err1 := server.StartPluginAtPath(path1, rpc.PluginServer(nil))
35-
require.NoError(t, err1)
36-
require.NotNil(t, stop1)
37-
require.NotNil(t, errors1)
34+
server1, err := server.StartPluginAtPath(path1, rpc.PluginServer(nil))
35+
require.NoError(t, err)
36+
require.NotNil(t, server1)
3837

3938
name2 := "server2"
4039
path2 := filepath.Join(dir, name2)
41-
stop2, errors2, err2 := server.StartPluginAtPath(path2, rpc.PluginServer(nil))
42-
require.NoError(t, err2)
43-
require.NotNil(t, stop2)
44-
require.NotNil(t, errors2)
40+
server2, err := server.StartPluginAtPath(path2, rpc.PluginServer(nil))
41+
require.NoError(t, err)
42+
require.NotNil(t, server2)
4543

4644
discover, err := newDirPluginDiscovery(dir)
4745
require.NoError(t, err)
@@ -55,7 +53,7 @@ func TestDirDiscovery(t *testing.T) {
5553
require.NotNil(t, p)
5654

5755
// Now we stop the servers
58-
close(stop1)
56+
server1.Stop()
5957
blockWhileFileExists(path1)
6058

6159
p, err = discover.Find(name1)
@@ -65,7 +63,7 @@ func TestDirDiscovery(t *testing.T) {
6563
require.NoError(t, err)
6664
require.NotNil(t, p)
6765

68-
close(stop2)
66+
server2.Stop()
6967

7068
blockWhileFileExists(path2)
7169

example/flavor/combo/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func main() {
3232
if err != nil {
3333
return nil, err
3434
}
35-
return flavor_rpc.NewClient(endpoint.Protocol, endpoint.Address)
35+
return flavor_rpc.NewClient(endpoint.Address), nil
3636
}
3737

3838
cli.SetLogLevel(logLevel)

leader/swarm/swarm_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ func TestSwarmDetector(t *testing.T) {
1919

2020
ctx := context.Background()
2121
nodeInfo := types.Info{
22-
Swarm: swarm.Info{
23-
NodeID: "node",
22+
InfoBase: &types.InfoBase{
23+
Swarm: swarm.Info{
24+
NodeID: "node",
25+
},
2426
},
2527
}
2628
node := swarm.Node{

manager/group_plugin_impl.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,8 @@ func (m *manager) proxyForGroupPlugin(name string) (group.Plugin, error) {
1818
return nil, err
1919
}
2020

21-
client, err := rpc.NewClient(endpoint.Protocol, endpoint.Address)
22-
if err != nil {
23-
return nil, err
24-
}
25-
2621
m.backendName = name
27-
return client, nil
22+
return rpc.NewClient(endpoint.Address), nil
2823
}
2924

3025
// This implements the Group Plugin interface to support single group-only operations

manager/manager.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ func (m *manager) execPlugins(config GlobalSpec, work func(group.Plugin, group.S
326326
return err
327327
}
328328

329-
gp, err := rpc.NewClient(ep.Protocol, ep.Address)
329+
gp := rpc.NewClient(ep.Address)
330330
if err != nil {
331331
log.Warningln("Cannot contact group", id, " at plugin", name, "endpoint=", ep.Address)
332332
return err

0 commit comments

Comments
 (0)