diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d88d3c..8bf378c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,24 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added +* `empty` +* `singleton` +* `ofSeq` +* `ofAsync` +* `ofTask` +* `toArray` +* `toList` +* `toLookup` +* `catch` +* `chunkBy` +* `chunkBySize` +* `merge` +* `choose` +* `ofType` + ## [0.2.0] - 2024-11-23 - Added some basic functions: `filter/where`, `bind`, `concat`, `distinct`, `mapi`, `skip`, `take`, … diff --git a/src/FSharp.Control.R3/AsyncObservable.fs b/src/FSharp.Control.R3/AsyncObservable.fs index 3b56f14..d9369df 100644 --- a/src/FSharp.Control.R3/AsyncObservable.fs +++ b/src/FSharp.Control.R3/AsyncObservable.fs @@ -5,6 +5,7 @@ open System.Threading open System.Threading.Tasks open FSharp.Control.R3 +/// Caution! All functions returning are blocking and may never return if awaited module Observable = /// Applies an accumulator function over an observable sequence, returning the @@ -76,6 +77,27 @@ module Observable = options.MaxConcurrent ) + /// Creates observable sequence from a single element returned by asynchronous computation + let ofAsync (computation : Async<'T>) = + Observable.FromAsync (fun ct -> + Async.StartImmediateAsTask (computation, cancellationToken = ct) + |> ValueTask<'T>) + + let inline toArray source = async { + let! ct = Async.CancellationToken + return! + ObservableExtensions.ToArrayAsync (source, ct) + |> Async.AwaitTask + } + + let toList source = async { + let! ct = Async.CancellationToken + let! array = + ObservableExtensions.ToArrayAsync (source, ct) + |> Async.AwaitTask + return List.ofArray array + } + /// /// Invokes an asynchronous action for each element in the observable sequence, and propagates all observer /// messages through the result sequence. @@ -85,3 +107,40 @@ module Observable = /// by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// let iterAsync options (action : 't -> Async) source = source |> mapAsync options action |> length |> Async.Ignore + +[] +module Extensions = + + open System.Runtime.CompilerServices + open System.Runtime.InteropServices + + [] + type Observable private () = + + static member toLookup (source, keySelector : 'T -> 'Key, [] cancellationToken) = async { + let! ct = Async.CancellationToken + return! + ObservableExtensions.ToLookupAsync (source, keySelector, ct) + |> Async.AwaitTask + } + + static member toLookup (source, keySelector : 'T -> 'Key, keyComparer, [] cancellationToken) = async { + let! ct = Async.CancellationToken + return! + ObservableExtensions.ToLookupAsync (source, keySelector, keyComparer = keyComparer, cancellationToken = ct) + |> Async.AwaitTask + } + + static member toLookup (source, keySelector : 'T -> 'Key, elementSelector : 'T -> 'Element, [] cancellationToken) = async { + let! ct = Async.CancellationToken + return! + ObservableExtensions.ToLookupAsync (source, keySelector, elementSelector = elementSelector, cancellationToken = ct) + |> Async.AwaitTask + } + + static member toLookup (source, keySelector : 'T -> 'Key, elementSelector : 'T -> 'Element, keyComparer, [] cancellationToken) = async { + let! ct = Async.CancellationToken + return! + ObservableExtensions.ToLookupAsync (source, keySelector, elementSelector, keyComparer = keyComparer, cancellationToken = ct) + |> Async.AwaitTask + } diff --git a/src/FSharp.Control.R3/Observable.fs b/src/FSharp.Control.R3/Observable.fs index 2f04593..b47c2ff 100644 --- a/src/FSharp.Control.R3/Observable.fs +++ b/src/FSharp.Control.R3/Observable.fs @@ -1,5 +1,6 @@ module FSharp.Control.R3.Observable +open System open R3 /// Hides the identy of an observable sequence @@ -11,33 +12,107 @@ let inline bind ([] f : 'T -> Observable<'TNext>) source = Obser /// Converts the elements of the sequence to the specified type let inline cast<'T, 'CastType> (source) = ObservableExtensions.Cast<'T, 'CastType> (source) +/// +/// Adds an error handler to an observable sequence. +/// +/// Exception does not stop further processing +let inline catch ([] f : 'Exn -> Observable<'T>) o = ObservableExtensions.Catch (o, f) + /// Concatenates the second observable sequence to the first observable sequence /// upn the successful termination of the first -let inline concat source = ObservableExtensions.Concat source +let inline concat first second = ObservableExtensions.Concat (first, second) + +///Divides the input observable sequence into chunks of size at most chunkSize. +///The maximum size of each chunk. +///The input observable sequence. +///The observable sequence divided into chunks. +///Thrown when the input sequence is null. +///Thrown when chunkSize is not positive. +let inline chunkBySize (chunkSize : int) (source) = ObservableExtensions.Chunk (source, chunkSize) + +let inline chunkBy (configuration : ChunkConfiguration<'T>) (source) = + match configuration with + | ChunkCount count -> ObservableExtensions.Chunk (source, count) + | ChunkTimeSpan (timeSpan, timeProvider) -> ObservableExtensions.Chunk (source, timeSpan, timeProvider) + | ChunkTimeSpanCount (timeSpan, count, timeProvider) -> ObservableExtensions.Chunk (source, timeSpan, count, timeProvider) + | ChunkMilliseconds (milliseconds, timeProvider) -> + ObservableExtensions.Chunk (source, TimeSpan.FromMilliseconds (float milliseconds), timeProvider) + | ChunkMillisecondsCount (milliseconds, count, timeProvider) -> + ObservableExtensions.Chunk (source, TimeSpan.FromMilliseconds (float milliseconds), count, timeProvider) + | ChunkAsyncWindow (asyncWindow, configureAwait) -> ObservableExtensions.Chunk (source, asyncWindow, configureAwait) + | ChunkWindowBoundaries windowBoundaries -> ObservableExtensions.Chunk (source, windowBoundaries = windowBoundaries) /// Returns an observable sequence that only contains distinct elements let inline distinct source = ObservableExtensions.Distinct source +/// Returns an observable sequence that contains no elements +let inline empty () = Observable.Empty () + /// Filters the observable elements of a sequence based on a predicate -let inline filter ([] f : 't -> bool) source = ObservableExtensions.Where (source, f) +let inline filter ([] f : 'T -> bool) source = ObservableExtensions.Where (source, f) /// Maps the given observable with the given function -let inline map ([] f : 't -> 'r) source = ObservableExtensions.Select (source, f) +let inline map ([] f : 'T -> 'R) source = ObservableExtensions.Select (source, f) /// Maps the given observable with the given function and the index of the element -let inline mapi ([] f : int -> 't -> 'r) source = ObservableExtensions.Select (source, (fun i x -> f x i)) +let inline mapi ([] f : int -> 'T -> 'R) source = ObservableExtensions.Select (source, (fun i x -> f x i)) + +/// Merges two observable sequences into one observable sequence +let inline merge (source1, source2) = ObservableExtensions.Merge (source1, source2) + +let inline ofType<'T, 'R> (source) = ObservableExtensions.OfType<'T, 'R> (source) + +/// Returns an observable sequence that contains only a single element +let inline singleton item = Observable.Return<'T> item -/// Bypasses a specified number of elements in an observable sequence and then returns the remaining elements. +/// Bypasses a specified number of elements in an observable sequence and then returns the remaining elements let inline skip (count : int) (source) = ObservableExtensions.Skip (source, count) /// Takes n elements (from the beginning of an observable sequence?) let inline take (count : int) (source) = ObservableExtensions.Take (source, count) /// Filters the observable elements of a sequence based on a predicate -let inline where ([] f : 't -> bool) source = ObservableExtensions.Where (source, f) +let inline where ([] f : 'T -> bool) source = ObservableExtensions.Where (source, f) + +open System.Runtime.CompilerServices +open System.Runtime.InteropServices + +[] +module Extensions = + + [] + type Observable private () = + + static member ofSeq (items : _ seq, [] cancellationToken) = Observable.ToObservable (items, cancellationToken) + +[] +module OptionExtensions = + + [] + type Observable private () = + + /// Applies the given function to each element of the observable. Returns + /// a sequence comprised of the results "x" for each element where + /// the function returns Some(x) + [] + static member choose f = map f >> where Option.isSome >> map Option.get + +[] +module ValueOptionExtensions = + + [] + type Observable private () = + + /// Applies the given function to each element of the observable. Returns + /// a sequence comprised of the results "x" for each element where + /// the function returns ValueSome(x) + [] + static member choose f = map f >> where ValueOption.isSome >> map ValueOption.get + [] module Builders = + open System /// A reactive query builder. diff --git a/src/FSharp.Control.R3/ProcessingOptions.fs b/src/FSharp.Control.R3/ProcessingOptions.fs index 747e7b5..9750d25 100644 --- a/src/FSharp.Control.R3/ProcessingOptions.fs +++ b/src/FSharp.Control.R3/ProcessingOptions.fs @@ -1,5 +1,8 @@ namespace FSharp.Control.R3 +open System +open System.Threading +open System.Threading.Tasks open R3 type AwaitOperationConfiguration = @@ -58,3 +61,26 @@ type ProcessingOptions = { | AwaitOperationConfiguration.Parallel _ -> AwaitOperation.Parallel | AwaitOperationConfiguration.SequentialParallel _ -> AwaitOperation.SequentialParallel | AwaitOperationConfiguration.ThrottleFirstLast -> AwaitOperation.ThrottleFirstLast + +type ChunkConfiguration<'T> = + | ChunkCount of WindowLength : int + | ChunkTimeSpan of WindowTime : TimeSpan * TimeProvider : TimeProvider + | ChunkTimeSpanCount of WindowTime : TimeSpan * WindowLength : int * TimeProvider : TimeProvider + | ChunkMilliseconds of WindowTime : int * TimeProvider : TimeProvider + | ChunkMillisecondsCount of WindowTime : int * WindowLength : int * TimeProvider : TimeProvider + | ChunkAsyncWindow of AsyncWindow : Func<'T, CancellationToken, ValueTask> * ConfigureAwait : bool + | ChunkWindowBoundaries of WindowBoundaries : Observable<'T> + +[] +module ChunkConfiguration = + let inline TimeSpan windowTime = ChunkTimeSpan (windowTime, ObservableSystem.DefaultTimeProvider) + let inline TimeSpanCount windowTime windowLength = + ChunkTimeSpanCount (windowTime, windowLength, ObservableSystem.DefaultTimeProvider) + let inline Milliseconds windowTime = ChunkMilliseconds (windowTime, ObservableSystem.DefaultTimeProvider) + let inline MillisecondsCount windowTime windowLength = + ChunkMillisecondsCount (windowTime, windowLength, ObservableSystem.DefaultTimeProvider) + let AsyncWindow (asyncWindow : 'T -> Async) = + let asyncWindow element ct = + Async.StartImmediateAsTask (asyncWindow element, ct) :> Task + |> ValueTask + ChunkAsyncWindow (asyncWindow, true) diff --git a/src/FSharp.Control.R3/TaskObservable.fs b/src/FSharp.Control.R3/TaskObservable.fs index 76ea8df..ba1f738 100644 --- a/src/FSharp.Control.R3/TaskObservable.fs +++ b/src/FSharp.Control.R3/TaskObservable.fs @@ -5,15 +5,16 @@ open System.Threading open System.Threading.Tasks open FSharp.Control.R3 +/// Caution! All functions returning / are blocking and may never return if awaited module Observable = /// Applies an accumulator function over an observable sequence, returning the /// result of the aggregation as a single element in the result sequence - let inline aggregate cancellationToken seed ([] f : 'r -> 't -> 'r) source = + let inline aggregate cancellationToken seed ([] f : 'R -> 'T -> 'R) source = ObservableExtensions.AggregateAsync (source, seed, f, cancellationToken) /// Determines whether all elements of an observable satisfy a predicate - let inline all cancellationToken ([] f : 't -> bool) source = ObservableExtensions.AllAsync (source, f, cancellationToken) + let inline all cancellationToken ([] f : 'T -> bool) source = ObservableExtensions.AllAsync (source, f, cancellationToken) /// /// Invokes an action for each element in the observable sequence, and propagates all observer @@ -23,7 +24,7 @@ module Observable = /// This method can be used for debugging, logging, etc. of query behavior /// by intercepting the message stream to run arbitrary actions for messages on the pipeline. /// - let inline iter cancellationToken ([] action : 't -> unit) source = + let inline iter cancellationToken ([] action : 'T -> unit) source = ObservableExtensions.ForEachAsync (source, action, cancellationToken) /// Determines whether an observable sequence contains a specified value @@ -37,8 +38,8 @@ module Observable = let length cancellationToken source = ObservableExtensions.CountAsync (source, cancellationToken) /// Maps the given observable with the given asynchronous function - let mapAsync (options : ProcessingOptions) (f : CancellationToken -> 't -> Task<'r>) source = - let selector x ct = ValueTask<'r> (f ct x) + let mapAsync (options : ProcessingOptions) (f : CancellationToken -> 'T -> Task<'R>) source = + let selector x ct = ValueTask<'R> (f ct x) ObservableExtensions.SelectAwait ( source, selector, @@ -61,3 +62,49 @@ module Observable = |> mapAsync options action |> length cancellationToken :> Task + +[] +module Extensions = + + open System.Runtime.CompilerServices + open System.Runtime.InteropServices + + [] + type Observable private () = + + /// + /// Creates observable sequence from a single element returned by cancellable + /// + static member inline ofTask (asyncFactory : CancellationToken -> ValueTask, [] configureAwait) = + Observable.FromAsync (asyncFactory, configureAwait) + + /// + /// Creates observable sequence from a single element returned by cancellable + /// + static member inline ofTask (asyncFactory : CancellationToken -> ValueTask<'T>, [] configureAwait) = + Observable.FromAsync (asyncFactory, configureAwait) + + static member inline toArray (source, [] cancellationToken) = ObservableExtensions.ToArrayAsync (source, cancellationToken) + + static member toList (source, [] cancellationToken) = task { + let! array = ObservableExtensions.ToArrayAsync (source, cancellationToken) + return List.ofArray array + } + + static member toLookup (source, keySelector : 'T -> 'Key, [] cancellationToken) = + ObservableExtensions.ToLookupAsync (source, keySelector, cancellationToken) + + static member toLookup (source, keySelector : 'T -> 'Key, keyComparer, [] cancellationToken) = + ObservableExtensions.ToLookupAsync (source, keySelector, keyComparer = keyComparer, cancellationToken = cancellationToken) + + static member toLookup (source, keySelector : 'T -> 'Key, elementSelector : 'T -> 'Element, [] cancellationToken) = + ObservableExtensions.ToLookupAsync (source, keySelector, elementSelector = elementSelector, cancellationToken = cancellationToken) + + static member toLookup (source, keySelector : 'T -> 'Key, elementSelector : 'T -> 'Element, keyComparer, [] cancellationToken) = + ObservableExtensions.ToLookupAsync ( + source, + keySelector, + elementSelector, + keyComparer = keyComparer, + cancellationToken = cancellationToken + )