Skip to content

of and to functions, additional operators #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added
* `empty`
* `singleton`
* `ofSeq`
* `ofAsync`
* `ofTask`
* `toArray`
* `toList`
* `toLookup`
* `catch`
* `chunkBy`
* `chunkBySize`
* `merge`
* `choose`

## [0.2.0] - 2024-11-23

- Added some basic functions: `filter/where`, `bind`, `concat`, `distinct`, `mapi`, `skip`, `take`, …
Expand Down
58 changes: 58 additions & 0 deletions src/FSharp.Control.R3/AsyncObservable.fs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,27 @@ module Observable =
options.MaxConcurrent
)

/// Creates observable sequence from a single element returned by asynchronous computation
let ofAsync (computation : Async<'T>) =
Observable.FromAsync (fun ct ->
Async.StartAsTask (computation, cancellationToken = ct)
|> ValueTask<'T>)

let inline toArray source = async {
let! ct = Async.CancellationToken
return!
ObservableExtensions.ToArrayAsync (source, ct)
|> Async.AwaitTask
}

let toList source = async {
let! ct = Async.CancellationToken
let! array =
ObservableExtensions.ToArrayAsync (source, ct)
|> Async.AwaitTask
return List.ofArray array
}

/// <summary>
/// Invokes an asynchronous action for each element in the observable sequence, and propagates all observer
/// messages through the result sequence.
Expand All @@ -85,3 +106,40 @@ module Observable =
/// by intercepting the message stream to run arbitrary actions for messages on the pipeline.
/// </remarks>
let iterAsync options (action : 't -> Async<unit>) source = source |> mapAsync options action |> length |> Async.Ignore

[<AutoOpen>]
module Extensions =

open System.Runtime.CompilerServices
open System.Runtime.InteropServices

[<AbstractClass; Sealed; Extension>]
type Observable private () =

static member toLookup (source, keySelector : 'T -> 'Key, [<Optional>] cancellationToken) = async {
let! ct = Async.CancellationToken
return!
ObservableExtensions.ToLookupAsync (source, keySelector, ct)
|> Async.AwaitTask
}

static member toLookup (source, keySelector : 'T -> 'Key, keyComparer, [<Optional>] cancellationToken) = async {
let! ct = Async.CancellationToken
return!
ObservableExtensions.ToLookupAsync (source, keySelector, keyComparer = keyComparer, cancellationToken = ct)
|> Async.AwaitTask
}

static member toLookup (source, keySelector : 'T -> 'Key, elementSelector : 'T -> 'Element, [<Optional>] cancellationToken) = async {
let! ct = Async.CancellationToken
return!
ObservableExtensions.ToLookupAsync (source, keySelector, elementSelector = elementSelector, cancellationToken = ct)
|> Async.AwaitTask
}

static member toLookup (source, keySelector : 'T -> 'Key, elementSelector : 'T -> 'Element, keyComparer, [<Optional>] cancellationToken) = async {
let! ct = Async.CancellationToken
return!
ObservableExtensions.ToLookupAsync (source, keySelector, elementSelector, keyComparer = keyComparer, cancellationToken = ct)
|> Async.AwaitTask
}
77 changes: 75 additions & 2 deletions src/FSharp.Control.R3/Observable.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module FSharp.Control.R3.Observable

open System
open R3

/// Hides the identy of an observable sequence
Expand All @@ -11,13 +12,38 @@ let inline bind ([<InlineIfLambda>] f : 'T -> Observable<'TNext>) source = Obser
/// Converts the elements of the sequence to the specified type
let inline cast<'T, 'CastType> (source) = ObservableExtensions.Cast<'T, 'CastType> (source)

let inline catch ([<InlineIfLambda>] f : 'Exn -> Observable<'TNext>) o = ObservableExtensions.Catch (o, f)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the R3 exception handling significantly differs from Rx, this could need some xml comments as well. (R3 docs: "Stopping the pipeline at OnError is a mistake.")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added XML comment with remarks


/// Concatenates the second observable sequence to the first observable sequence
/// upn the successful termination of the first
let inline concat source = ObservableExtensions.Concat source
let inline concat first second = ObservableExtensions.Concat (first, second)

///<summary>Divides the input observable sequence into chunks of size at most <c>chunkSize</c>.</summary>
///<param name="chunkSize">The maximum size of each chunk.</param>
///<param name="source">The input observable sequence.</param>
///<returns>The observable sequence divided into chunks.</returns>
///<exception cref="T:System.ArgumentNullException">Thrown when the input sequence is null.</exception>
///<exception cref="T:System.ArgumentException">Thrown when <c>chunkSize</c> is not positive.</exception>
let inline chunkBySize (chunkSize : int) (source) = ObservableExtensions.Chunk (source, chunkSize)

let inline chunkBy (configuration : ChunkConfiguration<'T>) (source) =
match configuration with
| ChunkCount count -> ObservableExtensions.Chunk (source, count)
| ChunkTimeSpan (timeSpan, timeProvider) -> ObservableExtensions.Chunk (source, timeSpan, timeProvider)
| ChunkTimeSpanCount (timeSpan, count, timeProvider) -> ObservableExtensions.Chunk (source, timeSpan, count, timeProvider)
| ChunkMilliseconds (milliseconds, timeProvider) ->
ObservableExtensions.Chunk (source, TimeSpan.FromMilliseconds (float milliseconds), timeProvider)
| ChunkMillisecondsCount (milliseconds, count, timeProvider) ->
ObservableExtensions.Chunk (source, TimeSpan.FromMilliseconds (float milliseconds), count, timeProvider)
| ChunkAsyncWindow (asyncWindow, configureAwait) -> ObservableExtensions.Chunk (source, asyncWindow, configureAwait)
| ChunkWindowBoundaries windowBoundaries -> ObservableExtensions.Chunk (source, windowBoundaries = windowBoundaries)

/// Returns an observable sequence that only contains distinct elements
let inline distinct source = ObservableExtensions.Distinct source

/// Returns an observable sequence that contains no elements
let inline empty () = Observable.Empty ()

/// Filters the observable elements of a sequence based on a predicate
let inline filter ([<InlineIfLambda>] f : 't -> bool) source = ObservableExtensions.Where (source, f)

Expand All @@ -27,7 +53,13 @@ let inline map ([<InlineIfLambda>] f : 't -> 'r) source = ObservableExtensions.S
/// Maps the given observable with the given function and the index of the element
let inline mapi ([<InlineIfLambda>] f : int -> 't -> 'r) source = ObservableExtensions.Select (source, (fun i x -> f x i))

/// Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.
/// Merges two observable sequences into one observable sequence
let inline merge (source1, source2) = ObservableExtensions.Merge (source1, source2)

/// Returns an observable sequence that contains only a single element
let inline singleton item = Observable.Return<'T> item
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate this "sigleton" because it reminds people of OO GoF desgin-pattern of singleton which this is not, this is not a static instance and there can be more by just merging this. This is rather "lift my value to this computation expression", I don't know if there are better more F#pish names like Option.ofObj or Task.FromResult.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/// Bypasses a specified number of elements in an observable sequence and then returns the remaining elements
let inline skip (count : int) (source) = ObservableExtensions.Skip (source, count)

/// Takes n elements (from the beginning of an observable sequence?)
Expand All @@ -36,8 +68,49 @@ let inline take (count : int) (source) = ObservableExtensions.Take (source, coun
/// Filters the observable elements of a sequence based on a predicate
let inline where ([<InlineIfLambda>] f : 't -> bool) source = ObservableExtensions.Where (source, f)

open System.Runtime.CompilerServices
open System.Runtime.InteropServices

[<AutoOpen>]
module Extensions =

[<AbstractClass; Sealed; Extension>]
type Observable private () =

static member ofSeq (items : _ seq, [<Optional>] cancellationToken) = Observable.ToObservable (items, cancellationToken)

[<AutoOpen>]
module OptionExtensions =

[<AbstractClass; Sealed; Extension>]
type Observable private () =

/// Applies the given function to each element of the observable. Returns
/// a sequence comprised of the results "x" for each element where
/// the function returns Some(x)
[<Extension>]
static member choose f o = o |> map f |> where Option.isSome |> map Option.get

[<AutoOpen>]
module ValueOptionExtensions =

[<AbstractClass; Sealed; Extension>]
type Observable private () =

/// Applies the given function to each element of the observable. Returns
/// a sequence comprised of the results "x" for each element where
/// the function returns ValueSome(x)
[<Extension>]
static member choose f o =
o
|> map f
|> where ValueOption.isSome
|> map ValueOption.get


[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Builders =

open System

/// A reactive query builder.
Expand Down
26 changes: 26 additions & 0 deletions src/FSharp.Control.R3/ProcessingOptions.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
namespace FSharp.Control.R3

open System
open System.Threading
open System.Threading.Tasks
open R3

type AwaitOperationConfiguration =
Expand Down Expand Up @@ -58,3 +61,26 @@ type ProcessingOptions = {
| AwaitOperationConfiguration.Parallel _ -> AwaitOperation.Parallel
| AwaitOperationConfiguration.SequentialParallel _ -> AwaitOperation.SequentialParallel
| AwaitOperationConfiguration.ThrottleFirstLast -> AwaitOperation.ThrottleFirstLast

type ChunkConfiguration<'T> =
| ChunkCount of WindowLength : int
| ChunkTimeSpan of WindowTime : TimeSpan * TimeProvider : TimeProvider
| ChunkTimeSpanCount of WindowTime : TimeSpan * WindowLength : int * TimeProvider : TimeProvider
| ChunkMilliseconds of WindowTime : int * TimeProvider : TimeProvider
| ChunkMillisecondsCount of WindowTime : int * WindowLength : int * TimeProvider : TimeProvider
| ChunkAsyncWindow of AsyncWindow : Func<'T, CancellationToken, ValueTask> * ConfigureAwait : bool
| ChunkWindowBoundaries of WindowBoundaries : Observable<'T>

[<AutoOpen>]
module ChunkConfiguration =
let inline TimeSpan windowTime = ChunkTimeSpan (windowTime, ObservableSystem.DefaultTimeProvider)
let inline TimeSpanCount windowTime windowLength =
ChunkTimeSpanCount (windowTime, windowLength, ObservableSystem.DefaultTimeProvider)
let inline Milliseconds windowTime = ChunkMilliseconds (windowTime, ObservableSystem.DefaultTimeProvider)
let inline MillisecondsCount windowTime windowLength =
ChunkMillisecondsCount (windowTime, windowLength, ObservableSystem.DefaultTimeProvider)
let AsyncWindow (asyncWindow : 'T -> Async<unit>) =
let asyncWindow element ct =
Async.StartImmediateAsTask (asyncWindow element, ct) :> Task
|> ValueTask
ChunkAsyncWindow (asyncWindow, true)
46 changes: 46 additions & 0 deletions src/FSharp.Control.R3/TaskObservable.fs
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,49 @@ module Observable =
|> mapAsync options action
|> length cancellationToken
:> Task

[<AutoOpen>]
module Extensions =

open System.Runtime.CompilerServices
open System.Runtime.InteropServices

[<AbstractClass; Sealed; Extension>]
type Observable private () =

/// <summary>
/// Creates observable sequence from a single element returned by cancellable <see cref="ValueTask"/>
/// </summary>
static member inline ofTask (asyncFactory : CancellationToken -> ValueTask, [<Optional>] configureAwait) =
Observable.FromAsync (asyncFactory, configureAwait)

/// <summary>
/// Creates observable sequence from a single element returned by cancellable <see cref="ValueTask"/>
/// </summary>
static member inline ofTask (asyncFactory : CancellationToken -> ValueTask<'T>, [<Optional>] configureAwait) =
Observable.FromAsync (asyncFactory, configureAwait)

static member inline toArray (source, [<Optional>] cancellationToken) = ObservableExtensions.ToArrayAsync (source, cancellationToken)

static member toList (source, [<Optional>] cancellationToken) = task {
let! array = ObservableExtensions.ToArrayAsync (source, cancellationToken)
return List.ofArray array
}

static member toLookup (source, keySelector : 'T -> 'Key, [<Optional>] cancellationToken) =
ObservableExtensions.ToLookupAsync (source, keySelector, cancellationToken)

static member toLookup (source, keySelector : 'T -> 'Key, keyComparer, [<Optional>] cancellationToken) =
ObservableExtensions.ToLookupAsync (source, keySelector, keyComparer = keyComparer, cancellationToken = cancellationToken)

static member toLookup (source, keySelector : 'T -> 'Key, elementSelector : 'T -> 'Element, [<Optional>] cancellationToken) =
ObservableExtensions.ToLookupAsync (source, keySelector, elementSelector = elementSelector, cancellationToken = cancellationToken)

static member toLookup (source, keySelector : 'T -> 'Key, elementSelector : 'T -> 'Element, keyComparer, [<Optional>] cancellationToken) =
ObservableExtensions.ToLookupAsync (
source,
keySelector,
elementSelector,
keyComparer = keyComparer,
cancellationToken = cancellationToken
)