Skip to content

Commit 9f3dcef

Browse files
committed
Merge branch 'trunk' into KAFKA-16907
2 parents 25c6477 + 6596151 commit 9f3dcef

File tree

84 files changed

+4123
-1182
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+4123
-1182
lines changed

clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,11 @@ public enum ApiKeys {
149149
private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values())
150150
.collect(Collectors.toMap(key -> (int) key.id, Function.identity()));
151151

152+
// Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka,
153+
// version `0` has to be included in the api versions response (see KAFKA-18659). In order to achieve that,
154+
// we adjust `toApiVersion` to return `0` for the min version of `produce` in the broker listener.
155+
public static final short PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION = 0;
156+
152157
/** the permanent and immutable id of an API - this can't change ever */
153158
public final short id;
154159

@@ -264,8 +269,30 @@ public boolean hasValidVersion() {
264269
return oldestVersion() <= latestVersion();
265270
}
266271

272+
/**
273+
* To workaround a critical bug in librdkafka, the api versions response is inconsistent with the actual versions
274+
* supported by `produce` - this method handles that. It should be called in the context of the api response protocol
275+
* handling.
276+
*
277+
* It should not be used by code generating protocol documentation - we keep that consistent with the actual versions
278+
* supported by `produce`.
279+
*
280+
* See `PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for details.
281+
*/
282+
public Optional<ApiVersionsResponseData.ApiVersion> toApiVersionForApiResponse(boolean enableUnstableLastVersion,
283+
ApiMessageType.ListenerType listenerType) {
284+
return toApiVersion(enableUnstableLastVersion, Optional.of(listenerType));
285+
}
286+
267287
public Optional<ApiVersionsResponseData.ApiVersion> toApiVersion(boolean enableUnstableLastVersion) {
268-
short oldestVersion = oldestVersion();
288+
return toApiVersion(enableUnstableLastVersion, Optional.empty());
289+
}
290+
291+
private Optional<ApiVersionsResponseData.ApiVersion> toApiVersion(boolean enableUnstableLastVersion,
292+
Optional<ApiMessageType.ListenerType> listenerType) {
293+
// see `PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for details on why we do this
294+
short oldestVersion = (this == PRODUCE && listenerType.map(l -> l == ApiMessageType.ListenerType.BROKER).orElse(false)) ?
295+
PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION : oldestVersion();
269296
short latestVersion = latestVersion(enableUnstableLastVersion);
270297

271298
// API is entirely disabled if latestStableVersion is smaller than oldestVersion.
@@ -299,7 +326,7 @@ static String toHtml() {
299326
b.append("<th>Key</th>\n");
300327
b.append("</tr>");
301328
clientApis().stream()
302-
.filter(apiKey -> apiKey.toApiVersion(false).isPresent())
329+
.filter(apiKey -> apiKey.toApiVersion(false, Optional.empty()).isPresent())
303330
.forEach(apiKey -> {
304331
b.append("<tr>\n");
305332
b.append("<td>");
@@ -341,10 +368,7 @@ public static EnumSet<ApiKeys> controllerApis() {
341368
}
342369

343370
public static EnumSet<ApiKeys> clientApis() {
344-
List<ApiKeys> apis = Arrays.stream(ApiKeys.values())
345-
.filter(apiKey -> apiKey.inScope(ApiMessageType.ListenerType.BROKER))
346-
.collect(Collectors.toList());
347-
return EnumSet.copyOf(apis);
371+
return brokerApis();
348372
}
349373

350374
public static EnumSet<ApiKeys> apisForListener(ApiMessageType.ListenerType listener) {

clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -208,26 +208,26 @@ public static String toHtml() {
208208
// Responses
209209
b.append("<b>Responses:</b><br>\n");
210210
Schema[] responses = key.messageType.responseSchemas();
211-
for (int i = 0; i < responses.length; i++) {
212-
Schema schema = responses[i];
211+
for (int version = key.oldestVersion(); version < key.latestVersion(); version++) {
212+
Schema schema = responses[version];
213+
if (schema == null)
214+
throw new IllegalStateException("Unexpected null schema for " + key + " with version " + version);
213215
// Schema
214-
if (schema != null) {
215-
b.append("<div>");
216-
// Version header
217-
b.append("<pre>");
218-
b.append(key.name);
219-
b.append(" Response (Version: ");
220-
b.append(i);
221-
b.append(") => ");
222-
schemaToBnfHtml(responses[i], b, 2);
223-
b.append("</pre>");
224-
225-
b.append("<p><b>Response header version:</b> ");
226-
b.append(key.responseHeaderVersion((short) i));
227-
b.append("</p>\n");
228-
229-
schemaToFieldTableHtml(responses[i], b);
230-
}
216+
b.append("<div>");
217+
// Version header
218+
b.append("<pre>");
219+
b.append(key.name);
220+
b.append(" Response (Version: ");
221+
b.append(version);
222+
b.append(") => ");
223+
schemaToBnfHtml(responses[version], b, 2);
224+
b.append("</pre>");
225+
226+
b.append("<p><b>Response header version:</b> ");
227+
b.append(key.responseHeaderVersion((short) version));
228+
b.append("</p>\n");
229+
230+
schemaToFieldTableHtml(responses[version], b);
231231
b.append("</div>\n");
232232
}
233233
}

clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,18 +204,19 @@ public static ApiVersionCollection filterApis(
204204
// Skip telemetry APIs if client telemetry is disabled.
205205
if ((apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY) && !clientTelemetryEnabled)
206206
continue;
207-
apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(apiKeys::add);
207+
apiKey.toApiVersionForApiResponse(enableUnstableLastVersion, listenerType).ifPresent(apiKeys::add);
208208
}
209209
return apiKeys;
210210
}
211211

212212
public static ApiVersionCollection collectApis(
213+
ApiMessageType.ListenerType listenerType,
213214
Set<ApiKeys> apiKeys,
214215
boolean enableUnstableLastVersion
215216
) {
216217
ApiVersionCollection res = new ApiVersionCollection();
217218
for (ApiKeys apiKey : apiKeys) {
218-
apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(res::add);
219+
apiKey.toApiVersionForApiResponse(enableUnstableLastVersion, listenerType).ifPresent(res::add);
219220
}
220221
return res;
221222
}
@@ -238,7 +239,7 @@ public static ApiVersionCollection intersectForwardableApis(
238239
) {
239240
ApiVersionCollection apiKeys = new ApiVersionCollection();
240241
for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
241-
final Optional<ApiVersion> brokerApiVersion = apiKey.toApiVersion(enableUnstableLastVersion);
242+
final Optional<ApiVersion> brokerApiVersion = apiKey.toApiVersionForApiResponse(enableUnstableLastVersion, listenerType);
242243
if (brokerApiVersion.isEmpty()) {
243244
// Broker does not support this API key.
244245
continue;

clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import static org.apache.kafka.common.requests.ProduceResponse.INVALID_OFFSET;
4141

4242
public class ProduceRequest extends AbstractRequest {
43+
4344
public static final short LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 = 11;
4445

4546
public static Builder builder(ProduceRequestData data, boolean useTransactionV1Version) {
@@ -66,21 +67,10 @@ public Builder(short minVersion,
6667

6768
@Override
6869
public ProduceRequest build(short version) {
69-
return build(version, true);
70-
}
71-
72-
// Visible for testing only
73-
public ProduceRequest buildUnsafe(short version) {
74-
return build(version, false);
75-
}
76-
77-
private ProduceRequest build(short version, boolean validate) {
78-
if (validate) {
79-
// Validate the given records first
80-
data.topicData().forEach(tpd ->
81-
tpd.partitionData().forEach(partitionProduceData ->
82-
ProduceRequest.validateRecords(version, partitionProduceData.records())));
83-
}
70+
// Validate the given records first
71+
data.topicData().forEach(tpd ->
72+
tpd.partitionData().forEach(partitionProduceData ->
73+
ProduceRequest.validateRecords(version, partitionProduceData.records())));
8474
return new ProduceRequest(data, version);
8575
}
8676

@@ -244,4 +234,5 @@ public static ProduceRequest parse(ByteBuffer buffer, short version) {
244234
public static boolean isTransactionV2Requested(short version) {
245235
return version > LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2;
246236
}
237+
247238
}

clients/src/main/resources/common/message/ProduceRequest.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
"type": "request",
1919
"listeners": ["broker"],
2020
"name": "ProduceRequest",
21-
// Versions 0-2 were removed in Apache Kafka 4.0, Version 3 is the new baseline.
21+
// Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka,
22+
// these versions have to be included in the api versions response (see KAFKA-18659), but are rejected otherwise.
23+
// See `ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for more details.
2224
//
2325
// Version 1 and 2 are the same as version 0.
2426
//

clients/src/main/resources/common/message/ProduceResponse.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
"apiKey": 0,
1818
"type": "response",
1919
"name": "ProduceResponse",
20-
// Versions 0-2 were removed in Apache Kafka 4.0, Version 3 is the new baseline.
20+
// Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka,
21+
// these versions have to be included in the api versions response (see KAFKA-18659), but are rejected otherwise.
22+
// See `ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for more details.
2123
//
2224
// Version 1 added the throttle time.
2325
// Version 2 added the log append time.

clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,25 @@ public void shouldHaveCorrectDefaultApiVersionsResponse(ApiMessageType.ListenerT
5858
for (ApiKeys key : ApiKeys.apisForListener(scope)) {
5959
ApiVersion version = defaultResponse.apiVersion(key.id);
6060
assertNotNull(version, "Could not find ApiVersion for API " + key.name);
61-
assertEquals(version.minVersion(), key.oldestVersion(), "Incorrect min version for Api " + key.name);
62-
assertEquals(version.maxVersion(), key.latestVersion(), "Incorrect max version for Api " + key.name);
61+
if (key == ApiKeys.PRODUCE)
62+
assertEquals(ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION, version.minVersion(), "Incorrect min version for Api " + key.name);
63+
else
64+
assertEquals(key.oldestVersion(), version.minVersion(), "Incorrect min version for Api " + key.name);
65+
assertEquals(key.latestVersion(), version.maxVersion(), "Incorrect max version for Api " + key.name);
6366

64-
// Check if versions less than min version are indeed set as null, i.e., deprecated.
67+
// Check if versions less than min version are indeed set as null, i.e., removed.
6568
for (int i = 0; i < version.minVersion(); ++i) {
6669
assertNull(key.messageType.requestSchemas()[i],
6770
"Request version " + i + " for API " + version.apiKey() + " must be null");
6871
assertNull(key.messageType.responseSchemas()[i],
6972
"Response version " + i + " for API " + version.apiKey() + " must be null");
7073
}
7174

75+
// The min version returned in ApiResponse for Produce is not the actual min version, so adjust it
76+
var minVersion = (key == ApiKeys.PRODUCE && scope == ListenerType.BROKER) ?
77+
ApiKeys.PRODUCE.oldestVersion() : version.minVersion();
7278
// Check if versions between min and max versions are non null, i.e., valid.
73-
for (int i = version.minVersion(); i <= version.maxVersion(); ++i) {
79+
for (int i = minVersion; i <= version.maxVersion(); ++i) {
7480
assertNotNull(key.messageType.requestSchemas()[i],
7581
"Request version " + i + " for API " + version.apiKey() + " must not be null");
7682
assertNotNull(key.messageType.responseSchemas()[i],

clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,7 @@ public void testV6AndBelowCannotUseZStdCompression() {
212212
.setAcks((short) 1)
213213
.setTimeoutMs(1000);
214214
// Can't create ProduceRequest instance with version within [3, 7)
215-
for (short version = 3; version < 7; version++) {
216-
215+
for (short version = ApiKeys.PRODUCE.oldestVersion(); version < 7; version++) {
217216
ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(version, version, produceData);
218217
assertThrowsForAllVersions(requestBuilder, UnsupportedCompressionTypeException.class);
219218
}
@@ -277,6 +276,22 @@ public void testMixedIdempotentData() {
277276
assertTrue(RequestTestUtils.hasIdempotentRecords(request));
278277
}
279278

279+
@Test
280+
public void testBuilderOldestAndLatestAllowed() {
281+
ProduceRequest.Builder builder = ProduceRequest.builder(new ProduceRequestData()
282+
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList(
283+
new ProduceRequestData.TopicProduceData()
284+
.setName("topic")
285+
.setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData()
286+
.setIndex(1)
287+
.setRecords(MemoryRecords.withRecords(Compression.NONE, simpleRecord))))
288+
).iterator()))
289+
.setAcks((short) -1)
290+
.setTimeoutMs(10));
291+
assertEquals(ApiKeys.PRODUCE.oldestVersion(), builder.oldestAllowedVersion());
292+
assertEquals(ApiKeys.PRODUCE.latestVersion(), builder.latestAllowedVersion());
293+
}
294+
280295
private static <T extends Throwable> void assertThrowsForAllVersions(ProduceRequest.Builder builder,
281296
Class<T> expectedType) {
282297
IntStream.range(builder.oldestAllowedVersion(), builder.latestAllowedVersion() + 1)

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoader.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,23 +28,6 @@
2828
*/
2929
public interface CoordinatorLoader<U> extends AutoCloseable {
3030

31-
/**
32-
* UnknownRecordTypeException is thrown when the Deserializer encounters
33-
* an unknown record type.
34-
*/
35-
class UnknownRecordTypeException extends RuntimeException {
36-
private final short unknownType;
37-
38-
public UnknownRecordTypeException(short unknownType) {
39-
super(String.format("Found an unknown record type %d", unknownType));
40-
this.unknownType = unknownType;
41-
}
42-
43-
public short unknownType() {
44-
return unknownType;
45-
}
46-
}
47-
4831
/**
4932
* Object that is returned as part of the future from load(). Holds the partition load time and the
5033
* end time.

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ public CoordinatorRecord deserialize(
7676

7777
final ApiMessage valueMessage = apiMessageValueFor(recordType);
7878
final short valueVersion = readVersion(valueBuffer, "value");
79+
80+
if (valueVersion < valueMessage.lowestSupportedVersion() || valueVersion > valueMessage.highestSupportedVersion()) {
81+
throw new UnknownRecordVersionException(recordType, valueVersion);
82+
}
83+
7984
readMessage(valueMessage, valueBuffer, valueVersion, "value");
8085

8186
return CoordinatorRecord.record(

0 commit comments

Comments
 (0)