Skip to content

Commit 4c5f2a3

Browse files
committed
Added more functions
1 parent 6619b48 commit 4c5f2a3

File tree

2 files changed

+48
-0
lines changed

2 files changed

+48
-0
lines changed

src/FSharp.Control.R3/Observable.fs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,33 @@ module FSharp.Control.R3.Observable
22

33
open R3
44

5+
/// Maps the given observable with the given function
56
let inline map (f : 't -> 'r) source = ObservableExtensions.Select (source, f)
7+
8+
/// Maps the given observable with the given function and the index of the element
9+
let inline mapi (f : int -> 't -> 'r) source = ObservableExtensions.Select (source, (fun i x -> f x i))
10+
11+
/// Filters the observable elements of a sequence based on a predicate
12+
let inline filter (f : 't -> bool) source = ObservableExtensions.Where (source, f)
13+
14+
/// Hides the identy of an observable sequence
15+
let inline asObservable source : Observable<'Source> = ObservableExtensions.AsObservable source
16+
17+
/// Binds an observable to generate a subsequent observable.
18+
let inline bind (f : 'T -> Observable<'TNext>) source = ObservableExtensions.SelectMany (source, f)
19+
20+
/// Converts the elements of the sequence to the specified type
21+
let inline cast<'T, 'CastType> (source) = ObservableExtensions.Cast<'T, 'CastType> (source)
22+
23+
/// Concatenates the second observable sequence to the first observable sequence
24+
/// upn the successful termination of the first
25+
let inline concat source = ObservableExtensions.Concat source
26+
27+
/// Returns an observable sequence that only contains distinct elements
28+
let inline distinct source = ObservableExtensions.Distinct source
29+
30+
/// Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.
31+
let inline skip (count : int) (source) = ObservableExtensions.Skip (source, count)
32+
33+
/// Takes n elements (from the beginning of an observable sequence? )
34+
let inline take (count : int) (source) = ObservableExtensions.Take (source, count)

src/FSharp.Control.R3/TaskObservable.fs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ open FSharp.Control.R3
77

88
module Observable =
99

10+
/// Maps the given observable with the given function
1011
let mapAsync (options : ProcessingOptions) (f : CancellationToken -> 't -> Task<'r>) source =
1112
let selector x ct = ValueTask<'r> (f ct x)
1213
ObservableExtensions.SelectAwait (
@@ -20,10 +21,28 @@ module Observable =
2021

2122
let length cancellationToken source = ObservableExtensions.CountAsync (source, cancellationToken)
2223

24+
/// Invokes an action for each element in the observable sequence, and propagates all observer
25+
/// messages through the result sequence. This method can be used for debugging, logging, etc. of query
26+
/// behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.
2327
let inline iter cancellationToken (action : 't -> unit) source = ObservableExtensions.ForEachAsync (source, action, cancellationToken)
2428

2529
let iterAsync cancellationToken options (action : CancellationToken -> 't -> Task<unit>) source =
2630
source
2731
|> mapAsync options action
2832
|> length cancellationToken
2933
:> Task
34+
35+
/// Applies an accumulator function over an observable sequence, returning the
36+
/// result of the aggregation as a single element in the result sequence
37+
let inline aggregateAsync cancellationToken seed (f : 'r -> 't -> 'r) source =
38+
ObservableExtensions.AggregateAsync (source, seed, f, cancellationToken)
39+
40+
/// Determines whether all elements of an observable satisfy a predicate
41+
let inline allAsync cancellationToken (f : 't -> bool) source = ObservableExtensions.AllAsync (source, f, cancellationToken)
42+
43+
/// Determines whether an observable sequence contains a specified value
44+
/// which satisfies the given predicate
45+
let inline existsAsync source = ObservableExtensions.AnyAsync source
46+
47+
/// Returns the first element of an observable sequence
48+
let inline firstAsync source = ObservableExtensions.FirstAsync source

0 commit comments

Comments
 (0)