Skip to content

Commit 7b4ff63

Browse files
committed
add mock HTTP server
1 parent 8cffaf6 commit 7b4ff63

33 files changed

+5256
-39
lines changed

demo.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@
55
int main(int argc, char **argv) {
66
int r;
77
kflowConfig cfg = {
8-
.URL = "http://chdev:20012/chf",
8+
.URL = "http://127.0.0.1:8999/chf",
99
.API = {
10-
.email = "will@kentik.com",
11-
.token = "81b7262feceecc94eef3ddafbc2c152f",
12-
.URL = "http://chdev:8080/api/v5",
10+
.email = "test@example.com",
11+
.token = "token",
12+
.URL = "http://127.0.0.1:8999/api/v5",
1313
},
14-
.device_id = 1001,
14+
.device_id = 1,
1515
.verbose = 1,
1616
};
1717

1818
if ((r = kflowInit(&cfg)) != 0) {
19-
printf("error initializing libkflow: %d", r);
19+
printf("error initializing libkflow: %d\n", r);
2020
exit(1);
2121
};
2222

@@ -30,12 +30,12 @@ int main(int argc, char **argv) {
3030
};
3131

3232
if ((r = kflowSend(&flow)) != 0) {
33-
printf("error sending flow: %d", r);
33+
printf("error sending flow: %d\n", r);
3434
exit(1);
3535
}
3636

3737
if ((r = kflowStop(10*1000)) != 0) {
38-
printf("error stopping libkflow: %d", r);
38+
printf("error stopping libkflow: %d\n", r);
3939
exit(1);
4040
}
4141

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package server
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"log"
7+
"net"
8+
"text/tabwriter"
9+
10+
"github.com/kentik/libkflow/chf"
11+
)
12+
13+
func Print(i int, flow chf.CHF) {
14+
buf := bytes.Buffer{}
15+
w := tabwriter.NewWriter(&buf, 0, 4, 1, ' ', 0)
16+
17+
fmt.Fprintf(w, "FLOW #%02d\n", i)
18+
fmt.Fprintf(w, " timestampNano:\t%v\n", flow.TimestampNano())
19+
fmt.Fprintf(w, " dstAs:\t%v\n", flow.DstAs())
20+
fmt.Fprintf(w, " dstGeo:\t%v\n", flow.DstGeo())
21+
fmt.Fprintf(w, " dstMac:\t%v\n", flow.DstMac())
22+
fmt.Fprintf(w, " headerLen:\t%v\n", flow.HeaderLen())
23+
fmt.Fprintf(w, " inBytes:\t%v\n", flow.InBytes())
24+
fmt.Fprintf(w, " inPkts:\t%v\n", flow.InPkts())
25+
fmt.Fprintf(w, " inputPort:\t%v\n", flow.InputPort())
26+
fmt.Fprintf(w, " ipSize:\t%v\n", flow.IpSize())
27+
fmt.Fprintf(w, " ipv4DstAddr:\t%v\n", ip(flow.Ipv4DstAddr()))
28+
fmt.Fprintf(w, " ipv4SrcAddr:\t%v\n", ip(flow.Ipv4SrcAddr()))
29+
fmt.Fprintf(w, " l4DstPort:\t%v\n", flow.L4DstPort())
30+
fmt.Fprintf(w, " l4SrcPort:\t%v\n", flow.L4SrcPort())
31+
fmt.Fprintf(w, " outputPort:\t%v\n", flow.OutputPort())
32+
fmt.Fprintf(w, " protocol:\t%v\n", flow.Protocol())
33+
fmt.Fprintf(w, " sampledPacketSize:\t%v\n", flow.SampledPacketSize())
34+
fmt.Fprintf(w, " srcAs:\t%v\n", flow.SrcAs())
35+
fmt.Fprintf(w, " srcGeo:\t%v\n", flow.SrcGeo())
36+
fmt.Fprintf(w, " srcMac:\t%v\n", flow.SrcMac())
37+
fmt.Fprintf(w, " tcpFlags:\t%v\n", flow.TcpFlags())
38+
fmt.Fprintf(w, " tos:\t%v\n", flow.Tos())
39+
fmt.Fprintf(w, " vlanIn:\t%v\n", flow.VlanIn())
40+
fmt.Fprintf(w, " vlanOut:\t%v\n", flow.VlanOut())
41+
fmt.Fprintf(w, " ipv4NextHop:\t%v\n", ip(flow.Ipv4NextHop()))
42+
fmt.Fprintf(w, " mplsType:\t%v\n", flow.MplsType())
43+
fmt.Fprintf(w, " outBytes:\t%v\n", flow.OutBytes())
44+
fmt.Fprintf(w, " outPkts:\t%v\n", flow.OutPkts())
45+
fmt.Fprintf(w, " tcpRetransmit:\t%v\n", flow.TcpRetransmit())
46+
fmt.Fprintf(w, " srcFlowTags:\t%#v\n", str(flow.SrcFlowTags()))
47+
fmt.Fprintf(w, " dstFlowTags:\t%#v\n", str(flow.DstFlowTags()))
48+
fmt.Fprintf(w, " sampleRate:\t%v\n", flow.SampleRate())
49+
fmt.Fprintf(w, " deviceId:\t%v\n", flow.DeviceId())
50+
fmt.Fprintf(w, " flowTags:\t%#v\n", str(flow.FlowTags()))
51+
fmt.Fprintf(w, " timestamp:\t%v\n", flow.Timestamp())
52+
fmt.Fprintf(w, " dstBgpAsPath:\t%#v\n", str(flow.DstBgpAsPath()))
53+
fmt.Fprintf(w, " dstBgpCommunity:\t%#v\n", str(flow.DstBgpCommunity()))
54+
fmt.Fprintf(w, " srcBgpAsPath:\t%#v\n", str(flow.SrcBgpAsPath()))
55+
fmt.Fprintf(w, " srcBgpCommunity:\t%#v\n", str(flow.SrcBgpCommunity()))
56+
fmt.Fprintf(w, " srcNextHopAs:\t%v\n", flow.SrcNextHopAs())
57+
fmt.Fprintf(w, " dstNextHopAs:\t%v\n", flow.DstNextHopAs())
58+
fmt.Fprintf(w, " srcGeoRegion:\t%v\n", flow.SrcGeoRegion())
59+
fmt.Fprintf(w, " dstGeoRegion:\t%v\n", flow.DstGeoRegion())
60+
fmt.Fprintf(w, " srcGeoCity:\t%v\n", flow.SrcGeoCity())
61+
fmt.Fprintf(w, " dstGeoCity:\t%v\n", flow.DstGeoCity())
62+
fmt.Fprintf(w, " big:\t%v\n", flow.Big())
63+
fmt.Fprintf(w, " sampleAdj:\t%v\n", flow.SampleAdj())
64+
fmt.Fprintf(w, " ipv4DstNextHop:\t%v\n", ip(flow.Ipv4DstNextHop()))
65+
fmt.Fprintf(w, " ipv4SrcNextHop:\t%v\n", ip(flow.Ipv4SrcNextHop()))
66+
fmt.Fprintf(w, " srcRoutePrefix:\t%v\n", flow.SrcRoutePrefix())
67+
fmt.Fprintf(w, " dstRoutePrefix:\t%v\n", flow.DstRoutePrefix())
68+
fmt.Fprintf(w, " srcRouteLength:\t%v\n", flow.SrcRouteLength())
69+
fmt.Fprintf(w, " dstRouteLength:\t%v\n", flow.DstRouteLength())
70+
fmt.Fprintf(w, " srcSecondAsn:\t%v\n", flow.SrcSecondAsn())
71+
fmt.Fprintf(w, " dstSecondAsn:\t%v\n", flow.DstSecondAsn())
72+
fmt.Fprintf(w, " srcThirdAsn:\t%v\n", flow.SrcThirdAsn())
73+
fmt.Fprintf(w, " dstThirdAsn:\t%v\n", flow.DstThirdAsn())
74+
fmt.Fprintf(w, " ipv6DstAddr:\t%v\n", ip(flow.Ipv6DstAddr()))
75+
fmt.Fprintf(w, " ipv6SrcAddr:\t%v\n", ip(flow.Ipv6SrcAddr()))
76+
fmt.Fprintf(w, " srcEthMac:\t%v\n", flow.SrcEthMac())
77+
fmt.Fprintf(w, " dstEthMac:\t%v\n", flow.DstEthMac())
78+
w.Flush()
79+
80+
log.Output(0, buf.String())
81+
}
82+
83+
func ip(v interface{}, _ ...error) net.IP {
84+
switch v := v.(type) {
85+
case uint32:
86+
return net.IPv4(byte(v>>24), byte(v>>16), byte(v>>8), byte(v))
87+
case []byte:
88+
return net.IP(v)
89+
default:
90+
return (net.IP)(nil)
91+
}
92+
}
93+
94+
func str(v interface{}, _ error) interface{} {
95+
return v
96+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package server
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"net"
7+
"net/http"
8+
"strconv"
9+
10+
"github.com/kentik/libkflow/api"
11+
"github.com/kentik/libkflow/chf"
12+
"zombiezen.com/go/capnproto2"
13+
)
14+
15+
type Server struct {
16+
Host net.IP
17+
Port int
18+
Email string
19+
Token string
20+
Device api.Device
21+
mux *http.ServeMux
22+
listener net.Listener
23+
}
24+
25+
func New(host string, port int) (*Server, error) {
26+
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(host, strconv.Itoa(port)))
27+
if err != nil {
28+
return nil, err
29+
}
30+
31+
listener, err := net.ListenTCP("tcp", addr)
32+
if err != nil {
33+
return nil, err
34+
}
35+
36+
addr = listener.Addr().(*net.TCPAddr)
37+
38+
return &Server{
39+
Host: addr.IP,
40+
Port: addr.Port,
41+
mux: http.NewServeMux(),
42+
listener: listener,
43+
}, nil
44+
}
45+
46+
func (s *Server) Serve(email, token string, dev api.Device) error {
47+
s.Email = email
48+
s.Token = token
49+
s.Device = dev
50+
s.mux.HandleFunc("/api/v5/device/", s.wrap(s.device))
51+
s.mux.HandleFunc("/chf", s.wrap(s.flow))
52+
return http.Serve(s.listener, s.mux)
53+
}
54+
55+
func (s *Server) device(w http.ResponseWriter, r *http.Request) {
56+
var did int
57+
58+
n, err := fmt.Sscanf(r.URL.Path, "/api/v5/device/%d", &did)
59+
if n != 1 || err != nil {
60+
panic(http.StatusBadRequest)
61+
}
62+
63+
if did != s.Device.ID {
64+
panic(http.StatusNotFound)
65+
}
66+
67+
w.Header().Set("Content-Type", "application/json")
68+
err = json.NewEncoder(w).Encode(&api.DeviceResponse{
69+
Device: s.Device,
70+
})
71+
72+
if err != nil {
73+
panic(http.StatusInternalServerError)
74+
}
75+
}
76+
77+
func (s *Server) flow(w http.ResponseWriter, r *http.Request) {
78+
if r.FormValue("sid") != "0" {
79+
panic(http.StatusBadRequest)
80+
}
81+
82+
if r.FormValue("sender_id") != s.Device.ClientID() {
83+
panic(http.StatusBadRequest)
84+
}
85+
86+
cid := [80]byte{}
87+
n, err := r.Body.Read(cid[:])
88+
if err != nil || n != len(cid) {
89+
panic(http.StatusBadRequest)
90+
}
91+
92+
msg, err := capnp.NewPackedDecoder(r.Body).Decode()
93+
defer r.Body.Close()
94+
if err != nil {
95+
panic(http.StatusBadRequest)
96+
}
97+
98+
root, err := chf.ReadRootPackedCHF(msg)
99+
if err != nil {
100+
panic(http.StatusBadRequest)
101+
}
102+
103+
msgs, err := root.Msgs()
104+
if err != nil {
105+
panic(http.StatusBadRequest)
106+
}
107+
108+
for i := 0; i < msgs.Len(); i++ {
109+
Print(i, msgs.At(i))
110+
}
111+
}
112+
113+
func (s *Server) wrap(f handler) handler {
114+
return func(w http.ResponseWriter, r *http.Request) {
115+
defer func() {
116+
if r := recover(); r != nil {
117+
if code, ok := r.(int); ok {
118+
http.Error(w, http.StatusText(code), code)
119+
return
120+
}
121+
panic(r)
122+
}
123+
}()
124+
125+
email := r.Header.Get("X-CH-Auth-Email")
126+
token := r.Header.Get("X-CH-Auth-API-Token")
127+
128+
if email != s.Email || token != s.Token {
129+
panic(http.StatusUnauthorized)
130+
}
131+
132+
if err := r.ParseForm(); err != nil {
133+
panic(http.StatusBadRequest)
134+
}
135+
136+
f(w, r)
137+
}
138+
}
139+
140+
type handler func(http.ResponseWriter, *http.Request)

