Skip to content

Commit

Permalink
Event store
Browse files Browse the repository at this point in the history
  • Loading branch information
SebastianStehle committed Jan 14, 2025
1 parent 720bb38 commit 7609041
Show file tree
Hide file tree
Showing 33 changed files with 345 additions and 180 deletions.
16 changes: 8 additions & 8 deletions events/Squidex.Events.EntityFramework/EFEventStore_Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ namespace Squidex.Events.EntityFramework;

public sealed partial class EFEventStore<T> : IEventStore
{
public IEventSubscription CreateSubscription(IEventSubscriber<StoredEvent> eventSubscriber, StreamFilter filter, string? position = null)
public IEventSubscription CreateSubscription(IEventSubscriber<StoredEvent> eventSubscriber, StreamFilter filter = default, StreamPosition position = default)
{
return new PollingSubscription(this, eventSubscriber, filter, options.Value.PollingInterval, position);
return new PollingSubscription(this, eventSubscriber, filter, position, options.Value.PollingInterval);
}

public async Task<IReadOnlyList<StoredEvent>> QueryStreamAsync(string streamName, long afterStreamPosition = -1,
Expand All @@ -31,7 +31,7 @@ public async Task<IReadOnlyList<StoredEvent>> QueryStreamAsync(string streamName

var result = Convert(commits, afterStreamPosition);

if ((commits.Count == 0 || commits[0].EventStreamOffset != afterStreamPosition) && afterStreamPosition > EventVersion.Empty)
if ((commits.Count == 0 || commits[0].EventStreamOffset != afterStreamPosition) && afterStreamPosition > EtagVersion.Empty)
{
commits = await context.Set<EFEventCommit>()
.ByStream(StreamFilter.Name(streamName))
Expand All @@ -46,7 +46,7 @@ public async Task<IReadOnlyList<StoredEvent>> QueryStreamAsync(string streamName
return result;
}

public async IAsyncEnumerable<StoredEvent> QueryAllReverseAsync(StreamFilter filter, DateTime timestamp = default, int take = int.MaxValue,
public async IAsyncEnumerable<StoredEvent> QueryAllReverseAsync(StreamFilter filter = default, DateTime timestamp = default, int take = int.MaxValue,
[EnumeratorCancellation] CancellationToken ct = default)
{
if (take <= 0)
Expand All @@ -67,7 +67,7 @@ public async IAsyncEnumerable<StoredEvent> QueryAllReverseAsync(StreamFilter fil
var taken = 0;
foreach (var commit in query)
{
foreach (var @event in commit.Filtered(EventVersion.Empty).Reverse())
foreach (var @event in commit.Filtered(EtagVersion.Empty).Reverse())
{
yield return @event;

Expand All @@ -80,17 +80,17 @@ public async IAsyncEnumerable<StoredEvent> QueryAllReverseAsync(StreamFilter fil
}
}

public async IAsyncEnumerable<StoredEvent> QueryAllAsync(StreamFilter filter, string? position = null, int take = int.MaxValue,
public async IAsyncEnumerable<StoredEvent> QueryAllAsync(StreamFilter filter = default, StreamPosition position = default, int take = int.MaxValue,
[EnumeratorCancellation] CancellationToken ct = default)
{
if (take <= 0)
if (take <= 0 || position.IsEnd)
{
yield break;
}

await using var context = await dbContextFactory.CreateDbContextAsync(ct);

StreamPosition streamPosition = position;
ParsedStreamPosition streamPosition = position;
var query = context.Set<EFEventCommit>()
.ByStream(filter)
.ByPosition(streamPosition)
Expand Down
12 changes: 6 additions & 6 deletions events/Squidex.Events.EntityFramework/FilterBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static IQueryable<EFEventCommit> ByTimestamp(this IQueryable<EFEventCommi

public static IQueryable<EFEventCommit> ByBeforeOffset(this IQueryable<EFEventCommit> q, long offset)
{
if (offset <= EventVersion.Empty)
if (offset <= EtagVersion.Empty)
{
return q;
}
Expand All @@ -40,15 +40,15 @@ public static IQueryable<EFEventCommit> ByBeforeOffset(this IQueryable<EFEventCo

public static IQueryable<EFEventCommit> ByOffset(this IQueryable<EFEventCommit> q, long offset)
{
if (offset <= EventVersion.Empty)
if (offset <= EtagVersion.Empty)
{
return q;
}

return q.Where(x => x.EventStreamOffset >= offset);
}

public static IQueryable<EFEventCommit> ByPosition(this IQueryable<EFEventCommit> q, StreamPosition position)
public static IQueryable<EFEventCommit> ByPosition(this IQueryable<EFEventCommit> q, ParsedStreamPosition position)
{
if (position.IsEndOfCommit)
{
Expand Down Expand Up @@ -83,7 +83,7 @@ public static IQueryable<EFEventCommit> ByStream(this IQueryable<EFEventCommit>
return q.Where(x => filter.Prefixes.Contains(x.EventStream));
}

public static IEnumerable<StoredEvent> Filtered(this EFEventCommit commit, StreamPosition position)
public static IEnumerable<StoredEvent> Filtered(this EFEventCommit commit, ParsedStreamPosition position)
{
var eventStreamOffset = commit.EventStreamOffset;

Expand All @@ -97,7 +97,7 @@ public static IEnumerable<StoredEvent> Filtered(this EFEventCommit commit, Strea
if (commitOffset > position.CommitOffset || commitPosition > position.Position)
{
var eventData = EventData.DeserializeFromJson(@event);
var eventPosition = new StreamPosition(commitPosition, commitOffset, commit.Events.Length);
var eventPosition = new ParsedStreamPosition(commitPosition, commitOffset, commit.Events.Length);

yield return new StoredEvent(commit.EventStream, eventPosition, eventStreamOffset, eventData);
}
Expand All @@ -120,7 +120,7 @@ public static IEnumerable<StoredEvent> Filtered(this EFEventCommit commit, long
if (eventStreamOffset > position)
{
var eventData = EventData.DeserializeFromJson(@event);
var eventPosition = new StreamPosition(commitPosition, commitOffset, commit.Events.Length);
var eventPosition = new ParsedStreamPosition(commitPosition, commitOffset, commit.Events.Length);

yield return new StoredEvent(commit.EventStream, eventPosition, eventStreamOffset, eventData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

namespace Squidex.Events.EntityFramework;

internal record struct StreamPosition(long Position, long CommitOffset, long CommitSize)
internal record struct ParsedStreamPosition(long Position, long CommitOffset, long CommitSize)
{
public static readonly StreamPosition Start = new StreamPosition(0, -1, -1);
public static readonly ParsedStreamPosition Start = new ParsedStreamPosition(0, -1, -1);

public readonly bool IsEndOfCommit => CommitOffset == CommitSize - 1;

public static implicit operator string(StreamPosition position)
public static implicit operator StreamPosition(ParsedStreamPosition position)
{
var sb = DefaultPools.StringBuilder.Get();
try
Expand All @@ -29,22 +29,23 @@ public static implicit operator string(StreamPosition position)
sb.Append('-');
sb.Append(position.CommitSize);

return sb.ToString();
return new StreamPosition(sb.ToString(), false);
}
finally
{
DefaultPools.StringBuilder.Return(sb);
}
}

public static implicit operator StreamPosition(string? value)
public static implicit operator ParsedStreamPosition(StreamPosition value)
{
if (string.IsNullOrWhiteSpace(value))
var token = value.Token;
if (string.IsNullOrWhiteSpace(token))
{
return Start;
}

var parts = value.Split('-');
var parts = token.Split('-');
if (parts.Length != 3)
{
return Start;
Expand All @@ -58,6 +59,6 @@ public static implicit operator StreamPosition(string? value)
return default;
}

return new StreamPosition(position, commitOffset, commitSize);
return new ParsedStreamPosition(position, commitOffset, commitSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================

using System.Collections.Concurrent;
using EventStore.Client;
using Microsoft.Extensions.Options;
using Squidex.Text;
Expand Down Expand Up @@ -125,8 +124,9 @@ private async Task CreateProjectionCoreAsync(string name, string query, bool wai
throw new InvalidOperationException("Projection is not running.");
}

if (status?.Progress == 100)
if (status?.Progress >= options.Value.ProgressDone)
{
await Task.Delay(100, ct);
break;
}

Expand Down
17 changes: 9 additions & 8 deletions events/Squidex.Events.GetEventStore/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Globalization;
using System.Runtime.CompilerServices;
using EventStore.Client;
using ESStreamPosition = EventStore.Client.StreamPosition;

namespace Squidex.Events.GetEventStore;

Expand All @@ -30,34 +31,34 @@ public static StreamRevision ToRevision(this long version)
return StreamRevision.FromInt64(version);
}

public static StreamPosition ToPositionBefore(this long version)
public static ESStreamPosition ToPositionBefore(this long version)
{
if (version < 0)
{
return StreamPosition.Start;
return ESStreamPosition.Start;
}

return StreamPosition.FromInt64(version - 1);
return ESStreamPosition.FromInt64(version + 1);
}

public static StreamPosition ToPosition(this string? position, bool inclusive)
public static ESStreamPosition ToPosition(this StreamPosition position, bool inclusive)
{
if (string.IsNullOrWhiteSpace(position))
{
return StreamPosition.Start;
return ESStreamPosition.Start;
}

if (long.TryParse(position, NumberStyles.Integer, CultureInfo.InvariantCulture, out var parsedPosition))
if (long.TryParse(position.Token, NumberStyles.Integer, CultureInfo.InvariantCulture, out var parsedPosition))
{
if (!inclusive)
{
parsedPosition++;
}

return StreamPosition.FromInt64(parsedPosition);
return ESStreamPosition.FromInt64(parsedPosition);
}

return StreamPosition.Start;
return ESStreamPosition.Start;
}

public static async IAsyncEnumerable<StoredEvent> IgnoreNotFound(this IAsyncEnumerable<StoredEvent> source,
Expand Down
4 changes: 2 additions & 2 deletions events/Squidex.Events.GetEventStore/Formatter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static class Formatter

public static StoredEvent Read(ResolvedEvent resolvedEvent, string? prefix)
{
var @event = resolvedEvent.Event;
var @event = resolvedEvent.OriginalEvent;

var eventPayload = Encoding.UTF8.GetString(@event.Data.Span);
var eventHeaders = GetHeaders(@event);
Expand All @@ -30,7 +30,7 @@ public static StoredEvent Read(ResolvedEvent resolvedEvent, string? prefix)
return new StoredEvent(
streamName,
resolvedEvent.OriginalEventNumber.ToInt64().ToString(CultureInfo.InvariantCulture),
resolvedEvent.Event.EventNumber.ToInt64(),
resolvedEvent.OriginalEvent.EventNumber.ToInt64(),
eventData);
}

Expand Down
26 changes: 14 additions & 12 deletions events/Squidex.Events.GetEventStore/GetEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Microsoft.Extensions.Options;
using Squidex.Hosting;
using Squidex.Hosting.Configuration;
using ESStreamPosition = EventStore.Client.StreamPosition;

namespace Squidex.Events.GetEventStore;

Expand Down Expand Up @@ -40,15 +41,15 @@ public async Task InitializeAsync(
}
}

public IEventSubscription CreateSubscription(IEventSubscriber<StoredEvent> subscriber, StreamFilter filter, string? position = null)
public IEventSubscription CreateSubscription(IEventSubscriber<StoredEvent> subscriber, StreamFilter filter = default, StreamPosition position = default)
{
return new GetEventStoreSubscription(subscriber, client, projectionClient, position, options.Value.Prefix, filter);
return new GetEventStoreSubscription(subscriber, client, projectionClient, options.Value.Prefix, filter, position);
}

public async IAsyncEnumerable<StoredEvent> QueryAllAsync(StreamFilter filter, string? position = null, int take = int.MaxValue,
public async IAsyncEnumerable<StoredEvent> QueryAllAsync(StreamFilter filter = default, StreamPosition position = default, int take = int.MaxValue,
[EnumeratorCancellation] CancellationToken ct = default)
{
if (take <= 0)
if (take <= 0 || position.IsEnd)
{
yield break;
}
Expand All @@ -62,7 +63,7 @@ public async IAsyncEnumerable<StoredEvent> QueryAllAsync(StreamFilter filter, st
}
}

public async IAsyncEnumerable<StoredEvent> QueryAllReverseAsync(StreamFilter filter, DateTime timestamp = default, int take = int.MaxValue,
public async IAsyncEnumerable<StoredEvent> QueryAllReverseAsync(StreamFilter filter = default, DateTime timestamp = default, int take = int.MaxValue,
[EnumeratorCancellation] CancellationToken ct = default)
{
if (take <= 0)
Expand All @@ -71,30 +72,31 @@ public async IAsyncEnumerable<StoredEvent> QueryAllReverseAsync(StreamFilter fil
}

var streamName = await projectionClient.CreateProjectionAsync(filter, true, ct);
var streamEvents = QueryReverseAsync(streamName, StreamPosition.End, take, ct);
var streamEvents = QueryReverseAsync(streamName, ESStreamPosition.End, take, ct);

await foreach (var storedEvent in streamEvents.IgnoreNotFound(ct).TakeWhile(x => x.Data.Headers.Timestamp() >= timestamp).WithCancellation(ct))
{
yield return storedEvent;
}
}

public async Task<IReadOnlyList<StoredEvent>> QueryStreamAsync(string streamName, long afterStreamPosition = EventVersion.Empty,
public async Task<IReadOnlyList<StoredEvent>> QueryStreamAsync(string streamName, long afterStreamPosition = EtagVersion.Empty,
CancellationToken ct = default)
{
var result = new List<StoredEvent>();

var stream = QueryAsync(GetStreamName(streamName), afterStreamPosition.ToPositionBefore(), int.MaxValue, ct);
var streamPath = GetStreamName(streamName);
var streamEvents = QueryAsync(streamPath, afterStreamPosition.ToPositionBefore(), int.MaxValue, ct);

await foreach (var storedEvent in stream.IgnoreNotFound(ct))
await foreach (var storedEvent in streamEvents.IgnoreNotFound(ct))
{
result.Add(storedEvent);
}

return result.ToList();
}

private IAsyncEnumerable<StoredEvent> QueryAsync(string streamName, StreamPosition start, long count,
private IAsyncEnumerable<StoredEvent> QueryAsync(string streamName, ESStreamPosition start, long count,
CancellationToken ct = default)
{
var result = client.ReadStreamAsync(
Expand All @@ -108,7 +110,7 @@ private IAsyncEnumerable<StoredEvent> QueryAsync(string streamName, StreamPositi
return result.Select(x => Formatter.Read(x, options.Value.Prefix));
}

private IAsyncEnumerable<StoredEvent> QueryReverseAsync(string streamName, StreamPosition start, long count,
private IAsyncEnumerable<StoredEvent> QueryReverseAsync(string streamName, ESStreamPosition start, long count,
CancellationToken ct = default)
{
var result = client.ReadStreamAsync(
Expand Down Expand Up @@ -171,7 +173,7 @@ public async Task DeleteAsync(StreamFilter filter,
{
var streamName = await projectionClient.CreateProjectionAsync(filter, true, ct);

var events = client.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start, resolveLinkTos: true, cancellationToken: ct);
var events = client.ReadStreamAsync(Direction.Forwards, streamName, ESStreamPosition.Start, resolveLinkTos: true, cancellationToken: ct);
if (await events.ReadState == ReadState.StreamNotFound)
{
return;
Expand Down
2 changes: 2 additions & 0 deletions events/Squidex.Events.GetEventStore/GetEventStoreOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public sealed class GetEventStoreOptions : IValidatableOptions

public string Prefix { get; set; } = "squidex";

public long ProgressDone { get; set; } = 95;

public IEnumerable<ConfigurationError> Validate()
{
if (string.IsNullOrWhiteSpace(Prefix))
Expand Down
Loading

0 comments on commit 7609041

Please sign in to comment.