Skip to content

Commit

Permalink
Table of Persistance.Reference at the end of the stream (#9972)
Browse files Browse the repository at this point in the history
Fixes #9361 by delaying storing of `Persistance.Reference` instances and creating their table at the end of the stream.
  • Loading branch information
JaroslavTulach authored May 18, 2024
1 parent c5a91a6 commit fe28c23
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 71 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,8 @@ lazy val `ydoc-server` = project
lazy val `persistance` = (project in file("lib/java/persistance"))
.settings(
version := "0.1",
Test / fork := true,
commands += WithDebugCommand.withDebug,
frgaalJavaCompilerSetting,
Compile / javacOptions := ((Compile / javacOptions).value),
libraryDependencies ++= Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void resetDebris() {
@Test
public void locationTest() throws Exception {
var l = new Location(12, 33);
var n = serde(Location.class, l, 8);
var n = serde(Location.class, l, 16);

assertEquals(12, n.start());
assertEquals(33, n.end());
Expand All @@ -43,14 +43,14 @@ public void locationTest() throws Exception {
@Test
public void identifiedLocation() throws Exception {
var il = new IdentifiedLocation(new Location(5, 19), null);
var in = serde(IdentifiedLocation.class, il, 12);
var in = serde(IdentifiedLocation.class, il, 20);
assertEquals(il, in);
}

@Test
public void identifiedLocationWithUUID() throws Exception {
var il = new IdentifiedLocation(new Location(5, 19), UUID.randomUUID());
var in = serde(IdentifiedLocation.class, il, 33);
var in = serde(IdentifiedLocation.class, il, 41);
assertEquals("UUIDs are serialized at the moment", il, in);
}

Expand All @@ -63,7 +63,7 @@ public void identifiedLocationNoUUID() throws Exception {
case UUID any -> null;
default -> obj;
};
var in = serde(IdentifiedLocation.class, il, 12, fn);
var in = serde(IdentifiedLocation.class, il, 20, fn);
var withoutUUID = new IdentifiedLocation(il.location());
assertEquals("UUIDs are no longer serialized", withoutUUID, in);
}
Expand Down Expand Up @@ -107,7 +107,7 @@ public void refHolderNoUUID() throws Exception {
case UUID any -> null;
default -> obj;
};
var in = serde(IdHolder.class, il, 1, fn);
var in = serde(IdHolder.class, il, 9, fn);
var withoutUUID = new IdHolder(null);
assertEquals("UUIDs are no longer serialized", withoutUUID, in);
}
Expand All @@ -118,7 +118,7 @@ public void scalaMap() throws Exception {
var idLoc1 = new IdentifiedLocation(new Location(1, 5));
var in = scala.collection.immutable.Map$.MODULE$.empty().$plus(new Tuple2("Hi", idLoc1));

var out = serde(scala.collection.immutable.Map.class, in, 36);
var out = serde(scala.collection.immutable.Map.class, in, 44);

assertEquals("One element", 1, out.size());
assertEquals(in, out);
Expand All @@ -137,7 +137,7 @@ public void scalaImmutableMapIsLazy() throws Exception {
.$plus(new Tuple2("World", s2));

LazySeq.forbidden = true;
var out = (scala.collection.immutable.Map) serde(scala.collection.immutable.Map.class, in, 64);
var out = (scala.collection.immutable.Map) serde(scala.collection.immutable.Map.class, in, 72);

assertEquals("Two pairs element", 2, out.size());
assertEquals("Two keys", 2, out.keySet().size());
Expand All @@ -159,7 +159,7 @@ public void scalaHashMap() throws Exception {
(scala.collection.mutable.HashMap)
scala.collection.mutable.HashMap$.MODULE$.apply(immutable);

var out = serde(scala.collection.mutable.Map.class, in, 36);
var out = serde(scala.collection.mutable.Map.class, in, 44);

assertEquals("One element", 1, out.size());
assertEquals(in, out);
Expand All @@ -171,7 +171,7 @@ public void scalaSet() throws Exception {
var idLoc1 = new IdentifiedLocation(new Location(1, 5));
var in = scala.collection.immutable.Set$.MODULE$.empty().$plus(idLoc1);

var out = serde(scala.collection.immutable.Set.class, in, 24);
var out = serde(scala.collection.immutable.Set.class, in, 32);

assertEquals("One element", 1, out.size());
assertEquals(in, out);
Expand All @@ -183,7 +183,7 @@ public void scalaList() throws Exception {
var idLoc2 = new IdentifiedLocation(new Location(2, 4), UUID.randomUUID());
var in = join(idLoc2, join(idLoc1, nil()));

var out = serde(List.class, in, 65);
var out = serde(List.class, in, 73);

assertEquals("Two elements", 2, out.size());
assertEquals("UUIDs are serialized at the moment", idLoc2, out.head());
Expand All @@ -195,7 +195,7 @@ public void scalaListSharedRef() throws Exception {
var idLoc1 = new IdentifiedLocation(new Location(1, 5));
var in = join(idLoc1, join(idLoc1, nil()));

var out = serde(List.class, in, 32);
var out = serde(List.class, in, 40);

assertEquals("Two elements", 2, out.size());
assertEquals("Head is equal to original", idLoc1, out.head());
Expand Down Expand Up @@ -288,7 +288,7 @@ public void hashMapIsLazy() throws Exception {
in.put("World", s2);

LazySeq.forbidden = true;
var out = serde(java.util.Map.class, in, 64);
var out = serde(java.util.Map.class, in, 72);

assertEquals("Two pairs element", 2, out.size());
assertEquals("Two keys", 2, out.keySet().size());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.enso.persist;

import java.io.IOException;
import org.enso.persist.PerInputImpl.InputCache;
import org.enso.persist.Persistance.Reference;

Expand All @@ -8,23 +9,44 @@ final class PerBufferReference<T> extends Persistance.Reference<T> {
private final PerInputImpl.InputCache cache;
private final int offset;

private PerBufferReference(Persistance<T> p, PerInputImpl.InputCache buffer, int offset) {
/**
* References can be cached, or loaded again every time.
*
* <p>If {@code cached} is set to {@code this}, then the caching is disabled and {@link
* #get(Class<V>)} will always load a new instance of the object. This is the mode one gets when
* using an API method {@link Persistance.Input#readReference(Class<T>)}.
*
* <p>In other cases the {@code cached} value can be {@code null} meaning <em>not yet loaded</em>
* or non-{@code null} holding the cached value to be returned from the {@link #get(Class<V>)}
* method until this reference instance is GCed.
*/
private Object cached;

private PerBufferReference(
Persistance<T> p, PerInputImpl.InputCache buffer, int offset, boolean allowCaching) {
this.p = p;
this.cache = buffer;
this.offset = offset;
this.cached = allowCaching ? null : this;
}

@SuppressWarnings(value = "unchecked")
final <T> T readObject(Class<T> clazz) {
final <T> T readObject(Class<T> clazz) throws IOException {
if (cached != this && clazz.isInstance(cached)) {
return clazz.cast(cached);
}
if (p != null) {
if (clazz.isAssignableFrom(p.clazz)) {
clazz = (Class) p.clazz;
} else {
throw new ClassCastException();
}
}
org.enso.persist.PerInputImpl in = new PerInputImpl(cache, offset);
var in = new PerInputImpl(cache, offset);
T obj = in.readInline(clazz);
if (cached != this) {
cached = obj;
}
return obj;
}

Expand All @@ -33,6 +55,10 @@ static <V> Reference<V> from(InputCache buffer, int offset) {
}

static <V> Reference<V> from(Persistance<V> p, InputCache buffer, int offset) {
return new PerBufferReference<>(p, buffer, offset);
return new PerBufferReference<>(p, buffer, offset, false);
}

static <V> Reference<V> cached(Persistance<V> p, InputCache buffer, int offset) {
return new PerBufferReference<>(p, buffer, offset, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
import org.slf4j.Logger;

final class PerGenerator {
static final byte[] HEADER = new byte[] {0x0a, 0x0d, 0x03, 0x0f};
static final byte[] HEADER = new byte[] {0x0a, 0x0d, 0x13, 0x0f};
private final OutputStream main;
private final Map<Object, Integer> knownObjects = new IdentityHashMap<>();
private int countReferences = 1;
private final Map<Object, Integer> pendingReferences = new IdentityHashMap<>();
private final Histogram histogram;
private final PerMap map;
final Function<Object, Object> writeReplace;
private final Function<Object, Object> writeReplace;
private int position;

private PerGenerator(
Expand All @@ -39,19 +41,25 @@ static byte[] writeObject(Object obj, Function<Object, Object> writeReplace) thr
data.writeInt(g.versionStamp());
data.write(new byte[4]); // space
data.flush();
var at = g.writeObject(obj);

var at = g.writeObjectAndReferences(obj);

var arr = out.toByteArray();
arr[8] = (byte) ((at >> 24) & 0xff);
arr[9] = (byte) ((at >> 16) & 0xff);
arr[10] = (byte) ((at >> 8) & 0xff);
arr[11] = (byte) (at & 0xff);
putIntToArray(arr, 8, at);

if (histogram != null) {
histogram.dump(PerUtils.LOG, arr.length);
}
return arr;
}

private static void putIntToArray(byte[] arr, int position, int value) {
arr[position] = (byte) ((value >> 24) & 0xff);
arr[position + 1] = (byte) ((value >> 16) & 0xff);
arr[position + 2] = (byte) ((value >> 8) & 0xff);
arr[position + 3] = (byte) (value & 0xff);
}

final <T> int writeObject(T t) throws IOException {
if (t == null) {
return -1;
Expand Down Expand Up @@ -106,6 +114,63 @@ final int versionStamp() {
return map.versionStamp;
}

private int registerReference(Persistance.Reference<?> ref) {
var obj = ref.get(Object.class);
var existingId = pendingReferences.get(obj);
if (existingId == null) {
var currentSize = countReferences++;
pendingReferences.put(obj, currentSize);
return currentSize;
} else {
return existingId;
}
}

/**
* Writes an object into the buffer. Writes also all {@link Persistance.Reference} that were left
* pending during the serialization.
*
* @param obj the object to write down
* @return location of the table {@code int size and then int[size]}
*/
private int writeObjectAndReferences(Object obj) throws IOException {
pendingReferences.put(obj, 0);
var objAt = writeObject(obj);

var refsOut = new ByteArrayOutputStream();
var refsData = new DataOutputStream(refsOut);
refsData.writeInt(-1); // space for size of references
refsData.writeInt(objAt); // the main object
var count = 1;
for (; ; ) {
var all = new ArrayList<>(pendingReferences.entrySet());
all.sort(
(e1, e2) -> {
return e1.getValue() - e2.getValue();
});
var round = all.subList(count, all.size());
if (round.isEmpty()) {
break;
}
for (var entry : round) {
count++;
var at = writeObject(entry.getKey());
assert count == entry.getValue() : "Expecting " + count + " got " + entry.getValue();
refsData.writeInt(at);
}
}
refsData.flush();
var arr = refsOut.toByteArray();

putIntToArray(arr, 0, count);

var tableAt = this.position;
this.main.write(arr);
this.position += arr.length;

return tableAt;
}

private static final class ReferenceOutput extends DataOutputStream
implements Persistance.Output {
private final PerGenerator generator;
Expand All @@ -117,6 +182,12 @@ private static final class ReferenceOutput extends DataOutputStream

@Override
public <T> void writeInline(Class<T> clazz, T t) throws IOException {
if (Persistance.Reference.class == clazz) {
Persistance.Reference<?> ref = (Persistance.Reference<?>) t;
var id = this.generator.registerReference(ref);
writeInt(id);
return;
}
var obj = generator.writeReplace.apply(t);
var p = generator.map.forType(clazz);
p.writeInline(obj, this);
Expand Down
Loading

0 comments on commit fe28c23

Please sign in to comment.