Skip to content

Commit 26b9ab9

Browse files
committed
common: moved UnixMillis to flow and graph packages
1 parent 4b654fc commit 26b9ab9

File tree

19 files changed

+59
-70
lines changed

19 files changed

+59
-70
lines changed

common/types.go

-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package common
1919

2020
import (
2121
"errors"
22-
"time"
2322
)
2423

2524
var (
@@ -45,11 +44,6 @@ const (
4544
SortDescending SortOrder = "DESC"
4645
)
4746

48-
// UnixMillis returns the current time in miliseconds
49-
func UnixMillis(t time.Time) int64 {
50-
return t.UTC().UnixNano() / 1000000
51-
}
52-
5347
// Metric defines a common metric interface
5448
type Metric interface {
5549
// part of the Getter interface

flow/ebpf.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ import (
2828
"syscall"
2929
"time"
3030
"unsafe"
31-
32-
"github.com/skydive-project/skydive/common"
3331
)
3432

3533
// #cgo CFLAGS: -I../ebpf
@@ -58,7 +56,7 @@ func tcpFlagTime(currFlagTime C.__u64, startKTimeNs int64, start time.Time) int6
5856
if currFlagTime == 0 {
5957
return 0
6058
}
61-
return common.UnixMillis(start.Add(time.Duration(int64(currFlagTime) - startKTimeNs)))
59+
return UnixMilli(start.Add(time.Duration(int64(currFlagTime) - startKTimeNs)))
6260
}
6361

6462
func kernLayersPath(kernFlow *C.struct_flow) (string, bool) {
@@ -114,8 +112,8 @@ func (ft *Table) newFlowFromEBPF(ebpfFlow *EBPFFlow, key uint64) ([]uint64, []*F
114112
var keys []uint64
115113

116114
f := NewFlow()
117-
f.Init(common.UnixMillis(ebpfFlow.Start), "", &ft.uuids)
118-
f.Last = common.UnixMillis(ebpfFlow.Last)
115+
f.Init(UnixMilli(ebpfFlow.Start), "", &ft.uuids)
116+
f.Last = UnixMilli(ebpfFlow.Last)
119117

120118
f.Metric = &FlowMetric{
121119
ABBytes: int64(ebpfFlow.KernFlow.metrics.ab_bytes),
@@ -194,8 +192,8 @@ func (ft *Table) newFlowFromEBPF(ebpfFlow *EBPFFlow, key uint64) ([]uint64, []*F
194192

195193
// inner layer
196194
f = NewFlow()
197-
f.Init(common.UnixMillis(ebpfFlow.Start), parent.UUID, &ft.uuids)
198-
f.Last = common.UnixMillis(ebpfFlow.Last)
195+
f.Init(UnixMilli(ebpfFlow.Start), parent.UUID, &ft.uuids)
196+
f.Last = UnixMilli(ebpfFlow.Last)
199197
f.LayersPath = innerLayerPath
200198
}
201199

@@ -319,7 +317,7 @@ func isABPacket(ebpfFlow *EBPFFlow, f *Flow) bool {
319317
}
320318

321319
func (ft *Table) updateFlowFromEBPF(ebpfFlow *EBPFFlow, f *Flow) bool {
322-
last := common.UnixMillis(ebpfFlow.Last)
320+
last := UnixMilli(ebpfFlow.Last)
323321
if last == f.Last {
324322
return false
325323
}

flow/flow.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"github.com/pierrec/xxHash/xxHash64"
3434
"github.com/spf13/cast"
3535

36-
"github.com/skydive-project/skydive/common"
3736
"github.com/skydive-project/skydive/config"
3837
fl "github.com/skydive-project/skydive/flow/layers"
3938
"github.com/skydive-project/skydive/graffiti/getter"
@@ -69,6 +68,12 @@ type flowState struct {
6968
ipv6 *layers.IPv6
7069
}
7170

71+
// UnixMilli returns t as a Unix time, the number of milliseconds elapsed
72+
// since January 1, 1970 UTC.
73+
func UnixMilli(t time.Time) int64 {
74+
return t.UTC().UnixNano() / 1000000
75+
}
76+
7277
// Packet describes one packet
7378
type Packet struct {
7479
GoPacket gopacket.Packet // orignal gopacket
@@ -583,7 +588,7 @@ func (f *Flow) Init(now int64, parentUUID string, uuids *UUIDs) {
583588

584589
// initFromPacket initializes the flow based on packet data, flow key and ids
585590
func (f *Flow) initFromPacket(key, l2Key, l3Key uint64, packet *Packet, parentUUID string, uuids *UUIDs, opts *Opts) {
586-
now := common.UnixMillis(packet.GoPacket.Metadata().CaptureInfo.Timestamp)
591+
now := UnixMilli(packet.GoPacket.Metadata().CaptureInfo.Timestamp)
587592
f.Init(now, parentUUID, uuids)
588593

589594
f.newLinkLayer(packet)
@@ -607,7 +612,7 @@ func (f *Flow) initFromPacket(key, l2Key, l3Key uint64, packet *Packet, parentUU
607612

608613
// Update a flow metrics and latency
609614
func (f *Flow) Update(packet *Packet, opts *Opts) {
610-
now := common.UnixMillis(packet.GoPacket.Metadata().CaptureInfo.Timestamp)
615+
now := UnixMilli(packet.GoPacket.Metadata().CaptureInfo.Timestamp)
611616
f.Last = now
612617
f.Metric.Last = now
613618

@@ -956,7 +961,7 @@ func (f *Flow) updateTCPMetrics(packet *Packet) error {
956961
return nil
957962
}
958963

959-
captureTime := common.UnixMillis(metadata.CaptureInfo.Timestamp)
964+
captureTime := UnixMilli(metadata.CaptureInfo.Timestamp)
960965

961966
switch {
962967
case tcpPacket.SYN:

flow/probes/targets/netflow_v5.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"github.com/google/gopacket"
3030

3131
"github.com/skydive-project/skydive/api/types"
32-
"github.com/skydive-project/skydive/common"
3332
"github.com/skydive-project/skydive/config"
3433
"github.com/skydive-project/skydive/flow"
3534
"github.com/skydive-project/skydive/graffiti/graph"
@@ -255,7 +254,7 @@ func NewNetFlowV5Target(g *graph.Graph, n *graph.Node, capture *types.Capture, u
255254
nf := &NetFlowV5Target{
256255
target: capture.Target,
257256
sysBoot: now,
258-
sysBootMs: common.UnixMillis(now),
257+
sysBootMs: flow.UnixMilli(now),
259258
}
260259

261260
nf.table = flow.NewTable(updateEvery, expireAfter, nf, uuids, tableOptsFromCapture(capture))

flow/table.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func (ft *Table) expire(expireBefore int64) {
276276
}
277277

278278
func (ft *Table) updateAt(now time.Time) {
279-
updateTime := common.UnixMillis(now)
279+
updateTime := UnixMilli(now)
280280
ft.update(ft.lastUpdate, updateTime)
281281
ft.lastUpdate = updateTime
282282
ft.updateVersion++
@@ -350,7 +350,7 @@ func (ft *Table) expireNow() {
350350

351351
func (ft *Table) expireAt(now time.Time) {
352352
ft.expire(ft.lastExpire)
353-
ft.lastExpire = common.UnixMillis(now)
353+
ft.lastExpire = UnixMilli(now)
354354
}
355355

356356
func (ft *Table) onQuery(tq *TableQuery) []byte {
@@ -427,7 +427,7 @@ func (ft *Table) packetToFlow(packet *Packet, parentUUID string) *Flow {
427427
flow.RawPacketsCaptured++
428428
linkType, _ := flow.LinkType()
429429
data := &RawPacket{
430-
Timestamp: common.UnixMillis(packet.GoPacket.Metadata().CaptureInfo.Timestamp),
430+
Timestamp: UnixMilli(packet.GoPacket.Metadata().CaptureInfo.Timestamp),
431431
Index: flow.RawPacketsCaptured,
432432
Data: packet.Data,
433433
LinkType: linkType,

flow/table_test.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"time"
2323

2424
"github.com/google/gopacket/layers"
25+
2526
"github.com/skydive-project/skydive/common"
2627
"github.com/skydive-project/skydive/config"
2728
"github.com/skydive-project/skydive/graffiti/filters"
@@ -144,7 +145,7 @@ func TestUpdate(t *testing.T) {
144145
flow1.XXX_state.updateVersion = table.updateVersion + 1
145146

146147
// check that LastUpdateMetric is filled after a expire before an update
147-
table.expire(common.UnixMillis(time.Now()))
148+
table.expire(UnixMilli(time.Now()))
148149

149150
if flow1.LastUpdateMetric.ABBytes != 1 {
150151
t.Errorf("Flow should have been updated by expire : %+v", flow1)
@@ -217,11 +218,11 @@ func TestAppSpecificTimeout(t *testing.T) {
217218
flowsTime := time.Now()
218219

219220
arpFlow, _ := table.getOrCreateFlow(123)
220-
arpFlow.Last = common.UnixMillis(flowsTime)
221+
arpFlow.Last = UnixMilli(flowsTime)
221222
arpFlow.Application = "ARP"
222223

223224
dnsFlow, _ := table.getOrCreateFlow(456)
224-
dnsFlow.Last = common.UnixMillis(flowsTime)
225+
dnsFlow.Last = UnixMilli(flowsTime)
225226
dnsFlow.Application = "DNS"
226227

227228
table.updateAt(flowsTime.Add(time.Duration(15) * time.Second))
@@ -241,7 +242,7 @@ func TestHold(t *testing.T) {
241242
flowTime := time.Now()
242243

243244
flow1, _ := table.getOrCreateFlow(123)
244-
flow1.Last = common.UnixMillis(flowTime)
245+
flow1.Last = UnixMilli(flowTime)
245246
flow1.FinishType = FlowFinishType_TCP_FIN
246247

247248
table.updateAt(flowTime.Add(time.Duration(5) * time.Second))
@@ -254,7 +255,7 @@ func TestHold(t *testing.T) {
254255
}
255256

256257
flow2, _ := table.getOrCreateFlow(456)
257-
flow2.Last = common.UnixMillis(flowTime)
258+
flow2.Last = UnixMilli(flowTime)
258259
flow2.FinishType = FlowFinishType_TCP_FIN
259260
table.updateAt(flowTime.Add(time.Duration(5) * time.Second))
260261
flow2.FinishType = FlowFinishType_NOT_FINISHED

graffiti/graph/elasticsearch.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,14 @@ func graphElementToRaw(typ string, e *graphElement) (*rawData, error) {
134134
ID: string(e.ID),
135135
Host: e.Host,
136136
Origin: e.Origin,
137-
CreatedAt: e.CreatedAt.Unix(),
138-
UpdatedAt: e.UpdatedAt.Unix(),
137+
CreatedAt: e.CreatedAt.UnixMilli(),
138+
UpdatedAt: e.UpdatedAt.UnixMilli(),
139139
Metadata: json.RawMessage(data),
140140
Revision: e.Revision,
141141
}
142142

143143
if !e.DeletedAt.IsZero() {
144-
raw.DeletedAt = e.DeletedAt.Unix()
144+
raw.DeletedAt = e.DeletedAt.UnixMilli()
145145
}
146146

147147
return raw, nil
@@ -162,7 +162,7 @@ func edgeToRaw(e *Edge) (*rawData, error) {
162162
}
163163

164164
func (b *ElasticSearchBackend) archive(raw *rawData, at Time) error {
165-
raw.ArchivedAt = at.Unix()
165+
raw.ArchivedAt = at.UnixMilli()
166166

167167
data, err := json.Marshal(raw)
168168
if err != nil {
@@ -512,7 +512,7 @@ func (b *ElasticSearchBackend) flushGraph() error {
512512
script := elastic.NewScript("ctx._source.DeletedAt = params.now; ctx._source.ArchivedAt = params.now;")
513513
script.Lang("painless")
514514
script.Params(map[string]interface{}{
515-
"now": TimeUTC().Unix(),
515+
"now": TimeUTC().UnixMilli(),
516516
})
517517

518518
return b.client.UpdateByScript(query, script, b.liveIndex.Alias(), b.archiveIndex.IndexWildcard())

graffiti/graph/filters.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818
package graph
1919

2020
import (
21-
"time"
22-
23-
"github.com/skydive-project/skydive/common"
2421
"github.com/skydive-project/skydive/graffiti/filters"
2522
"github.com/skydive-project/skydive/graffiti/getter"
2623
)
@@ -74,8 +71,8 @@ func NewTimeSlice(s, l int64) *TimeSlice {
7471
// startName and endName. time.Now() is used as reference if t == nil
7572
func filterForTimeSlice(t *TimeSlice, startName, endName string) *filters.Filter {
7673
if t == nil {
77-
u := common.UnixMillis(time.Now())
78-
t = NewTimeSlice(u, u)
74+
now := TimeNow().UnixMilli()
75+
t = NewTimeSlice(now, now)
7976
}
8077

8178
return filters.NewAndFilter(

graffiti/graph/graph.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -336,11 +336,11 @@ func (e *graphElement) GetFieldBool(field string) (_ bool, err error) {
336336
func (e *graphElement) GetFieldInt64(field string) (_ int64, err error) {
337337
switch field {
338338
case "CreatedAt":
339-
return e.CreatedAt.Unix(), nil
339+
return e.CreatedAt.UnixMilli(), nil
340340
case "UpdatedAt":
341-
return e.UpdatedAt.Unix(), nil
341+
return e.UpdatedAt.UnixMilli(), nil
342342
case "DeletedAt":
343-
return e.DeletedAt.Unix(), nil
343+
return e.DeletedAt.UnixMilli(), nil
344344
case "Revision":
345345
return e.Revision, nil
346346
default:

graffiti/graph/orientdb.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ func graphElementToOrientDBSetString(e graphElement) (s string) {
4646
fmt.Sprintf("ID = \"%s\"", string(e.ID)),
4747
fmt.Sprintf("Host = \"%s\"", e.Host),
4848
fmt.Sprintf("Origin = \"%s\"", e.Origin),
49-
fmt.Sprintf("CreatedAt = %d", e.CreatedAt.Unix()),
50-
fmt.Sprintf("UpdatedAt = %d", e.UpdatedAt.Unix()),
49+
fmt.Sprintf("CreatedAt = %d", e.CreatedAt.UnixMilli()),
50+
fmt.Sprintf("UpdatedAt = %d", e.UpdatedAt.UnixMilli()),
5151
fmt.Sprintf("Revision = %d", e.Revision),
5252
}
5353
s = strings.Join(properties, ", ")
@@ -89,7 +89,7 @@ func metadataToOrientDBSelectString(m ElementMatcher) string {
8989
func (o *OrientDBBackend) updateTimes(e string, id string, events ...eventTime) error {
9090
attrs := []string{}
9191
for _, event := range events {
92-
attrs = append(attrs, fmt.Sprintf("%s = %d", event.name, event.t.Unix()))
92+
attrs = append(attrs, fmt.Sprintf("%s = %d", event.name, event.t.UnixMilli()))
9393
}
9494
query := fmt.Sprintf("UPDATE %s SET %s WHERE ID = '%s' AND DeletedAt IS NULL AND ArchivedAt IS NULL", e, strings.Join(attrs, ", "), id)
9595
result, err := o.client.SQL(query)
@@ -326,7 +326,7 @@ func (o *OrientDBBackend) IsHistorySupported() bool {
326326
func (o *OrientDBBackend) flushGraph() error {
327327
o.logger.Info("Flush graph elements")
328328

329-
now := TimeUTC().Unix()
329+
now := TimeUTC().UnixMilli()
330330

331331
query := fmt.Sprintf("UPDATE Node SET DeletedAt = %d, ArchivedAt = %d WHERE DeletedAt IS NULL", now, now)
332332
if _, err := o.client.SQL(query); err != nil {

graffiti/graph/time.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,22 @@ import (
2121
"encoding/json"
2222
"strconv"
2323
"time"
24-
25-
"github.com/skydive-project/skydive/common"
2624
)
2725

2826
// Time describes time type used in the graph
2927
type Time time.Time
3028

31-
// Unix returns the time in millisecond
32-
func (t Time) Unix() int64 {
33-
return common.UnixMillis(time.Time(t))
29+
// UnixMilli returns the time in milliseconds since January 1, 1970
30+
func (t Time) UnixMilli() int64 {
31+
return time.Time(t).UnixNano() / int64(time.Millisecond)
3432
}
3533

3634
// MarshalJSON custom marshalling function
3735
func (t *Time) MarshalJSON() ([]byte, error) {
3836
if t.IsZero() {
3937
return []byte("null"), nil
4038
}
41-
return json.Marshal(t.Unix())
39+
return json.Marshal(t.UnixMilli())
4240
}
4341

4442
// UnmarshalJSON custom unmarshalling function
@@ -61,6 +59,11 @@ func (t Time) IsZero() bool {
6159
return time.Time(t).IsZero()
6260
}
6361

62+
// TimeNow creates a Time with now local time
63+
func TimeNow() Time {
64+
return Time(time.Now())
65+
}
66+
6467
// TimeUTC creates a Time with now UTC
6568
func TimeUTC() Time {
6669
return Time(time.Now().UTC())

graffiti/graph/traversal/traversal.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ func (t *GraphTraversal) Context(s ...interface{}) *GraphTraversal {
556556

557557
g, err := t.Graph.CloneWithContext(graph.Context{
558558
TimePoint: len(s) == 1,
559-
TimeSlice: graph.NewTimeSlice(common.UnixMillis(at.Add(-duration)), common.UnixMillis(at)),
559+
TimeSlice: graph.NewTimeSlice(graph.Time(at.Add(-duration)).UnixMilli(), graph.Time(at).UnixMilli()),
560560
})
561561
if err != nil {
562562
return &GraphTraversal{error: err}

gremlin/query.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import (
2121
"fmt"
2222
"strings"
2323
"time"
24-
25-
"github.com/skydive-project/skydive/common"
2624
)
2725

2826
// QueryString used to construct string representation of query
@@ -117,7 +115,7 @@ func (q QueryString) Context(list ...interface{}) QueryString {
117115
if t.IsZero() {
118116
return q
119117
}
120-
newQ = newQ.appends(fmt.Sprintf("%d", common.UnixMillis(t)))
118+
newQ = newQ.appends(fmt.Sprintf("%d", t.UnixNano()/int64(time.Millisecond)))
121119
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
122120
newQ = newQ.appends(fmt.Sprintf("%d", t))
123121
default:

netflow/agent.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"github.com/google/gopacket/layers"
2929
"github.com/safchain/insanelock"
3030

31-
"github.com/skydive-project/skydive/common"
3231
"github.com/skydive-project/skydive/config"
3332
"github.com/skydive-project/skydive/flow"
3433
"github.com/skydive-project/skydive/graffiti/logging"
@@ -88,7 +87,7 @@ func (nfa *Agent) feedFlowTable(extFlowChan chan *flow.ExtFlow) {
8887
continue
8988
}
9089

91-
bootTime := common.UnixMillis(time.Now()) - int64(msg.Header.SysUpTimeMSecs)
90+
bootTime := flow.UnixMilli(time.Now()) - int64(msg.Header.SysUpTimeMSecs)
9291

9392
LOOP:
9493
for _, nf := range msg.Flows {

0 commit comments

Comments
 (0)