-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnet.go
224 lines (201 loc) · 5.67 KB
/
net.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
package uiot
import (
"context"
"encoding/binary"
"fmt"
"log"
"net"
"strings"
"time"
proto "github.com/TrevorFarrelly/u-iot/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
)
const (
// Multicast information
mcastaddr = "239.0.0.0:1024"
mcastbuf = 512
mcastlen = 2
)
// barebones information about a remote device. Used for async communication between
// the multicast server and RPC client.
type remote struct {
addr string
port int
}
// Handler for all incoming and outgoing multicasts
type mcastEndpoint struct {
addr *net.UDPAddr
channel chan *remote
}
// receive incoming multicast messages
func (me *mcastEndpoint) recvMulticast() {
// set up socket
conn, err := net.ListenMulticastUDP("udp4", nil, me.addr)
if err != nil {
log.Fatalf("Could not start multicast server: %v", err)
}
conn.SetReadBuffer(mcastbuf)
buf := make([]byte, mcastlen)
// recieve packets
for {
_, src, err := conn.ReadFromUDP(buf)
if err != nil {
continue
}
// parse the data received and send it to the RPC client
addr := strings.Split(src.String(), ":")[0]
port := binary.BigEndian.Uint16(buf)
me.channel <- &remote{addr, int(port)}
}
}
// send our port to the multicast address
func (me *mcastEndpoint) sendMulticast(rpcPort int) error {
// set up socket
conn, err := net.DialUDP("udp4", nil, me.addr)
if err != nil {
return err
}
defer conn.Close()
// format and send data
buf := make([]byte, mcastlen)
binary.BigEndian.PutUint16(buf, uint16(rpcPort))
conn.Write(buf)
return nil
}
// start the multicast server and send our multicast message a moment later
func (me *mcastEndpoint) startMulticastService(rpcPort int) error {
var err error
me.addr, err = net.ResolveUDPAddr("udp4", mcastaddr)
if err != nil {
return err
}
go me.recvMulticast()
time.Sleep(100 * time.Millisecond)
if err := me.sendMulticast(rpcPort); err != nil {
return err
}
return nil
}
// Handler for all incoming and outgoing RPCs
type rpcEndpoint struct {
proto.UnimplementedDeviceServer
local *Device
network *Network
channel chan *remote
}
// RPC implementations
// Bootstrap a remote device. Add it to our network, then send our device info back
func (re *rpcEndpoint) Bootstrap(ctx context.Context, remote *proto.DevInfo) (*proto.DevInfo, error) {
// parse remote device information
device := deviceFromProto(remote)
// get address from the context
p, ok := peer.FromContext(ctx)
if ok {
addr := strings.Split(p.Addr.String(), ":")
device.addr = addr[0]
} else {
return nil, fmt.Errorf("Could not parse remote device information")
}
// add remote device info to our Network
return re.local.asProto(), re.network.addDevice(device)
}
// Call a local function, triggered by a remote device
func (re *rpcEndpoint) CallFunc(ctx context.Context, funcinfo *proto.FuncCall) (*proto.FuncRet, error) {
// get function from local device
f, ok := re.local.Funcs[funcinfo.Name]
if !ok {
return &proto.FuncRet{}, fmt.Errorf("Device does not have requested function %s", funcinfo.Name)
}
// parse parameters
var params []int
for _, p := range funcinfo.Params {
params = append(params, int(p))
}
// call function
f.F(params...)
return &proto.FuncRet{}, nil
}
// Remove a remote device from our network, and forward the message to all other knwon devices
func (re *rpcEndpoint) Quit(ctx context.Context, remote *proto.DevInfo) (*proto.Nothing, error) {
// parse remote device information
quitter := deviceFromProto(remote)
// addr field is unset, we are the first recipient.
// Parse IP before removing/forwarding
if quitter.addr == "" {
if p, ok := peer.FromContext(ctx); ok {
quitter.addr = strings.Split(p.Addr.String(), ":")[0]
} else {
return nil, fmt.Errorf("Could not parse remote device information")
}
}
// remove the device from our network
if err := re.network.removeDevice(quitter); err != nil {
return &proto.Nothing{}, nil
}
// forward the message to everyone we know about
for _, dev := range re.network.GetDevices() {
re.sendQuit(quitter, dev)
}
return &proto.Nothing{}, nil
}
func (re *rpcEndpoint) sendQuit(quitter *Device, remote *Device) error {
// construct and dial address
addr := fmt.Sprintf("%s:%d", remote.addr, remote.port)
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return err
}
// create client and send RPC
client := proto.NewDeviceClient(conn)
ctx := context.Background()
if _, err = client.Quit(ctx, quitter.asProto()); err != nil {
return err
}
return nil
}
// start the RPC server
func (re *rpcEndpoint) listenRPC(port int) error {
// set up socket
sock, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
return err
}
// start RPC server
server := grpc.NewServer()
proto.RegisterDeviceServer(server, re)
server.Serve(sock)
return nil
}
// send Bootstrap RPC to devices we receive multicasts from
func (re *rpcEndpoint) sendBootstrap() {
// get remote info from multicast server
for r := range re.channel {
// connect to remote device
addr := fmt.Sprintf("%s:%d", r.addr, r.port)
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
continue
}
client := proto.NewDeviceClient(conn)
ctx := context.Background()
// send bootstrap info
remote, err := client.Bootstrap(ctx, re.local.asProto())
if err != nil {
continue
}
// add remote device info to our Network
device := deviceFromProto(remote)
device.addr = r.addr
device.port = r.port
device.remote = true
re.network.addDevice(device)
}
}
// set up the RPC server and prepare to send Bootstrap RPCs when we receive a multicast
func (re *rpcEndpoint) startRPCService(rpcPort int) error {
go re.listenRPC(rpcPort)
time.Sleep(100 * time.Millisecond)
go re.sendBootstrap()
return nil
}