Skip to content

Commit 43b40d8

Browse files
committed
Close BufferedReader in process_withSingleCrudMode
1 parent bee31d0 commit 43b40d8

1 file changed

Lines changed: 96 additions & 91 deletions

File tree

data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java

Lines changed: 96 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -84,67 +84,69 @@ void setUp() {
8484
@Test
8585
void process_withSingleCrudMode_shouldProcessAllDataChunks() {
8686
// Arrange
87-
BufferedReader reader = new BufferedReader(new StringReader("test data"));
88-
when(params.getTransactionMode()).thenReturn(TransactionMode.SINGLE_CRUD);
89-
when(params.getDao()).thenReturn(dao);
90-
when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes);
87+
try (BufferedReader reader = new BufferedReader(new StringReader("test data"))) {
88+
when(params.getTransactionMode()).thenReturn(TransactionMode.SINGLE_CRUD);
89+
when(params.getDao()).thenReturn(dao);
90+
when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes);
9191

92-
TestImportProcessor processor = new TestImportProcessor(params);
93-
processor.addListener(eventListener);
92+
TestImportProcessor processor = new TestImportProcessor(params);
93+
processor.addListener(eventListener);
9494

95-
// Act
96-
processor.process(2, 1, reader);
95+
// Act
96+
processor.process(2, 1, reader);
9797

98-
// Assert
99-
verify(eventListener, times(1)).onAllDataChunksCompleted();
100-
// Verify that data chunks were processed
101-
verify(eventListener, times(1)).onDataChunkCompleted(any(ImportDataChunkStatus.class));
98+
// Assert
99+
verify(eventListener, times(1)).onAllDataChunksCompleted();
100+
// Verify that data chunks were processed
101+
verify(eventListener, times(1)).onDataChunkCompleted(any(ImportDataChunkStatus.class));
102+
}
102103
}
103104

104105
@Test
105-
void process_withConsensusCommitMode_shouldProcessAllDataChunks() throws TransactionException {
106+
void process_withConsensusCommitMode_shouldProcessAllDataChunks() throws TransactionException, IOException {
106107
// Arrange
107-
BufferedReader reader = new BufferedReader(new StringReader("test data"));
108-
when(params.getTransactionMode()).thenReturn(TransactionMode.CONSENSUS_COMMIT);
109-
when(params.getDao()).thenReturn(dao);
110-
when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes);
111-
when(params.getTableMetadataByTableName()).thenReturn(tableMetadataByTableName);
112-
when(params.getDistributedTransactionManager()).thenReturn(distributedTransactionManager);
113-
when(distributedTransactionManager.start()).thenReturn(distributedTransaction);
114-
115-
TestImportProcessor processor = new TestImportProcessor(params);
116-
processor.addListener(eventListener);
117-
118-
// Act
119-
processor.process(2, 1, reader);
120-
121-
// Assert
122-
verify(eventListener, times(1)).onAllDataChunksCompleted();
123-
// Verify that data chunks were processed
124-
verify(eventListener, times(1)).onDataChunkCompleted(any(ImportDataChunkStatus.class));
108+
try (BufferedReader reader = new BufferedReader(new StringReader("test data"))) {
109+
when(params.getTransactionMode()).thenReturn(TransactionMode.CONSENSUS_COMMIT);
110+
when(params.getDao()).thenReturn(dao);
111+
when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes);
112+
when(params.getTableMetadataByTableName()).thenReturn(tableMetadataByTableName);
113+
when(params.getDistributedTransactionManager()).thenReturn(distributedTransactionManager);
114+
when(distributedTransactionManager.start()).thenReturn(distributedTransaction);
115+
116+
TestImportProcessor processor = new TestImportProcessor(params);
117+
processor.addListener(eventListener);
118+
119+
// Act
120+
processor.process(2, 1, reader);
121+
122+
// Assert
123+
verify(eventListener, times(1)).onAllDataChunksCompleted();
124+
// Verify that data chunks were processed
125+
verify(eventListener, times(1)).onDataChunkCompleted(any(ImportDataChunkStatus.class));
126+
}
125127
}
126128

