Skip to content

Commit d407acc

Browse files
authored
chore: optimize stream peer task (#763)
Signed-off-by: Jim Ma <[email protected]>
1 parent 280e043 commit d407acc

File tree

2 files changed

+97
-93
lines changed

2 files changed

+97
-93
lines changed

client/daemon/peer/peertask_base.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -371,9 +371,10 @@ func (pt *peerTask) pullPiecesFromPeers(cleanUnfinishedFunc func()) {
371371
defer func() {
372372
cleanUnfinishedFunc()
373373
}()
374-
375-
if !pt.waitFirstPeerPacket() {
376-
// TODO 如果是客户端直接回源,这里不应该在输出错误日志
374+
if ok, backSource := pt.waitFirstPeerPacket(); !ok {
375+
if backSource {
376+
return
377+
}
377378
pt.Errorf("wait first peer packet error")
378379
return
379380
}
@@ -487,7 +488,7 @@ func (pt *peerTask) init(piecePacket *base.PiecePacket, pieceBufferSize int32) (
487488
return pieceRequestCh, true
488489
}
489490

490-
func (pt *peerTask) waitFirstPeerPacket() bool {
491+
func (pt *peerTask) waitFirstPeerPacket() (done bool, backSource bool) {
491492
// wait first available peer
492493
select {
493494
case <-pt.ctx.Done():
@@ -502,13 +503,14 @@ func (pt *peerTask) waitFirstPeerPacket() bool {
502503
// preparePieceTasksByPeer func already send piece result with error
503504
pt.Infof("new peer client ready, scheduler time cost: %dus, main peer: %s",
504505
time.Now().Sub(pt.callback.GetStartTime()).Microseconds(), pt.peerPacket.Load().(*scheduler.PeerPacket).MainPeer)
505-
return true
506+
return true, false
506507
}
507508
// when scheduler says dfcodes.SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady
508509
pt.Infof("start download from source due to dfcodes.SchedNeedBackSource")
509510
pt.span.AddEvent("back source due to scheduler says need back source")
510511
pt.needBackSource = true
511512
pt.backSource()
513+
return false, true
512514
case <-time.After(pt.schedulerOption.ScheduleTimeout.Duration):
513515
if pt.schedulerOption.DisableAutoBackSource {
514516
pt.failedReason = reasonScheduleTimeout
@@ -521,9 +523,10 @@ func (pt *peerTask) waitFirstPeerPacket() bool {
521523
pt.span.AddEvent("back source due to schedule timeout")
522524
pt.needBackSource = true
523525
pt.backSource()
526+
return false, true
524527
}
525528
}
526-
return false
529+
return false, false
527530
}
528531

529532
func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) {

client/daemon/peer/peertask_stream.go

Lines changed: 88 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -277,10 +277,7 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.ReadCloser, map[string]s
277277
firstPiece = first
278278
}
279279

280-
pr, pw := io.Pipe()
281280
attr := map[string]string{}
282-
var readCloser io.ReadCloser = pr
283-
var writer io.Writer = pw
284281
if s.contentLength.Load() != -1 {
285282
attr[headers.ContentLength] = fmt.Sprintf("%d", s.contentLength.Load())
286283
} else {
@@ -289,89 +286,9 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.ReadCloser, map[string]s
289286
attr[config.HeaderDragonflyTask] = s.taskID
290287
attr[config.HeaderDragonflyPeer] = s.peerID
291288

292-
go func(first int32) {
293-
defer func() {
294-
s.cancel()
295-
s.span.End()
296-
}()
297-
var (
298-
desired int32
299-
cur int32
300-
wrote int64
301-
err error
302-
//ok bool
303-
cache = make(map[int32]bool)
304-
)
305-
// update first piece to cache and check cur with desired
306-
cache[first] = true
307-
cur = first
308-
for {
309-
if desired == cur {
310-
for {
311-
delete(cache, desired)
312-
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
313-
span.SetAttributes(config.AttributePiece.Int(int(desired)))
314-
wrote, err = s.writeTo(writer, desired)
315-
if err != nil {
316-
span.RecordError(err)
317-
span.End()
318-
s.Errorf("write to pipe error: %s", err)
319-
_ = pw.CloseWithError(err)
320-
return
321-
}
322-
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
323-
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
324-
span.End()
325-
desired++
326-
cached := cache[desired]
327-
if !cached {
328-
break
329-
}
330-
}
331-
} else {
332-
// not desired piece, cache it
333-
cache[cur] = true
334-
if cur < desired {
335-
s.Warnf("piece number should be equal or greater than %d, received piece number: %d", desired, cur)
336-
}
337-
}
338-
339-
select {
340-
case <-s.ctx.Done():
341-
s.Errorf("ctx.PeerTaskDone due to: %s", s.ctx.Err())
342-
s.span.RecordError(s.ctx.Err())
343-
if err := pw.CloseWithError(s.ctx.Err()); err != nil {
344-
s.Errorf("CloseWithError failed: %s", err)
345-
}
346-
return
347-
case <-s.streamDone:
348-
for {
349-
// all data wrote to local storage, and all data wrote to pipe write
350-
if s.readyPieces.Settled() == desired {
351-
pw.Close()
352-
return
353-
}
354-
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
355-
span.SetAttributes(config.AttributePiece.Int(int(desired)))
356-
wrote, err = s.writeTo(pw, desired)
357-
if err != nil {
358-
span.RecordError(err)
359-
span.End()
360-
s.span.RecordError(err)
361-
s.Errorf("write to pipe error: %s", err)
362-
_ = pw.CloseWithError(err)
363-
return
364-
}
365-
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
366-
span.End()
367-
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
368-
desired++
369-
}
370-
case cur = <-s.successPieceCh:
371-
continue
372-
}
373-
}
374-
}(firstPiece)
289+
pr, pw := io.Pipe()
290+
var readCloser io.ReadCloser = pr
291+
go s.writeToPipe(firstPiece, pw)
375292

376293
return readCloser, attr, nil
377294
}
@@ -428,7 +345,7 @@ func (s *streamPeerTask) SetTotalPieces(i int32) {
428345
s.totalPiece = i
429346
}
430347

431-
func (s *streamPeerTask) writeTo(w io.Writer, pieceNum int32) (int64, error) {
348+
func (s *streamPeerTask) writeOnePiece(w io.Writer, pieceNum int32) (int64, error) {
432349
pr, pc, err := s.pieceManager.ReadPiece(s.ctx, &storage.ReadPieceRequest{
433350
PeerTaskMetaData: storage.PeerTaskMetaData{
434351
PeerID: s.peerID,
@@ -476,3 +393,87 @@ func (s *streamPeerTask) backSource() {
476393
_ = s.finish()
477394
return
478395
}
396+
397+
func (s *streamPeerTask) writeToPipe(firstPiece int32, pw *io.PipeWriter) {
398+
defer func() {
399+
s.cancel()
400+
s.span.End()
401+
}()
402+
var (
403+
desired int32
404+
cur int32
405+
wrote int64
406+
err error
407+
cache = make(map[int32]bool)
408+
)
409+
// update first piece to cache and check cur with desired
410+
cache[firstPiece] = true
411+
cur = firstPiece
412+
for {
413+
if desired == cur {
414+
for {
415+
delete(cache, desired)
416+
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
417+
span.SetAttributes(config.AttributePiece.Int(int(desired)))
418+
wrote, err = s.writeOnePiece(pw, desired)
419+
if err != nil {
420+
span.RecordError(err)
421+
span.End()
422+
s.Errorf("write to pipe error: %s", err)
423+
_ = pw.CloseWithError(err)
424+
return
425+
}
426+
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
427+
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
428+
span.End()
429+
desired++
430+
cached := cache[desired]
431+
if !cached {
432+
break
433+
}
434+
}
435+
} else {
436+
// not desired piece, cache it
437+
cache[cur] = true
438+
if cur < desired {
439+
s.Warnf("piece number should be equal or greater than %d, received piece number: %d", desired, cur)
440+
}
441+
}
442+
443+
select {
444+
case <-s.ctx.Done():
445+
s.Errorf("ctx.PeerTaskDone due to: %s", s.ctx.Err())
446+
s.span.RecordError(s.ctx.Err())
447+
if err := pw.CloseWithError(s.ctx.Err()); err != nil {
448+
s.Errorf("CloseWithError failed: %s", err)
449+
}
450+
return
451+
case <-s.streamDone:
452+
for {
453+
// all data wrote to local storage, and all data wrote to pipe write
454+
if s.readyPieces.Settled() == desired {
455+
s.Debugf("all %d pieces wrote to pipe", desired)
456+
pw.Close()
457+
return
458+
}
459+
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
460+
span.SetAttributes(config.AttributePiece.Int(int(desired)))
461+
wrote, err = s.writeOnePiece(pw, desired)
462+
if err != nil {
463+
span.RecordError(err)
464+
span.End()
465+
s.span.RecordError(err)
466+
s.Errorf("write to pipe error: %s", err)
467+
_ = pw.CloseWithError(err)
468+
return
469+
}
470+
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
471+
span.End()
472+
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
473+
desired++
474+
}
475+
case cur = <-s.successPieceCh:
476+
continue
477+
}
478+
}
479+
}

0 commit comments

Comments
 (0)