Skip to content

Commit 74f70a9

Browse files
authored
feat: split to Observable, Async.Observable and Task.Observable modules (#2)
1 parent cec396b commit 74f70a9

File tree

6 files changed

+133
-27
lines changed

6 files changed

+133
-27
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
module FSharp.Control.R3.Async
2+
3+
open R3
4+
open System.Threading
5+
open System.Threading.Tasks
6+
open FSharp.Control.R3
7+
8+
module Observable =
9+
10+
let mapAsync (options : ProcessingOptions) (f : 't -> Async<'r>) source =
11+
let selector x ct = ValueTask<'r> (Async.StartImmediateAsTask (f x, ct))
12+
ObservableExtensions.SelectAwait (
13+
source,
14+
selector,
15+
options.AwaitOperation,
16+
options.ConfigureAwait,
17+
options.CancelOnCompleted,
18+
options.MaxConcurrent
19+
)
20+
21+
let length source = async {
22+
let! ct = Async.CancellationToken
23+
return!
24+
ObservableExtensions.CountAsync (source, ct)
25+
|> Async.AwaitTask
26+
}
27+
28+
let inline iter (action : 't -> unit) source = async {
29+
let! ct = Async.CancellationToken
30+
return!
31+
ObservableExtensions.ForEachAsync (source, action, ct)
32+
|> Async.AwaitTask
33+
}
34+
35+
let iterAsync options (action : 't -> Async<unit>) source = source |> mapAsync options action |> length |> Async.Ignore

src/FSharp.Control.R3/FSharp.Control.R3.fsproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616

1717
<ItemGroup>
1818
<Compile Include="AssemblyInfo.fs" />
19+
<Compile Include="ProcessingOptions.fs" />
1920
<Compile Include="Observable.fs" />
21+
<Compile Include="AsyncObservable.fs" />
22+
<Compile Include="TaskObservable.fs" />
2023
</ItemGroup>
2124

2225
<ItemGroup>

src/FSharp.Control.R3/Observable.fs

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,5 @@
1-
module FSharp.Control.Observable
1+
module FSharp.Control.R3.Observable
22

33
open R3
4-
open System.Threading
5-
open System.Threading.Tasks
6-
7-
let asyncMap (f : 't -> Async<'r>) source =
8-
let selector x ct = ValueTask<'r> (Async.StartImmediateAsTask (f x, ct))
9-
ObservableExtensions.SelectAwait (source, selector)
10-
11-
let mapAsync (f : CancellationToken -> 't -> Task<'r>) source =
12-
let selector x ct = ValueTask<'r> (f ct x)
13-
ObservableExtensions.SelectAwait (source, selector)
144

155
let inline map (f : 't -> 'r) source = ObservableExtensions.Select (source, f)
16-
17-
let length source = async {
18-
let! ct = Async.CancellationToken
19-
return!
20-
ObservableExtensions.CountAsync (source, ct)
21-
|> Async.AwaitTask
22-
}
23-
24-
let inline iter (action : 't -> unit) source = ObservableExtensions.ForEachAsync (source, action)
25-
26-
let asyncIter (action : 't -> Async<unit>) source = source |> asyncMap action |> length |> Async.Ignore
27-
28-
let iterAsync (action : CancellationToken -> 't -> Task<unit>) source = source |> mapAsync action |> length |> Async.Ignore
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
namespace FSharp.Control.R3
2+
3+
open R3
4+
5+
type AwaitOperationConfiguration =
6+
/// <summary>All values are queued, and the next value waits for the completion of the asynchronous method.</summary>
7+
| Sequential
8+
/// <summary>Drop new value when async operation is running.</summary>
9+
| Drop
10+
/// <summary>If the previous asynchronous method is running, it is cancelled and the next asynchronous method is executed.</summary>
11+
| Switch
12+
/// <summary>All values are sent immediately to the asynchronous method.</summary>
13+
| Parallel of
14+
/// If set to -1, there is no limit.
15+
MaxConcurrent : int
16+
/// <summary>All values are sent immediately to the asynchronous method, but the results are queued and passed to the next operator in order.</summary>
17+
| SequentialParallel of
18+
/// If set to -1, there is no limit.
19+
MaxConcurrent : int
20+
/// <summary>Send the first value and the last value while the asynchronous method is running.</summary>
21+
| ThrottleFirstLast
22+
23+
type ProcessingOptions = {
24+
AwaitOperationConfiguration : AwaitOperationConfiguration
25+
ConfigureAwait : bool
26+
CancelOnCompleted : bool
27+
} with
28+
29+
static let ``default`` = {
30+
AwaitOperationConfiguration = AwaitOperationConfiguration.Sequential
31+
ConfigureAwait = true
32+
CancelOnCompleted = false
33+
}
34+
35+
static let ``parallel`` = {
36+
AwaitOperationConfiguration = AwaitOperationConfiguration.Parallel -1
37+
ConfigureAwait = true
38+
CancelOnCompleted = false
39+
}
40+
41+
static member Default = ``default``
42+
static member Parallel = ``parallel``
43+
44+
member this.MaxConcurrent =
45+
match this.AwaitOperationConfiguration with
46+
| AwaitOperationConfiguration.Sequential -> -1
47+
| AwaitOperationConfiguration.Drop -> -1
48+
| AwaitOperationConfiguration.Switch -> -1
49+
| AwaitOperationConfiguration.Parallel maxConcurrent -> maxConcurrent
50+
| AwaitOperationConfiguration.SequentialParallel maxConcurrent -> maxConcurrent
51+
| AwaitOperationConfiguration.ThrottleFirstLast -> -1
52+
53+
member this.AwaitOperation =
54+
match this.AwaitOperationConfiguration with
55+
| AwaitOperationConfiguration.Sequential -> AwaitOperation.Sequential
56+
| AwaitOperationConfiguration.Drop -> AwaitOperation.Drop
57+
| AwaitOperationConfiguration.Switch -> AwaitOperation.Switch
58+
| AwaitOperationConfiguration.Parallel _ -> AwaitOperation.Parallel
59+
| AwaitOperationConfiguration.SequentialParallel _ -> AwaitOperation.SequentialParallel
60+
| AwaitOperationConfiguration.ThrottleFirstLast -> AwaitOperation.ThrottleFirstLast
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
module FSharp.Control.R3.Task
2+
3+
open R3
4+
open System.Threading
5+
open System.Threading.Tasks
6+
open FSharp.Control.R3
7+
8+
module Observable =
9+
10+
let mapAsync (options : ProcessingOptions) (f : CancellationToken -> 't -> Task<'r>) source =
11+
let selector x ct = ValueTask<'r> (f ct x)
12+
ObservableExtensions.SelectAwait (
13+
source,
14+
selector,
15+
options.AwaitOperation,
16+
options.ConfigureAwait,
17+
options.CancelOnCompleted,
18+
options.MaxConcurrent
19+
)
20+
21+
let length cancellationToken source = ObservableExtensions.CountAsync (source, cancellationToken)
22+
23+
let inline iter cancellationToken (action : 't -> unit) source = ObservableExtensions.ForEachAsync (source, action, cancellationToken)
24+
25+
let iterAsync cancellationToken options (action : CancellationToken -> 't -> Task<unit>) source =
26+
source
27+
|> mapAsync options action
28+
|> length cancellationToken
29+
:> Task

tests/FSharp.Control.R3.Tests/Tests.fs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
namespace FSharp.Control.R3.Tests
22

33
open System
4-
open FSharp.Control
4+
open System.Threading.Tasks
5+
open FSharp.Control.R3.Async
56
open Microsoft.VisualStudio.TestTools.UnitTesting
67
open Swensen.Unquote
78

89
[<TestClass>]
910
type ObservableTests () =
1011

1112
[<TestMethod>]
12-
member _.``Test length`` () =
13+
member _.``Test length`` () : Task =
1314

1415
async {
1516
use r3Bus = new R3.Subject<int> ()
@@ -27,4 +28,5 @@ type ObservableTests () =
2728
Assert.AreEqual<int> (0, res)
2829

2930
}
30-
|> Async.RunSynchronously
31+
|> Async.StartImmediateAsTask
32+
:> Task

0 commit comments

Comments
 (0)