Skip to content

Commit

Permalink
Tweak combining APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
eikemeier committed Mar 7, 2024
1 parent d3628cb commit 01e6fb5
Show file tree
Hide file tree
Showing 16 changed files with 349 additions and 115 deletions.
13 changes: 4 additions & 9 deletions .buildkite/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,14 @@ steps:
commands:
- golangci-lint run --timeout 10m0s

- label: ':hammer: Test'
- label: ':hammer: Test (:codecov: + :codeclimate:)'
commands:
- gotestsum --junitfile test.xml ./...
- gotestsum --junitfile test.xml -- -race -coverprofile=cover.out ./...
- sh .buildkite/upload_coverage.sh cover.out
plugins:
- test-collector#v1.10.0:
- test-collector#v1.10.1:
files: test.xml
format: junit
env:
GOEXPERIMENT: rangefunc

- label: ':codecov: + :codeclimate: Coverage'
commands:
- go test -race -coverprofile=cover.out ./...
- sh .buildkite/upload_coverage.sh cover.out
env:
GOEXPERIMENT: rangefunc
3 changes: 3 additions & 0 deletions .codeclimate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ exclude_patterns:
- "go.sum"
- "LICENSE"
- "nocopy.go"
engines:
golangci:
enabled: true
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: 🧸 golangci-lint
uses: golangci/golangci-lint-action@v4
with:
version: v1.56.1
version: v1.56.2
- name: 🔨 Test
run: go test -race ./...
env:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
[![Maintainability](https://api.codeclimate.com/v1/badges/12a77c18122e2d1e1f6b/maintainability)](https://codeclimate.com/github/fillmore-labs/promise/maintainability)
[![Go Report Card](https://goreportcard.com/badge/fillmore-labs.com/promise)](https://goreportcard.com/report/fillmore-labs.com/promise)
[![License](https://img.shields.io/github/license/fillmore-labs/promise)](https://www.apache.org/licenses/LICENSE-2.0)
[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Ffillmore-labs%2Fpromise.svg?type=shield&issueType=license)](https://app.fossa.com/projects/git%2Bgithub.com%2Ffillmore-labs%2Fpromise?ref=badge_shield&issueType=license)

The `promise` package provides interfaces and utilities for writing asynchronous code in Go.

Expand Down
77 changes: 60 additions & 17 deletions combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,48 @@ package promise
import (
"context"
"fmt"
"runtime/trace"
"reflect"

"fillmore-labs.com/promise/result"
)

// List is a list of [Future], representing results of asynchronous tasks.
type List[R any] []Future[R]
// AnyFuture matches a [Future] of any type.
type AnyFuture interface {
reflect() reflect.Value
}

// AwaitAll returns a function that yields the results of all futures.
// If the context is canceled, it returns an error for the remaining futures.
func AwaitAll[R any](ctx context.Context, futures ...Future[R]) func(yield func(int, result.Result[R]) bool) {
i := newIterator(ctx, convertValue[R], futures)

// All returns a function that yields the results of all futures.
return i.yieldTo
}

// AwaitAllAny returns a function that yields the results of all futures.
// If the context is canceled, it returns an error for the remaining futures.
func (l List[R]) All(ctx context.Context) func(yield func(int, result.Result[R]) bool) {
defer trace.StartRegion(ctx, "asyncSeq").End()
s := newIterator(ctx, l)
func AwaitAllAny(ctx context.Context, futures ...AnyFuture) func(yield func(int, result.Result[any]) bool) {
i := newIterator(ctx, convertValueAny, futures)

return i.yieldTo
}

return s.yieldTo
// AwaitAllResults waits for all futures to complete and returns the results.
// If the context is canceled, it returns early with errors for the remaining futures.
func AwaitAllResults[R any](ctx context.Context, futures ...Future[R]) []result.Result[R] {
return awaitAllResults(len(futures), AwaitAll(ctx, futures...))
}

// AwaitAll waits for all futures to complete and returns the results.
// AwaitAllResultsAny waits for all futures to complete and returns the results.
// If the context is canceled, it returns early with errors for the remaining futures.
func (l List[R]) AwaitAll(ctx context.Context) []result.Result[R] {
results := make([]result.Result[R], len(l))
l.All(ctx)(func(i int, r result.Result[R]) bool {
func AwaitAllResultsAny(ctx context.Context, futures ...AnyFuture) []result.Result[any] {
return awaitAllResults(len(futures), AwaitAllAny(ctx, futures...))
}

func awaitAllResults[R any](n int, iter func(yield func(int, result.Result[R]) bool)) []result.Result[R] {
results := make([]result.Result[R], n)

iter(func(i int, r result.Result[R]) bool {
results[i] = r

return true
Expand All @@ -51,10 +71,21 @@ func (l List[R]) AwaitAll(ctx context.Context) []result.Result[R] {

// AwaitAllValues returns the values of completed futures.
// If any future fails or the context is canceled, it returns early with an error.
func (l List[R]) AwaitAllValues(ctx context.Context) ([]R, error) {
results := make([]R, len(l))
func AwaitAllValues[R any](ctx context.Context, futures ...Future[R]) ([]R, error) {
return awaitAllValues(len(futures), AwaitAll(ctx, futures...))
}

// AwaitAllValuesAny returns the values of completed futures.
// If any future fails or the context is canceled, it returns early with an error.
func AwaitAllValuesAny(ctx context.Context, futures ...AnyFuture) ([]any, error) {
return awaitAllValues(len(futures), AwaitAllAny(ctx, futures...))
}

func awaitAllValues[R any](n int, iter func(yield func(int, result.Result[R]) bool)) ([]R, error) {
results := make([]R, n)
var yieldErr error
l.All(ctx)(func(i int, r result.Result[R]) bool {

iter(func(i int, r result.Result[R]) bool {
if r.Err() != nil {
yieldErr = fmt.Errorf("list AwaitAllValues result %d: %w", i, r.Err())

Expand All @@ -70,13 +101,25 @@ func (l List[R]) AwaitAllValues(ctx context.Context) ([]R, error) {

// AwaitFirst returns the result of the first completed future.
// If the context is canceled, it returns early with an error.
func (l List[R]) AwaitFirst(ctx context.Context) (R, error) {
func AwaitFirst[R any](ctx context.Context, futures ...Future[R]) (R, error) {
return awaitFirst(AwaitAll(ctx, futures...))
}

// AwaitFirstAny returns the result of the first completed future.
// If the context is canceled, it returns early with an error.
func AwaitFirstAny(ctx context.Context, futures ...AnyFuture) (any, error) {
return awaitFirst(AwaitAllAny(ctx, futures...))
}

func awaitFirst[R any](iter func(yield func(int, result.Result[R]) bool)) (R, error) {
var v result.Result[R]
l.All(ctx)(func(_ int, r result.Result[R]) bool {

iter(func(_ int, r result.Result[R]) bool {
v = r

return false
})

if v == nil {
return *new(R), ErrNoResult
}
Expand Down
50 changes: 43 additions & 7 deletions combine_all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,15 @@ func TestAll(t *testing.T) {
}

for i, v := range values {
value, err := v.value, v.err
go promises[i].Do(func() (int, error) { return value, err })
promises[i].Do(func() (int, error) { return v.value, v.err })
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// when
var results [3]result.Result[int]
for i, r := range futures.All(ctx) { //nolint:typecheck
results := make([]result.Result[int], len(futures))
for i, r := range promise.AwaitAll(ctx, futures...) { //nolint:typecheck
results[i] = r
}

Expand All @@ -74,13 +73,50 @@ func TestAllEmpty(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var futures promise.List[int]

// when
allFutures := futures.All(ctx)
allFutures := promise.AwaitAllResults[int](ctx)

// then
assert.Zero(t, len(allFutures))
for _, v := range allFutures { //nolint:typecheck
t.Errorf("Invalid value %v", v)
}
}

func TestAnyAll(t *testing.T) {
t.Parallel()

// given
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

p1, f1 := promise.New[int]()
p2, f2 := promise.New[string]()
p3, f3 := promise.New[struct{}]()

p1.Resolve(1)
p2.Resolve("test")
p3.Resolve(struct{}{})

// when
results := make([]result.Result[any], 3)
for i, r := range promise.AwaitAllAny(ctx, f1, f2, f3) { //nolint:typecheck
results[i] = r
}

// then
for i, r := range results {
if assert.NoError(t, r.Err()) {
switch i {
case 0:
assert.Equal(t, 1, r.Value())
case 1:
assert.Equal(t, "test", r.Value())
case 2:
assert.Equal(t, struct{}{}, r.Value())
default:
assert.Fail(t, "unexpected index")
}
}
}
}
Loading

0 comments on commit 01e6fb5

Please sign in to comment.