Skip to content

Commit c3b1ba9

Browse files
committed
feat: move async functions to specific modules and add where, XML comments and lambda inlining
1 parent b8c007b commit c3b1ba9

File tree

3 files changed

+137
-68
lines changed

3 files changed

+137
-68
lines changed

src/FSharp.Control.R3/AsyncObservable.fs

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,81 @@ open FSharp.Control.R3
77

88
module Observable =
99

10-
let mapAsync (options : ProcessingOptions) (f : 't -> Async<'r>) source =
11-
let selector x ct = ValueTask<'r> (Async.StartImmediateAsTask (f x, ct))
12-
ObservableExtensions.SelectAwait (
13-
source,
14-
selector,
15-
options.AwaitOperation,
16-
options.ConfigureAwait,
17-
options.CancelOnCompleted,
18-
options.MaxConcurrent
19-
)
10+
/// Applies an accumulator function over an observable sequence, returning the
11+
/// result of the aggregation as a single element in the result sequence
12+
let aggregate seed (f : 'r -> 't -> 'r) source = async {
13+
let! ct = Async.CancellationToken
14+
return!
15+
ObservableExtensions.AggregateAsync (source, seed, f, ct)
16+
|> Async.AwaitTask
17+
}
2018

21-
let length source = async {
19+
/// Determines whether all elements of an observable satisfy a predicate
20+
let all (f : 't -> bool) source = async {
2221
let! ct = Async.CancellationToken
2322
return!
24-
ObservableExtensions.CountAsync (source, ct)
23+
ObservableExtensions.AllAsync (source, f, ct)
24+
|> Async.AwaitTask
25+
}
26+
27+
/// Determines whether an observable sequence contains a specified value
28+
/// which satisfies the given predicate
29+
let existsAsync source = async {
30+
let! ct = Async.CancellationToken
31+
return!
32+
ObservableExtensions.AnyAsync (source, ct)
33+
|> Async.AwaitTask
34+
}
35+
36+
/// Returns the first element of an observable sequence
37+
let firstAsync source = async {
38+
let! ct = Async.CancellationToken
39+
return!
40+
ObservableExtensions.FirstAsync (source, ct)
2541
|> Async.AwaitTask
2642
}
2743

28-
let inline iter (action : 't -> unit) source = async {
44+
/// <summary>
45+
/// Invokes an action for each element in the observable sequence, and propagates all observer
46+
/// messages through the result sequence.
47+
/// </summary>
48+
/// <remarks>
49+
/// This method can be used for debugging, logging, etc. of query behavior
50+
/// by intercepting the message stream to run arbitrary actions for messages on the pipeline.
51+
/// </remarks>
52+
let iter (action : 't -> unit) source = async {
2953
let! ct = Async.CancellationToken
3054
return!
3155
ObservableExtensions.ForEachAsync (source, action, ct)
3256
|> Async.AwaitTask
3357
}
3458

59+
/// Returns the last element of an observable sequence till its completion or cancellation
60+
let length source = async {
61+
let! ct = Async.CancellationToken
62+
return!
63+
ObservableExtensions.CountAsync (source, ct)
64+
|> Async.AwaitTask
65+
}
66+
67+
/// Maps the given observable with the given asynchronous function
68+
let mapAsync (options : ProcessingOptions) (f : 't -> Async<'r>) source =
69+
let selector x ct = ValueTask<'r> (Async.StartImmediateAsTask (f x, ct))
70+
ObservableExtensions.SelectAwait (
71+
source,
72+
selector,
73+
options.AwaitOperation,
74+
options.ConfigureAwait,
75+
options.CancelOnCompleted,
76+
options.MaxConcurrent
77+
)
78+
79+
/// <summary>
80+
/// Invokes an asynchronous action for each element in the observable sequence, and propagates all observer
81+
/// messages through the result sequence.
82+
/// </summary>
83+
/// <remarks>
84+
/// This method can be used for debugging, logging, etc. of query behavior
85+
/// by intercepting the message stream to run arbitrary actions for messages on the pipeline.
86+
/// </remarks>
3587
let iterAsync options (action : 't -> Async<unit>) source = source |> mapAsync options action |> length |> Async.Ignore

src/FSharp.Control.R3/Observable.fs

Lines changed: 34 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,11 @@ module FSharp.Control.R3.Observable
22

33
open R3
44

5-
/// Applies an accumulator function over an observable sequence, returning the
6-
/// result of the aggregation as a single element in the result sequence
7-
let inline aggregateAsync cancellationToken seed (f : 'r -> 't -> 'r) source =
8-
ObservableExtensions.AggregateAsync (source, seed, f, cancellationToken)
9-
10-
/// Determines whether all elements of an observable satisfy a predicate
11-
let inline allAsync cancellationToken (f : 't -> bool) source = ObservableExtensions.AllAsync (source, f, cancellationToken)
12-
135
/// Hides the identy of an observable sequence
146
let inline asObservable source : Observable<'Source> = ObservableExtensions.AsObservable source
157

168
/// Binds an observable to generate a subsequent observable.
17-
let inline bind (f : 'T -> Observable<'TNext>) source = ObservableExtensions.SelectMany (source, f)
9+
let inline bind ([<InlineIfLambda>] f : 'T -> Observable<'TNext>) source = ObservableExtensions.SelectMany (source, f)
1810

1911
/// Converts the elements of the sequence to the specified type
2012
let inline cast<'T, 'CastType> (source) = ObservableExtensions.Cast<'T, 'CastType> (source)
@@ -26,28 +18,23 @@ let inline concat source = ObservableExtensions.Concat source
2618
/// Returns an observable sequence that only contains distinct elements
2719
let inline distinct source = ObservableExtensions.Distinct source
2820

29-
/// Determines whether an observable sequence contains a specified value
30-
/// which satisfies the given predicate
31-
let inline existsAsync source = ObservableExtensions.AnyAsync source
32-
33-
/// Returns the first element of an observable sequence
34-
let inline firstAsync source = ObservableExtensions.FirstAsync source
35-
3621
/// Filters the observable elements of a sequence based on a predicate
37-
let inline filter (f : 't -> bool) source = ObservableExtensions.Where (source, f)
22+
let inline filter ([<InlineIfLambda>] f : 't -> bool) source = ObservableExtensions.Where (source, f)
3823

3924
/// Maps the given observable with the given function
40-
let inline map (f : 't -> 'r) source = ObservableExtensions.Select (source, f)
25+
let inline map ([<InlineIfLambda>] f : 't -> 'r) source = ObservableExtensions.Select (source, f)
4126

4227
/// Maps the given observable with the given function and the index of the element
43-
let inline mapi (f : int -> 't -> 'r) source = ObservableExtensions.Select (source, (fun i x -> f x i))
28+
let inline mapi ([<InlineIfLambda>] f : int -> 't -> 'r) source = ObservableExtensions.Select (source, (fun i x -> f x i))
4429

4530
/// Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.
4631
let inline skip (count : int) (source) = ObservableExtensions.Skip (source, count)
4732

48-
/// Takes n elements (from the beginning of an observable sequence? )
33+
/// Takes n elements (from the beginning of an observable sequence?)
4934
let inline take (count : int) (source) = ObservableExtensions.Take (source, count)
5035

36+
/// Filters the observable elements of a sequence based on a predicate
37+
let inline where ([<InlineIfLambda>] f : 't -> bool) source = ObservableExtensions.Where (source, f)
5138

5239
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
5340
module Builders =
@@ -56,59 +43,59 @@ module Builders =
5643
/// A reactive query builder.
5744
/// See http://mnajder.blogspot.com/2011/09/when-reactive-framework-meets-f-30.html
5845
type RxQueryBuilder () =
59-
member __.For (s : Observable<_>, body : _ -> Observable<_>) = s.SelectMany (body)
46+
member _.For (s : Observable<_>, body : _ -> Observable<_>) = s.SelectMany (body)
6047
[<CustomOperation("select", AllowIntoPattern = true)>]
61-
member __.Select (s : Observable<_>, [<ProjectionParameter>] selector : _ -> _) = s.Select (selector)
48+
member _.Select (s : Observable<_>, [<ProjectionParameter>] selector : _ -> _) = s.Select (selector)
6249
[<CustomOperation("where", MaintainsVariableSpace = true, AllowIntoPattern = true)>]
63-
member __.Where (s : Observable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.Where (predicate)
50+
member _.Where (s : Observable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.Where (predicate)
6451
[<CustomOperation("takeWhile", MaintainsVariableSpace = true, AllowIntoPattern = true)>]
65-
member __.TakeWhile (s : Observable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.TakeWhile (predicate)
52+
member _.TakeWhile (s : Observable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.TakeWhile (predicate)
6653
[<CustomOperation("take", MaintainsVariableSpace = true, AllowIntoPattern = true)>]
67-
member __.Take (s : Observable<_>, count : int) = s.Take (count)
54+
member _.Take (s : Observable<_>, count : int) = s.Take (count)
6855
[<CustomOperation("skipWhile", MaintainsVariableSpace = true, AllowIntoPattern = true)>]
69-
member __.SkipWhile (s : Observable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.SkipWhile (predicate)
56+
member _.SkipWhile (s : Observable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.SkipWhile (predicate)
7057
[<CustomOperation("skip", MaintainsVariableSpace = true, AllowIntoPattern = true)>]
71-
member __.Skip (s : Observable<_>, count : int) = s.Skip (count)
72-
member __.Zero () = Observable.Empty (TimeProvider.System)
73-
member __.Yield (value) = Observable.Return (value, TimeProvider.System)
58+
member _.Skip (s : Observable<_>, count : int) = s.Skip (count)
59+
member _.Zero () = Observable.Empty (TimeProvider.System)
60+
member _.Yield (value) = Observable.Return (value, TimeProvider.System)
7461
[<CustomOperation("count")>]
75-
member __.Count (s : Observable<_>) = ObservableExtensions.CountAsync (s)
62+
member _.Count (s : Observable<_>) = ObservableExtensions.CountAsync (s)
7663
[<CustomOperation("all")>]
77-
member __.All (s : Observable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.AllAsync (new Func<_, bool> (predicate))
64+
member _.All (s : Observable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.AllAsync (new Func<_, bool> (predicate))
7865
[<CustomOperation("contains")>]
79-
member __.Contains (s : Observable<_>, key) = s.ContainsAsync (key)
66+
member _.Contains (s : Observable<_>, key) = s.ContainsAsync (key)
8067
[<CustomOperation("distinct", MaintainsVariableSpace = true, AllowIntoPattern = true)>]
81-
member __.Distinct (s : Observable<_>) = s.Distinct ()
68+
member _.Distinct (s : Observable<_>) = s.Distinct ()
8269
[<CustomOperation("exactlyOne")>]
83-
member __.ExactlyOne (s : Observable<_>) = s.SingleAsync ()
70+
member _.ExactlyOne (s : Observable<_>) = s.SingleAsync ()
8471
[<CustomOperation("exactlyOneOrDefault")>]
85-
member __.ExactlyOneOrDefault (s : Observable<_>) = s.SingleOrDefaultAsync ()
72+
member _.ExactlyOneOrDefault (s : Observable<_>) = s.SingleOrDefaultAsync ()
8673
[<CustomOperation("find")>]
87-
member __.Find (s : Observable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.FirstAsync (new Func<_, bool> (predicate))
74+
member _.Find (s : Observable<_>, [<ProjectionParameter>] predicate : _ -> bool) = s.FirstAsync (new Func<_, bool> (predicate))
8875
[<CustomOperation("head")>]
89-
member __.Head (s : Observable<_>) = s.FirstAsync ()
76+
member _.Head (s : Observable<_>) = s.FirstAsync ()
9077
[<CustomOperation("headOrDefault")>]
91-
member __.HeadOrDefault (s : Observable<_>) = s.FirstOrDefaultAsync ()
78+
member _.HeadOrDefault (s : Observable<_>) = s.FirstOrDefaultAsync ()
9279
[<CustomOperation("last")>]
93-
member __.Last (s : Observable<_>) = s.LastAsync ()
80+
member _.Last (s : Observable<_>) = s.LastAsync ()
9481
[<CustomOperation("lastOrDefault")>]
95-
member __.LastOrDefault (s : Observable<_>) = s.LastOrDefaultAsync ()
82+
member _.LastOrDefault (s : Observable<_>) = s.LastOrDefaultAsync ()
9683
[<CustomOperation("maxBy")>]
97-
member __.MaxBy (s : Observable<'a>, [<ProjectionParameter>] valueSelector : 'a -> 'b) = s.MaxByAsync (new Func<'a, 'b> (valueSelector))
84+
member _.MaxBy (s : Observable<'a>, [<ProjectionParameter>] valueSelector : 'a -> 'b) = s.MaxByAsync (new Func<'a, 'b> (valueSelector))
9885
[<CustomOperation("minBy")>]
99-
member __.MinBy (s : Observable<'a>, [<ProjectionParameter>] valueSelector : 'a -> 'b) = s.MinByAsync (new Func<'a, 'b> (valueSelector))
86+
member _.MinBy (s : Observable<'a>, [<ProjectionParameter>] valueSelector : 'a -> 'b) = s.MinByAsync (new Func<'a, 'b> (valueSelector))
10087

10188
[<CustomOperation("sumBy")>]
102-
member inline __.SumBy (s : Observable<_>, [<ProjectionParameter>] valueSelector : _ -> _) =
89+
member inline _.SumBy (s : Observable<_>, [<ProjectionParameter>] valueSelector : _ -> _) =
10390
s
104-
.Select(valueSelector)
105-
.AggregateAsync (Unchecked.defaultof<_>, new Func<_, _, _> (fun a b -> a + b))
91+
|> _.Select(valueSelector)
92+
|> _.AggregateAsync(Unchecked.defaultof<_>, new Func<_, _, _> (fun a b -> a + b))
10693

10794
[<CustomOperation("zip", IsLikeZip = true)>]
108-
member __.Zip (s1 : Observable<_>, s2 : Observable<_>, [<ProjectionParameter>] resultSelector : _ -> _) =
95+
member _.Zip (s1 : Observable<_>, s2 : Observable<_>, [<ProjectionParameter>] resultSelector : _ -> _) =
10996
s1.Zip (s2, new Func<_, _, _> (resultSelector))
11097

11198
[<CustomOperation("iter")>]
112-
member __.Iter (s : Observable<_>, [<ProjectionParameter>] selector : _ -> _) = s.ForEachAsync (selector)
99+
member _.Iter (s : Observable<_>, [<ProjectionParameter>] selector : _ -> _) = s.ForEachAsync (selector)
113100

114101
let rxquery = RxQueryBuilder ()

src/FSharp.Control.R3/TaskObservable.fs

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,36 @@ open FSharp.Control.R3
77

88
module Observable =
99

10-
/// Maps the given observable with the given function
10+
/// Applies an accumulator function over an observable sequence, returning the
11+
/// result of the aggregation as a single element in the result sequence
12+
let inline aggregate cancellationToken seed ([<InlineIfLambda>] f : 'r -> 't -> 'r) source =
13+
ObservableExtensions.AggregateAsync (source, seed, f, cancellationToken)
14+
15+
/// Determines whether all elements of an observable satisfy a predicate
16+
let inline all cancellationToken ([<InlineIfLambda>] f : 't -> bool) source = ObservableExtensions.AllAsync (source, f, cancellationToken)
17+
18+
/// <summary>
19+
/// Invokes an action for each element in the observable sequence, and propagates all observer
20+
/// messages through the result sequence.
21+
/// </summary>
22+
/// <remarks>
23+
/// This method can be used for debugging, logging, etc. of query behavior
24+
/// by intercepting the message stream to run arbitrary actions for messages on the pipeline.
25+
/// </remarks>
26+
let inline iter cancellationToken ([<InlineIfLambda>] action : 't -> unit) source =
27+
ObservableExtensions.ForEachAsync (source, action, cancellationToken)
28+
29+
/// Determines whether an observable sequence contains a specified value
30+
/// which satisfies the given predicate
31+
let inline existsAsync cancellationToken source = ObservableExtensions.AnyAsync (source, cancellationToken)
32+
33+
/// Returns the first element of an observable sequence
34+
let inline firstAsync cancellationToken source = ObservableExtensions.FirstAsync (source, cancellationToken)
35+
36+
/// Returns the length of the observable sequence till its completion or cancellation
37+
let length cancellationToken source = ObservableExtensions.CountAsync (source, cancellationToken)
38+
39+
/// Maps the given observable with the given asynchronous function
1140
let mapAsync (options : ProcessingOptions) (f : CancellationToken -> 't -> Task<'r>) source =
1241
let selector x ct = ValueTask<'r> (f ct x)
1342
ObservableExtensions.SelectAwait (
@@ -19,13 +48,14 @@ module Observable =
1948
options.MaxConcurrent
2049
)
2150

22-
let length cancellationToken source = ObservableExtensions.CountAsync (source, cancellationToken)
23-
24-
/// Invokes an action for each element in the observable sequence, and propagates all observer
25-
/// messages through the result sequence. This method can be used for debugging, logging, etc. of query
26-
/// behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
27-
let inline iter cancellationToken (action : 't -> unit) source = ObservableExtensions.ForEachAsync (source, action, cancellationToken)
28-
51+
/// <summary>
52+
/// Invokes an asynchronous action for each element in the observable sequence, and propagates all observer
53+
/// messages through the result sequence.
54+
/// </summary>
55+
/// <remarks>
56+
/// This method can be used for debugging, logging, etc. of query behavior
57+
/// by intercepting the message stream to run arbitrary actions for messages on the pipeline.
58+
/// </remarks>
2959
let iterAsync cancellationToken options (action : CancellationToken -> 't -> Task<unit>) source =
3060
source
3161
|> mapAsync options action

0 commit comments

Comments
 (0)