Skip to content

Commit e7e4252

Browse files
authored
JUnit extensions for integration tests (apache#9986)
Adds JUnit 5 extension for running the same test with different types of clusters. See core/src/test/java/kafka/test/junit/README.md for details
1 parent 1bfce16 commit e7e4252

25 files changed

+1690
-111
lines changed

build.gradle

+4-1
Original file line numberDiff line numberDiff line change
@@ -1013,7 +1013,10 @@ project(':core') {
10131013
}
10141014
test {
10151015
java {
1016-
srcDirs = ["src/generated/java", "src/test/java"]
1016+
srcDirs = []
1017+
}
1018+
scala {
1019+
srcDirs = ["src/test/java", "src/test/scala"]
10171020
}
10181021
}
10191022
}

checkstyle/import-control-core.xml

+15
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,19 @@
5858
<allow pkg="org.apache.kafka.clients" />
5959
</subpackage>
6060

61+
<subpackage name="test">
62+
<allow pkg="kafka.test.annotation"/>
63+
<allow pkg="kafka.test.junit"/>
64+
<allow pkg="kafka.network"/>
65+
<allow pkg="kafka.api"/>
66+
<allow pkg="kafka.server"/>
67+
<allow pkg="org.apache.kafka.clients.admin"/>
68+
<allow pkg="integration.kafka.server" class="IntegrationTestHelper"/>
69+
<subpackage name="annotation">
70+
<allow pkg="kafka.test"/>
71+
</subpackage>
72+
<subpackage name="junit">
73+
<allow pkg="kafka.test"/>
74+
</subpackage>
75+
</subpackage>
6176
</import-control>

checkstyle/suppressions.xml

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
<!-- core -->
2424
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
2525
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
26+
<suppress checks="NPathComplexity" files="ClusterTestExtensions.java"/>
2627