127129
@Test
128-
void process_withEmptyData_shouldNotProcessAnyDataChunks() {
130+
void process_withEmptyData_shouldNotProcessAnyDataChunks() throws IOException {
129131
// Arrange
130132
TestImportProcessor processor = new TestImportProcessor(params);
131133
processor.addListener(eventListener);
132134

133-
BufferedReader reader = new BufferedReader(new StringReader(""));
135+
try (BufferedReader reader = new BufferedReader(new StringReader(""))) {
136+
// Act
137+
processor.process(2, 1, reader);
134138

135-
// Act
136-
processor.process(2, 1, reader);
137-
138-
// Assert
139-
verify(eventListener, times(1)).onAllDataChunksCompleted();
140-
// Verify that no data chunks were processed
141-
verify(eventListener, times(0)).onDataChunkCompleted(any());
139+
// Assert
140+
verify(eventListener, times(1)).onAllDataChunksCompleted();
141+
// Verify that no data chunks were processed
142+
verify(eventListener, times(0)).onDataChunkCompleted(any());
143+
}
142144
}
143145

144146
// Thread executor behavior tests
145147

146148
@Test
147-
void process_withMultipleDataChunks_shouldUseThreadPool() {
149+
void process_withMultipleDataChunks_shouldUseThreadPool() throws IOException {
148150
// Arrange
149151
final int maxThreads = 4;
150152
when(importOptions.getMaxThreads()).thenReturn(maxThreads);
@@ -157,7 +159,6 @@ void process_withMultipleDataChunks_shouldUseThreadPool() {
157159
for (int i = 0; i < 20; i++) {
158160
testData.append("test data line ").append(i).append("\n");
159161
}
160-
BufferedReader reader = new BufferedReader(new StringReader(testData.toString()));
161162

162163
// Create a latch to ensure tasks take some time to complete
163164
CountDownLatch latch = new CountDownLatch(1);
@@ -167,38 +168,40 @@ void process_withMultipleDataChunks_shouldUseThreadPool() {
167168
processor.addListener(eventListener);
168169
processor.setProcessingLatch(latch);
169170

170-
// Act
171-
processor.process(2, 1, reader);
171+
try (BufferedReader reader = new BufferedReader(new StringReader(testData.toString()))) {
172+
// Act
173+
processor.process(2, 1, reader);
172174

173-
// Assert
174-
// Verify that multiple threads were used but not more than maxThreads
175-
assertTrue(processor.getMaxConcurrentThreads().get() > 1, "Should use multiple threads");
176-
assertTrue(
177-
processor.getMaxConcurrentThreads().get() <= maxThreads, "Should not exceed max threads");
175+
// Assert
176+
// Verify that multiple threads were used but not more than maxThreads
177+
assertTrue(processor.getMaxConcurrentThreads().get() > 1, "Should use multiple threads");
178+
assertTrue(
179+
processor.getMaxConcurrentThreads().get() <= maxThreads, "Should not exceed max threads");
178180

179-
// Verify that all data chunks were processed
180-
verify(eventListener, times(1)).onAllDataChunksCompleted();
181+
// Verify that all data chunks were processed
182+
verify(eventListener, times(1)).onAllDataChunksCompleted();
183+
}
181184
}
182185

183186
@Test
184-
void process_withInterruption_shouldShutdownGracefully() {
187+
void process_withInterruption_shouldShutdownGracefully() throws IOException {
185188
// Arrange
186-
BufferedReader reader = new BufferedReader(new StringReader("test data\nmore data\n"));
187-
188189
// Create a processor that will be interrupted
189190
TestImportProcessor processor = new TestImportProcessor(params);
190191
processor.addListener(eventListener);
191192
processor.setSimulateInterruption(true);
192193

193-
// Act & Assert
194-
assertThrows(RuntimeException.class, () -> processor.process(2, 1, reader));
194+
try (BufferedReader reader = new BufferedReader(new StringReader("test data\nmore data\n"))) {
195+
// Act & Assert
196+
assertThrows(RuntimeException.class, () -> processor.process(2, 1, reader));
195197

196-
// Verify that onAllDataChunksCompleted was still called (in finally block)
197-
verify(eventListener, times(1)).onAllDataChunksCompleted();
198+
// Verify that onAllDataChunksCompleted was still called (in finally block)
199+
verify(eventListener, times(1)).onAllDataChunksCompleted();
200+
}
198201
}
199202

200203
@Test
201-
void process_withLargeNumberOfTasks_shouldWaitForAllTasksToComplete() {
204+
void process_withLargeNumberOfTasks_shouldWaitForAllTasksToComplete() throws IOException {
202205
// Arrange
203206
final int maxThreads = 2;
204207
when(importOptions.getMaxThreads()).thenReturn(maxThreads);
@@ -211,75 +214,77 @@ void process_withLargeNumberOfTasks_shouldWaitForAllTasksToComplete() {
211214
for (int i = 0; i < 50; i++) {
212215
testData.append("test data line ").append(i).append("\n");
213216
}
214-
BufferedReader reader = new BufferedReader(new StringReader(testData.toString()));
215217

216218
// Create a TestImportProcessor with a small processing delay
217219
TestImportProcessor processor = new TestImportProcessor(params);
218220
processor.addListener(eventListener);
219221
processor.setProcessingDelayMs(10); // 10ms delay per chunk
220222

221-
// Act
222-
processor.process(2, 1, reader);
223+
try (BufferedReader reader = new BufferedReader(new StringReader(testData.toString()))) {
224+
// Act
225+
processor.process(2, 1, reader);
223226

224-
// Assert
225-
// Verify that all tasks were completed
226-
assertTrue(processor.getProcessedChunksCount().get() > 0, "All tasks should be completed");
227-
verify(eventListener, times(1)).onAllDataChunksCompleted();
227+
// Assert
228+
// Verify that all tasks were completed
229+
assertTrue(processor.getProcessedChunksCount().get() > 0, "All tasks should be completed");
230+
verify(eventListener, times(1)).onAllDataChunksCompleted();
231+
}
228232
}
229233

230234
@Test
231-
void process_withShutdown_shouldShutdownExecutorsGracefully() {
235+
void process_withShutdown_shouldShutdownExecutorsGracefully() throws IOException {
232236
// Arrange
233237
when(params.getTransactionMode()).thenReturn(TransactionMode.SINGLE_CRUD);
234238
when(params.getDao()).thenReturn(dao);
235239
when(params.getTableColumnDataTypes()).thenReturn(tableColumnDataTypes);
236240
when(params.getTableMetadataByTableName()).thenReturn(tableMetadataByTableName);
237241

238-
BufferedReader reader =
239-
new BufferedReader(new StringReader("test data\nmore data\neven more data\n"));
240-
241242
// Create a TestImportProcessor with a longer processing delay
242243
TestImportProcessor processor = new TestImportProcessor(params);
243244
processor.addListener(eventListener);
244245
processor.setProcessingDelayMs(50); // 50ms delay per chunk
245246

246-
// Act
247-
processor.process(1, 1, reader);
247+
try (BufferedReader reader =
248+
new BufferedReader(new StringReader("test data\nmore data\neven more data\n"))) {
249+
// Act
250+
processor.process(1, 1, reader);
248251

249-
// Assert
250-
// Verify that all data chunks were processed and executors were shut down gracefully
251-
verify(eventListener, times(1)).onAllDataChunksCompleted();
252-
assertEquals(3, processor.getProcessedChunksCount().get(), "All chunks should be processed");
252+
// Assert
253+
// Verify that all data chunks were processed and executors were shut down gracefully
254+
verify(eventListener, times(1)).onAllDataChunksCompleted();
255+
assertEquals(3, processor.getProcessedChunksCount().get(), "All chunks should be processed");
256+
}
253257
}
254258

255259
@Test
256260
void process_withUnexpectedExceptionInTransaction_shouldHandleGracefully()
257-
throws TransactionException {
261+
throws TransactionException, IOException {
258262
// Arrange
259-
BufferedReader reader = new BufferedReader(new StringReader("test data"));
260263
when(params.getTransactionMode()).thenReturn(TransactionMode.CONSENSUS_COMMIT);
261264
when(params.getDistributedTransactionManager()).thenReturn(distributedTransactionManager);
262265
when(distributedTransactionManager.start()).thenThrow(new RuntimeException("Unexpected error"));
263266

264267
TestImportProcessor processor = new TestImportProcessor(params);
265268
processor.addListener(eventListener);
266269

267-
// Act
268-
processor.process(2, 1, reader);
270+
try (BufferedReader reader = new BufferedReader(new StringReader("test data"))) {
271+
// Act
272+
processor.process(2, 1, reader);
269273

270-
// Assert
271-
verify(eventListener, times(1)).onAllDataChunksCompleted();
274+
// Assert
275+
verify(eventListener, times(1)).onAllDataChunksCompleted();
272276

273-
// Capture and verify the transaction batch result
274-
ArgumentCaptor<ImportTransactionBatchResult> resultCaptor =
275-
ArgumentCaptor.forClass(ImportTransactionBatchResult.class);
276-
verify(eventListener, times(1)).onTransactionBatchCompleted(resultCaptor.capture());
277+
// Capture and verify the transaction batch result
278+
ArgumentCaptor<ImportTransactionBatchResult> resultCaptor =
279+
ArgumentCaptor.forClass(ImportTransactionBatchResult.class);
280+
verify(eventListener, times(1)).onTransactionBatchCompleted(resultCaptor.capture());
277281

278-
ImportTransactionBatchResult result = resultCaptor.getValue();
279-
assertFalse(result.isSuccess());
280-
assertEquals(0, result.getTransactionBatchId());
281-
assertEquals(1, result.getDataChunkId());
282-
assertTrue(result.getErrors().get(0).contains("Unexpected error"));
282+
ImportTransactionBatchResult result = resultCaptor.getValue();
283+
assertFalse(result.isSuccess());
284+
assertEquals(0, result.getTransactionBatchId());
285+
assertEquals(1, result.getDataChunkId());
286+
assertTrue(result.getErrors().get(0).contains("Unexpected error"));
287+
}
283288
}
284289

285290
/**

0 commit comments

Comments
 (0)