From 3930d992acb85e931aaa73dda7adad2d44baa360 Mon Sep 17 00:00:00 2001 From: halgari Date: Wed, 17 Jan 2024 17:19:50 -0700 Subject: [PATCH] InMemory tests are working --- .../RocksDBEventStore.cs | 67 ++----------------- src/NexusMods.EventSourcing/AEventStore.cs | 4 -- .../InMemoryEventStore.cs | 61 +++++++++++------ .../NexusMods.EventSourcing.TestModel.csproj | 1 + 4 files changed, 46 insertions(+), 87 deletions(-) diff --git a/src/NexusMods.EventSourcing.RocksDB/RocksDBEventStore.cs b/src/NexusMods.EventSourcing.RocksDB/RocksDBEventStore.cs index 3ad356d0..bbcb3900 100644 --- a/src/NexusMods.EventSourcing.RocksDB/RocksDBEventStore.cs +++ b/src/NexusMods.EventSourcing.RocksDB/RocksDBEventStore.cs @@ -40,72 +40,15 @@ public RocksDBEventStore(TSerializer serializer, Settings settings, ISerializati _deserializer = new SpanDeserializer(serializer); } - - public override TransactionId Add(T eventValue) + public override TransactionId Add(T eventEntity, (IIndexableAttribute, IAccumulator)[] indexed) { - lock (this) - { - _tx = _tx.Next(); - - // Write the event itself - { - Span keySpan = stackalloc byte[8]; - _tx.WriteTo(keySpan); - var span = _serializer.Serialize(eventValue); - _db.Put(keySpan, span, _eventsColumn); - } - - // Update the entity indexes to mark them as having this event - { - var ingester = new ModifiedEntitiesIngester(); - eventValue.Apply(ingester); - Span keySpan = stackalloc byte[24]; - _tx.WriteTo(keySpan.SliceFast(16..)); - foreach (var entityId in ingester.Entities) - { - entityId.TryWriteBytes(keySpan); - _db.Put(keySpan, keySpan, _entityIndexColumn); - } - } - return _tx; - } + throw new NotImplementedException(); } - public override void EventsForEntity(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId) + public override void EventsForIndex(IIndexableAttribute attr, TVal value, TIngester ingester, TransactionId fromTx, + TransactionId toTx) { - Span startKey = stackalloc byte[24]; - entityId.TryWriteBytes(startKey); - BinaryPrimitives.WriteUInt64BigEndian(startKey.SliceFast(16), fromId.Value); - Span endKey = stackalloc byte[24]; - entityId.TryWriteBytes(endKey); - if (toId == TransactionId.Max) - BinaryPrimitives.WriteUInt64BigEndian(endKey.SliceFast(16), ulong.MaxValue); - else - BinaryPrimitives.WriteUInt64BigEndian(endKey.SliceFast(16), toId.Value + 1); - - var options = new ReadOptions(); - unsafe - { - fixed (byte* startKeyPtr = startKey) - { - fixed (byte* endKeyPtr = endKey) - { - options.SetIterateUpperBound(endKeyPtr, 24); - options.SetIterateLowerBound(startKeyPtr, 24); - using var iterator = _db.NewIterator(_entityIndexColumn, options); - - iterator.SeekToFirst(); - while (iterator.Valid()) - { - var key = iterator.GetKeySpan().SliceFast(16); - var txId = TransactionId.From(key); - var evt = _db.Get(key, _deserializer, _eventsColumn); - if (!ingester.Ingest(txId, evt)) break; - iterator.Next(); - } - } - } - } + throw new NotImplementedException(); } public override TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, out IAccumulator loadedDefinition, diff --git a/src/NexusMods.EventSourcing/AEventStore.cs b/src/NexusMods.EventSourcing/AEventStore.cs index 54d7884b..b5bbc0ed 100644 --- a/src/NexusMods.EventSourcing/AEventStore.cs +++ b/src/NexusMods.EventSourcing/AEventStore.cs @@ -98,10 +98,6 @@ protected ReadOnlySpan SerializeSnapshot(EntityId id, IDictionary(T eventEntity) where T : IEvent; - public abstract void EventsForEntity(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId) where TIngester : IEventIngester; - public abstract TransactionId Add(T eventEntity, (IIndexableAttribute, IAccumulator)[] indexed) where T : IEvent; public abstract void EventsForIndex(IIndexableAttribute attr, TVal value, TIngester ingester, TransactionId fromTx, diff --git a/tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs b/tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs index 21438360..75b0974d 100644 --- a/tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs +++ b/tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs @@ -2,6 +2,7 @@ using System.Buffers.Binary; using NexusMods.EventSourcing.Abstractions; using NexusMods.EventSourcing.Abstractions.Serialization; +using NexusMods.Hashing.xxHash64; using Reloaded.Memory.Extensions; namespace NexusMods.EventSourcing.TestModel; @@ -9,8 +10,8 @@ namespace NexusMods.EventSourcing.TestModel; public class InMemoryEventStore : AEventStore where TSerializer : IEventSerializer { - private TransactionId _tx = TransactionId.From(0); - private readonly Dictionary> _events = new(); + private readonly List _events = new(); + private readonly Dictionary<(IAttribute, Hash), SortedSet> _indexes = new(); private readonly Dictionary> _snapshots = new(); private TSerializer _serializer; @@ -19,43 +20,61 @@ public InMemoryEventStore(TSerializer serializer, ISerializationRegistry seriali _serializer = serializer; } - public override TransactionId Add(T entity) + public override TransactionId Add(T entity, (IIndexableAttribute, IAccumulator)[] indexed) { lock (this) { - _tx = _tx.Next(); + // Create the new txId + var txId = TransactionId.From((ulong)_events.Count); + var data = _serializer.Serialize(entity); - var logger = new ModifiedEntitiesIngester(); - entity.Apply(logger); - foreach (var id in logger.Entities) + _events.Add(data.ToArray()); + + foreach (var (attr, accumulator) in indexed) { - if (!_events.TryGetValue(id, out var value)) + // Hash the accumulator to condense it down into a single ulong + var hash = HashAccumulator(attr, accumulator); + + if (_indexes.TryGetValue((attr, hash), out var found)) { - value = new List<(TransactionId, byte[])>(); - _events.Add(id, value); + found.Add(txId); + } + else + { + var newSet = new SortedSet { txId }; + _indexes.Add((attr, hash), newSet); } - - value.Add((_tx, data.ToArray())); } - - return _tx; + return txId; } } + private static Hash HashAccumulator(IIndexableAttribute attr, IAccumulator accumulator) + { + Span span = stackalloc byte[attr.SpanSize()]; + attr.WriteTo(span, accumulator); + var hash = span.XxHash64(); + return hash; + } - public override void EventsForEntity(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId) + public override void EventsForIndex(IIndexableAttribute attr, TVal value, TIngester ingester, TransactionId fromTx, + TransactionId toTx) { - if (!_events.TryGetValue(entityId, out var events)) + Span valueSpan = stackalloc byte[attr.SpanSize()]; + attr.WriteTo(valueSpan, value); + var hash = valueSpan.XxHash64(); + + if (!_indexes.TryGetValue((attr, hash), out var found)) return; - foreach (var data in events) + foreach (var txId in found) { - if (data.TxId < fromId) continue; - if (data.TxId > toId) break; + if (txId > toTx || txId < fromTx) continue; - var @event = _serializer.Deserialize(data.Data)!; - if (!ingester.Ingest(data.TxId, @event)) break; + var eventItem = _serializer.Deserialize(_events[(int)txId.Value]); + ingester.Ingest(txId, eventItem); } + } public override TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, out IAccumulator loadedDefinition, diff --git a/tests/NexusMods.EventSourcing.TestModel/NexusMods.EventSourcing.TestModel.csproj b/tests/NexusMods.EventSourcing.TestModel/NexusMods.EventSourcing.TestModel.csproj index d7f4a94f..0131df55 100644 --- a/tests/NexusMods.EventSourcing.TestModel/NexusMods.EventSourcing.TestModel.csproj +++ b/tests/NexusMods.EventSourcing.TestModel/NexusMods.EventSourcing.TestModel.csproj @@ -13,6 +13,7 @@ +