Skip to content

Commit 8a04112

Browse files
committed
Simplified port detection using a Future-style abstraction
1 parent 1a85d16 commit 8a04112

File tree

2 files changed

+43
-16
lines changed

2 files changed

+43
-16
lines changed

commands/upload/upload.go

+10-16
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"io"
2222
"path/filepath"
2323
"strings"
24-
"sync"
2524
"time"
2625

2726
"github.com/arduino/arduino-cli/arduino"
@@ -219,17 +218,11 @@ func runProgramAction(pme *packagemanager.Explorer,
219218
if watch != nil {
220219
// Run port detector
221220
uploadCompletedCtx, cancel := context.WithCancel(context.Background())
222-
var newUploadPort *rpc.Port
223-
var wg sync.WaitGroup
224-
wg.Add(1)
225-
go func() {
226-
newUploadPort = detectUploadPort(port, watch, uploadCompletedCtx)
227-
wg.Done()
228-
}()
221+
newUploadPort := f.NewFuture[*rpc.Port]()
222+
go detectUploadPort(port, watch, uploadCompletedCtx, newUploadPort)
229223
uploadCompleted = func() *rpc.Port {
230224
cancel()
231-
wg.Wait()
232-
return newUploadPort
225+
return newUploadPort.Await()
233226
}
234227
defer uploadCompleted() // defer in case of exit on error (ensures goroutine completion)
235228
}
@@ -522,13 +515,15 @@ func runProgramAction(pme *packagemanager.Explorer,
522515
return uploadCompleted(), nil
523516
}
524517

525-
func detectUploadPort(uploadPort *rpc.Port, watch <-chan *rpc.BoardListWatchResponse, uploadCtx context.Context) *rpc.Port {
518+
func detectUploadPort(uploadPort *rpc.Port, watch <-chan *rpc.BoardListWatchResponse, uploadCtx context.Context, result f.Future[*rpc.Port]) {
526519
log := logrus.WithField("task", "port_detection")
527520
log.Tracef("Detecting new board port after upload")
528521

522+
var candidate *rpc.Port
529523
defer func() {
530524
// On exit, discard all events until the watcher is closed
531525
go f.DiscardCh(watch)
526+
result.Send(candidate)
532527
}()
533528

534529
// Ignore all events during the upload
@@ -537,7 +532,7 @@ func detectUploadPort(uploadPort *rpc.Port, watch <-chan *rpc.BoardListWatchResp
537532
case ev, ok := <-watch:
538533
if !ok {
539534
log.Error("Upload port detection failed, watcher closed")
540-
return nil
535+
return
541536
}
542537
log.WithField("event", ev).Trace("Ignored watcher event before upload")
543538
continue
@@ -550,13 +545,12 @@ func detectUploadPort(uploadPort *rpc.Port, watch <-chan *rpc.BoardListWatchResp
550545
// Pick the first port that is detected after the upload
551546
desiredHwID := uploadPort.HardwareId
552547
timeout := time.After(5 * time.Second)
553-
var candidate *rpc.Port
554548
for {
555549
select {
556550
case ev, ok := <-watch:
557551
if !ok {
558552
log.Error("Upload port detection failed, watcher closed")
559-
return candidate
553+
return
560554
}
561555
if ev.EventType == "remove" && candidate != nil {
562556
if candidate.Equals(ev.Port.GetPort()) {
@@ -580,10 +574,10 @@ func detectUploadPort(uploadPort *rpc.Port, watch <-chan *rpc.BoardListWatchResp
580574
}
581575

582576
log.Trace("Found new upload port!")
583-
return candidate
577+
return
584578
case <-timeout:
585579
log.Trace("Timeout waiting for candidate port")
586-
return candidate
580+
return
587581
}
588582
}
589583
}

internal/algorithms/channels.go

+33
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,41 @@
1515

1616
package f
1717

18+
import "sync"
19+
1820
// DiscardCh consume all incoming messages from the given channel until its closed.
1921
func DiscardCh[T any](ch <-chan T) {
2022
for range ch {
2123
}
2224
}
25+
26+
// Future is an object that holds a result value. The value may be read and
27+
// written asynchronously.
28+
type Future[T any] interface {
29+
Send(T)
30+
Await() T
31+
}
32+
33+
type future[T any] struct {
34+
wg sync.WaitGroup
35+
value T
36+
}
37+
38+
// NewFuture creates a new Future[T]
39+
func NewFuture[T any]() Future[T] {
40+
res := &future[T]{}
41+
res.wg.Add(1)
42+
return res
43+
}
44+
45+
// Send a result in the Future. Threads waiting for result will be unlocked.
46+
func (f *future[T]) Send(value T) {
47+
f.value = value
48+
f.wg.Done()
49+
}
50+
51+
// Await for a result from the Future, blocks until a result is available.
52+
func (f *future[T]) Await() T {
53+
f.wg.Wait()
54+
return f.value
55+
}

0 commit comments

Comments
 (0)