src/github.com/kentik/libkflow/api/types.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@ type DeviceResponse struct {
1111
}
1212

1313
type Device struct {
14-
ID string `json:"id"`
14+
ID int `json:"id,string"`
1515
Name string `json:"device_name"`
16-
CompanyID string `json:"company_id"`
16+
CompanyID int `json:"company_id,string"`
1717
Custom CustomColumns `json:"custom_columns"`
1818
}
1919

2020
type CustomColumns map[string]uint64
2121

2222
func (d *Device) ClientID() string {
23-
return fmt.Sprintf("%s:%s:%s", d.CompanyID, d.Name, d.ID)
23+
return fmt.Sprintf("%d:%s:%d", d.CompanyID, d.Name, d.ID)
2424
}
2525

2626
func (c *CustomColumns) UnmarshalJSON(data []byte) error {
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"os"
6+
7+
"github.com/jessevdk/go-flags"
8+
"github.com/kentik/libkflow/api"
9+
"github.com/kentik/libkflow/api/server"
10+
)
11+
12+
type Args struct {
13+
Host string `short:"h" description:"listen on host"`
14+
Port int `short:"p" description:"listen on port"`
15+
Email string `long:"email" description:"API auth email"`
16+
Token string `long:"token" description:"API auth token"`
17+
CompanyID int `long:"company-id" description:"company ID "`
18+
DeviceID int `long:"device-id" description:"device ID "`
19+
DeviceName string `long:"device-name" description:"device name "`
20+
}
21+
22+
func main() {
23+
args := Args{
24+
Host: "127.0.0.1",
25+
Port: 8999,
26+
27+
Token: "token",
28+
CompanyID: 1,
29+
DeviceID: 1,
30+
DeviceName: "dev1",
31+
}
32+
33+
parser := flags.NewParser(&args, flags.PassDoubleDash|flags.HelpFlag)
34+
if _, err := parser.Parse(); err != nil {
35+
switch err.(*flags.Error).Type {
36+
case flags.ErrHelp:
37+
parser.WriteHelp(os.Stderr)
38+
os.Exit(1)
39+
default:
40+
log.Fatal(err)
41+
}
42+
}
43+
44+
s, err := server.New(args.Host, args.Port)
45+
if err != nil {
46+
log.Fatal(err)
47+
}
48+
49+
log.Printf("listening on %s:%d", s.Host, s.Port)
50+
51+
err = s.Serve(args.Email, args.Token, api.Device{
52+
ID: args.DeviceID,
53+
Name: args.DeviceName,
54+
CompanyID: args.CompanyID,
55+
})
56+
57+
if err != nil {
58+
log.Fatal(err)
59+
}
60+
}

