|
| 1 | +/* |
| 2 | + * Copyright 2014-2018 MarkLogic Corporation |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
1 | 16 | package com.marklogic.client.datamovement.functionaltests;
|
2 | 17 |
|
3 | 18 | import static org.junit.Assert.assertEquals;
|
| 19 | +import static org.junit.Assert.assertTrue; |
4 | 20 |
|
5 | 21 | import java.io.BufferedReader;
|
6 | 22 | import java.io.File;
|
|
58 | 74 | import com.marklogic.client.datamovement.JobTicket;
|
59 | 75 | import com.marklogic.client.datamovement.NoResponseListener;
|
60 | 76 | import com.marklogic.client.datamovement.QueryBatch;
|
| 77 | +import com.marklogic.client.datamovement.QueryBatchException; |
| 78 | +import com.marklogic.client.datamovement.QueryBatchListener; |
61 | 79 | import com.marklogic.client.datamovement.QueryBatcher;
|
| 80 | +import com.marklogic.client.datamovement.QueryBatcherListener; |
| 81 | +import com.marklogic.client.datamovement.QueryFailureListener; |
62 | 82 | import com.marklogic.client.datamovement.WriteBatcher;
|
63 | 83 | import com.marklogic.client.document.DocumentPage;
|
64 | 84 | import com.marklogic.client.document.DocumentRecord;
|
@@ -99,7 +119,13 @@ public static void setUpBeforeClass() throws Exception {
|
99 | 119 |
|
100 | 120 | server = getRestAppServerName();
|
101 | 121 | port = getRestAppServerPort();
|
102 |
| - |
| 122 | + |
| 123 | + // Create App Server if needed. |
| 124 | + createRESTServerWithDB(server, port); |
| 125 | + associateRESTServerWithDB(server, dbName); |
| 126 | + if (IsSecurityEnabled()) { |
| 127 | + enableSecurityOnRESTServer(server, dbName); |
| 128 | + } |
103 | 129 | hostNames = getHosts();
|
104 | 130 | // Perform the setup on multiple nodes only.
|
105 | 131 | if (hostNames.length > 1) {
|
@@ -182,12 +208,7 @@ public static void setUpBeforeClass() throws Exception {
|
182 | 208 | props.put("journaling", "strict");
|
183 | 209 | changeProperty(props, "/manage/v2/databases/" + dbName + "/properties");
|
184 | 210 | Thread.currentThread().sleep(500L);
|
185 |
| - // Create App Server if needed. |
186 |
| - createRESTServerWithDB(server, port); |
187 |
| - associateRESTServerWithDB(server, dbName); |
188 |
| - if (IsSecurityEnabled()) { |
189 |
| - enableSecurityOnRESTServer(server, dbName); |
190 |
| - } |
| 211 | + |
191 | 212 | // StringHandle
|
192 | 213 | stringTriple = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><foo>This is so foo</foo>";
|
193 | 214 | stringHandle = new StringHandle(stringTriple);
|
@@ -783,6 +804,116 @@ public void massDeleteConsistentSnapShot() throws Exception {
|
783 | 804 | System.out.println("Count: " + evalClient.newServerEval().xquery(query1).eval().next().getNumber().intValue());
|
784 | 805 | assertEquals(0, evalClient.newServerEval().xquery(query1).eval().next().getNumber().intValue());
|
785 | 806 | }
|
| 807 | + |
| 808 | + /* This test is intended to test closing of listeners when job is done. |
| 809 | + * |
| 810 | + * |
| 811 | + */ |
| 812 | + @Test(timeout = 450000) |
| 813 | + public void testListenerCloseables() throws Exception { |
| 814 | + Assume.assumeTrue(hostNames.length > 1); |
| 815 | + |
| 816 | + System.out.println(Thread.currentThread().getStackTrace()[1].getMethodName()); |
| 817 | + AtomicInteger success = new AtomicInteger(0); |
| 818 | + // There two variables are to track close method on Listeners. |
| 819 | + AtomicBoolean testCloseOnBatchListenerUriReady = new AtomicBoolean(false); |
| 820 | + AtomicBoolean testCloseOnFailureListenerQueryFailure = new AtomicBoolean(false); |
| 821 | + |
| 822 | + // This variable tracks the OnJobCompleteion status |
| 823 | + AtomicBoolean getOnePrimaryDBClient = new AtomicBoolean(false); |
| 824 | + |
| 825 | + // Track primary database client on all listeners and job completion |
| 826 | + StringBuilder sb_strBatchListenerUriReady = new StringBuilder(); |
| 827 | + StringBuilder sb_strJobCompletionListener = new StringBuilder(); |
| 828 | + |
| 829 | + |
| 830 | + class TestCloseOnBatchListenerUriReady implements QueryBatchListener, AutoCloseable { |
| 831 | + |
| 832 | + @Override |
| 833 | + public void close() throws Exception { |
| 834 | + System.out.println("Close called from testMinNodesWithCloseable in TestCloseOnBatchListenerUriReady class"); |
| 835 | + testCloseOnBatchListenerUriReady.set(true); |
| 836 | + } |
| 837 | + |
| 838 | + @Override |
| 839 | + public void processEvent(QueryBatch batch) { |
| 840 | + System.out.println("processEvent called from testMinNodesWithCloseable in TestCloseOnBatchListenerUriReady class"); |
| 841 | + // Verify the Primary DatabaseClient instance |
| 842 | + if (!getOnePrimaryDBClient.get()) { |
| 843 | + getOnePrimaryDBClient.set(true); |
| 844 | + sb_strBatchListenerUriReady.append(batch.getBatcher().getPrimaryClient().getHost()); |
| 845 | + sb_strBatchListenerUriReady.append("|"); |
| 846 | + sb_strBatchListenerUriReady.append(batch.getBatcher().getPrimaryClient().getPort()); |
| 847 | + } |
| 848 | + } |
| 849 | + } |
| 850 | + |
| 851 | + class TestCloseOnBatchListenerQueryFailure implements QueryFailureListener, AutoCloseable { |
| 852 | + |
| 853 | + @Override |
| 854 | + public void close() throws Exception { |
| 855 | + System.out.println("Close called from testMinNodesWithCloseable in TestCloseOnBatchListenerQueryFailure class"); |
| 856 | + testCloseOnFailureListenerQueryFailure.set(true); |
| 857 | + } |
| 858 | + |
| 859 | + @Override |
| 860 | + public void processFailure(QueryBatchException failure) { |
| 861 | + System.out.println("processFailure called from testMinNodesWithCloseable in TestCloseOnBatchListenerQueryFailure class"); |
| 862 | + } |
| 863 | + } |
| 864 | + |
| 865 | + // Listener to be called when QueryBatcher has completed reading all URIs |
| 866 | + class TestQBJobCompleteionListener implements QueryBatcherListener { |
| 867 | + |
| 868 | + @Override |
| 869 | + public void processEvent(QueryBatcher batcher) { |
| 870 | + System.out.println("processEvent called from testMinNodesWithCloseable in TestQBJobCompleteionListener class"); |
| 871 | + |
| 872 | + // Verify a detail - ticket Id at end of completion |
| 873 | + sb_strJobCompletionListener.append(batcher.getBatchSize()); |
| 874 | + } |
| 875 | + } |
| 876 | + |
| 877 | + try { |
| 878 | + |
| 879 | + TestCloseOnBatchListenerUriReady closeBatchURIs = new TestCloseOnBatchListenerUriReady(); |
| 880 | + TestCloseOnBatchListenerQueryFailure closeQueryFailure = new TestCloseOnBatchListenerQueryFailure(); |
| 881 | + TestQBJobCompleteionListener jobCompleteListener = new TestQBJobCompleteionListener(); |
| 882 | + |
| 883 | + QueryBatcher batcher = dmManager.newQueryBatcher(new StructuredQueryBuilder().collection("XmlTransform")) |
| 884 | + .withBatchSize(4000).withThreadCount(5); |
| 885 | + |
| 886 | + // Add the new Listeners to the batcher. |
| 887 | + |
| 888 | + batcher.onUrisReady((batch) -> { |
| 889 | + success.addAndGet(batch.getItems().length); |
| 890 | + }).onQueryFailure(queryException -> { |
| 891 | + queryException.printStackTrace(); |
| 892 | + }).onUrisReady(closeBatchURIs) |
| 893 | + .onQueryFailure(closeQueryFailure) |
| 894 | + .onJobCompletion(jobCompleteListener); |
| 895 | + |
| 896 | + ticket = dmManager.startJob(batcher); |
| 897 | + |
| 898 | + batcher.awaitCompletion(); |
| 899 | + dmManager.stopJob(ticket); |
| 900 | + } catch (Exception e) { |
| 901 | + e.printStackTrace(); |
| 902 | + } |
| 903 | + // Verify the DatabaseClient instances. |
| 904 | + System.out.println("Primary database instance is " + sb_strBatchListenerUriReady.toString()); |
| 905 | + |
| 906 | + // Verify the close status |
| 907 | + assertTrue("Close is not called from testMinNodesWithCloseable in TestCloseOnBatchListenerUriReady class", testCloseOnBatchListenerUriReady.get()); |
| 908 | + assertTrue("Close is not called from testMinNodesWithCloseable in TestCloseOnBatchListenerQueryFailure class", testCloseOnFailureListenerQueryFailure.get()); |
| 909 | + |
| 910 | + // Verify the batch size on job completion |
| 911 | + assertTrue("Job Completion details not equal", |
| 912 | + sb_strJobCompletionListener.toString().equalsIgnoreCase("4000")); |
| 913 | + // Verify the primary database client |
| 914 | + assertTrue("Primary database details not correct", |
| 915 | + sb_strBatchListenerUriReady.toString().contains(String.valueOf(port))); |
| 916 | + } |
786 | 917 |
|
787 | 918 | private void serverStartStop(String server, String command) throws Exception {
|
788 | 919 | System.out.println("Preparing to " + command + " " + server);
|
|
0 commit comments