Skip to content

Commit

Permalink
InMemory tests are working
Browse files Browse the repository at this point in the history
  • Loading branch information
halgari committed Jan 18, 2024
1 parent be83c4a commit 3930d99
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 87 deletions.
67 changes: 5 additions & 62 deletions src/NexusMods.EventSourcing.RocksDB/RocksDBEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,72 +40,15 @@ public RocksDBEventStore(TSerializer serializer, Settings settings, ISerializati
_deserializer = new SpanDeserializer<TSerializer>(serializer);
}


public override TransactionId Add<T>(T eventValue)
public override TransactionId Add<T>(T eventEntity, (IIndexableAttribute, IAccumulator)[] indexed)
{
lock (this)
{
_tx = _tx.Next();

// Write the event itself
{
Span<byte> keySpan = stackalloc byte[8];
_tx.WriteTo(keySpan);
var span = _serializer.Serialize(eventValue);
_db.Put(keySpan, span, _eventsColumn);
}

// Update the entity indexes to mark them as having this event
{
var ingester = new ModifiedEntitiesIngester();
eventValue.Apply(ingester);
Span<byte> keySpan = stackalloc byte[24];
_tx.WriteTo(keySpan.SliceFast(16..));
foreach (var entityId in ingester.Entities)
{
entityId.TryWriteBytes(keySpan);
_db.Put(keySpan, keySpan, _entityIndexColumn);
}
}
return _tx;
}
throw new NotImplementedException();
}

public override void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId)
public override void EventsForIndex<TIngester, TVal>(IIndexableAttribute<TVal> attr, TVal value, TIngester ingester, TransactionId fromTx,
TransactionId toTx)
{
Span<byte> startKey = stackalloc byte[24];
entityId.TryWriteBytes(startKey);
BinaryPrimitives.WriteUInt64BigEndian(startKey.SliceFast(16), fromId.Value);
Span<byte> endKey = stackalloc byte[24];
entityId.TryWriteBytes(endKey);
if (toId == TransactionId.Max)
BinaryPrimitives.WriteUInt64BigEndian(endKey.SliceFast(16), ulong.MaxValue);
else
BinaryPrimitives.WriteUInt64BigEndian(endKey.SliceFast(16), toId.Value + 1);

var options = new ReadOptions();
unsafe
{
fixed (byte* startKeyPtr = startKey)
{
fixed (byte* endKeyPtr = endKey)
{
options.SetIterateUpperBound(endKeyPtr, 24);
options.SetIterateLowerBound(startKeyPtr, 24);
using var iterator = _db.NewIterator(_entityIndexColumn, options);

iterator.SeekToFirst();
while (iterator.Valid())
{
var key = iterator.GetKeySpan().SliceFast(16);
var txId = TransactionId.From(key);
var evt = _db.Get(key, _deserializer, _eventsColumn);
if (!ingester.Ingest(txId, evt)) break;
iterator.Next();
}
}
}
}
throw new NotImplementedException();
}

public override TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, out IAccumulator loadedDefinition,
Expand Down
4 changes: 0 additions & 4 deletions src/NexusMods.EventSourcing/AEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ protected ReadOnlySpan<byte> SerializeSnapshot(EntityId id, IDictionary<IAttribu
var span = _writer.GetWrittenSpan();
return span;
}

public abstract TransactionId Add<T>(T eventEntity) where T : IEvent;
public abstract void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId) where TIngester : IEventIngester;

public abstract TransactionId Add<T>(T eventEntity, (IIndexableAttribute, IAccumulator)[] indexed) where T : IEvent;

public abstract void EventsForIndex<TIngester, TVal>(IIndexableAttribute<TVal> attr, TVal value, TIngester ingester, TransactionId fromTx,
Expand Down
61 changes: 40 additions & 21 deletions tests/NexusMods.EventSourcing.TestModel/InMemoryEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
using System.Buffers.Binary;
using NexusMods.EventSourcing.Abstractions;
using NexusMods.EventSourcing.Abstractions.Serialization;
using NexusMods.Hashing.xxHash64;
using Reloaded.Memory.Extensions;

namespace NexusMods.EventSourcing.TestModel;

public class InMemoryEventStore<TSerializer> : AEventStore
where TSerializer : IEventSerializer
{
private TransactionId _tx = TransactionId.From(0);
private readonly Dictionary<EntityId,IList<(TransactionId TxId, byte[] Data)>> _events = new();
private readonly List<byte[]> _events = new();
private readonly Dictionary<(IAttribute, Hash), SortedSet<TransactionId>> _indexes = new();
private readonly Dictionary<EntityId, SortedDictionary<TransactionId, byte[]>> _snapshots = new();
private TSerializer _serializer;

Expand All @@ -19,43 +20,61 @@ public InMemoryEventStore(TSerializer serializer, ISerializationRegistry seriali
_serializer = serializer;
}

public override TransactionId Add<T>(T entity)
public override TransactionId Add<T>(T entity, (IIndexableAttribute, IAccumulator)[] indexed)
{
lock (this)
{
_tx = _tx.Next();
// Create the new txId
var txId = TransactionId.From((ulong)_events.Count);

var data = _serializer.Serialize(entity);
var logger = new ModifiedEntitiesIngester();
entity.Apply(logger);
foreach (var id in logger.Entities)
_events.Add(data.ToArray());

foreach (var (attr, accumulator) in indexed)
{
if (!_events.TryGetValue(id, out var value))
// Hash the accumulator to condense it down into a single ulong
var hash = HashAccumulator(attr, accumulator);

if (_indexes.TryGetValue((attr, hash), out var found))
{
value = new List<(TransactionId, byte[])>();
_events.Add(id, value);
found.Add(txId);
}
else
{
var newSet = new SortedSet<TransactionId> { txId };
_indexes.Add((attr, hash), newSet);
}

value.Add((_tx, data.ToArray()));
}

return _tx;
return txId;
}
}

private static Hash HashAccumulator(IIndexableAttribute attr, IAccumulator accumulator)
{
Span<byte> span = stackalloc byte[attr.SpanSize()];
attr.WriteTo(span, accumulator);
var hash = span.XxHash64();
return hash;
}

public override void EventsForEntity<TIngester>(EntityId entityId, TIngester ingester, TransactionId fromId, TransactionId toId)
public override void EventsForIndex<TIngester, TVal>(IIndexableAttribute<TVal> attr, TVal value, TIngester ingester, TransactionId fromTx,
TransactionId toTx)
{
if (!_events.TryGetValue(entityId, out var events))
Span<byte> valueSpan = stackalloc byte[attr.SpanSize()];
attr.WriteTo(valueSpan, value);
var hash = valueSpan.XxHash64();

if (!_indexes.TryGetValue((attr, hash), out var found))
return;

foreach (var data in events)
foreach (var txId in found)
{
if (data.TxId < fromId) continue;
if (data.TxId > toId) break;
if (txId > toTx || txId < fromTx) continue;

var @event = _serializer.Deserialize(data.Data)!;
if (!ingester.Ingest(data.TxId, @event)) break;
var eventItem = _serializer.Deserialize(_events[(int)txId.Value]);
ingester.Ingest(txId, eventItem);
}

}

public override TransactionId GetSnapshot(TransactionId asOf, EntityId entityId, out IAccumulator loadedDefinition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

<ItemGroup>
<PackageReference Include="MemoryPack" Version="1.10.0" />
<PackageReference Include="NexusMods.Hashing.xxHash64" Version="1.0.1" />
</ItemGroup>

</Project>

0 comments on commit 3930d99

Please sign in to comment.