src/github.com/kentik/libkflow/kflow.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,18 @@ typedef struct {
2525
uint32_t dstAs; // destination AS
2626
uint32_t dstGeo; // IGNORE
2727
uint32_t dstMac; // IGNORE
28-
uint32_t headerLen; //
28+
uint32_t headerLen; // IGNORE
2929
uint64_t inBytes; // number of bytes in
3030
uint64_t inPkts; // number of packets in
3131
uint32_t inputPort; // input interface identifier
32-
uint32_t ipSize; //
32+
uint32_t ipSize; // IGNORE
3333
uint32_t ipv4DstAddr; // IPv4 dst address
34-
uint32_t ipv4SrcAddr; // IPv6 src address
34+
uint32_t ipv4SrcAddr; // IPv4 src address
3535
uint32_t l4DstPort; // layer 4 dst port
3636
uint32_t l4SrcPort; // layer 4 src port
3737
uint32_t outputPort; // output interface identifier
3838
uint32_t protocol; // IP protocol number
39-
uint32_t sampledPacketSize; //
39+
uint32_t sampledPacketSize; // IGNORE
4040
uint32_t srcAs; // source AS
4141
uint32_t srcGeo; // IGNORE
4242
uint32_t srcMac; // IGNORE
@@ -45,7 +45,7 @@ typedef struct {
4545
uint32_t vlanIn; // input VLAN number
4646
uint32_t vlanOut; // output VLAN number
4747
uint32_t ipv4NextHop; // IPv4 next-hop address
48-
uint32_t mplsType; //
48+
uint32_t mplsType; // IGNORE
4949
uint64_t outBytes; // number of bytes out
5050
uint64_t outPkts; // number of packets out
5151
uint32_t tcpRetransmit; // number of packets retransmitted

0 commit comments

Comments
 (0)