5
5
import static com .linkedin .venice .utils .IntegrationTestPushUtils .makeStoreHybrid ;
6
6
import static com .linkedin .venice .utils .IntegrationTestPushUtils .sendStreamingRecord ;
7
7
import static org .testng .Assert .assertEquals ;
8
+ import static org .testng .Assert .assertTrue ;
8
9
9
10
import com .linkedin .venice .client .store .AvroGenericStoreClient ;
10
11
import com .linkedin .venice .client .store .ClientConfig ;
15
16
import com .linkedin .venice .exceptions .VeniceException ;
16
17
import com .linkedin .venice .integration .utils .ServiceFactory ;
17
18
import com .linkedin .venice .integration .utils .VeniceClusterWrapper ;
18
- import com .linkedin .venice .kafka .TopicManager ;
19
19
import com .linkedin .venice .kafka .TopicManagerRepository ;
20
20
import com .linkedin .venice .meta .Version ;
21
21
import com .linkedin .venice .pubsub .PubSubTopicRepository ;
@@ -42,26 +42,22 @@ public class TestDeleteStoreDeletesRealtimeTopic {
42
42
private VeniceClusterWrapper venice = null ;
43
43
private AvroGenericStoreClient client = null ;
44
44
private ControllerClient controllerClient = null ;
45
- private TopicManager topicManager = null ;
45
+ private TopicManagerRepository topicManagerRepository = null ;
46
46
private String storeName = null ;
47
47
48
- private PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository ();
48
+ private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository ();
49
49
50
50
@ BeforeClass
51
51
public void setUp () {
52
52
venice = ServiceFactory .getVeniceCluster ();
53
53
controllerClient =
54
54
ControllerClient .constructClusterControllerClient (venice .getClusterName (), venice .getRandomRouterURL ());
55
-
56
- try (TopicManagerRepository topicManagerRepository = IntegrationTestPushUtils .getTopicManagerRepo (
55
+ topicManagerRepository = IntegrationTestPushUtils .getTopicManagerRepo (
57
56
DEFAULT_KAFKA_OPERATION_TIMEOUT_MS ,
58
57
100 ,
59
58
0l ,
60
59
venice .getPubSubBrokerWrapper (),
61
- pubSubTopicRepository )) {
62
- topicManager = topicManagerRepository .getTopicManager ();
63
- }
64
-
60
+ pubSubTopicRepository );
65
61
storeName = Utils .getUniqueString ("hybrid-store" );
66
62
venice .getNewStore (storeName );
67
63
makeStoreHybrid (venice , storeName , 100L , 5L );
@@ -71,7 +67,7 @@ public void setUp() {
71
67
72
68
@ AfterClass
73
69
public void cleanUp () {
74
- Utils .closeQuietlyWithErrorLogged (topicManager );
70
+ Utils .closeQuietlyWithErrorLogged (topicManagerRepository );
75
71
Utils .closeQuietlyWithErrorLogged (client );
76
72
Utils .closeQuietlyWithErrorLogged (venice );
77
73
Utils .closeQuietlyWithErrorLogged (controllerClient );
@@ -109,7 +105,7 @@ public void deletingHybridStoreDeletesRealtimeTopic() {
109
105
110
106
// verify realtime topic exists
111
107
PubSubTopic rtTopic = pubSubTopicRepository .getTopic (Version .composeRealTimeTopic (storeName ));
112
- Assert . assertTrue (topicManager .containsTopicAndAllPartitionsAreOnline (rtTopic ));
108
+ assertTrue (topicManagerRepository . getTopicManager () .containsTopicAndAllPartitionsAreOnline (rtTopic ));
113
109
114
110
// disable store
115
111
TestUtils .assertCommand (
@@ -130,11 +126,11 @@ public void deletingHybridStoreDeletesRealtimeTopic() {
130
126
// verify realtime topic does not exist
131
127
PubSubTopic realTimeTopicName = pubSubTopicRepository .getTopic (Version .composeRealTimeTopic (storeName ));
132
128
try {
133
- boolean isTruncated = topicManager .isTopicTruncated (realTimeTopicName , 60000 );
134
- Assert . assertTrue (
129
+ boolean isTruncated = topicManagerRepository . getTopicManager () .isTopicTruncated (realTimeTopicName , 60000 );
130
+ assertTrue (
135
131
isTruncated ,
136
132
"Real-time buffer topic should be truncated: " + realTimeTopicName + " but retention is set to: "
137
- + topicManager .getTopicRetention (realTimeTopicName ) + "." );
133
+ + topicManagerRepository . getTopicManager () .getTopicRetention (realTimeTopicName ) + "." );
138
134
LOGGER .info ("Confirmed truncation of real-time topic: {}" , realTimeTopicName );
139
135
} catch (PubSubTopicDoesNotExistException e ) {
140
136
LOGGER
0 commit comments