Skip to content

Commit 3c0d285

Browse files
authored
[vpj] Fixing a bug with chunked messages in Repush (#2480)
* [vpj] Fixing a bug with chunked message in Repush * Bumping container version * Pinning the docker API version for PulsarVeniceIntegration test * Pinning version of ubuntu * Creating docker-java.properties * Adding docker-java.properties
1 parent 2ca16e5 commit 3c0d285

File tree

3 files changed

+96
-3
lines changed

3 files changed

+96
-3
lines changed

clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJob.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,13 @@
9797
import com.linkedin.venice.writer.VeniceWriter;
9898
import java.io.IOException;
9999
import java.util.ArrayList;
100+
import java.util.Arrays;
100101
import java.util.Collections;
101102
import java.util.Comparator;
103+
import java.util.HashSet;
102104
import java.util.List;
103105
import java.util.Properties;
106+
import java.util.Set;
104107
import java.util.UUID;
105108
import org.apache.avro.Schema;
106109
import org.apache.logging.log4j.LogManager;
@@ -436,6 +439,8 @@ protected Dataset<Row> applyTTLFilter(Dataset<Row> dataFrame) {
436439
StructType schema = dataFrame.schema();
437440
ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
438441

442+
final LongAccumulator ttlFilteredAcc = accumulatorsForDataWriterJob.repushTtlFilteredRecordCounter;
443+
439444
// Apply filter using mapPartitions for efficiency (one filter instance per partition)
440445
dataFrame = dataFrame.mapPartitions((MapPartitionsFunction<Row, Row>) iterator -> {
441446
SparkKafkaInputTTLFilter ttlFilter =
@@ -457,7 +462,7 @@ protected Dataset<Row> applyTTLFilter(Dataset<Row> dataFrame) {
457462

458463
if (shouldRemove) {
459464
// Increment counter for filtered records
460-
accumulatorsForDataWriterJob.repushTtlFilteredRecordCounter.add(1);
465+
ttlFilteredAcc.add(1);
461466
}
462467

463468
return !shouldRemove; // Keep if NOT filtered
@@ -575,6 +580,8 @@ protected Dataset<Row> applyChunkAssembly(Dataset<Row> dataFrame) {
575580

576581
ExpressionEncoder<Row> encoder = RowEncoder.apply(DEFAULT_SCHEMA_WITH_SCHEMA_ID);
577582

583+
final LongAccumulator emptyRecordAcc = accumulatorsForDataWriterJob.emptyRecordCounter;
584+
578585
dataFrame = dataFrame
579586
// Group by key
580587
.groupByKey((MapFunction<Row, byte[]>) row -> row.getAs(KEY_COLUMN_NAME), Encoders.BINARY())
@@ -602,7 +609,7 @@ protected Dataset<Row> applyChunkAssembly(Dataset<Row> dataFrame) {
602609

603610
if (assembled == null) {
604611
// Latest record is DELETE, chunks incomplete, or filtered by TTL
605-
accumulatorsForDataWriterJob.emptyRecordCounter.add(1);
612+
emptyRecordAcc.add(1);
606613
return Collections.emptyIterator();
607614
}
608615

@@ -842,8 +849,22 @@ void validateDataFrame(Dataset<Row> dataFrameForDataWriterJob) {
842849

843850
validateDataFrameFieldAndTypes(fields, dataSchema, RMD_COLUMN_NAME, DataTypes.BinaryType);
844851

852+
// For KIF repush, the DataFrame may contain Venice internal columns (defined in SparkConstants)
853+
// needed for chunk assembly. These are consumed by applyChunkAssembly() and dropped in runComputeJob().
854+
Set<String> allowedInternalColumns = new HashSet<>();
855+
PushJobSetting setting = getPushJobSetting();
856+
if (setting != null && setting.isSourceKafka) {
857+
allowedInternalColumns.addAll(
858+
Arrays.asList(
859+
SCHEMA_ID_COLUMN_NAME,
860+
RMD_VERSION_ID_COLUMN_NAME,
861+
OFFSET_COLUMN_NAME,
862+
MESSAGE_TYPE_COLUMN_NAME,
863+
CHUNKED_KEY_SUFFIX_COLUMN_NAME));
864+
}
865+
845866
for (StructField field: fields) {
846-
if (field.name().startsWith("_")) {
867+
if (field.name().startsWith("_") && !allowedInternalColumns.contains(field.name())) {
847868
String errorMessage = String
848869
.format("The provided input must not have fields that start with an underscore. Got: %s", field.name());
849870
throw new VeniceInvalidInputException(errorMessage);

clients/venice-push-job/src/test/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJobTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,22 @@
22

33
import static com.linkedin.venice.ConfigKeys.KAFKA_CONFIG_PREFIX;
44
import static com.linkedin.venice.meta.Store.UNLIMITED_STORAGE_QUOTA;
5+
import static com.linkedin.venice.spark.SparkConstants.CHUNKED_KEY_SUFFIX_COLUMN_NAME;
56
import static com.linkedin.venice.spark.SparkConstants.KEY_COLUMN_NAME;
7+
import static com.linkedin.venice.spark.SparkConstants.MESSAGE_TYPE_COLUMN_NAME;
8+
import static com.linkedin.venice.spark.SparkConstants.OFFSET_COLUMN_NAME;
69
import static com.linkedin.venice.spark.SparkConstants.RMD_COLUMN_NAME;
10+
import static com.linkedin.venice.spark.SparkConstants.RMD_VERSION_ID_COLUMN_NAME;
11+
import static com.linkedin.venice.spark.SparkConstants.SCHEMA_ID_COLUMN_NAME;
712
import static com.linkedin.venice.spark.SparkConstants.SPARK_APP_NAME_CONFIG;
813
import static com.linkedin.venice.spark.SparkConstants.SPARK_DATA_WRITER_CONF_PREFIX;
914
import static com.linkedin.venice.spark.SparkConstants.SPARK_SESSION_CONF_PREFIX;
1015
import static com.linkedin.venice.spark.SparkConstants.VALUE_COLUMN_NAME;
1116
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_KEY_FIELD_PROP;
1217
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP;
1318
import static org.apache.spark.sql.types.DataTypes.BinaryType;
19+
import static org.apache.spark.sql.types.DataTypes.IntegerType;
20+
import static org.apache.spark.sql.types.DataTypes.LongType;
1421
import static org.apache.spark.sql.types.DataTypes.StringType;
1522
import static org.mockito.ArgumentMatchers.eq;
1623
import static org.mockito.Mockito.mock;
@@ -129,6 +136,70 @@ public void testValidateDataFrameWithValidRmdType() {
129136
dataWriterSparkJob.validateDataFrame(mockDataset);
130137
}
131138

139+
@Test
140+
public void testValidateDataFrameWithChunkedKifColumns() {
141+
PushJobSetting kafkaSetting = new PushJobSetting();
142+
kafkaSetting.isSourceKafka = true;
143+
144+
AbstractDataWriterSparkJob dataWriterSparkJob = spy(AbstractDataWriterSparkJob.class);
145+
when(dataWriterSparkJob.getPushJobSetting()).thenReturn(kafkaSetting);
146+
147+
// Schema matching chunked KIF repush input: key, value, rmd + internal columns for chunk assembly
148+
StructType chunkedKifSchema = new StructType(
149+
new StructField[] { new StructField(KEY_COLUMN_NAME, BinaryType, false, Metadata.empty()),
150+
new StructField(VALUE_COLUMN_NAME, BinaryType, true, Metadata.empty()),
151+
new StructField(RMD_COLUMN_NAME, BinaryType, true, Metadata.empty()),
152+
new StructField(SCHEMA_ID_COLUMN_NAME, IntegerType, false, Metadata.empty()),
153+
new StructField(RMD_VERSION_ID_COLUMN_NAME, IntegerType, false, Metadata.empty()),
154+
new StructField(OFFSET_COLUMN_NAME, LongType, false, Metadata.empty()),
155+
new StructField(MESSAGE_TYPE_COLUMN_NAME, IntegerType, false, Metadata.empty()),
156+
new StructField(CHUNKED_KEY_SUFFIX_COLUMN_NAME, BinaryType, true, Metadata.empty()) });
157+
158+
Dataset<Row> mockDataset = mock(Dataset.class);
159+
when(mockDataset.schema()).thenReturn(chunkedKifSchema);
160+
dataWriterSparkJob.validateDataFrame(mockDataset);
161+
}
162+
163+
@Test(expectedExceptions = VeniceInvalidInputException.class, expectedExceptionsMessageRegExp = ".*must not have fields that start with an underscore.*__schema_id__.*")
164+
public void testValidateDataFrameRejectsInternalColumnsForNonKifJob() {
165+
PushJobSetting hdfsSetting = new PushJobSetting();
166+
hdfsSetting.isSourceKafka = false;
167+
168+
AbstractDataWriterSparkJob dataWriterSparkJob = spy(AbstractDataWriterSparkJob.class);
169+
when(dataWriterSparkJob.getPushJobSetting()).thenReturn(hdfsSetting);
170+
171+
// Same chunked KIF schema but on a non-KIF job — should be rejected
172+
StructType chunkedKifSchema = new StructType(
173+
new StructField[] { new StructField(KEY_COLUMN_NAME, BinaryType, false, Metadata.empty()),
174+
new StructField(VALUE_COLUMN_NAME, BinaryType, true, Metadata.empty()),
175+
new StructField(RMD_COLUMN_NAME, BinaryType, true, Metadata.empty()),
176+
new StructField(SCHEMA_ID_COLUMN_NAME, IntegerType, false, Metadata.empty()) });
177+
178+
Dataset<Row> mockDataset = mock(Dataset.class);
179+
when(mockDataset.schema()).thenReturn(chunkedKifSchema);
180+
dataWriterSparkJob.validateDataFrame(mockDataset);
181+
}
182+
183+
@Test(expectedExceptions = VeniceInvalidInputException.class, expectedExceptionsMessageRegExp = ".*must not have fields that start with an underscore.*_unknown_internal.*")
184+
public void testValidateDataFrameRejectsUnknownUnderscoreColumnsForKifJob() {
185+
PushJobSetting kafkaSetting = new PushJobSetting();
186+
kafkaSetting.isSourceKafka = true;
187+
188+
AbstractDataWriterSparkJob dataWriterSparkJob = spy(AbstractDataWriterSparkJob.class);
189+
when(dataWriterSparkJob.getPushJobSetting()).thenReturn(kafkaSetting);
190+
191+
// KIF job but with an unknown underscore column — should still be rejected
192+
StructType schemaWithUnknownInternalCol = new StructType(
193+
new StructField[] { new StructField(KEY_COLUMN_NAME, BinaryType, false, Metadata.empty()),
194+
new StructField(VALUE_COLUMN_NAME, BinaryType, true, Metadata.empty()),
195+
new StructField(RMD_COLUMN_NAME, BinaryType, true, Metadata.empty()),
196+
new StructField("_unknown_internal", StringType, true, Metadata.empty()) });
197+
198+
Dataset<Row> mockDataset = mock(Dataset.class);
199+
when(mockDataset.schema()).thenReturn(schemaWithUnknownInternalCol);
200+
dataWriterSparkJob.validateDataFrame(mockDataset);
201+
}
202+
132203
@Test
133204
public void testValidateDataFrameSchema() throws IOException {
134205
File inputDir = TestWriteUtils.getTempDataDirectory();
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
api.version=1.44

0 commit comments

Comments
 (0)