2728
<!-- Clients -->
2829
<suppress id="dontUseSystemExit"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.test;
19+
20+
import kafka.test.annotation.Type;
21+
import org.apache.kafka.common.security.auth.SecurityProtocol;
22+
23+
import java.io.File;
24+
import java.util.LinkedHashMap;
25+
import java.util.Map;
26+
import java.util.Optional;
27+
import java.util.Properties;
28+
29+
/**
30+
* Represents a requested configuration of a Kafka cluster for integration testing
31+
*/
32+
public class ClusterConfig {
33+
34+
private final Type type;
35+
private final int brokers;
36+
private final int controllers;
37+
private final String name;
38+
private final boolean autoStart;
39+
40+
private final SecurityProtocol securityProtocol;
41+
private final String listenerName;
42+
private final File trustStoreFile;
43+
44+
private final Properties serverProperties = new Properties();
45+
private final Properties producerProperties = new Properties();
46+
private final Properties consumerProperties = new Properties();
47+
private final Properties adminClientProperties = new Properties();
48+
private final Properties saslServerProperties = new Properties();
49+
private final Properties saslClientProperties = new Properties();
50+
51+
ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart,
52+
SecurityProtocol securityProtocol, String listenerName, File trustStoreFile) {
53+
this.type = type;
54+
this.brokers = brokers;
55+
this.controllers = controllers;
56+
this.name = name;
57+
this.autoStart = autoStart;
58+
this.securityProtocol = securityProtocol;
59+
this.listenerName = listenerName;
60+
this.trustStoreFile = trustStoreFile;
61+
}
62+
63+
public Type clusterType() {
64+
return type;
65+
}
66+
67+
public int numBrokers() {
68+
return brokers;
69+
}
70+
71+
public int numControllers() {
72+
return controllers;
73+
}
74+
75+
public Optional<String> name() {
76+
return Optional.ofNullable(name);
77+
}
78+
79+
public boolean isAutoStart() {
80+
return autoStart;
81+
}
82+
83+
public Properties serverProperties() {
84+
return serverProperties;
85+
}
86+
87+
public Properties producerProperties() {
88+
return producerProperties;
89+
}
90+
91+
public Properties consumerProperties() {
92+
return consumerProperties;
93+
}
94+
95+
public Properties adminClientProperties() {
96+
return adminClientProperties;
97+
}
98+
99+
public Properties saslServerProperties() {
100+
return saslServerProperties;
101+
}
102+
103+
public Properties saslClientProperties() {
104+
return saslClientProperties;
105+
}
106+
107+
public SecurityProtocol securityProtocol() {
108+
return securityProtocol;
109+
}
110+
111+
public Optional<String> listenerName() {
112+
return Optional.ofNullable(listenerName);
113+
}
114+
115+
public Optional<File> trustStoreFile() {
116+
return Optional.ofNullable(trustStoreFile);
117+
}
118+
119+
public Map<String, String> nameTags() {
120+
Map<String, String> tags = new LinkedHashMap<>(3);
121+
name().ifPresent(name -> tags.put("Name", name));
122+
tags.put("security", securityProtocol.name());
123+
listenerName().ifPresent(listener -> tags.put("listener", listener));
124+
return tags;
125+
}
126+
127+
public ClusterConfig copyOf() {
128+
ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile);
129+
copy.serverProperties.putAll(serverProperties);
130+
copy.producerProperties.putAll(producerProperties);
131+
copy.consumerProperties.putAll(consumerProperties);
132+
copy.saslServerProperties.putAll(saslServerProperties);
133+
copy.saslClientProperties.putAll(saslClientProperties);
134+
return copy;
135+
}
136+
137+
public static Builder defaultClusterBuilder() {
138+
return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT);
139+
}
140+
141+
public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) {
142+
return new Builder(type, brokers, controllers, autoStart, securityProtocol);
143+
}
144+
145+
public static class Builder {
146+
private Type type;
147+
private int brokers;
148+
private int controllers;
149+
private String name;
150+
private boolean autoStart;
151+
private SecurityProtocol securityProtocol;
152+
private String listenerName;
153+
private File trustStoreFile;
154+
155+
Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) {
156+
this.type = type;
157+
this.brokers = brokers;
158+
this.controllers = controllers;
159+
this.autoStart = autoStart;
160+
this.securityProtocol = securityProtocol;
161+
}
162+
163+
public Builder type(Type type) {
164+
this.type = type;
165+
return this;
166+
}
167+
168+
public Builder brokers(int brokers) {
169+
this.brokers = brokers;
170+
return this;
171+
}
172+
173+
public Builder controllers(int controllers) {
174+
this.controllers = controllers;
175+
return this;
176+
}
177+
178+
public Builder name(String name) {
179+
this.name = name;
180+
return this;
181+
}
182+
183+
public Builder autoStart(boolean autoStart) {
184+
this.autoStart = autoStart;
185+
return this;
186+
}
187+
188+
public Builder securityProtocol(SecurityProtocol securityProtocol) {
189+
this.securityProtocol = securityProtocol;
190+
return this;
191+
}
192+
193+
public Builder listenerName(String listenerName) {
194+
this.listenerName = listenerName;
195+
return this;
196+
}
197+
198+
public Builder trustStoreFile(File trustStoreFile) {
199+
this.trustStoreFile = trustStoreFile;
200+
return this;
201+
}
202+
203+
public ClusterConfig build() {
204+
return new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile);
205+
}
206+
}
207+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.test;
19+
20+
import java.util.function.Consumer;
21+
22+
@FunctionalInterface
23+
public interface ClusterGenerator extends Consumer<ClusterConfig> {
24+
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package kafka.test;
19+
20+
import kafka.network.SocketServer;
21+
import kafka.test.annotation.ClusterTest;
22+
import org.apache.kafka.clients.admin.Admin;
23+
import org.apache.kafka.common.network.ListenerName;
24+
25+
import java.util.Collection;
26+
import java.util.Properties;
27+
28+
public interface ClusterInstance {
29+
30+
enum ClusterType {
31+
ZK,
32+
// RAFT
33+
}
34+
35+
/**
36+
* Cluster type. For now, only ZK is supported.
37+
*/
38+
ClusterType clusterType();
39+
40+
/**
41+
* The cluster configuration used to create this cluster. Changing data in this instance through this accessor will
42+
* have no affect on the cluster since it is already provisioned.
43+
*/
44+
ClusterConfig config();
45+
46+
/**
47+
* The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If
48+
* unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT
49+
*/
50+
ListenerName clientListener();
51+
52+
/**
53+
* The broker connect string which can be used by clients for bootstrapping
54+
*/
55+
String bootstrapServers();
56+
57+
/**
58+
* A collection of all brokers in the cluster. In ZK-based clusters this will also include the broker which is
59+
* acting as the controller (since ZK controllers serve both broker and controller roles).
60+
*/
61+
Collection<SocketServer> brokerSocketServers();
62+
63+
/**
64+
* A collection of all controllers in the cluster. For ZK-based clusters, this will return the broker which is also
65+
* currently the active controller. For Raft-based clusters, this will return all controller servers.
66+
*/
67+
Collection<SocketServer> controllerSocketServers();
68+
69+
/**
70+
* Return any one of the broker servers. Throw an error if none are found
71+
*/
72+
SocketServer anyBrokerSocketServer();
73+
74+
/**
75+
* Return any one of the controller servers. Throw an error if none are found
76+
*/
77+
SocketServer anyControllerSocketServer();
78+
79+
/**
80+
* The underlying object which is responsible for setting up and tearing down the cluster.
81+
*/
82+
Object getUnderlying();
83+
84+
default <T> T getUnderlying(Class<T> asClass) {
85+
return asClass.cast(getUnderlying());
86+
}
87+
88+
Admin createAdminClient(Properties configOverrides);
89+
90+
default Admin createAdminClient() {
91+
return createAdminClient(new Properties());
92+
}
93+
94+
void start();
95+
96+
void stop();
97+
}

0 commit comments

Comments
 (0)