From b000f175551a8be524c7c7b62e3c302c5fcfc2e5 Mon Sep 17 00:00:00 2001 From: Junxiao Shi Date: Thu, 5 Dec 2024 18:28:55 +0000 Subject: [PATCH] fetch: truncate file to exact size upon stop --- app/fetch/README.md | 41 +++++ app/fetch/fetcher.go | 2 +- app/fetch/fetchtest/fetcher_test.go | 242 +++++++++++++++++++--------- app/fetch/task.go | 19 ++- app/fetch/worker.go | 11 ++ cmd/ndndpdk-ctrl/tg.go | 8 + cmd/ndndpdk-godemo/ndn6file.go | 1 + csrc/fetch/fetcher.h | 2 +- js/types/tg/fetch.ts | 1 + package.json | 2 +- 10 files changed, 245 insertions(+), 84 deletions(-) diff --git a/app/fetch/README.md b/app/fetch/README.md index afe851c6..4bac9ad3 100644 --- a/app/fetch/README.md +++ b/app/fetch/README.md @@ -3,3 +3,44 @@ This package is the congestion aware fetcher, used in the [traffic generator](../tg). It implements a consumer that follows the TCP CUBIC congestion control algorithm, simulating traffic patterns similar to bulk file transfer. It requires at least one thread, running the `FetchThread_Run` function. + +## Fetch Task Definition + +**TaskDef** defines a fetch task that retrieves one segmented object. +A *segmented object* is a group of NDN packets, which have a common name prefix and have SegmentNameComponent as the last component. +The TaskDef contains these fields: + +* Prefix: a name prefix except the last SegmentNameComponent. + * Importantly, if you are retrieving from the [file server](../fileserver), this field must end with the VersionNameComponent. +* InterestLifetime +* HopLimit +* SegmentRange: retrieve a consecutive subset of the available segments. + * If the fetcher encounters a Data packet whose FinalBlockId equals its last name component, the fetching will terminate at this segment, even if the upper bound of SegmentRange has not been reached. + +Normally, a fetch task generates traffic similar to bulk file transfer, in which contents of the received packets are discarded. +It is however possible to write the received payload into a file. +In this case, the TaskDef additionally contains these fields: + +* Filename: output file name. +* FileSize: total file size. +* SegmentLen: the payload length in every segment; the last segment may be shorter. + +## Fetcher and its Workers + +A **worker** is a thread running the `FetchThread_Run` function. +It can simultaneously process zero or more fetch tasks, which are arranged in an RCU-protected linked list. +It has an io\_uring handle in order to write payload to files when requested. + +A **TaskContext** stores information of an ongoing fetch task, which can be initialized from a TaskDef. +It includes a **taskSlot** (aka **C.FetchTask**) used by C code, along with several Go objects. +It is responsible for opening and closing the file, if the TaskDef requests to write payload to a file. +Each taskSlot has an index number that used as the PIT token for its Interests, which allows the reply Data packets to come back to the same taskSlot. + +**FetchLogic** contained with the taskSlot implements the algorithmic part of the fetch procedure. +It includes an RTT estimator, a CUBIC-like congestion control implementation, and a retransmission queue. +It makes decisions on when to transmit an Interest for a certain segment number, and gets notified about when the Data arrives with or without a congestion mark. +Nack packets are not considered in the congestion aware fetcher. + +**Fetcher** is the top level. +It controls one or more workers and owns one or more task slots. +A incoming TaskDef is placed into an unused task slot, and then added to the worker with the least number of ongoing fetch tasks. diff --git a/app/fetch/fetcher.go b/app/fetch/fetcher.go index a9cffb65..1fb5a8f9 100644 --- a/app/fetch/fetcher.go +++ b/app/fetch/fetcher.go @@ -141,7 +141,7 @@ func (fetcher *Fetcher) Reset() { w.ClearTasks() } for _, ts := range fetcher.taskSlots { - ts.closeFd() + ts.closeFd(nil) ts.worker = -1 } maps.DeleteFunc(taskContextByID, func(id int, task *TaskContext) bool { return task.fetcher == fetcher }) diff --git a/app/fetch/fetchtest/fetcher_test.go b/app/fetch/fetchtest/fetcher_test.go index 5f803f0d..bb2851e8 100644 --- a/app/fetch/fetchtest/fetcher_test.go +++ b/app/fetch/fetchtest/fetcher_test.go @@ -16,106 +16,190 @@ import ( "github.com/usnistgov/ndn-dpdk/ndn/tlv" ) +const testFetcherWindowCapacity = 512 + +type testFetcherTask struct { + fetch.TaskDef + FinalBlock int64 + Payload []byte + PInterests map[tlv.NNI]int + NInterests int +} + +func (ft *testFetcherTask) Run(t *testing.T, fetcher *fetch.Fetcher) (cnt fetch.Counters) { + t.Parallel() + assert, require := makeAR(t) + + task, e := fetcher.Fetch(ft.TaskDef) + require.NoError(e) + + t0 := time.Now() + ticker := time.NewTicker(time.Millisecond) + defer ticker.Stop() + for range ticker.C { + if task.Finished() { + break + } + } + task.Stop() + + cnt = task.Counters() + t.Logf("Interests %d (unique %d) in %v", ft.NInterests, len(ft.PInterests), time.Since(t0)) + t.Logf("Counters %v", cnt) + + if ft.Filename != "" { + if fd, e := os.Open(ft.Filename); assert.NoError(e) { + defer fd.Close() + written, e := io.ReadAll(fd) + assert.NoError(e) + assert.Equal(ft.Payload, written) + } + } + + return +} + +func (ft *testFetcherTask) Serve(segNum tlv.NNI, lastComp ndn.NameComponent, data *ndn.Data) bool { + ft.NInterests++ + ft.PInterests[segNum]++ + + if uint64(segNum) < ft.SegmentBegin || (ft.SegmentEnd > 0 && uint64(segNum) >= ft.SegmentEnd) { + panic(segNum) + } + + if ft.Payload != nil { + payloadOffset := int(segNum) * ft.SegmentLen + data.Content = ft.Payload[payloadOffset:min(int(payloadOffset+ft.SegmentLen), len(ft.Payload))] + } + if ft.FinalBlock >= 0 { + if int64(segNum) == ft.FinalBlock { + data.FinalBlock = lastComp + } else if int64(segNum) > ft.FinalBlock { + return false + } + } + return true +} + +func newTestFetcherTask(prefix rune) *testFetcherTask { + var td testFetcherTask + td.Prefix = ndn.ParseName("/" + string(prefix)) + td.FinalBlock = -1 + td.PInterests = map[tlv.NNI]int{} + return &td +} + func TestFetcher(t *testing.T) { assert, require := makeAR(t) intFace := intface.MustNew() - defer intFace.D.Close() + t.Cleanup(func() { intFace.D.Close() }) var cfg fetch.Config - cfg.NThreads = 1 - cfg.NTasks = 2 - cfg.WindowCapacity = 512 + cfg.NThreads = 2 + cfg.NTasks = 8 + cfg.WindowCapacity = testFetcherWindowCapacity fetcher, e := fetch.New(intFace.D, cfg) require.NoError(e) tgtestenv.Open(t, fetcher) - defer fetcher.Close() + t.Cleanup(func() { fetcher.Close() }) fetcher.Launch() - var defA, defB fetch.TaskDef - defA.Prefix = ndn.ParseName("/A") - defA.SegmentBegin, defA.SegmentEnd = 0, 5000 - defA.Filename, defA.SegmentLen = filepath.Join(t.TempDir(), "A.bin"), 100 - payloadA := make([]byte, int(defA.SegmentEnd)*defA.SegmentLen) - randBytes(payloadA) - defB.Prefix = ndn.ParseName("/B") - defB.SegmentBegin, defB.SegmentEnd = 1000, 4000 - const finalBlockB = 1800 - - pInterestsA, nInterestsA, pInterestsB, nInterestsB := map[tlv.NNI]int{}, 0, map[tlv.NNI]int{}, 0 + tempDir := t.TempDir() + ftByName := map[rune]*testFetcherTask{} + + t.Run("0", func(t *testing.T) { + assert, _ := makeAR(t) + + ft := newTestFetcherTask('0') + ft.SegmentBegin, ft.SegmentEnd = 1000, 1000 + ftByName['0'] = ft + // empty SegmentRange + + cnt := ft.Run(t, fetcher) + assert.Zero(cnt.NRxData) + assert.Zero(len(ft.PInterests)) + }) + + t.Run("A", func(t *testing.T) { + assert, _ := makeAR(t) + + ft := newTestFetcherTask('A') + ft.SegmentBegin, ft.SegmentEnd = 1000, 4000 + ft.FinalBlock = 1800 + ftByName['A'] = ft + // bounded by both SegmentRange and FinalBlock + + cnt := ft.Run(t, fetcher) + assert.EqualValues(ft.FinalBlock-int64(ft.SegmentBegin)+1, cnt.NRxData) + nUniqueInterests := int64(len(ft.PInterests)) + assert.GreaterOrEqual(nUniqueInterests, ft.FinalBlock-int64(ft.SegmentBegin)+1) + assert.Less(nUniqueInterests, ft.FinalBlock-int64(ft.SegmentBegin)+testFetcherWindowCapacity) + }) + + t.Run("H", func(t *testing.T) { + assert, _ := makeAR(t) + + ft := newTestFetcherTask('H') + ft.SegmentBegin, ft.SegmentEnd, ft.SegmentLen = 0, 5000, 100 + ft.Filename = filepath.Join(tempDir, "H.bin") + ft.Payload = make([]byte, int64(ft.SegmentLen)*int64(ft.SegmentEnd)) + randBytes(ft.Payload) + ftByName['H'] = ft + // bounded by SegmentRange, write to file + + cnt := ft.Run(t, fetcher) + assert.EqualValues(ft.SegmentEnd-ft.SegmentBegin, cnt.NRxData) + assert.EqualValues(ft.SegmentEnd-ft.SegmentBegin, len(ft.PInterests)) + assert.Zero(cnt.NInFlight) + assert.InDelta(float64(ft.NInterests), float64(cnt.NTxRetx+cnt.NRxData), testFetcherWindowCapacity) + }) + + t.Run("I", func(t *testing.T) { + assert, _ := makeAR(t) + + ft := newTestFetcherTask('I') + ft.SegmentBegin, ft.SegmentEnd, ft.SegmentLen = 0, 900, 400 + ft.Filename = filepath.Join(tempDir, "I.bin") + fileSize := int64(ft.SegmentLen)*int64(ft.SegmentEnd) - 7 + ft.FileSize = &fileSize + ft.Payload = make([]byte, fileSize) + randBytes(ft.Payload) + ftByName['I'] = ft + // bounded by SegmentRange, write to file, truncate file + + cnt := ft.Run(t, fetcher) + assert.EqualValues(ft.SegmentEnd-ft.SegmentBegin, cnt.NRxData) + assert.EqualValues(ft.SegmentEnd-ft.SegmentBegin, len(ft.PInterests)) + assert.Zero(cnt.NInFlight) + assert.InDelta(float64(ft.NInterests), float64(cnt.NTxRetx+cnt.NRxData), testFetcherWindowCapacity) + }) + go func() { for packet := range intFace.Rx { - require.NotNil(packet.Interest) + if !assert.NotNil(packet.Interest) || !assert.Len(packet.Interest.Name, 2) { + continue + } data := ndn.MakeData(packet.Interest, time.Millisecond) - lastComp := packet.Interest.Name.Get(-1) - assert.EqualValues(an.TtSegmentNameComponent, lastComp.Type) + comp0, comp1 := packet.Interest.Name[0], packet.Interest.Name[1] + assert.EqualValues(an.TtGenericNameComponent, comp0.Type) + assert.EqualValues(1, comp0.Length()) + assert.EqualValues(an.TtSegmentNameComponent, comp1.Type) var segNum tlv.NNI - assert.NoError(segNum.UnmarshalBinary(lastComp.Value)) - - switch { - case defA.Prefix.IsPrefixOf(packet.Interest.Name): - nInterestsA++ - pInterestsA[segNum]++ - assert.Less(uint64(segNum), defA.SegmentEnd) - payloadOffset := int(segNum) * defA.SegmentLen - data.Content = payloadA[payloadOffset : payloadOffset+defA.SegmentLen] - case defB.Prefix.IsPrefixOf(packet.Interest.Name): - nInterestsB++ - pInterestsB[segNum]++ - assert.GreaterOrEqual(uint64(segNum), defB.SegmentBegin) - assert.Less(uint64(segNum), defB.SegmentEnd) - if segNum == finalBlockB { - data.FinalBlock = lastComp - } else if segNum > finalBlockB { - continue - } - default: + assert.NoError(segNum.UnmarshalBinary(comp1.Value)) + + respond := false + ft := ftByName[rune(comp0.Value[0])] + if ft == nil { assert.Fail("unexpected Interest", packet.Interest.Name) + } else { + respond = ft.Serve(segNum, comp1, &data) } - - if rand.Float64() > 0.01 { + if respond && rand.Float64() > 0.01 { intFace.Tx <- data } } }() - - taskA, e := fetcher.Fetch(defA) - require.NoError(e) - taskB, e := fetcher.Fetch(defB) - require.NoError(e) - - t0 := time.Now() - { - ticker := time.NewTicker(time.Millisecond) - for range ticker.C { - if taskA.Finished() && taskB.Finished() { - break - } - } - ticker.Stop() - } - taskA.Stop() - taskB.Stop() - - cntA, cntB := taskA.Counters(), taskB.Counters() - assert.EqualValues(defA.SegmentEnd-defA.SegmentBegin, cntA.NRxData) - assert.EqualValues(defA.SegmentEnd-defA.SegmentBegin, len(pInterestsA)) - assert.Zero(cntA.NInFlight) - assert.InDelta(float64(nInterestsA), float64(cntA.NTxRetx+cntA.NRxData), float64(cfg.WindowCapacity)) - - assert.EqualValues(finalBlockB-defB.SegmentBegin+1, cntB.NRxData) - assert.GreaterOrEqual(len(pInterestsB), int(finalBlockB-defB.SegmentBegin+1)) - assert.Less(len(pInterestsB), int(finalBlockB-defB.SegmentBegin)+cfg.WindowCapacity) - - t.Logf("/A Interests %d (unique %d) and /B Interests %d (unique %d) in %v", - nInterestsA, len(pInterestsA), nInterestsB, len(pInterestsB), time.Since(t0)) - - if fA, e := os.Open(defA.Filename); assert.NoError(e) { - defer fA.Close() - writtenA, e := io.ReadAll(fA) - assert.NoError(e) - assert.Equal(payloadA, writtenA) - } } diff --git a/app/fetch/task.go b/app/fetch/task.go index cd3b8e31..dbd5c47b 100644 --- a/app/fetch/task.go +++ b/app/fetch/task.go @@ -47,7 +47,7 @@ func (task *TaskContext) Counters() Counters { func (task *TaskContext) Stop() { eal.CallMain(func() { task.w.RemoveTask(eal.MainReadSide, task.ts) - task.ts.closeFd() + task.ts.closeFd(task.d.FileSize) close(task.stopping) taskContextLock.Lock() defer taskContextLock.Unlock() @@ -78,6 +78,11 @@ type TaskDef struct { // If omitted, payload is not written to a file. Filename string `json:"filename,omitempty"` + // FileSize is total payload length. + // This is only relevant when writing to a file. + // If set, the file will be truncated to this size after fetching is completed. + FileSize *int64 `json:"fileSize"` + // SegmentLen is the payload length in each segment. // This is only needed when writing to a file. // If any segment has incorrect Content TLV-LENGTH, the output file would not contain correct payload. @@ -169,7 +174,7 @@ func (ts *taskSlot) Logic() *Logic { return (*Logic)(&ts.logic) } -func (ts *taskSlot) closeFd() { +func (ts *taskSlot) closeFd(fileSize *int64) { fd := int(ts.fd) if fd < 0 { return @@ -178,7 +183,17 @@ func (ts *taskSlot) closeFd() { logEntry := logger.With( zap.Int("slot-index", int(ts.index)), zap.Int("fd", fd), + zap.Int64p("file-size", fileSize), ) + + if fileSize != nil { + if e := unix.Ftruncate(fd, *fileSize); e != nil { + logEntry.Warn("unix.Ftruncate error", + zap.Error(e), + ) + } + } + if e := unix.Close(fd); e != nil { logEntry.Warn("unix.Close error", zap.Error(e), diff --git a/app/fetch/worker.go b/app/fetch/worker.go index 09570132..960f252f 100644 --- a/app/fetch/worker.go +++ b/app/fetch/worker.go @@ -15,6 +15,7 @@ import ( "github.com/usnistgov/ndn-dpdk/dpdk/ealthread" "github.com/usnistgov/ndn-dpdk/iface" "github.com/usnistgov/ndn-dpdk/ndni" + "go.uber.org/zap" ) type worker struct { @@ -56,6 +57,11 @@ func (w *worker) AddTask(rs *urcu.ReadSide, ts *taskSlot) { w.nTasks++ C.cds_hlist_add_head_rcu(&ts.fthNode, &w.c.tasksHead) + + logger.Info("task added to worker", + zap.Int("slot-index", int(ts.index)), + w.LCore().ZapField("worker-lc"), + ) } // RemoveTask removes a task. @@ -72,6 +78,11 @@ func (w *worker) RemoveTask(rs *urcu.ReadSide, ts *taskSlot) { w.nTasks-- C.cds_hlist_del_rcu(&ts.fthNode) urcu.Synchronize() + + logger.Info("task removed from worker", + zap.Int("slot-index", int(ts.index)), + w.LCore().ZapField("worker-lc"), + ) } // ClearTasks clears task list. diff --git a/cmd/ndndpdk-ctrl/tg.go b/cmd/ndndpdk-ctrl/tg.go index 7e8705b9..a623f55c 100644 --- a/cmd/ndndpdk-ctrl/tg.go +++ b/cmd/ndndpdk-ctrl/tg.go @@ -131,6 +131,7 @@ func init() { func init() { var fetcher, name, filename string var segmentBegin, segmentEnd uint64 + var fileSize int64 var segmentLen int defineCommand(&cli.Command{ Category: "trafficgen", @@ -167,6 +168,11 @@ func init() { DefaultText: "not saving to file", Destination: &filename, }, + &cli.Int64Flag{ + Name: "file-size", + Usage: "file size `octets`", + Destination: &fileSize, + }, &cli.IntFlag{ Name: "segment-len", Usage: "segment length `octets`", @@ -185,6 +191,7 @@ func init() { } if filename != "" { task["filename"] = filename + task["fileSize"] = fileSize task["segmentLen"] = segmentLen } return clientDoPrint(c.Context, ` @@ -198,6 +205,7 @@ func init() { segmentBegin segmentEnd filename + fileSize segmentLen } worker { diff --git a/cmd/ndndpdk-godemo/ndn6file.go b/cmd/ndndpdk-godemo/ndn6file.go index 1be5cb66..de93b033 100644 --- a/cmd/ndndpdk-godemo/ndn6file.go +++ b/cmd/ndndpdk-godemo/ndn6file.go @@ -108,6 +108,7 @@ func init() { fmt.Println(shellquote.Join( "--name", m.Name.String(), "--segment-end", strconv.FormatUint(m.SegmentEnd(), 10), + "--file-size", strconv.FormatInt(m.Size, 10), "--segment-len", strconv.Itoa(m.SegmentSize), )) return nil diff --git a/csrc/fetch/fetcher.h b/csrc/fetch/fetcher.h index 351cbdbb..6a8f9cff 100644 --- a/csrc/fetch/fetcher.h +++ b/csrc/fetch/fetcher.h @@ -27,7 +27,7 @@ typedef struct FetchTask { InterestTemplate tpl; } FetchTask; -/** @brief Fetch thread that runs several fetch procedures. */ +/** @brief Fetch thread that can simultaneously process several fetch tasks. */ typedef struct FetchThread { Uring ur; ThreadCtrl ctrl; diff --git a/js/types/tg/fetch.ts b/js/types/tg/fetch.ts index a6b4a273..86a63e06 100644 --- a/js/types/tg/fetch.ts +++ b/js/types/tg/fetch.ts @@ -29,6 +29,7 @@ export interface FetchTaskDef extends InterestTemplate { segmentEnd?: Uint; filename?: string; + fileSize?: Uint; segmentLen?: Uint; } diff --git a/package.json b/package.json index ff2057d7..cc7e628e 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,7 @@ "it-pushable": "^3.2.3", "throat": "^6.0.2", "tslib": "^2.8.1", - "type-fest": "^4.29.0", + "type-fest": "^4.30.0", "ws": "^8.18.0" }, "devDependencies": {