Skip to content

Commit

Permalink
[all] Make getValueSchemaId APIs match the canonical schemas (#1018)
Browse files Browse the repository at this point in the history
Previously, "getValueSchemaId" APIs in "SchemaReader", "SchemaData" and "ReadOnlySchemaRepository" match the schemas exactly (with the exception of "HelixReadWriteSchemaRepository"). These APIs are generally used to get a schema id to write along with the data. In such cases, we only care about canonical schemas.

The exception is where we want to register a schema with a doc change. In our codebase, this schema check is not handled via "getValueSchemaId" APIs and happens in the "VeniceHelixAdmin#addValueSchema" code paths which handle this case explicitly.

This commit makes "getValueSchemaId" use canonical schema match in all places.
  • Loading branch information
nisargthakkar authored Jun 7, 2024
1 parent 7798479 commit e68caf6
Show file tree
Hide file tree
Showing 28 changed files with 293 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@
import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceUnsupportedOperationException;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.schema.SchemaRepoBackedSchemaReader;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
import com.linkedin.venice.serialization.StoreDeserializerCache;
import com.linkedin.venice.serialization.avro.AvroSpecificStoreDeserializerCache;
Expand Down Expand Up @@ -96,9 +96,9 @@ private static class ReusableObjects {
private static final int REUSABLE_MAP_CAPACITY = 100;
private static final float REUSABLE_MAP_LOAD_FACTOR = 0.75f;
// LRU cache for storing schema->record map for object reuse of value and result record
final LinkedHashMap<Schema, GenericRecord> reuseValueRecordMap =
new LinkedHashMap<Schema, GenericRecord>(REUSABLE_MAP_CAPACITY, REUSABLE_MAP_LOAD_FACTOR, true) {
protected boolean removeEldestEntry(Map.Entry<Schema, GenericRecord> eldest) {
final LinkedHashMap<Integer, GenericRecord> reuseValueRecordMap =
new LinkedHashMap<Integer, GenericRecord>(REUSABLE_MAP_CAPACITY, REUSABLE_MAP_LOAD_FACTOR, true) {
protected boolean removeEldestEntry(Map.Entry<Integer, GenericRecord> eldest) {
return size() > REUSABLE_MAP_CAPACITY;
}
};
Expand Down Expand Up @@ -178,6 +178,11 @@ protected AvroGenericDaVinciClient(
preValidation.run();
}

@Override
public SchemaReader getSchemaReader() {
return new SchemaRepoBackedSchemaReader(getBackend().getSchemaRepository(), getStoreName());
}

@Override
public String getStoreName() {
return clientConfig.getStoreName();
Expand All @@ -189,6 +194,7 @@ public Schema getKeySchema() {
return getBackend().getSchemaRepository().getKeySchema(getStoreName()).getSchema();
}

@Deprecated
@Override
public Schema getLatestValueSchema() {
throwIfNotReady();
Expand Down Expand Up @@ -445,7 +451,7 @@ public boolean isProjectionFieldValidationEnabled() {
public ComputeRequestBuilder<K> compute(
Optional<ClientStats> stats,
AvroGenericReadComputeStoreClient computeStoreClient) throws VeniceClientException {
return new AvroComputeRequestBuilderV4<K>(computeStoreClient, getLatestValueSchema()).setStats(stats)
return new AvroComputeRequestBuilderV4<K>(computeStoreClient, getSchemaReader()).setStats(stats)
.setValidateProjectionFields(isProjectionFieldValidationEnabled());
}

Expand All @@ -465,26 +471,6 @@ private Schema getComputeResultSchema(ComputeRequestWrapper computeRequestWrappe
return computeResultSchema;
}

static int getValueSchemaIdForComputeRequest(
String storeName,
Schema computeValueSchema,
ReadOnlySchemaRepository repo) {
SchemaEntry latestSchemaEntry = repo.getSupersetOrLatestValueSchema(storeName);
if (computeValueSchema == latestSchemaEntry.getSchema()) {
/**
* For most of the scenario, the compute request will execute this efficient path since the request is trying to
* use the latest value schema all the time.
*/
return latestSchemaEntry.getId();
} else {
/**
* This slow path shouldn't be executed frequently, and it is a little inefficient because of the schema parsing logic internally.
* Refactoring SchemaRepository is a slightly bigger effort.
*/
return repo.getValueSchemaId(storeName, computeValueSchema.toString());
}
}

@Override
public void compute(
ComputeRequestWrapper computeRequestWrapper,
Expand All @@ -508,10 +494,9 @@ public void compute(

ReusableObjects reusableObjects = REUSABLE_OBJECTS.get();
Schema valueSchema = computeRequestWrapper.getValueSchema();
int valueSchemaId =
getValueSchemaIdForComputeRequest(getStoreName(), valueSchema, daVinciBackend.get().getSchemaRepository());
int valueSchemaId = computeRequestWrapper.getValueSchemaID();
GenericRecord reuseValueRecord =
reusableObjects.reuseValueRecordMap.computeIfAbsent(valueSchema, k -> new GenericData.Record(valueSchema));
reusableObjects.reuseValueRecordMap.computeIfAbsent(valueSchemaId, k -> new GenericData.Record(valueSchema));

Map<String, Object> globalContext = new HashMap<>();
Schema computeResultSchema = getComputeResultSchema(computeRequestWrapper);
Expand Down Expand Up @@ -576,8 +561,9 @@ public void computeWithKeyPrefixFilter(

ReusableObjects reusableObjects = REUSABLE_OBJECTS.get();
Schema valueSchema = computeRequestWrapper.getValueSchema();
int valueSchemaId = computeRequestWrapper.getValueSchemaID();
GenericRecord reuseValueRecord =
reusableObjects.reuseValueRecordMap.computeIfAbsent(valueSchema, k -> new GenericData.Record(valueSchema));
reusableObjects.reuseValueRecordMap.computeIfAbsent(valueSchemaId, k -> new GenericData.Record(valueSchema));

Map<String, Object> globalContext = new HashMap<>();
Schema computeResultSchema = getComputeResultSchema(computeRequestWrapper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.venice.client.store.ComputeRequestBuilder;
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.compute.ComputeRequestWrapper;
import com.linkedin.venice.schema.SchemaReader;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -91,11 +92,17 @@ public Schema getKeySchema() {
return delegate.getKeySchema();
}

@Deprecated
@Override
public Schema getLatestValueSchema() {
return delegate.getLatestValueSchema();
}

@Override
public SchemaReader getSchemaReader() {
return delegate.getSchemaReader();
}

@Override
public boolean isProjectionFieldValidationEnabled() {
return delegate.isProjectionFieldValidationEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.linkedin.venice.meta.StoreConfig;
import com.linkedin.venice.meta.SystemStore;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.schema.AvroSchemaParseUtils;
import com.linkedin.venice.schema.SchemaData;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.SchemaReader;
Expand Down Expand Up @@ -123,7 +124,8 @@ protected SchemaEntry getValueSchemaInternally(String storeName, int id) {
@Override
public int getValueSchemaId(String storeName, String valueSchemaStr) {
if (VeniceSystemStoreType.getSystemStoreType(storeName) == VeniceSystemStoreType.META_STORE) {
return metaStoreSchemaReader.getValueSchemaId(Schema.parse(valueSchemaStr));
return metaStoreSchemaReader
.getValueSchemaId(AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(valueSchemaStr));
} else {
return super.getValueSchemaId(storeName, valueSchemaStr);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;

import com.linkedin.davinci.StoreBackend;
import com.linkedin.davinci.VersionBackend;
import com.linkedin.davinci.store.rocksdb.RocksDBServerConfig;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.serializer.AvroSerializer;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.ReferenceCounted;
Expand All @@ -30,32 +26,6 @@


public class AvroGenericDaVinciClientTest {
@Test
public void testGetValueSchemaIdForComputeRequest() {
String storeName = "test_store";
ReadOnlySchemaRepository repo = mock(ReadOnlySchemaRepository.class);

Schema computeValueSchema = mock(Schema.class);
String computeValueSchemaString = "compute_value_schema";
doReturn(computeValueSchemaString).when(computeValueSchema).toString();
int valueSchemaId = 1;
doReturn(new SchemaEntry(valueSchemaId, computeValueSchema)).when(repo).getSupersetOrLatestValueSchema(storeName);
Assert.assertEquals(
AvroGenericDaVinciClient.getValueSchemaIdForComputeRequest(storeName, computeValueSchema, repo),
1);

// mismatch scenario
Schema latestValueSchema = mock(Schema.class);
int latestValueSchemaId = 2;
doReturn(new SchemaEntry(latestValueSchemaId, latestValueSchema)).when(repo)
.getSupersetOrLatestValueSchema(storeName);
doReturn(valueSchemaId).when(repo).getValueSchemaId(storeName, computeValueSchemaString);
Assert.assertEquals(
AvroGenericDaVinciClient.getValueSchemaIdForComputeRequest(storeName, computeValueSchema, repo),
1);
verify(repo).getValueSchemaId(storeName, computeValueSchemaString);
}

@Test
public void testPropertyBuilderWithRecordTransformer() {
String schema = "{\n" + " \"type\": \"string\"\n" + "}\n";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.compute.ComputeRequestWrapper;
import com.linkedin.venice.fastclient.factory.ClientFactory;
import com.linkedin.venice.schema.SchemaReader;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -80,6 +81,11 @@ public ClientConfig getClientConfig() {
return clientConfig;
}

@Override
public SchemaReader getSchemaReader() {
return delegate.getSchemaReader();
}

@Override
protected CompletableFuture<V> get(GetRequestContext requestContext, K key) throws VeniceClientException {
return delegate.get(requestContext, key);
Expand Down Expand Up @@ -124,6 +130,7 @@ public Schema getKeySchema() {
return delegate.getKeySchema();
}

@Deprecated
@Override
public Schema getLatestValueSchema() {
return delegate.getLatestValueSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.linkedin.venice.read.protocol.response.MultiGetResponseRecordV1;
import com.linkedin.venice.read.protocol.response.streaming.StreamingFooterRecordV1;
import com.linkedin.venice.router.exception.VeniceKeyCountLimitException;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.schema.avro.ReadAvroProtocolDefinition;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
import com.linkedin.venice.serialization.StoreDeserializerCache;
Expand Down Expand Up @@ -618,9 +619,7 @@ protected void compute(
headers.put(
HttpConstants.VENICE_API_VERSION,
Integer.toString(ReadAvroProtocolDefinition.COMPUTE_REQUEST_V3.getProtocolVersion()));
headers.put(
VENICE_COMPUTE_VALUE_SCHEMA_ID,
Integer.toString(metadata.getValueSchemaId(computeRequest.getValueSchema())));
headers.put(VENICE_COMPUTE_VALUE_SCHEMA_ID, Integer.toString(computeRequest.getValueSchemaID()));

RecordDeserializer<GenericRecord> computeResultRecordDeserializer =
getComputeResultRecordDeserializer(resultSchema);
Expand Down Expand Up @@ -869,8 +868,14 @@ public Schema getKeySchema() {
return metadata.getKeySchema();
}

@Deprecated
@Override
public Schema getLatestValueSchema() {
return metadata.getLatestValueSchema();
}

@Override
public SchemaReader getSchemaReader() {
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ private InternalAvroStoreClient prepareDispatchingClient(
ClientConfig clientConfig) {
StoreMetadata mockMetadata = mock(StoreMetadata.class);
doReturn(STORE_NAME).when(mockMetadata).getStoreName();
doReturn(STORE_VALUE_SCHEMA).when(mockMetadata).getLatestValueSchema();
doReturn(1).when(mockMetadata).getLatestValueSchemaId();
doReturn(STORE_VALUE_SCHEMA).when(mockMetadata).getValueSchema(1);
return new DispatchingAvroGenericStoreClient(mockMetadata, clientConfig) {
private int requestCnt = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.venice.controllerapi.MultiSchemaIdResponse;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.schema.AvroSchemaParseUtils;
import com.linkedin.venice.schema.SchemaData;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.SchemaReader;
Expand Down Expand Up @@ -66,7 +67,8 @@ public class RouterBackedSchemaReader implements SchemaReader {
private final Optional<Schema> readerSchema;
private volatile Schema keySchema;
private final Map<Integer, SchemaEntry> valueSchemaEntryMap = new VeniceConcurrentHashMap<>();
private final Cache<Schema, Integer> valueSchemaMapR = Caffeine.newBuilder().maximumSize(1000).build();
private final Cache<Schema, Integer> valueSchemaToCanonicalSchemaId = Caffeine.newBuilder().maximumSize(1000).build();
private final Cache<Schema, Integer> canonicalValueSchemaMapR = Caffeine.newBuilder().maximumSize(1000).build();
private final Map<Integer, DerivedSchemaEntry> valueSchemaIdToUpdateSchemaEntryMap = new VeniceConcurrentHashMap<>();
private final AtomicReference<SchemaEntry> latestValueSchemaEntry = new AtomicReference<>();
private final AtomicInteger supersetSchemaIdAtomic = new AtomicInteger(SchemaData.INVALID_VALUE_SCHEMA_ID);
Expand Down Expand Up @@ -211,16 +213,24 @@ public Integer getLatestValueSchemaId() throws VeniceClientException {

@Override
public int getValueSchemaId(Schema schema) {
if (valueSchemaMapR.getIfPresent(schema) == null) {
// Optimization to not compute the canonical form if we have previously seen the value schema.
Integer cachedValueSchemaId = valueSchemaToCanonicalSchemaId.getIfPresent(schema);
if (cachedValueSchemaId != null && isValidSchemaId(cachedValueSchemaId)) {
return cachedValueSchemaId;
}

String canonicalSchemaStr = AvroCompatibilityHelper.toParsingForm(schema);
Schema canonicalSchema = AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(canonicalSchemaStr);
Integer valueSchemaId = canonicalValueSchemaMapR.getIfPresent(canonicalSchema);
if (valueSchemaId == null || !isValidSchemaId(valueSchemaId)) {
// Perform one time refresh of all schemas.
updateAllValueSchemas(false);
}
Integer valueSchemaId = valueSchemaMapR.get(schema, s -> NOT_EXIST_UPDATE_SCHEMA_ENTRY.getValueSchemaID());

valueSchemaId = canonicalValueSchemaMapR.getIfPresent(canonicalSchema);
if (valueSchemaId != null && isValidSchemaId(valueSchemaId)) {
return valueSchemaId;
}
valueSchemaMapR.put(schema, NOT_EXIST_UPDATE_SCHEMA_ENTRY.getValueSchemaID());
cacheValueAndCanonicalSchemas(schema, canonicalSchema, NOT_EXIST_UPDATE_SCHEMA_ENTRY.getValueSchemaID());
throw new VeniceClientException("Could not find schema: " + schema + ". for store " + storeName);
}

Expand Down Expand Up @@ -316,7 +326,7 @@ private void updateAllValueSchemas(boolean forceRefresh) {
// Fall back to fetch all value schema.
for (SchemaEntry valueSchemaEntry: fetchAllValueSchemaEntriesFromRouter()) {
valueSchemaEntryMap.put(valueSchemaEntry.getId(), valueSchemaEntry);
valueSchemaMapR.put(valueSchemaEntry.getSchema(), valueSchemaEntry.getId());
cacheValueAndCanonicalSchemas(valueSchemaEntry.getSchema(), valueSchemaEntry.getId());
}
return;
}
Expand Down Expand Up @@ -456,7 +466,7 @@ private SchemaEntry maybeFetchValueSchemaEntryById(int valueSchemaId, boolean fo
SchemaEntry oldEntry = valueSchemaEntryMap.get(valueSchemaId);
// Do not need to refresh if the schema id is already present in the schema repo.
if (oldEntry != null && isValidSchemaEntry(oldEntry)) {
valueSchemaMapR.put(oldEntry.getSchema(), valueSchemaId);
cacheValueAndCanonicalSchemas(oldEntry.getSchema(), valueSchemaId);
return oldEntry;
}
SchemaEntry entry = fetchValueSchemaEntryFromRouter(valueSchemaId);
Expand All @@ -465,7 +475,7 @@ private SchemaEntry maybeFetchValueSchemaEntryById(int valueSchemaId, boolean fo
return NOT_EXIST_VALUE_SCHEMA_ENTRY;
} else {
valueSchemaEntryMap.put(valueSchemaId, entry);
valueSchemaMapR.put(entry.getSchema(), valueSchemaId);
cacheValueAndCanonicalSchemas(entry.getSchema(), valueSchemaId);
return entry;
}
} else {
Expand All @@ -477,7 +487,7 @@ private SchemaEntry maybeFetchValueSchemaEntryById(int valueSchemaId, boolean fo
// Every time when we fetch a new value schema to cache during non-force-refresh logic, we should try to mark
// the flag as true.
shouldRefreshLatestValueSchemaEntry.compareAndSet(false, true);
valueSchemaMapR.put(entry.getSchema(), valueSchemaId);
cacheValueAndCanonicalSchemas(entry.getSchema(), valueSchemaId);
return entry;
});
}
Expand Down Expand Up @@ -726,4 +736,22 @@ private Schema preemptiveSchemaVerification(Schema writerSchema, String writerSc
}
return alternativeWriterSchema;
}

private void cacheValueAndCanonicalSchemas(Schema valueSchema, int valueSchemaId) {
String canonicalSchemaStr = AvroCompatibilityHelper.toParsingForm(valueSchema);
Schema canonicalSchema = AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(canonicalSchemaStr);

cacheValueAndCanonicalSchemas(valueSchema, canonicalSchema, valueSchemaId);
}

private void cacheValueAndCanonicalSchemas(Schema valueSchema, Schema canonicalSchema, int valueSchemaId) {
valueSchemaToCanonicalSchemaId.put(valueSchema, valueSchemaId);

Integer cachedCanonicalSchemaId = canonicalValueSchemaMapR.getIfPresent(canonicalSchema);
// Cache schemas if they're previously unseen or have a higher schema ID than the current cached one. This will
// ensure that the later schemas are preferred over old schemas
if (cachedCanonicalSchemaId == null || cachedCanonicalSchemaId < valueSchemaId) {
canonicalValueSchemaMapR.put(canonicalSchema, valueSchemaId);
}
}
}
Loading

0 comments on commit e68caf6

Please sign in to comment.