diff --git a/automation/pom.xml b/automation/pom.xml index 9afb0ae22f..eaff8c2a65 100644 --- a/automation/pom.xml +++ b/automation/pom.xml @@ -32,7 +32,6 @@ - @@ -173,6 +172,7 @@ jsystemCore 6.1.05 + javax.comm comm @@ -184,12 +184,12 @@ - org.jsystemtest.systemobjects cli 6.1.05 + javax.comm comm @@ -201,7 +201,6 @@ - org.mockito mockito-core diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-2-as-policy/expected/query01.ans b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-2-as-policy/expected/query01.ans new file mode 100755 index 0000000000..c0fb0a454b --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-2-as-policy/expected/query01.ans @@ -0,0 +1,16 @@ +-- start_ignore +-- end_ignore +-- @description query01 for PXF test to check fragments distribution across segments with HDFS profile using active-segment policy +SELECT * FROM fd_2_active_segment_hdfs_ext_table ORDER BY 1; + id | descr +----+------- + 1 | text1 + 2 | text2 + 3 | text3 + 4 | text4 + 5 | text5 + 6 | text6 + 7 | text7 + 8 | text8 +(8 rows) + diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-2-as-policy/sql/query01.sql b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-2-as-policy/sql/query01.sql new file mode 100755 index 0000000000..ad053720a5 --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-2-as-policy/sql/query01.sql @@ -0,0 +1,2 @@ +-- @description query01 for PXF test to check fragments distribution across segments with HDFS profile using active-segment policy +SELECT * FROM fd_2_active_segment_hdfs_ext_table ORDER BY 1; diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-seg-limit/expected/query01.ans b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-4-as-policy/expected/query01.ans similarity index 55% rename from automation/sqlrepo/arenadata/fragment-distribution/hdfs-seg-limit/expected/query01.ans rename to automation/sqlrepo/arenadata/fragment-distribution/hdfs-4-as-policy/expected/query01.ans index 69ec3de7ec..a43cd54c67 100755 --- a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-seg-limit/expected/query01.ans +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-4-as-policy/expected/query01.ans @@ -1,13 +1,13 @@ -- start_ignore -- end_ignore -- @description query01 for PXF test to check fragments distribution across segments with HDFS profile -SELECT * FROM fragment_distribution_hdfs_seg_limit_ext_table ORDER BY 1 LIMIT 5; +SELECT * FROM fd_4_active_segment_hdfs_ext_table ORDER BY 1; id | descr ----+------- 1 | text1 - 1 | text1 - 1 | text1 - 1 | text1 - 1 | text1 -(5 rows) - + 2 | text2 + 3 | text3 + 4 | text4 + 5 | text5 + 6 | text6 +(6 rows) diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-seg-limit/sql/query01.sql b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-4-as-policy/sql/query01.sql similarity index 55% rename from automation/sqlrepo/arenadata/fragment-distribution/hdfs-seg-limit/sql/query01.sql rename to automation/sqlrepo/arenadata/fragment-distribution/hdfs-4-as-policy/sql/query01.sql index a99f9ab1b5..c571dc40b4 100755 --- a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-seg-limit/sql/query01.sql +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-4-as-policy/sql/query01.sql @@ -1,2 +1,2 @@ -- @description query01 for PXF test to check fragments distribution across segments with HDFS profile -SELECT * FROM fragment_distribution_hdfs_seg_limit_ext_table ORDER BY 1 LIMIT 5; +SELECT * FROM fd_4_active_segment_hdfs_ext_table ORDER BY 1; diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-even-policy/expected/query01.ans b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-even-policy/expected/query01.ans new file mode 100755 index 0000000000..0d0448922d --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-even-policy/expected/query01.ans @@ -0,0 +1,19 @@ +-- start_ignore +-- end_ignore +-- @description query01 for PXF test to check fragments distribution across segments HDFS profile when fragment count is equal to an even number of segments. +SELECT * FROM fd_improved_round_robin_even_hdfs_ext_table ORDER BY 1; + id | descr +----+-------- + 1 | text1 + 2 | text2 + 3 | text3 + 4 | text4 + 5 | text5 + 6 | text6 + 7 | text7 + 8 | text8 + 9 | text9 + 10 | text10 + 11 | text11 + 12 | text12 +(12 rows) diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-even-policy/sql/query01.sql b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-even-policy/sql/query01.sql new file mode 100755 index 0000000000..edaa0c048e --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-even-policy/sql/query01.sql @@ -0,0 +1,2 @@ +-- @description query01 for PXF test to check fragments distribution across segments HDFS profile when fragment count is equal to an even number of segments. +SELECT * FROM fd_improved_round_robin_even_hdfs_ext_table ORDER BY 1; diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-less-policy/expected/query01.ans b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-less-policy/expected/query01.ans new file mode 100755 index 0000000000..3fa7034450 --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-less-policy/expected/query01.ans @@ -0,0 +1,10 @@ +-- start_ignore +-- end_ignore +-- @description query01 for PXF test to check fragments distribution across segments with HDFS profile using improved-round-robin policy when fragment count is less +-- than segment count. +SELECT * FROM fd_improved_round_robin_less_hdfs_ext_table ORDER BY 1; + id | descr +----+------- + 1 | text1 + 2 | text2 +(2 rows) diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-less-policy/sql/query01.sql b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-less-policy/sql/query01.sql new file mode 100755 index 0000000000..612d83306c --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-less-policy/sql/query01.sql @@ -0,0 +1,3 @@ +-- @description query01 for PXF test to check fragments distribution across segments with HDFS profile using improved-round-robin policy when fragment count is less +-- than segment count. +SELECT * FROM fd_improved_round_robin_less_hdfs_ext_table ORDER BY 1; diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-more-policy/expected/query01.ans b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-more-policy/expected/query01.ans new file mode 100755 index 0000000000..b11bad6fdc --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-more-policy/expected/query01.ans @@ -0,0 +1,16 @@ +-- start_ignore +-- end_ignore +-- @description query01 for PXF test to check fragments distribution across segments with HDFS profile using improved-round-robin policy when fragment count is more +-- than segment count but not equal to an even number of segments. +SELECT * FROM fd_improved_round_robin_more_hdfs_ext_table ORDER BY 1; + id | descr +----+------- + 1 | text1 + 2 | text2 + 3 | text3 + 4 | text4 + 5 | text5 + 6 | text6 + 7 | text7 + 8 | text8 +(8 rows) diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-more-policy/sql/query01.sql b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-more-policy/sql/query01.sql new file mode 100755 index 0000000000..f6baf16655 --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-irr-more-policy/sql/query01.sql @@ -0,0 +1,3 @@ +-- @description query01 for PXF test to check fragments distribution across segments with HDFS profile using improved-round-robin policy when fragment count is more +-- than segment count but not equal to an even number of segments. +SELECT * FROM fd_improved_round_robin_more_hdfs_ext_table ORDER BY 1; diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-limit/expected/query01.ans b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-limit/expected/query01.ans index 5e0e98e438..44d811188a 100755 --- a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-limit/expected/query01.ans +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-limit/expected/query01.ans @@ -1,13 +1,14 @@ -- start_ignore -- end_ignore -- @description query01 for PXF test to check fragments distribution across segments with HDFS profile -SELECT * FROM fragment_distribution_hdfs_limit_ext_table ORDER BY 1 LIMIT 5; +SELECT * FROM fd_hdfs_limit_ext_table ORDER BY 1; id | descr ----+------- 1 | text1 - 1 | text1 - 1 | text1 - 1 | text1 - 1 | text1 -(5 rows) + 2 | text2 + 3 | text3 + 4 | text4 + 5 | text5 + 6 | text6 +(6 rows) diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-limit/sql/query01.sql b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-limit/sql/query01.sql index 17709ee1a4..9d3fb398e2 100755 --- a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-limit/sql/query01.sql +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-limit/sql/query01.sql @@ -1,2 +1,2 @@ -- @description query01 for PXF test to check fragments distribution across segments with HDFS profile -SELECT * FROM fragment_distribution_hdfs_limit_ext_table ORDER BY 1 LIMIT 5; +SELECT * FROM fd_hdfs_limit_ext_table ORDER BY 1; diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-random-policy/expected/query01.ans b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-random-policy/expected/query01.ans new file mode 100755 index 0000000000..0e48058ac4 --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-random-policy/expected/query01.ans @@ -0,0 +1,15 @@ +-- start_ignore +-- end_ignore +-- @description query01 for PXF test to check fragments distribution across segments with HDFS profile using random policy +SELECT * FROM fd_random_hdfs_ext_table ORDER BY 1; + id | descr +----+------- + 1 | text1 + 2 | text2 + 3 | text3 + 4 | text4 + 5 | text5 + 6 | text6 + 7 | text7 + 8 | text8 +(8 rows) diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs-random-policy/sql/query01.sql b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-random-policy/sql/query01.sql new file mode 100755 index 0000000000..e71f0ca844 --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs-random-policy/sql/query01.sql @@ -0,0 +1,2 @@ +-- @description query01 for PXF test to check fragments distribution across segments with HDFS profile using random policy +SELECT * FROM fd_random_hdfs_ext_table ORDER BY 1; diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs/expected/query01.ans b/automation/sqlrepo/arenadata/fragment-distribution/hdfs/expected/query01.ans index 731525fa7a..359a7d3fe5 100755 --- a/automation/sqlrepo/arenadata/fragment-distribution/hdfs/expected/query01.ans +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs/expected/query01.ans @@ -1,13 +1,14 @@ -- start_ignore -- end_ignore -- @description query01 for PXF test to check fragments distribution across segments with HDFS profile -SELECT * FROM fragment_distribution_hdfs_ext_table ORDER BY 1 LIMIT 5; +SELECT * FROM fd_hdfs_ext_table ORDER BY 1; id | descr ----+------- 1 | text1 - 1 | text1 - 1 | text1 - 1 | text1 - 1 | text1 -(5 rows) + 2 | text2 + 3 | text3 + 4 | text4 + 5 | text5 + 6 | text6 +(6 rows) diff --git a/automation/sqlrepo/arenadata/fragment-distribution/hdfs/sql/query01.sql b/automation/sqlrepo/arenadata/fragment-distribution/hdfs/sql/query01.sql index 0bdb836746..ac75d6973b 100755 --- a/automation/sqlrepo/arenadata/fragment-distribution/hdfs/sql/query01.sql +++ b/automation/sqlrepo/arenadata/fragment-distribution/hdfs/sql/query01.sql @@ -1,2 +1,2 @@ -- @description query01 for PXF test to check fragments distribution across segments with HDFS profile -SELECT * FROM fragment_distribution_hdfs_ext_table ORDER BY 1 LIMIT 5; +SELECT * FROM fd_hdfs_ext_table ORDER BY 1; diff --git a/automation/sqlrepo/arenadata/fragment-distribution/jdbc-as-policy/expected/query01.ans b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-as-policy/expected/query01.ans new file mode 100755 index 0000000000..432bdeb2e8 --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-as-policy/expected/query01.ans @@ -0,0 +1,28 @@ +-- start_ignore +-- end_ignore +-- @description query01 for PXF test to check fragments distribution across segments with JDBC profile using active-segment policy +SELECT * FROM fd_active_segment_jdbc_limit_ext_table ORDER BY 1; + id | descr +----+------- + 1 | text1 + 2 | text2 + 3 | text3 + 4 | text4 + 5 | text5 + 6 | text6 + 7 | text7 + 8 | text8 + 9 | text9 + 10 | text10 + 11 | text11 + 12 | text12 + 13 | text13 + 14 | text14 + 15 | text15 + 16 | text16 + 17 | text17 + 18 | text18 + 19 | text19 + 20 | text20 +(20 rows) + diff --git a/automation/sqlrepo/arenadata/fragment-distribution/jdbc-as-policy/sql/query01.sql b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-as-policy/sql/query01.sql new file mode 100755 index 0000000000..4d0a2acd8e --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-as-policy/sql/query01.sql @@ -0,0 +1,2 @@ +-- @description query01 for PXF test to check fragments distribution across segments with JDBC profile using active-segment policy +SELECT * FROM fd_active_segment_jdbc_limit_ext_table ORDER BY 1; diff --git a/automation/sqlrepo/arenadata/fragment-distribution/jdbc-failed-as-policy/expected/query01.ans b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-failed-as-policy/expected/query01.ans new file mode 100755 index 0000000000..261803c9a9 --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-failed-as-policy/expected/query01.ans @@ -0,0 +1,8 @@ +-- start_ignore +-- end_ignore +-- @description query01 for PXF test to check fragments distribution across segments with JDBC profile using active-segment policy +-- but without parameter ACTIVE_SEGMENT_COUNT +SELECT * FROM fd_failed_active_segment_jdbc_limit_ext_table ORDER BY 1; +ERROR: PXF server error : The parameter ACTIVE_SEGMENT_COUNT is not define while the fragment distribution policy is active-segment. Add the parameter ACTIVE_SEGMENT_COUNT to the external table DDL +HINT: Check the PXF logs located in the '/usr/local/greenplum-db-devel/pxf/logs' directory on host 'localhost' or 'set client_min_messages=LOG' for additional details. +CONTEXT: External table fd_failed_active_segment_jdbc_limit_ext_table diff --git a/automation/sqlrepo/arenadata/fragment-distribution/jdbc-failed-as-policy/sql/query01.sql b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-failed-as-policy/sql/query01.sql new file mode 100755 index 0000000000..0be8b8e491 --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-failed-as-policy/sql/query01.sql @@ -0,0 +1,3 @@ +-- @description query01 for PXF test to check fragments distribution across segments with JDBC profile using active-segment policy +-- but without parameter ACTIVE_SEGMENT_COUNT +SELECT * FROM fd_failed_active_segment_jdbc_limit_ext_table ORDER BY 1; diff --git a/automation/sqlrepo/arenadata/fragment-distribution/jdbc-irr-policy/expected/query01.ans b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-irr-policy/expected/query01.ans new file mode 100755 index 0000000000..0d40945031 --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-irr-policy/expected/query01.ans @@ -0,0 +1,27 @@ +-- start_ignore +-- end_ignore +-- @description query01 for PXF test to check fragments distribution across segments with JDBC profile using improved-round-robin policy +SELECT * FROM fd_improved_round_robin_jdbc_ext_table ORDER BY 1; + id | descr +----+------- + 1 | text1 + 2 | text2 + 3 | text3 + 4 | text4 + 5 | text5 + 6 | text6 + 7 | text7 + 8 | text8 + 9 | text9 + 10 | text10 + 11 | text11 + 12 | text12 + 13 | text13 + 14 | text14 + 15 | text15 + 16 | text16 + 17 | text17 + 18 | text18 + 19 | text19 + 20 | text20 +(20 rows) diff --git a/automation/sqlrepo/arenadata/fragment-distribution/jdbc-irr-policy/sql/query01.sql b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-irr-policy/sql/query01.sql new file mode 100755 index 0000000000..40188ef7a6 --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-irr-policy/sql/query01.sql @@ -0,0 +1,2 @@ +-- @description query01 for PXF test to check fragments distribution across segments with JDBC profile using improved-round-robin policy +SELECT * FROM fd_improved_round_robin_jdbc_ext_table ORDER BY 1; diff --git a/automation/sqlrepo/arenadata/fragment-distribution/jdbc-limit/expected/query01.ans b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-limit/expected/query01.ans index c962bd7c4d..a44f8f0223 100755 --- a/automation/sqlrepo/arenadata/fragment-distribution/jdbc-limit/expected/query01.ans +++ b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-limit/expected/query01.ans @@ -1,7 +1,7 @@ -- start_ignore -- end_ignore -- @description query01 for PXF test to check fragments distribution across segments with JDBC profile -SELECT * FROM fragment_distribution_jdbc_limit_ext_table ORDER BY 1; +SELECT * FROM fd_jdbc_limit_ext_table ORDER BY 1; id | descr ----+------- 1 | text1 diff --git a/automation/sqlrepo/arenadata/fragment-distribution/jdbc-limit/sql/query01.sql b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-limit/sql/query01.sql index 17d77e0984..6b8d54f4aa 100755 --- a/automation/sqlrepo/arenadata/fragment-distribution/jdbc-limit/sql/query01.sql +++ b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-limit/sql/query01.sql @@ -1,2 +1,2 @@ -- @description query01 for PXF test to check fragments distribution across segments with JDBC profile -SELECT * FROM fragment_distribution_jdbc_limit_ext_table ORDER BY 1; +SELECT * FROM fd_jdbc_limit_ext_table ORDER BY 1; diff --git a/automation/sqlrepo/arenadata/fragment-distribution/jdbc-random-policy/expected/query01.ans b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-random-policy/expected/query01.ans new file mode 100755 index 0000000000..e8a7ac2b9e --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-random-policy/expected/query01.ans @@ -0,0 +1,27 @@ +-- start_ignore +-- end_ignore +-- @description query01 for PXF test to check fragments distribution across segments with JDBC profile using random policy +SELECT * FROM fd_random_jdbc_ext_table ORDER BY 1; + id | descr +----+------- + 1 | text1 + 2 | text2 + 3 | text3 + 4 | text4 + 5 | text5 + 6 | text6 + 7 | text7 + 8 | text8 + 9 | text9 + 10 | text10 + 11 | text11 + 12 | text12 + 13 | text13 + 14 | text14 + 15 | text15 + 16 | text16 + 17 | text17 + 18 | text18 + 19 | text19 + 20 | text20 +(20 rows) diff --git a/automation/sqlrepo/arenadata/fragment-distribution/jdbc-random-policy/sql/query01.sql b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-random-policy/sql/query01.sql new file mode 100755 index 0000000000..f34aff2ede --- /dev/null +++ b/automation/sqlrepo/arenadata/fragment-distribution/jdbc-random-policy/sql/query01.sql @@ -0,0 +1,2 @@ +-- @description query01 for PXF test to check fragments distribution across segments with JDBC profile using random policy +SELECT * FROM fd_random_jdbc_ext_table ORDER BY 1; diff --git a/automation/sqlrepo/arenadata/fragment-distribution/jdbc/expected/query01.ans b/automation/sqlrepo/arenadata/fragment-distribution/jdbc/expected/query01.ans index 44dcb3c3e6..c7395cbc44 100755 --- a/automation/sqlrepo/arenadata/fragment-distribution/jdbc/expected/query01.ans +++ b/automation/sqlrepo/arenadata/fragment-distribution/jdbc/expected/query01.ans @@ -1,7 +1,7 @@ -- start_ignore -- end_ignore -- @description query01 for PXF test to check fragments distribution across segments with JDBC profile -SELECT * FROM fragment_distribution_jdbc_ext_table ORDER BY 1; +SELECT * FROM fd_jdbc_ext_table ORDER BY 1; id | descr ----+------- 1 | text1 diff --git a/automation/sqlrepo/arenadata/fragment-distribution/jdbc/sql/query01.sql b/automation/sqlrepo/arenadata/fragment-distribution/jdbc/sql/query01.sql index df04baf696..1f440d462b 100755 --- a/automation/sqlrepo/arenadata/fragment-distribution/jdbc/sql/query01.sql +++ b/automation/sqlrepo/arenadata/fragment-distribution/jdbc/sql/query01.sql @@ -1,2 +1,2 @@ -- @description query01 for PXF test to check fragments distribution across segments with JDBC profile -SELECT * FROM fragment_distribution_jdbc_ext_table ORDER BY 1; +SELECT * FROM fd_jdbc_ext_table ORDER BY 1; diff --git a/automation/src/test/java/org/greenplum/pxf/automation/arenadata/FragmentDistributionTest.java b/automation/src/test/java/org/greenplum/pxf/automation/arenadata/FragmentDistributionTest.java index a9c6c4a1b6..847ac24151 100644 --- a/automation/src/test/java/org/greenplum/pxf/automation/arenadata/FragmentDistributionTest.java +++ b/automation/src/test/java/org/greenplum/pxf/automation/arenadata/FragmentDistributionTest.java @@ -21,12 +21,21 @@ import static org.junit.Assert.assertEquals; public class FragmentDistributionTest extends BaseFeature { - private static final String PG_SOURCE_TABLE_NAME = "pg_fragment_distribution_source_table"; - private static final String JDBC_EXT_TABLE_NAME = "fragment_distribution_jdbc_ext_table"; - private static final String JDBC_LIMIT_EXT_TABLE_NAME = "fragment_distribution_jdbc_limit_ext_table"; - private static final String HDFS_EXT_TABLE_NAME = "fragment_distribution_hdfs_ext_table"; - private static final String HDFS_LIMIT_EXT_TABLE_NAME = "fragment_distribution_hdfs_limit_ext_table"; - private static final String HDFS_SEG_LIMIT_EXT_TABLE_NAME = "fragment_distribution_hdfs_seg_limit_ext_table"; + private static final String PG_SOURCE_TABLE_NAME = "pg_fd_source_table"; + private static final String JDBC_EXT_TABLE_NAME = "fd_jdbc_ext_table"; + private static final String JDBC_LIMIT_EXT_TABLE_NAME = "fd_jdbc_limit_ext_table"; + private static final String JDBC_ACTIVE_SEGMENT_EXT_TABLE_NAME = "fd_active_segment_jdbc_limit_ext_table"; + private static final String JDBC_ACTIVE_SEGMENT_FAILED_EXT_TABLE_NAME = "fd_failed_active_segment_jdbc_limit_ext_table"; + private static final String JDBC_RANDOM_EXT_TABLE_NAME = "fd_random_jdbc_ext_table"; + private static final String JDBC_IMPROVED_ROUND_ROBIN_EXT_TABLE_NAME = "fd_improved_round_robin_jdbc_ext_table"; + private static final String HDFS_EXT_TABLE_NAME = "fd_hdfs_ext_table"; + private static final String HDFS_LIMIT_EXT_TABLE_NAME = "fd_hdfs_limit_ext_table"; + private static final String HDFS_4_ACTIVE_SEGMENT_EXT_TABLE_NAME = "fd_4_active_segment_hdfs_ext_table"; + private static final String HDFS_2_ACTIVE_SEGMENT_EXT_TABLE_NAME = "fd_2_active_segment_hdfs_ext_table"; + private static final String HDFS_RANDOM_EXT_TABLE_NAME = "fd_random_hdfs_ext_table"; + private static final String HDFS_IRR_MORE_EXT_TABLE_NAME = "fd_improved_round_robin_more_hdfs_ext_table"; + private static final String HDFS_IRR_EVEN_EXT_TABLE_NAME = "fd_improved_round_robin_even_hdfs_ext_table"; + private static final String HDFS_IRR_LESS_EXT_TABLE_NAME = "fd_improved_round_robin_less_hdfs_ext_table"; private static final String[] SOURCE_TABLE_FIELDS = new String[]{ "id int", "descr text"}; @@ -34,15 +43,16 @@ public class FragmentDistributionTest extends BaseFeature { private static final String PXF_TEMP_LOG_FILE = PXF_TEMP_LOG_PATH + "/pxf-service.log"; private static final String DELIMITER = ";"; private static final String CAT_COMMAND = "cat " + PXF_TEMP_LOG_FILE; - private static final String PXF_LOG_JDBC_GREP_COMMAND = CAT_COMMAND + " | grep -E 'Returning [1-7]/7 fragment' | wc -l"; - private static final String PXF_LOG_JDBC_GREP_COMMAND_LIMIT = CAT_COMMAND + " | grep 'Returning 0/7 fragment' | wc -l"; - private static final String PXF_LOG_HDFS_GREP_COMMAND = CAT_COMMAND + " | grep 'Returning 1/6 fragment' | wc -l"; - private static final String PXF_LOG_HDFS_COMMAND_LIMIT = CAT_COMMAND + " | grep 'Returning 0/6 fragment' | wc -l"; - private static final String PXF_LOG_HDFS_SEG_COMMAND_LIMIT = CAT_COMMAND + " | grep -E 'Returning [1-3]/6 fragment' | wc -l"; + private static final String PXF_LOG_GET_FRAGMENTS_COUNT = CAT_COMMAND + " | grep -oP 'Returning ([1-9]+/\\d+) fragment' | awk -F ' ' '{print $2}' | awk -F '/' '{sum+=$1;} END{print sum;}'"; + private static final String PXF_LOG_GET_IDLE_SEGMENTS = CAT_COMMAND + " | grep -oP 'Returning (0/\\d+) fragment' | wc -l"; + private static final String PXF_LOG_GET_ACTIVE_SEGMENTS = CAT_COMMAND + " | grep -oP 'Returning ([1-9]+/\\d+) fragment' | wc -l"; + private static final String PXF_LOG_CHECK_POLICY_TEMPLATE = CAT_COMMAND + "| grep \"The '%s' fragment distribution policy will be used\" | wc -l"; private List pxfNodes; private String pxfLogFile; - private String hdfsPath; - private Table dataTable; + private String hdfsPathWith2Files; + private String hdfsPathWith6Files; + private String hdfsPathWith8Files; + private String hdfsPathWith12Files; private Table postgresSourceTable; private String restartCommand; @@ -55,7 +65,10 @@ protected void beforeClass() throws Exception { pxfNodes = ((MultiNodeCluster) cluster).getNode(SegmentNode.class, PhdCluster.EnumClusterServices.pxf); } pxfLogFile = pxfHome + "/" + PXF_LOG_RELATIVE_PATH; - hdfsPath = hdfs.getWorkingDirectory() + "/parquet_fragment_distribution/"; + hdfsPathWith2Files = hdfs.getWorkingDirectory() + "/text_fragment_distribution_2/"; + hdfsPathWith6Files = hdfs.getWorkingDirectory() + "/text_fragment_distribution_6/"; + hdfsPathWith8Files = hdfs.getWorkingDirectory() + "/text_fragment_distribution_8/"; + hdfsPathWith12Files = hdfs.getWorkingDirectory() + "/text_fragment_distribution_12/"; prepareData(); changeLogLevel("debug"); cluster.runCommand("mkdir -p " + PXF_TEMP_LOG_PATH); @@ -71,12 +84,24 @@ public void afterClass() throws Exception { protected void prepareData() throws Exception { preparePgSourceTable(); - prepareHdfsFiles(); + prepareHdfsFiles(2, hdfsPathWith2Files); + prepareHdfsFiles(6, hdfsPathWith6Files); + prepareHdfsFiles(8, hdfsPathWith8Files); + prepareHdfsFiles(12, hdfsPathWith12Files); createGpdbReadableJdbcTable(); createGpdbReadableJdbcTableWithLimit(); + createGpdbReadableJdbcTableWithActiveSegmentDistribution(); + createGpdbReadableJdbcTableWithFailedActiveSegmentDistribution(); + createGpdbReadableJdbcTableWithRandomDistribution(); + createGpdbReadableJdbcTableWithImprovedRoundRobinDistribution(); createGpdbReadableHdfsTable(); createGpdbReadableHdfsTableWithLimit(); - createGpdbReadableHdfsTableWithSegLimit(); + createGpdbReadableHdfsTableWith4ActiveSegmentDistribution(); + createGpdbReadableHdfsTableWith2ActiveSegmentDistribution(); + createGpdbReadableHdfsTableWithRandomDistribution(); + createGpdbReadableHdfsTableWithImprovedRoundRobinDistribution(HDFS_IRR_MORE_EXT_TABLE_NAME, hdfsPathWith8Files); + createGpdbReadableHdfsTableWithImprovedRoundRobinDistribution(HDFS_IRR_EVEN_EXT_TABLE_NAME, hdfsPathWith12Files); + createGpdbReadableHdfsTableWithImprovedRoundRobinDistribution(HDFS_IRR_LESS_EXT_TABLE_NAME, hdfsPathWith2Files); } private void preparePgSourceTable() throws Exception { @@ -90,17 +115,20 @@ private void preparePgSourceTable() throws Exception { rows[i][0] = String.valueOf(i + 1); rows[i][1] = "text" + (i + 1); } - dataTable = new Table("dataTable", SOURCE_TABLE_FIELDS); + Table dataTable = new Table("dataTable", SOURCE_TABLE_FIELDS); dataTable.addRows(rows); gpdb.insertData(dataTable, postgresSourceTable); } - private void prepareHdfsFiles() { + private void prepareHdfsFiles(int filesCount, String path) { // Create 6 files to receive 6 fragments - IntStream.rangeClosed(0, 5) + IntStream.rangeClosed(1, filesCount) .forEach(i -> { + String[][] rows = {{String.valueOf(i), "text" + i}}; + Table hdfsDataTable = new Table("hdfsDataTable", SOURCE_TABLE_FIELDS); + hdfsDataTable.addRows(rows); try { - hdfs.writeTableToFile(hdfsPath + "file-" + i, dataTable, DELIMITER); + hdfs.writeTableToFile(path + "file-" + i, hdfsDataTable, DELIMITER); } catch (Exception e) { throw new RuntimeException(e); } @@ -108,38 +136,58 @@ private void prepareHdfsFiles() { } private void createGpdbReadableJdbcTable() throws Exception { - ExternalTable gpdbReadablePgTable = TableFactory.getPxfJdbcReadablePartitionedTable( - JDBC_EXT_TABLE_NAME, - SOURCE_TABLE_FIELDS, - postgresSourceTable.getName(), - 0, - "1:20", - "5", - EnumPartitionType.INT, - "default", - null); + ExternalTable gpdbReadablePgTable = createJdbcExtPartitionTable(JDBC_EXT_TABLE_NAME, null); gpdb.createTableAndVerify(gpdbReadablePgTable); } private void createGpdbReadableJdbcTableWithLimit() throws Exception { - ExternalTable gpdbReadablePgTable = TableFactory.getPxfJdbcReadablePartitionedTable( - JDBC_LIMIT_EXT_TABLE_NAME, + String customProperties = "ACTIVE_SEGMENT_COUNT=2"; + ExternalTable gpdbReadablePgTable = createJdbcExtPartitionTable(JDBC_LIMIT_EXT_TABLE_NAME, customProperties); + gpdb.createTableAndVerify(gpdbReadablePgTable); + } + + private void createGpdbReadableJdbcTableWithActiveSegmentDistribution() throws Exception { + String customProperties = "FRAGMENT_DISTRIBUTION_POLICY=active-segment&ACTIVE_SEGMENT_COUNT=2"; + ExternalTable gpdbReadablePgTable = createJdbcExtPartitionTable(JDBC_ACTIVE_SEGMENT_EXT_TABLE_NAME, customProperties); + gpdb.createTableAndVerify(gpdbReadablePgTable); + } + + private void createGpdbReadableJdbcTableWithFailedActiveSegmentDistribution() throws Exception { + String customProperties = "FRAGMENT_DISTRIBUTION_POLICY=active-segment"; + ExternalTable gpdbReadablePgTable = createJdbcExtPartitionTable(JDBC_ACTIVE_SEGMENT_FAILED_EXT_TABLE_NAME, customProperties); + gpdb.createTableAndVerify(gpdbReadablePgTable); + } + + private void createGpdbReadableJdbcTableWithRandomDistribution() throws Exception { + String customProperties = "FRAGMENT_DISTRIBUTION_POLICY=random"; + ExternalTable gpdbReadablePgTable = createJdbcExtPartitionTable(JDBC_RANDOM_EXT_TABLE_NAME, customProperties); + gpdb.createTableAndVerify(gpdbReadablePgTable); + } + + private void createGpdbReadableJdbcTableWithImprovedRoundRobinDistribution() throws Exception { + String customProperties = "FRAGMENT_DISTRIBUTION_POLICY=improved-round-robin"; + ExternalTable gpdbReadablePgTable = createJdbcExtPartitionTable(JDBC_IMPROVED_ROUND_ROBIN_EXT_TABLE_NAME, customProperties); + gpdb.createTableAndVerify(gpdbReadablePgTable); + } + + private ExternalTable createJdbcExtPartitionTable(String tableName, String customParameters) { + return TableFactory.getPxfJdbcReadablePartitionedTable( + tableName, SOURCE_TABLE_FIELDS, postgresSourceTable.getName(), 0, "1:20", - "5", + "4", EnumPartitionType.INT, "default", - "ACTIVE_SEGMENT_COUNT=2"); - gpdb.createTableAndVerify(gpdbReadablePgTable); + customParameters); } private void createGpdbReadableHdfsTable() throws Exception { ReadableExternalTable gpdbReadableHdfsTable = TableFactory.getPxfReadableCSVTable( HDFS_EXT_TABLE_NAME, SOURCE_TABLE_FIELDS, - hdfsPath, + hdfsPathWith6Files, DELIMITER); gpdb.createTableAndVerify(gpdbReadableHdfsTable); } @@ -148,89 +196,396 @@ private void createGpdbReadableHdfsTableWithLimit() throws Exception { ReadableExternalTable gpdbReadableHdfsTable = TableFactory.getPxfReadableCSVTable( HDFS_LIMIT_EXT_TABLE_NAME, SOURCE_TABLE_FIELDS, - hdfsPath, + hdfsPathWith6Files, DELIMITER); gpdbReadableHdfsTable.setUserParameters(new String[]{"ACTIVE_SEGMENT_COUNT=2"}); gpdb.createTableAndVerify(gpdbReadableHdfsTable); } - private void createGpdbReadableHdfsTableWithSegLimit() throws Exception { + private void createGpdbReadableHdfsTableWith4ActiveSegmentDistribution() throws Exception { ReadableExternalTable gpdbReadableHdfsTable = TableFactory.getPxfReadableCSVTable( - HDFS_SEG_LIMIT_EXT_TABLE_NAME, + HDFS_4_ACTIVE_SEGMENT_EXT_TABLE_NAME, SOURCE_TABLE_FIELDS, - hdfsPath, + hdfsPathWith6Files, DELIMITER); - gpdbReadableHdfsTable.setUserParameters(new String[]{"ACTIVE_SEGMENT_COUNT=4"}); + gpdbReadableHdfsTable.setUserParameters(new String[]{"FRAGMENT_DISTRIBUTION_POLICY=active-segment", "ACTIVE_SEGMENT_COUNT=4"}); gpdb.createTableAndVerify(gpdbReadableHdfsTable); } - @Test(groups = {"arenadata"}, description = "Check JDBC fragments distribution across segments. Run on all segments.") - public void testFragmentDistributionForJdbc() throws Exception { + private void createGpdbReadableHdfsTableWith2ActiveSegmentDistribution() throws Exception { + ReadableExternalTable gpdbReadableHdfsTable = TableFactory.getPxfReadableCSVTable( + HDFS_2_ACTIVE_SEGMENT_EXT_TABLE_NAME, + SOURCE_TABLE_FIELDS, + hdfsPathWith8Files, + DELIMITER); + gpdbReadableHdfsTable.setUserParameters(new String[]{"FRAGMENT_DISTRIBUTION_POLICY=active-segment", "ACTIVE_SEGMENT_COUNT=2"}); + gpdb.createTableAndVerify(gpdbReadableHdfsTable); + } + + private void createGpdbReadableHdfsTableWithRandomDistribution() throws Exception { + ReadableExternalTable gpdbReadableHdfsTable = TableFactory.getPxfReadableCSVTable( + HDFS_RANDOM_EXT_TABLE_NAME, + SOURCE_TABLE_FIELDS, + hdfsPathWith8Files, + DELIMITER); + gpdbReadableHdfsTable.setUserParameters(new String[]{"FRAGMENT_DISTRIBUTION_POLICY=random"}); + gpdb.createTableAndVerify(gpdbReadableHdfsTable); + } + + private void createGpdbReadableHdfsTableWithImprovedRoundRobinDistribution(String tableName, String sourcePath) throws Exception { + ReadableExternalTable gpdbReadableHdfsTable = TableFactory.getPxfReadableCSVTable( + tableName, + SOURCE_TABLE_FIELDS, + sourcePath, + DELIMITER); + gpdbReadableHdfsTable.setUserParameters(new String[]{"FRAGMENT_DISTRIBUTION_POLICY=improved-round-robin"}); + gpdb.createTableAndVerify(gpdbReadableHdfsTable); + } + + /** + * Check default fragments distribution policy with JDBC profile. The default policy is 'round-robin'. + * Parameters: not define + * We don't know exactly how many fragments will get each logical segments, but the total fragments has to be 8. + * checkPolicyResult has to have 3 lines per node (1 per each logical segment on the segment host); + * checkIdleSegments has to be 0 as each segment will get at least 1 fragment; + * fragmentsCount has to be 8 as a summary from all segments. + */ + @Test(groups = {"arenadata"}, description = "Check JDBC fragments distribution across segments with default policy (round-robin)") + public void fragmentDistributionForJdbcWithDefaultPolicyTest() throws Exception { cleanLogs(); runSqlTest("arenadata/fragment-distribution/jdbc"); + int fragmentsCount = 0; for (Node pxfNode : pxfNodes) { cluster.copyFromRemoteMachine(pxfNode.getUserName(), pxfNode.getPassword(), pxfNode.getHost(), pxfLogFile, PXF_TEMP_LOG_PATH); copyLogs(getMethodName(), pxfNode.getHost()); - String result = getCmdResult(cluster, PXF_LOG_JDBC_GREP_COMMAND); - assertEquals("3", result); + String checkPolicyResult = getCmdResult(cluster, String.format(PXF_LOG_CHECK_POLICY_TEMPLATE, "round-robin")); + assertEquals("3", checkPolicyResult); + String checkIdleSegments = getCmdResult(cluster, PXF_LOG_GET_IDLE_SEGMENTS); + assertEquals("0", checkIdleSegments); + fragmentsCount += Integer.parseInt(getCmdResult(cluster, PXF_LOG_GET_FRAGMENTS_COUNT)); cluster.deleteFileFromNodes(PXF_TEMP_LOG_FILE, false); } + assertEquals(8, fragmentsCount); } + /** + * Check active-segment distribution policy with JDBC profile (backward compatability test as we don't use + * FRAGMENT_DISTRIBUTION_POLICY parameter here. But if the parameter ACTIVE_SEGMENT_COUNT is present we apply active-segment policy). + * Parameters: ACTIVE_SEGMENT_COUNT=2 + * checkPolicyResult has to have 3 lines per node (1 per each logical segment on the segment host) + * checkIdleSegments has to be 2 per each segment host as we have only 1 active segment per node and the other 2 segments will not get fragments; + * fragmentsCount has to be 4 for each segment host as we have 8 fragments total, + * and they will be split between 2 segment hosts evenly. + */ @Test(groups = {"arenadata"}, description = "Check JDBC fragments distribution across segments. Run on limit segments") - public void testFragmentDistributionForJdbcWithLimit() throws Exception { + public void fragmentDistributionForJdbcWithLimitTest() throws Exception { cleanLogs(); runSqlTest("arenadata/fragment-distribution/jdbc-limit"); for (Node pxfNode : pxfNodes) { cluster.copyFromRemoteMachine(pxfNode.getUserName(), pxfNode.getPassword(), pxfNode.getHost(), pxfLogFile, PXF_TEMP_LOG_PATH); copyLogs(getMethodName(), pxfNode.getHost()); - String result = getCmdResult(cluster, PXF_LOG_JDBC_GREP_COMMAND_LIMIT); - assertEquals("2", result); + String checkPolicyResult = getCmdResult(cluster, String.format(PXF_LOG_CHECK_POLICY_TEMPLATE, "active-segment")); + assertEquals("3", checkPolicyResult); + String checkIdleSegments = getCmdResult(cluster, PXF_LOG_GET_IDLE_SEGMENTS); + assertEquals("2", checkIdleSegments); + String fragmentsCount = getCmdResult(cluster, PXF_LOG_GET_FRAGMENTS_COUNT); + assertEquals("4", fragmentsCount); + cluster.deleteFileFromNodes(PXF_TEMP_LOG_FILE, false); + } + } + + /** + * Check active-segment distribution policy with JDBC profile. + * Parameters: FRAGMENT_DISTRIBUTION_POLICY=active-segment, ACTIVE_SEGMENT_COUNT=2 + * checkPolicyResult has to have 3 lines per node (1 per each logical segment on the segment host) + * checkIdleSegments has to be 2 per each segment host as we have only 1 active segment per node and the other 2 segments will not get fragments; + * fragmentsCount has to be 4 for each segment host as we have 8 fragments total, + * and they will be split between 2 segment hosts evenly. + */ + @Test(groups = {"arenadata"}, description = "Check JDBC fragments distribution across segments with active-segment distribution policy") + public void fragmentDistributionForJdbcWithActiveSegmentPolicyTest() throws Exception { + cleanLogs(); + runSqlTest("arenadata/fragment-distribution/jdbc-as-policy"); + for (Node pxfNode : pxfNodes) { + cluster.copyFromRemoteMachine(pxfNode.getUserName(), pxfNode.getPassword(), pxfNode.getHost(), pxfLogFile, PXF_TEMP_LOG_PATH); + copyLogs(getMethodName(), pxfNode.getHost()); + String checkPolicyResult = getCmdResult(cluster, String.format(PXF_LOG_CHECK_POLICY_TEMPLATE, "active-segment")); + assertEquals("3", checkPolicyResult); + String checkIdleSegments = getCmdResult(cluster, PXF_LOG_GET_IDLE_SEGMENTS); + assertEquals("2", checkIdleSegments); + String fragmentsCount = getCmdResult(cluster, PXF_LOG_GET_FRAGMENTS_COUNT); + assertEquals("4", fragmentsCount); + cluster.deleteFileFromNodes(PXF_TEMP_LOG_FILE, false); + } + } + + /** + * Check active-segment distribution policy with JDBC profile without ACTIVE_SEGMENT_COUNT parameter. + * Parameters: FRAGMENT_DISTRIBUTION_POLICY=active-segment, ACTIVE_SEGMENT_COUNT is not present. + * The query will fail. + */ + @Test(groups = {"arenadata"}, description = "Check JDBC fragments distribution across segments with failed active-segment distribution policy") + public void fragmentDistributionForJdbcWithFailedActiveSegmentPolicyTest() throws Exception { + runSqlTest("arenadata/fragment-distribution/jdbc-failed-as-policy"); + } + + /** + * Check random fragments distribution policy with JDBC profile. + * Parameters: FRAGMENT_DISTRIBUTION_POLICY=random + * We don't know exactly how many fragments will get each logical segments, but the total fragments has to be 8. + * checkPolicyResult has to have 3 lines per node (1 per each logical segment on the segment host) + * fragmentsCount has to be 8 as a summary from all segments. + */ + @Test(groups = {"arenadata"}, description = "Check JDBC fragments distribution across segments with random distribution policy") + public void fragmentDistributionForJdbcWithRandomPolicyTest() throws Exception { + cleanLogs(); + runSqlTest("arenadata/fragment-distribution/jdbc-random-policy"); + int fragmentsCount = 0; + for (Node pxfNode : pxfNodes) { + cluster.copyFromRemoteMachine(pxfNode.getUserName(), pxfNode.getPassword(), pxfNode.getHost(), pxfLogFile, PXF_TEMP_LOG_PATH); + copyLogs(getMethodName(), pxfNode.getHost()); + String checkPolicyResult = getCmdResult(cluster, String.format(PXF_LOG_CHECK_POLICY_TEMPLATE, "random")); + assertEquals("3", checkPolicyResult); + fragmentsCount += Integer.parseInt(getCmdResult(cluster, PXF_LOG_GET_FRAGMENTS_COUNT)); + cluster.deleteFileFromNodes(PXF_TEMP_LOG_FILE, false); + } + assertEquals(8, fragmentsCount); + } + + /** + * Check improved-round-robin fragments distribution policy with JDBC profile. + * Parameters: FRAGMENT_DISTRIBUTION_POLICY=improved-round-robin + * checkPolicyResult has to have 3 lines per node (1 per each logical segment on the segment host) + * checkIdleSegments has to be 0 as each segment will get at least 1 fragment; + * fragmentsCount has to be 4 for each segment host as we have 8 fragments total. The policy will + * distribute 6 fragments between all segments and the rest 2 fragments will be distributed between segment hosts evenly + */ + @Test(groups = {"arenadata"}, description = "Check JDBC fragments distribution across segments with improved-round-robin distribution policy") + public void fragmentDistributionForJdbcWithImprovedRoundRobinPolicyTest() throws Exception { + cleanLogs(); + runSqlTest("arenadata/fragment-distribution/jdbc-irr-policy"); + for (Node pxfNode : pxfNodes) { + cluster.copyFromRemoteMachine(pxfNode.getUserName(), pxfNode.getPassword(), pxfNode.getHost(), pxfLogFile, PXF_TEMP_LOG_PATH); + copyLogs(getMethodName(), pxfNode.getHost()); + String checkPolicyResult = getCmdResult(cluster, String.format(PXF_LOG_CHECK_POLICY_TEMPLATE, "improved-round-robin")); + assertEquals("3", checkPolicyResult); + String checkIdleSegments = getCmdResult(cluster, PXF_LOG_GET_IDLE_SEGMENTS); + assertEquals("0", checkIdleSegments); + String fragmentCount = getCmdResult(cluster, PXF_LOG_GET_FRAGMENTS_COUNT); + assertEquals("4", fragmentCount); cluster.deleteFileFromNodes(PXF_TEMP_LOG_FILE, false); } } - @Test(groups = {"arenadata"}, description = "Check HDFS fragments distribution across segments. Run on all segments.") - public void testFragmentDistributionForHdfs() throws Exception { + /** + * Check default fragments distribution policy with HDFS profile. The default policy is 'round-robin'. + * Parameters: not define + * We don't know exactly how many fragments will get each logical segments, but the total fragments has to be 6. + * checkPolicyResult has to have 3 lines per node (1 per each logical segment on the segment host) + * checkIdleSegments has to be 0 as each segment will get at least 1 fragment; + * fragmentsCount has to be 6 as a summary from all segments. + */ + @Test(groups = {"arenadata"}, description = "Check HDFS fragments distribution across segments. Run on all segments") + public void fragmentDistributionForHdfsTest() throws Exception { cleanLogs(); runSqlTest("arenadata/fragment-distribution/hdfs"); + int fragmentsCount = 0; for (Node pxfNode : pxfNodes) { cluster.copyFromRemoteMachine(pxfNode.getUserName(), pxfNode.getPassword(), pxfNode.getHost(), pxfLogFile, PXF_TEMP_LOG_PATH); copyLogs(getMethodName(), pxfNode.getHost()); - String result = getCmdResult(cluster, PXF_LOG_HDFS_GREP_COMMAND); - assertEquals("3", result); + String checkPolicyResult = getCmdResult(cluster, String.format(PXF_LOG_CHECK_POLICY_TEMPLATE, "round-robin")); + assertEquals("3", checkPolicyResult); + String checkIdleSegments = getCmdResult(cluster, PXF_LOG_GET_IDLE_SEGMENTS); + assertEquals("0", checkIdleSegments); + fragmentsCount += Integer.parseInt(getCmdResult(cluster, PXF_LOG_GET_FRAGMENTS_COUNT)); cluster.deleteFileFromNodes(PXF_TEMP_LOG_FILE, false); } + assertEquals(6, fragmentsCount); } - @Test(groups = {"arenadata"}, description = "Check HDFS fragments distribution across segments. Run on limit segments.") - public void testFragmentDistributionForHdfsWithLimit() throws Exception { + /** + * Check active-segment distribution policy with HDFS profile (backward compatability test as we don't use + * FRAGMENT_DISTRIBUTION_POLICY parameter here. But if the parameter ACTIVE_SEGMENT_COUNT is present we apply active-segment policy). + * Parameters: ACTIVE_SEGMENT_COUNT=2 + * checkPolicyResult has to have 3 lines per node (1 per each logical segment on the segment host) + * checkIdleSegments has to be 2 per each segment host as we have only 1 active segment per node and the other 2 segments will not get fragments; + * fragmentsCount has to be 3 for each segment host as we have 6 fragments total, + * and they will be split between 2 segment hosts evenly. + */ + @Test(groups = {"arenadata"}, description = "Check HDFS fragments distribution across segments. Run on 2 segments") + public void fragmentDistributionForHdfsWithLimitTest() throws Exception { cleanLogs(); runSqlTest("arenadata/fragment-distribution/hdfs-limit"); for (Node pxfNode : pxfNodes) { cluster.copyFromRemoteMachine(pxfNode.getUserName(), pxfNode.getPassword(), pxfNode.getHost(), pxfLogFile, PXF_TEMP_LOG_PATH); copyLogs(getMethodName(), pxfNode.getHost()); - String result = getCmdResult(cluster, PXF_LOG_HDFS_COMMAND_LIMIT); - assertEquals("2", result); + String checkPolicyResult = getCmdResult(cluster, String.format(PXF_LOG_CHECK_POLICY_TEMPLATE, "active-segment")); + assertEquals("3", checkPolicyResult); + String checkIdleSegments = getCmdResult(cluster, PXF_LOG_GET_IDLE_SEGMENTS); + assertEquals("2", checkIdleSegments); + String fragmentsCount = getCmdResult(cluster, PXF_LOG_GET_FRAGMENTS_COUNT); + assertEquals("3", fragmentsCount); + cluster.deleteFileFromNodes(PXF_TEMP_LOG_FILE, false); + } + } + + /** + * Check active-segment distribution policy with HDFS profile. + * Parameters: FRAGMENT_DISTRIBUTION_POLICY=active-segment, ACTIVE_SEGMENT_COUNT=4 + * checkPolicyResult has to have 3 lines per node (1 per each logical segment on the segment host) + * idleSegmentsCount has to be 2 for all cluster as we have 4 active segments and the other 2 segments will not get fragments; + * activeSegmentsCount has to be 4; + * fragmentsCount has to be 6 for all segments. We try to split them evenly between segment hosts but there is no guarantee. + * That is why we check the summary from all segments hosts. + */ + @Test(groups = {"arenadata"}, description = "Check HDFS fragments distribution across segments with 4 active-segment distribution policy") + public void fragmentDistributionForHdfsWith4ActiveSegmentPolicyTest() throws Exception { + cleanLogs(); + runSqlTest("arenadata/fragment-distribution/hdfs-4-as-policy"); + int fragmentsCount = 0; + int activeSegmentsCount = 0; + int idleSegmentsCount = 0; + for (Node pxfNode : pxfNodes) { + cluster.copyFromRemoteMachine(pxfNode.getUserName(), pxfNode.getPassword(), pxfNode.getHost(), pxfLogFile, PXF_TEMP_LOG_PATH); + copyLogs(getMethodName(), pxfNode.getHost()); + String checkPolicyResult = getCmdResult(cluster, String.format(PXF_LOG_CHECK_POLICY_TEMPLATE, "active-segment")); + assertEquals("3", checkPolicyResult); + fragmentsCount += Integer.parseInt(getCmdResult(cluster, PXF_LOG_GET_FRAGMENTS_COUNT)); + activeSegmentsCount += Integer.parseInt(getCmdResult(cluster, PXF_LOG_GET_ACTIVE_SEGMENTS)); + idleSegmentsCount += Integer.parseInt(getCmdResult(cluster, PXF_LOG_GET_IDLE_SEGMENTS)); + cluster.deleteFileFromNodes(PXF_TEMP_LOG_FILE, false); + } + assertEquals(4, activeSegmentsCount); + assertEquals(2, idleSegmentsCount); + assertEquals(6, fragmentsCount); + } + + /** + * Check active-segment distribution policy with HDFS profile. + * Parameters: FRAGMENT_DISTRIBUTION_POLICY=active-segment, ACTIVE_SEGMENT_COUNT=2 + * checkPolicyResult has to have 3 lines per node (1 per each logical segment on the segment host) + * checkIdleSegments has to be 2 per each segment host as we have only 1 active segment per node and the other 2 segments will not get fragments; + * fragmentsCount has to be 4 for each segment host as we have 8 fragments total, + * and they will be split between 2 segment hosts evenly. + */ + @Test(groups = {"arenadata"}, description = "Check HDFS fragments distribution across segments with 2 active-segment distribution policy") + public void fragmentDistributionForHdfsWith2ActiveSegmentPolicyTest() throws Exception { + cleanLogs(); + runSqlTest("arenadata/fragment-distribution/hdfs-2-as-policy"); + for (Node pxfNode : pxfNodes) { + cluster.copyFromRemoteMachine(pxfNode.getUserName(), pxfNode.getPassword(), pxfNode.getHost(), pxfLogFile, PXF_TEMP_LOG_PATH); + copyLogs(getMethodName(), pxfNode.getHost()); + String checkPolicyResult = getCmdResult(cluster, String.format(PXF_LOG_CHECK_POLICY_TEMPLATE, "active-segment")); + assertEquals("3", checkPolicyResult); + String checkIdleSegments = getCmdResult(cluster, PXF_LOG_GET_IDLE_SEGMENTS); + assertEquals("2", checkIdleSegments); + String fragmentsCount = getCmdResult(cluster, PXF_LOG_GET_FRAGMENTS_COUNT); + assertEquals("4", fragmentsCount); + cluster.deleteFileFromNodes(PXF_TEMP_LOG_FILE, false); + } + } + + /** + * Check random fragments distribution policy with HDFS profile. + * Parameters: FRAGMENT_DISTRIBUTION_POLICY=random + * We don't know exactly how many fragments will get each logical segments, but the total fragments has to be 8. + * checkPolicyResult has to have 3 lines per node (1 per each logical segment on the segment host) + * fragmentsCount has to be 8 as a summary from all segments. + */ + @Test(groups = {"arenadata"}, description = "Check HDFS fragments distribution across segments with random distribution policy") + public void fragmentDistributionForHdfsWithRandomPolicyTest() throws Exception { + cleanLogs(); + runSqlTest("arenadata/fragment-distribution/hdfs-random-policy"); + int fragmentCount = 0; + for (Node pxfNode : pxfNodes) { + cluster.copyFromRemoteMachine(pxfNode.getUserName(), pxfNode.getPassword(), pxfNode.getHost(), pxfLogFile, PXF_TEMP_LOG_PATH); + copyLogs(getMethodName(), pxfNode.getHost()); + String checkPolicyResult = getCmdResult(cluster, String.format(PXF_LOG_CHECK_POLICY_TEMPLATE, "random")); + assertEquals("3", checkPolicyResult); + fragmentCount += Integer.parseInt(getCmdResult(cluster, PXF_LOG_GET_FRAGMENTS_COUNT)); + cluster.deleteFileFromNodes(PXF_TEMP_LOG_FILE, false); + } + assertEquals(8, fragmentCount); + } + + /** + * Check improved-round-robin fragments distribution policy with HDFS profile when fragment count is more + * than segment count but not equal to an even number of segments. + * Parameters: FRAGMENT_DISTRIBUTION_POLICY=improved-round-robin + * checkPolicyResult has to have 3 lines per node (1 per each logical segment on the segment host) + * checkIdleSegments has to be 0 as each segment will get at least 1 fragment; + * fragmentsCount has to be 4 for each segment host as we have 8 fragments total. The policy will + * distribute 6 fragments between all segments and the rest 2 fragments will be distributed between segment hosts evenly + */ + @Test(groups = {"arenadata"}, description = "Check HDFS fragments distribution across segments with improved-round-robin " + + "distribution policy when fragment count is more than segment count but not equal to an even number of segments") + public void fragmentDistributionForHdfsWithIrrPolicyWithMoreFragmentsTest() throws Exception { + cleanLogs(); + runSqlTest("arenadata/fragment-distribution/hdfs-irr-more-policy"); + for (Node pxfNode : pxfNodes) { + cluster.copyFromRemoteMachine(pxfNode.getUserName(), pxfNode.getPassword(), pxfNode.getHost(), pxfLogFile, PXF_TEMP_LOG_PATH); + copyLogs(getMethodName(), pxfNode.getHost()); + String checkPolicyResult = getCmdResult(cluster, String.format(PXF_LOG_CHECK_POLICY_TEMPLATE, "improved-round-robin")); + assertEquals("3", checkPolicyResult); + String checkIdleSegments = getCmdResult(cluster, PXF_LOG_GET_IDLE_SEGMENTS); + assertEquals("0", checkIdleSegments); + String fragmentsCount = getCmdResult(cluster, PXF_LOG_GET_FRAGMENTS_COUNT); + assertEquals("4", fragmentsCount); + cluster.deleteFileFromNodes(PXF_TEMP_LOG_FILE, false); + } + } + + /** + * Check improved-round-robin fragments distribution policy with HDFS profile when fragment count is equal to an even number of segments. + * Parameters: FRAGMENT_DISTRIBUTION_POLICY=improved-round-robin + * checkPolicyResult has to have 3 lines per node (1 per each logical segment on the segment host) + * checkIdleSegments has to be 0 as each segment will get at least 1 fragment; + * fragmentsCount has to be 6 for each segment host as we have 12 fragments total. The policy will + * distribute 2 fragments for each logical segment. + */ + @Test(groups = {"arenadata"}, description = "Check HDFS fragments distribution across segments with improved-round-robin " + + "distribution policy when fragment count is equal to an even number of segments") + public void fragmentDistributionForHdfsWithIrrPolicyWithEvenFragmentsTest() throws Exception { + cleanLogs(); + runSqlTest("arenadata/fragment-distribution/hdfs-irr-even-policy"); + for (Node pxfNode : pxfNodes) { + cluster.copyFromRemoteMachine(pxfNode.getUserName(), pxfNode.getPassword(), pxfNode.getHost(), pxfLogFile, PXF_TEMP_LOG_PATH); + copyLogs(getMethodName(), pxfNode.getHost()); + String checkPolicyResult = getCmdResult(cluster, String.format(PXF_LOG_CHECK_POLICY_TEMPLATE, "improved-round-robin")); + assertEquals("3", checkPolicyResult); + String checkIdleSegments = getCmdResult(cluster, PXF_LOG_GET_IDLE_SEGMENTS); + assertEquals("0", checkIdleSegments); + String fragmentsCount = getCmdResult(cluster, PXF_LOG_GET_FRAGMENTS_COUNT); + assertEquals("6", fragmentsCount); cluster.deleteFileFromNodes(PXF_TEMP_LOG_FILE, false); } } - @Test(groups = {"arenadata"}, description = "Check HDFS fragments distribution across segments. Run on 4 segments.") - public void testFragmentDistributionForHdfsWithSegLimit() throws Exception { + /** + * Check improved-round-robin fragments distribution policy with HDFS profile when fragment count is less + * than segment count. + * Parameters: FRAGMENT_DISTRIBUTION_POLICY=improved-round-robin + * checkPolicyResult has to have 3 lines per node (1 per each logical segment on the segment host) + * checkIdleSegments has to be 2 as we have only 2 fragments and they should be on a different segment hosts; + * fragmentsCount has to be 1 for each segment host as we have 2 fragments total. The policy will + * split these 2 fragments between 2 segment hosts evenly. + */ + @Test(groups = {"arenadata"}, description = "Check HDFS fragments distribution across segments with improved-round-robin " + + "distribution policy when fragment count is less than segment count") + public void fragmentDistributionForHdfsWithIrrPolicyWithLessFragmentsTest() throws Exception { cleanLogs(); - runSqlTest("arenadata/fragment-distribution/hdfs-seg-limit"); - int activeSegmentCount = 0; - int idleSegmentCount = 0; + runSqlTest("arenadata/fragment-distribution/hdfs-irr-less-policy"); for (Node pxfNode : pxfNodes) { cluster.copyFromRemoteMachine(pxfNode.getUserName(), pxfNode.getPassword(), pxfNode.getHost(), pxfLogFile, PXF_TEMP_LOG_PATH); copyLogs(getMethodName(), pxfNode.getHost()); - activeSegmentCount += Integer.parseInt(getCmdResult(cluster, PXF_LOG_HDFS_SEG_COMMAND_LIMIT)); - idleSegmentCount += Integer.parseInt(getCmdResult(cluster, PXF_LOG_HDFS_COMMAND_LIMIT)); + String checkPolicyResult = getCmdResult(cluster, String.format(PXF_LOG_CHECK_POLICY_TEMPLATE, "improved-round-robin")); + assertEquals("3", checkPolicyResult); + String checkIdleSegments = getCmdResult(cluster, PXF_LOG_GET_IDLE_SEGMENTS); + assertEquals("2", checkIdleSegments); + String fragmentsCount = getCmdResult(cluster, PXF_LOG_GET_FRAGMENTS_COUNT); + assertEquals("1", fragmentsCount); cluster.deleteFileFromNodes(PXF_TEMP_LOG_FILE, false); } - assertEquals(4, activeSegmentCount); - assertEquals(2, idleSegmentCount); } private void changeLogLevel(String level) throws Exception { @@ -245,7 +600,7 @@ private void copyLogs(String methodName, String host) throws Exception { cluster.runCommand("cp " + PXF_TEMP_LOG_FILE + " " + PXF_TEMP_LOG_FILE + "-" + methodName + "-" + host); } - private String getMethodName() throws Exception { + private String getMethodName() { return Thread.currentThread() .getStackTrace()[2] .getMethodName(); diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/FragmenterService.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/FragmenterService.java index 3d7b731146..76762c8a8d 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/FragmenterService.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/FragmenterService.java @@ -20,24 +20,21 @@ */ import com.google.common.util.concurrent.UncheckedExecutionException; -import org.greenplum.pxf.api.error.PxfRuntimeException; +import lombok.extern.slf4j.Slf4j; import org.greenplum.pxf.api.model.Fragment; import org.greenplum.pxf.api.model.Fragmenter; import org.greenplum.pxf.api.model.RequestContext; import org.greenplum.pxf.api.utilities.FragmenterCacheFactory; +import org.greenplum.pxf.service.fragment.FragmentStrategyProvider; import org.greenplum.pxf.service.utilities.AnalyzeUtils; import org.greenplum.pxf.service.utilities.BasePluginFactory; import org.greenplum.pxf.service.utilities.GSSFailureHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.concurrent.ExecutionException; /** @@ -46,80 +43,51 @@ * request the list of fragments will populate it, while the rest of the * segments will wait until the list of fragments is populated. */ +@Slf4j @Component public class FragmenterService { - - private static final Logger LOG = LoggerFactory.getLogger(FragmenterService.class); - public static final String ACTIVE_SEGMENT_COUNT_OPTION = "ACTIVE_SEGMENT_COUNT"; private final BasePluginFactory pluginFactory; private final FragmenterCacheFactory fragmenterCacheFactory; private final GSSFailureHandler failureHandler; + private final FragmentStrategyProvider strategyProvider; public FragmenterService(FragmenterCacheFactory fragmenterCacheFactory, BasePluginFactory pluginFactory, - GSSFailureHandler failureHandler) { + GSSFailureHandler failureHandler, + FragmentStrategyProvider strategyProvider) { this.fragmenterCacheFactory = fragmenterCacheFactory; this.pluginFactory = pluginFactory; this.failureHandler = failureHandler; + this.strategyProvider = strategyProvider; } public List getFragmentsForSegment(RequestContext context) throws IOException { - - LOG.trace("Received FRAGMENTER call"); - - // Get active segment count of logical segments which will take part in the request - int activeSegmentCount = getActiveSegmentCount(context); - + log.trace("Received FRAGMENTER call"); Instant startTime = Instant.now(); final String path = context.getDataSource(); - if (LOG.isDebugEnabled()) { - LOG.debug("fragmentCache size={}, stats={}", + if (log.isDebugEnabled()) { + log.debug("fragmentCache size={}, stats={}", fragmenterCacheFactory.getCache().size(), fragmenterCacheFactory.getCache().stats()); + log.debug("FRAGMENTER started for path \"{}\"", path); } - LOG.debug("FRAGMENTER started for path \"{}\"", path); - List fragments = getFragmentsFromCache(context, startTime); - List filteredFragments = filterFragments(fragments, - context.getSegmentId(), - activeSegmentCount, - context.getTotalSegments(), - context.getGpSessionId(), - context.getGpCommandCount()); + List filteredFragments = strategyProvider.getStrategy(context) + .filterFragments(fragments, context); - if (LOG.isDebugEnabled()) { + if (log.isDebugEnabled()) { int numberOfFragments = filteredFragments.size(); long elapsedMillis = Duration.between(startTime, Instant.now()).toMillis(); - - LOG.debug("Returning {}/{} fragment{} for path {} in {} ms [profile={}, predicate {}available]", + log.debug("Returning {}/{} fragment{} for path {} in {} ms [profile={}, predicate {}available]", numberOfFragments, fragments.size(), numberOfFragments == 1 ? "" : "s", context.getDataSource(), elapsedMillis, context.getProfile(), context.hasFilter() ? "" : "un"); } - return filteredFragments; } - private int getActiveSegmentCount(RequestContext context) { - try { - int activeSegmentCount = Optional.ofNullable(context.getOptions().get(ACTIVE_SEGMENT_COUNT_OPTION)) - .map(Integer::valueOf) - .orElse(context.getTotalSegments()); - if (activeSegmentCount < 1 || activeSegmentCount > context.getTotalSegments()) { - String errorMessage = String.format("The parameter '%s' has the value %d. The value of this parameter " + - "cannot be less than 1 or cannot be greater than the total amount of segments [%d segment(s)]", - ACTIVE_SEGMENT_COUNT_OPTION, activeSegmentCount, context.getTotalSegments()); - throw new PxfRuntimeException(errorMessage); - } - return activeSegmentCount; - } catch (Exception e) { - throw new PxfRuntimeException(String.format("Failed to get active segment count: %s. Check the value of the parameter '%s'", - e.getMessage(), ACTIVE_SEGMENT_COUNT_OPTION), e); - } - } - /** * Returns the list of fragments from the fragmenter cache. If the cache is * empty, it populates the cache with the list of fragments. When @@ -137,7 +105,7 @@ private List getFragmentsFromCache(RequestContext context, Instant sta try { return fragmenterCacheFactory.getCache() .get(fragmenterCacheKey, () -> { - LOG.debug("Caching fragments from segmentId={} with key={}", + log.debug("Caching fragments from segmentId={} with key={}", context.getSegmentId(), fragmenterCacheKey); List fragmentList = failureHandler.execute(context.getConfiguration(), @@ -150,7 +118,7 @@ private List getFragmentsFromCache(RequestContext context, Instant sta int numberOfFragments = fragmentList.size(); long elapsedMillis = Duration.between(startTime, Instant.now()).toMillis(); String fragmenterClassName = context.getFragmenter(); - LOG.info("Returning {} fragment{} in {} ms [user={}, table={}.{}, resource={}, fragmenter={}, profile={}, predicate {}available]", + log.info("Returning {} fragment{} in {} ms [user={}, table={}.{}, resource={}, fragmenter={}, profile={}, predicate {}available]", numberOfFragments, numberOfFragments == 1 ? "" : "s", elapsedMillis, @@ -161,100 +129,15 @@ private List getFragmentsFromCache(RequestContext context, Instant sta fragmenterClassName.substring(fragmenterClassName.lastIndexOf(".") + 1), context.getProfile(), context.hasFilter() ? "" : "un"); - return fragmentList; }); } catch (UncheckedExecutionException | ExecutionException e) { - // Unwrap the error Exception exception = e.getCause() != null ? (Exception) e.getCause() : e; - if (exception instanceof IOException) + if (exception instanceof IOException) { throw (IOException) exception; - throw new IOException(exception); - } - } - - /** - * Filters the {@code fragments} for the given segment. To determine which - * segment S should process an element at a given index i, use a randomized - * MOD function - *

- * S = MOD(I + MOD(gp_session_id, N) + gp_command_count, N) - *

- * which ensures more fair work distribution for small lists of just a few - * elements across N segments global session ID and command count are used - * as a randomizer, as it is different for every query, while being the - * same across all segments for a given query. - * - * @param fragments the list of fragments - * @param segmentId the identifier for the segment processing the request - * @param totalSegments the total number of segments - * @param gpSessionId the Greenplum session ID - * @param gpCommandCount the command number for this Greenplum Session ID - * @return the filtered list of fragments for the given segment - */ - private List filterFragments(List fragments, - int segmentId, - int activeSegmentCount, - int totalSegments, - int gpSessionId, - int gpCommandCount) { - /* - We use a mod function inside the loop to distribute fragments across all N segments for processing - in a round-robin fashion, where each segment will be allocated to work on every N-th fragment. - - We use an artificially shifted index so that queries with low fragment count are not always executed by - the low-numbered segments. This helps to spread out the workload across the cluster to different PXF JVMs. - Using gpCommandCount will ensure that consecutive queries for a single transaction are also shifted. - */ - int shiftedIndex = gpSessionId % totalSegments + gpCommandCount; // index of fragment #0 to use for mod function - - if (totalSegments == activeSegmentCount) { - List filteredFragments = new ArrayList<>((int) Math.ceil((double) fragments.size() / totalSegments)); - for (Fragment fragment : fragments) { - if (segmentId == (shiftedIndex % totalSegments)) { - filteredFragments.add(fragment); - } - shiftedIndex++; } - return filteredFragments; - } else { - int index = 0; - List activeSegmentList = getActiveSegmentList(shiftedIndex, activeSegmentCount, totalSegments); - List filteredFragments = new ArrayList<>((int) Math.ceil((double) fragments.size() / activeSegmentCount)); - for (Fragment fragment : fragments) { - if (segmentId == activeSegmentList.get(index % activeSegmentList.size())) { - filteredFragments.add(fragment); - } - index++; - } - return filteredFragments; - } - } - - private static List getActiveSegmentList(int shiftedIndex, int activeSegmentCount, int totalSegments) { - // We will try to distribute fragments evenly between segments hosts, - // but we don't know exactly how many logical segments per host. - // Also, we don't know how many segment hosts in the cluster. - List activeSegmentList = new ArrayList<>(); - while (activeSegmentCount > 0) { - int step = (int) Math.ceil((float) totalSegments / activeSegmentCount); - // How many segments might be evenly distributed with the step - int currentCount = totalSegments / step; - for (int i = 0; i < currentCount; i++) { - int id = shiftedIndex % totalSegments; - // We need to do shiftedIndex++ if the list has already contained the segment id - if (activeSegmentList.contains(id)) { - shiftedIndex++; - id = shiftedIndex % totalSegments; - } - activeSegmentList.add(id); - shiftedIndex += step; - } - // Get the rest of the segments and distribute them again if activeSegmentCount > 0 - activeSegmentCount = activeSegmentCount - currentCount; + throw new IOException(exception); } - LOG.debug("The fragments will be distributed between segments with segment id: {}", activeSegmentList); - return activeSegmentList; } /** @@ -298,7 +181,6 @@ private void updateFragmentIndex(List fragments) { int index = 0; String sourceName = null; for (Fragment fragment : fragments) { - String currentSourceName = fragment.getSourceName(); if (!currentSourceName.equals(sourceName)) { index = 0; diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/ActiveSegmentFragmentStrategy.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/ActiveSegmentFragmentStrategy.java new file mode 100644 index 0000000000..24e8712b90 --- /dev/null +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/ActiveSegmentFragmentStrategy.java @@ -0,0 +1,70 @@ +package org.greenplum.pxf.service.fragment; + +import lombok.extern.slf4j.Slf4j; +import org.greenplum.pxf.api.error.PxfRuntimeException; +import org.greenplum.pxf.api.model.Fragment; +import org.greenplum.pxf.api.model.RequestContext; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +@Slf4j +@Component +public class ActiveSegmentFragmentStrategy implements FragmentStrategy { + private static final FragmentDistributionPolicy DISTRIBUTION_POLICY = FragmentDistributionPolicy.ACTIVE_SEGMENT; + private static final String ACTIVE_SEGMENT_COUNT_OPTION = FragmentStrategyProvider.ACTIVE_SEGMENT_COUNT_OPTION; + + @Override + public List filterFragments(Collection fragments, RequestContext context) { + int shiftedIndex = context.getGpSessionId() % context.getTotalSegments() + context.getGpCommandCount(); + // We don't need all active segments if fragment size is less than active segment count + int activeSegmentCount = Math.min(getActiveSegmentCount(context), fragments.size()); + + int index = 0; + List activeSegmentList = FragmentUtils.getActiveSegmentList(shiftedIndex, activeSegmentCount, context.getTotalSegments()); + log.debug("The fragments will be distributed between segments with segment id: {}", activeSegmentList); + // There is no chance to get fragment by the current segment if its id is not in the List + if (!activeSegmentList.contains(context.getSegmentId())) { + return Collections.emptyList(); + } + List filteredFragments = new ArrayList<>((int) Math.ceil((double) fragments.size() / activeSegmentCount)); + for (Fragment fragment : fragments) { + if (context.getSegmentId() == activeSegmentList.get(index % activeSegmentList.size())) { + filteredFragments.add(fragment); + } + index++; + } + return filteredFragments; + } + + @Override + public FragmentDistributionPolicy getDistributionPolicy() { + return DISTRIBUTION_POLICY; + } + + private int getActiveSegmentCount(RequestContext context) { + try { + int activeSegmentCount = Optional.ofNullable(context.getOptions().get(ACTIVE_SEGMENT_COUNT_OPTION)) + .map(Integer::valueOf) + .orElseThrow(() -> new PxfRuntimeException( + "The parameter " + ACTIVE_SEGMENT_COUNT_OPTION + " is not define while the fragment " + + "distribution policy is " + DISTRIBUTION_POLICY.getName() + "." + + " Add the parameter " + ACTIVE_SEGMENT_COUNT_OPTION + " to the external table DDL") + ); + if (activeSegmentCount < 1 || activeSegmentCount > context.getTotalSegments()) { + String errorMessage = String.format("The parameter '%s' has the value %d. The value of this parameter " + + "cannot be less than 1 or cannot be greater than the total amount of segments [%d segment(s)]", + ACTIVE_SEGMENT_COUNT_OPTION, activeSegmentCount, context.getTotalSegments()); + throw new PxfRuntimeException(errorMessage); + } + return activeSegmentCount; + } catch (Exception e) { + throw new PxfRuntimeException(String.format("Failed to get active segment count: %s. Check the value of the parameter '%s'", + e.getMessage(), ACTIVE_SEGMENT_COUNT_OPTION), e); + } + } +} diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/FragmentDistributionPolicy.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/FragmentDistributionPolicy.java new file mode 100644 index 0000000000..49ee7e926b --- /dev/null +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/FragmentDistributionPolicy.java @@ -0,0 +1,29 @@ +package org.greenplum.pxf.service.fragment; + +import lombok.Getter; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +@Getter +public enum FragmentDistributionPolicy { + ROUND_ROBIN("round-robin"), + ACTIVE_SEGMENT("active-segment"), + IMPROVED_ROUND_ROBIN("improved-round-robin"), + RANDOM("random"); + + private final String name; + public static final Map POLICY_NAME_MAP = Arrays.stream(FragmentDistributionPolicy.values()) + .collect(Collectors.toMap(FragmentDistributionPolicy::getName, policy -> policy)); + + FragmentDistributionPolicy(String name) { + this.name = name; + } + + public static FragmentDistributionPolicy fromName(String name) { + return Optional.ofNullable(POLICY_NAME_MAP.get(name.toLowerCase())) + .orElseThrow(() -> new IllegalArgumentException("Cannot find corresponding fragment distribution policy with name " + name)); + } +} diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/FragmentStrategy.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/FragmentStrategy.java new file mode 100644 index 0000000000..2fd05dd700 --- /dev/null +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/FragmentStrategy.java @@ -0,0 +1,14 @@ +package org.greenplum.pxf.service.fragment; + +import org.greenplum.pxf.api.model.Fragment; +import org.greenplum.pxf.api.model.RequestContext; + +import java.util.Collection; +import java.util.List; + +public interface FragmentStrategy { + + List filterFragments(Collection fragments, RequestContext context); + + FragmentDistributionPolicy getDistributionPolicy(); +} diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/FragmentStrategyProvider.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/FragmentStrategyProvider.java new file mode 100644 index 0000000000..ed128fcd9f --- /dev/null +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/FragmentStrategyProvider.java @@ -0,0 +1,40 @@ +package org.greenplum.pxf.service.fragment; + +import lombok.extern.slf4j.Slf4j; +import org.greenplum.pxf.api.error.PxfRuntimeException; +import org.greenplum.pxf.api.model.RequestContext; +import org.springframework.stereotype.Component; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +@Slf4j +@Component +public class FragmentStrategyProvider { + public static final String FRAGMENT_DISTRIBUTION_POLICY_OPTION = "FRAGMENT_DISTRIBUTION_POLICY"; + public static final String ACTIVE_SEGMENT_COUNT_OPTION = "ACTIVE_SEGMENT_COUNT"; + private final Map strategyMap; + + public FragmentStrategyProvider(Collection strategies) { + this.strategyMap = new HashMap<>(); + strategies.forEach(strategy -> strategyMap.put(strategy.getDistributionPolicy(), strategy)); + } + + public FragmentStrategy getStrategy(RequestContext context) { + FragmentDistributionPolicy policy = getPolicyFromContext(context); + FragmentStrategy strategy = Optional.ofNullable(strategyMap.get(policy)) + .orElseThrow(() -> new PxfRuntimeException("The strategy for fragment distribution policy '" + policy.getName() + "' was not found")); + log.debug("The '{}' fragment distribution policy will be used", policy.getName()); + return strategy; + } + + private FragmentDistributionPolicy getPolicyFromContext(RequestContext context) { + return Optional.ofNullable(context.getOption(ACTIVE_SEGMENT_COUNT_OPTION)) + .map(str -> FragmentDistributionPolicy.ACTIVE_SEGMENT) // for backward compatability + .orElseGet(() -> Optional.ofNullable(context.getOption(FRAGMENT_DISTRIBUTION_POLICY_OPTION)) + .map(FragmentDistributionPolicy::fromName) + .orElse(FragmentDistributionPolicy.ROUND_ROBIN)); + } +} diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/FragmentUtils.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/FragmentUtils.java new file mode 100644 index 0000000000..657452a977 --- /dev/null +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/FragmentUtils.java @@ -0,0 +1,31 @@ +package org.greenplum.pxf.service.fragment; + +import java.util.ArrayList; +import java.util.List; + +public class FragmentUtils { + public static List getActiveSegmentList(int shiftedIndex, int activeSegmentCount, int totalSegments) { + // We will try to distribute fragments evenly between segments hosts, + // but we don't know exactly how many logical segments per host. + // Also, we don't know how many segment hosts in the cluster. + List activeSegmentList = new ArrayList<>(); + while (activeSegmentCount > 0) { + int step = (int) Math.ceil((float) totalSegments / activeSegmentCount); + // How many segments might be evenly distributed with the step + int currentCount = totalSegments / step; + for (int i = 0; i < currentCount; i++) { + int id = shiftedIndex % totalSegments; + // We need to do shiftedIndex++ if the list has already contained the segment id + if (activeSegmentList.contains(id)) { + shiftedIndex++; + id = shiftedIndex % totalSegments; + } + activeSegmentList.add(id); + shiftedIndex += step; + } + // Get the rest of the segments and distribute them again if activeSegmentCount > 0 + activeSegmentCount = activeSegmentCount - currentCount; + } + return activeSegmentList; + } +} diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/ImprovedRoundRobinFragmentStrategy.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/ImprovedRoundRobinFragmentStrategy.java new file mode 100644 index 0000000000..86d749e1b4 --- /dev/null +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/ImprovedRoundRobinFragmentStrategy.java @@ -0,0 +1,83 @@ +package org.greenplum.pxf.service.fragment; + +import lombok.extern.slf4j.Slf4j; +import org.greenplum.pxf.api.model.Fragment; +import org.greenplum.pxf.api.model.RequestContext; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +@Slf4j +@Component +public class ImprovedRoundRobinFragmentStrategy implements FragmentStrategy { + private final static FragmentDistributionPolicy DISTRIBUTION_POLICY = FragmentDistributionPolicy.IMPROVED_ROUND_ROBIN; + + @Override + public List filterFragments(Collection fragments, RequestContext context) { + int totalSegments = context.getTotalSegments(); + List filteredFragments = new ArrayList<>((int) Math.ceil((double) fragments.size() / totalSegments)); + int shiftedIndex = context.getGpSessionId() % totalSegments + context.getGpCommandCount(); + + int fullIterationCount = fragments.size() / totalSegments; + int restFragments = fragments.size() % totalSegments; + // The fragment size is less than total segment count. We will try to distribute them between segments evenly + if (fullIterationCount == 0 && restFragments != 0) { + List activeSegmentList = FragmentUtils.getActiveSegmentList(shiftedIndex, restFragments, totalSegments); + log.debug("{} fragment(s) will be distributed between segments with segment id: {}", restFragments, activeSegmentList); + // There is no chance to get fragment by the current segment if its id is not in the List + if (!activeSegmentList.contains(context.getSegmentId())) { + return Collections.emptyList(); + } + int index = 0; + for (Fragment fragment : fragments) { + if (context.getSegmentId() == activeSegmentList.get(index % activeSegmentList.size())) { + filteredFragments.add(fragment); + } + index++; + } + return filteredFragments; + // The fragment size is greater than total segment count. + // We will distribute them between all segments and the rest part of fragments will be distributed between segments evenly + } else if (fullIterationCount != 0 && restFragments != 0) { + int index = 0; + int fragmentCount = 0; + int fullDistributionFragmentCount = fullIterationCount * totalSegments; + List activeSegmentList = FragmentUtils.getActiveSegmentList(shiftedIndex, restFragments, totalSegments); + log.debug("{} fragments will be distributed between all segments {} time(s) and the rest {} fragment(s) will be " + + "distributed between segments with segment id: {}", fragments.size(), fullIterationCount, restFragments, activeSegmentList); + for (Fragment fragment : fragments) { + if (fragmentCount >= fullDistributionFragmentCount) { + if (context.getSegmentId() == activeSegmentList.get(index % activeSegmentList.size())) { + filteredFragments.add(fragment); + } + index++; + } else { + if (context.getSegmentId() == (shiftedIndex % context.getTotalSegments())) { + filteredFragments.add(fragment); + } + shiftedIndex++; + fragmentCount++; + } + } + return filteredFragments; + // The fragment size is equal to an even number of total segments + } else { + log.debug("{} fragments will be distributed between {} segments evenly", fragments.size(), totalSegments); + for (Fragment fragment : fragments) { + if (context.getSegmentId() == (shiftedIndex % context.getTotalSegments())) { + filteredFragments.add(fragment); + } + shiftedIndex++; + } + return filteredFragments; + } + } + + @Override + public FragmentDistributionPolicy getDistributionPolicy() { + return DISTRIBUTION_POLICY; + } +} diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/RandomFragmentStrategy.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/RandomFragmentStrategy.java new file mode 100644 index 0000000000..5a4dfd17bd --- /dev/null +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/RandomFragmentStrategy.java @@ -0,0 +1,34 @@ +package org.greenplum.pxf.service.fragment; + +import org.greenplum.pxf.api.model.Fragment; +import org.greenplum.pxf.api.model.RequestContext; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +@Component +public class RandomFragmentStrategy implements FragmentStrategy { + private final static FragmentDistributionPolicy DISTRIBUTION_POLICY = FragmentDistributionPolicy.RANDOM; + + @Override + public List filterFragments(Collection fragments, RequestContext context) { + int totalSegments = context.getTotalSegments(); + int shiftedIndex = context.getGpSessionId() % totalSegments + context.getGpCommandCount(); + Random random = new Random(shiftedIndex); + List filteredFragments = new ArrayList<>((int) Math.ceil((double) fragments.size() / context.getTotalSegments())); + for (Fragment fragment : fragments) { + if (context.getSegmentId() == random.nextInt(totalSegments)) { + filteredFragments.add(fragment); + } + } + return filteredFragments; + } + + @Override + public FragmentDistributionPolicy getDistributionPolicy() { + return DISTRIBUTION_POLICY; + } +} diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/RoundRobinFragmentStrategy.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/RoundRobinFragmentStrategy.java new file mode 100644 index 0000000000..721d548525 --- /dev/null +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/fragment/RoundRobinFragmentStrategy.java @@ -0,0 +1,32 @@ +package org.greenplum.pxf.service.fragment; + +import org.greenplum.pxf.api.model.Fragment; +import org.greenplum.pxf.api.model.RequestContext; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +@Component +public class RoundRobinFragmentStrategy implements FragmentStrategy { + private final static FragmentDistributionPolicy DISTRIBUTION_POLICY = FragmentDistributionPolicy.ROUND_ROBIN; + + @Override + public List filterFragments(Collection fragments, RequestContext context) { + int shiftedIndex = context.getGpSessionId() % context.getTotalSegments() + context.getGpCommandCount(); + List filteredFragments = new ArrayList<>((int) Math.ceil((double) fragments.size() / context.getTotalSegments())); + for (Fragment fragment : fragments) { + if (context.getSegmentId() == (shiftedIndex % context.getTotalSegments())) { + filteredFragments.add(fragment); + } + shiftedIndex++; + } + return filteredFragments; + } + + @Override + public FragmentDistributionPolicy getDistributionPolicy() { + return DISTRIBUTION_POLICY; + } +} diff --git a/server/pxf-service/src/test/java/org/greenplum/pxf/service/FragmenterServiceTest.java b/server/pxf-service/src/test/java/org/greenplum/pxf/service/FragmenterServiceTest.java index 01dfd96be9..0a0550c4d0 100644 --- a/server/pxf-service/src/test/java/org/greenplum/pxf/service/FragmenterServiceTest.java +++ b/server/pxf-service/src/test/java/org/greenplum/pxf/service/FragmenterServiceTest.java @@ -4,12 +4,11 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.apache.hadoop.conf.Configuration; -import org.greenplum.pxf.api.error.PxfRuntimeException; -import org.greenplum.pxf.api.examples.DemoFragmentMetadata; import org.greenplum.pxf.api.model.Fragment; import org.greenplum.pxf.api.model.Fragmenter; import org.greenplum.pxf.api.model.RequestContext; import org.greenplum.pxf.api.utilities.FragmenterCacheFactory; +import org.greenplum.pxf.service.fragment.*; import org.greenplum.pxf.service.utilities.BasePluginFactory; import org.greenplum.pxf.service.utilities.GSSFailureHandler; import org.junit.jupiter.api.BeforeEach; @@ -20,42 +19,38 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @ExtendWith(MockitoExtension.class) class FragmenterServiceTest { + @Mock + private BasePluginFactory mockPluginFactory; + @Mock + private Fragmenter fragmenter1; + @Mock + private Fragmenter fragmenter2; + @Mock + private Fragmenter fragmenter3; + @Mock + private FragmenterCacheFactory fragmenterCacheFactory; + @Mock + private FragmentStrategyProvider strategyProvider; + @Mock + private FragmentStrategy strategy; - @Mock private BasePluginFactory mockPluginFactory; - @Mock private Fragmenter fragmenter1; - @Mock private Fragmenter fragmenter2; - @Mock private Fragmenter fragmenter3; private Cache> fragmentCache; private FakeTicker fakeTicker; private FragmenterService fragmenterService; private Configuration configuration; - private RequestContext context1; private RequestContext context2; - private RequestContext context3; @BeforeEach public void setup() { @@ -80,18 +75,6 @@ public void setup() { context2.setDataSource("path.A"); context2.setConfiguration(configuration); - context3 = new RequestContext(); - context3.setTransactionId("XID-XYZ-654321"); - context3.setFragmenter("org.greenplum.pxf.api.model.Fragmenter3"); - context3.setSegmentId(0); - context3.setGpCommandCount(1); - context3.setGpSessionId(1); - context3.setTotalSegments(1); - context3.setDataSource("path.A"); - context3.setConfiguration(configuration); - - FragmenterCacheFactory fragmenterCacheFactory = mock(FragmenterCacheFactory.class); - fakeTicker = new FakeTicker(); fragmentCache = CacheBuilder.newBuilder() .expireAfterAccess(10, TimeUnit.SECONDS) @@ -99,132 +82,18 @@ public void setup() { .build(); lenient().when(fragmenterCacheFactory.getCache()).thenReturn(fragmentCache); + lenient().when(strategyProvider.getStrategy(any(RequestContext.class))).thenReturn(strategy); + lenient().when(strategy.filterFragments(any(), any())).thenReturn(Collections.emptyList()); // use a real handler to ensure pass-through calls on default configuration fragmenterService = new FragmenterService(fragmenterCacheFactory, - mockPluginFactory, new GSSFailureHandler()); - } - - @Test - public void testFragmenterCallWithOneActiveSegmentCount() throws Throwable { - - List fragmentList = Arrays.asList( - new Fragment("foo.bar", new DemoFragmentMetadata()), - new Fragment("bar.foo", new DemoFragmentMetadata()), - new Fragment("foobar", new DemoFragmentMetadata()), - new Fragment("barfoo", new DemoFragmentMetadata()) - ); - - context1.setGpSessionId(0); - context1.setGpCommandCount(0); - context1.setSegmentId(0); - context1.setTotalSegments(2); - context1.addOption(FragmenterService.ACTIVE_SEGMENT_COUNT_OPTION, "1"); - - context2.setGpSessionId(0); - context2.setGpCommandCount(0); - context2.setSegmentId(1); - context2.setTotalSegments(2); - context2.addOption(FragmenterService.ACTIVE_SEGMENT_COUNT_OPTION, "1"); - - lenient().when(mockPluginFactory.getPlugin(context1, context1.getFragmenter())).thenReturn(fragmenter1); - lenient().when(fragmenter1.getFragments()).thenReturn(fragmentList); - lenient().when(mockPluginFactory.getPlugin(context2, context2.getFragmenter())).thenReturn(fragmenter2); - lenient().when(fragmenter2.getFragments()).thenReturn(fragmentList); - - List response1 = fragmenterService.getFragmentsForSegment(context1); - List response2 = fragmenterService.getFragmentsForSegment(context2); - - verify(fragmenter1, times(1)).getFragments(); - - assertEquals(4, response1.size()); - assertEquals("foo.bar", response1.get(0).getSourceName()); - assertEquals("bar.foo", response1.get(1).getSourceName()); - assertEquals("foobar", response1.get(2).getSourceName()); - assertEquals("barfoo", response1.get(3).getSourceName()); - - assertEquals(0, response2.size()); - } - - @Test - public void testFragmenterCallWith2ActiveSegmentCountAnd3TotalSegments() throws Throwable { - - List fragmentList = Arrays.asList( - new Fragment("foo.bar", new DemoFragmentMetadata()), - new Fragment("bar.foo", new DemoFragmentMetadata()), - new Fragment("foobar", new DemoFragmentMetadata()), - new Fragment("barfoo", new DemoFragmentMetadata()) - ); - - context1.setGpSessionId(0); - context1.setGpCommandCount(0); - context1.setSegmentId(0); - context1.setTotalSegments(3); - context1.addOption(FragmenterService.ACTIVE_SEGMENT_COUNT_OPTION, "2"); - - context2.setGpSessionId(0); - context2.setGpCommandCount(0); - context2.setSegmentId(1); - context2.setTotalSegments(3); - context2.addOption(FragmenterService.ACTIVE_SEGMENT_COUNT_OPTION, "2"); - - context3.setGpSessionId(0); - context3.setGpCommandCount(0); - context3.setSegmentId(2); - context3.setTotalSegments(3); - context3.addOption(FragmenterService.ACTIVE_SEGMENT_COUNT_OPTION, "2"); - - lenient().when(mockPluginFactory.getPlugin(context1, context1.getFragmenter())).thenReturn(fragmenter1); - lenient().when(fragmenter1.getFragments()).thenReturn(fragmentList); - lenient().when(mockPluginFactory.getPlugin(context2, context2.getFragmenter())).thenReturn(fragmenter2); - lenient().when(fragmenter2.getFragments()).thenReturn(fragmentList); - lenient().when(mockPluginFactory.getPlugin(context3, context3.getFragmenter())).thenReturn(fragmenter3); - lenient().when(fragmenter3.getFragments()).thenReturn(fragmentList); - - List response1 = fragmenterService.getFragmentsForSegment(context1); - List response2 = fragmenterService.getFragmentsForSegment(context2); - List response3 = fragmenterService.getFragmentsForSegment(context3); - - verify(fragmenter1, times(1)).getFragments(); - - assertEquals(2, response1.size()); - assertEquals("foo.bar", response1.get(0).getSourceName()); - assertEquals("foobar", response1.get(1).getSourceName()); - - assertEquals(2, response3.size()); - assertEquals("bar.foo", response3.get(0).getSourceName()); - assertEquals("barfoo", response3.get(1).getSourceName()); - - assertEquals(0, response2.size()); - } - - @Test - public void testFragmenterCallWithWrongActiveSegmentCount() { - context1.setTransactionId("0"); - context1.setSegmentId(0); - context1.setTotalSegments(1); - context1.addOption(FragmenterService.ACTIVE_SEGMENT_COUNT_OPTION, "WRONG"); - - Exception e = assertThrows(PxfRuntimeException.class, () -> fragmenterService.getFragmentsForSegment(context1)); - assertEquals("Failed to get active segment count: For input string: \"WRONG\". Check the value of the parameter 'ACTIVE_SEGMENT_COUNT'", e.getMessage()); - } - - @Test - public void testFragmenterCallWithLessThanOneActiveSegmentCount() { - context1.setTransactionId("0"); - context1.setSegmentId(0); - context1.setTotalSegments(1); - context1.addOption(FragmenterService.ACTIVE_SEGMENT_COUNT_OPTION, "0"); - - Exception e = assertThrows(PxfRuntimeException.class, () -> fragmenterService.getFragmentsForSegment(context1)); - assertTrue(e.getMessage().contains("The parameter 'ACTIVE_SEGMENT_COUNT' has the value 0. The value of this " + - "parameter cannot be less than 1 or cannot be greater than the total amount of segments [1 segment(s)]")); + mockPluginFactory, new GSSFailureHandler(), strategyProvider); } @Test public void getFragmentsResponseFromEmptyCache() throws Throwable { when(mockPluginFactory.getPlugin(context1, context1.getFragmenter())).thenReturn(fragmenter1); - + when(fragmenter1.getFragments()).thenReturn(Collections.emptyList()); fragmenterService.getFragmentsForSegment(context1); verify(fragmenter1, times(1)).getFragments(); } @@ -296,41 +165,6 @@ public void testFragmenterCallIsNotCachedForDifferentFilters() throws Throwable testContextsAreNotCached(context1, context2); } - @Test - public void testFragmenterCallForTwoSegments() throws Throwable { - - List fragmentList = Arrays.asList( - new Fragment("foo.bar", new DemoFragmentMetadata()), - new Fragment("bar.foo", new DemoFragmentMetadata()), - new Fragment("foobar", new DemoFragmentMetadata()), - new Fragment("barfoo", new DemoFragmentMetadata()) - ); - - context1.setTransactionId("XID-XYZ-123456"); - context1.setSegmentId(0); - context1.setTotalSegments(2); - - context2.setTransactionId("XID-XYZ-123456"); - context2.setSegmentId(1); - context2.setTotalSegments(2); - - when(mockPluginFactory.getPlugin(context1, context1.getFragmenter())).thenReturn(fragmenter1); - when(fragmenter1.getFragments()).thenReturn(fragmentList); - - List response1 = fragmenterService.getFragmentsForSegment(context1); - List response2 = fragmenterService.getFragmentsForSegment(context2); - - verify(fragmenter1, times(1)).getFragments(); - - assertEquals(2, response1.size()); - assertEquals("foo.bar", response1.get(0).getSourceName()); - assertEquals("foobar", response1.get(1).getSourceName()); - - assertEquals(2, response2.size()); - assertEquals("bar.foo", response2.get(0).getSourceName()); - assertEquals("barfoo", response2.get(1).getSourceName()); - } - @Test public void getSameFragmenterCallTwiceUsesCache() throws Throwable { List fragmentList = new ArrayList<>(); @@ -386,13 +220,12 @@ public void testMultiThreadedAccessToFragments() throws Throwable { for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(() -> { - try { fragmenterService.getFragmentsForSegment(context1); - finishedCount.incrementAndGet(); - } catch (Throwable e) { - e.printStackTrace(); + } catch (IOException e) { + throw new RuntimeException(e); } + finishedCount.incrementAndGet(); }); threads[i].start(); } @@ -421,9 +254,7 @@ public void testMultiThreadedAccessToFragments() throws Throwable { assertEquals(0, fragmentCache.size()); } - private void testContextsAreNotCached(RequestContext context1, RequestContext context2) - throws Throwable { - + private void testContextsAreNotCached(RequestContext context1, RequestContext context2) throws Throwable { List fragmentList1 = new ArrayList<>(); List fragmentList2 = new ArrayList<>(); @@ -538,7 +369,6 @@ public void testBeginIterationGSSFailureRetriedTwice() throws Throwable { @Test public void testBeginIterationGSSFailureAfterMaxRetries() throws Throwable { - List fragmentList = new ArrayList<>(); configuration.set("hadoop.security.authentication", "kerberos"); configuration.set("pxf.sasl.connection.retries", "2"); @@ -562,42 +392,4 @@ public void testBeginIterationGSSFailureAfterMaxRetries() throws Throwable { inOrder.verifyNoMoreInteractions(); verifyNoMoreInteractions(mockPluginFactory); } - - // ----- TESTS for performance of list traversal ----- - @Test - public void testListTraversalPerformance() throws Throwable { - - // This test makes sure we iterate properly (using an iterator, not the index-based for loop) over a LinkedList - // that is returned by a fragmenter when building a list of fragment for a segment. - // Tested on MacBookPro, the timings are as follows: - // 10M fragments - from 15 mins to 1.3 secs - // 1M fragments - from 8.2 secs to 1.3 secs - // so we will run the large dataset that would've taken 15 minutes and make sure it computes within 10 seconds - // allowing 8x margin for test slowness when running on slower machines on in the cloud under a heavy workload - - Fragment fragment = new Fragment("foo.bar", new DemoFragmentMetadata()); - List fragmentList = new LinkedList<>(); - for (int i=0; i<10000000; i++) { - fragmentList.add(fragment); // add the same fragment, save on memory, we only care about testing timings - } - - context1.setTransactionId("XID-XYZ-123456"); - context1.setSegmentId(0); - context1.setTotalSegments(100); - - when(mockPluginFactory.getPlugin(context1, context1.getFragmenter())).thenReturn(fragmenter1); - when(fragmenter1.getFragments()).thenReturn(fragmentList); - - long start = System.currentTimeMillis(); - List response = fragmenterService.getFragmentsForSegment(context1); - long end = System.currentTimeMillis(); - - verify(fragmenter1, times(1)).getFragments(); - - assertTrue(response instanceof ArrayList); - assertEquals(100000, response.size()); - assertEquals("foo.bar", response.get(0).getSourceName()); - assertTrue(end-start < 10000L); // should be less than 10 secs (8x margin), not minutes - - } } \ No newline at end of file diff --git a/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/ActiveSegmentFragmentStrategyTest.java b/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/ActiveSegmentFragmentStrategyTest.java new file mode 100644 index 0000000000..42c875b53b --- /dev/null +++ b/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/ActiveSegmentFragmentStrategyTest.java @@ -0,0 +1,229 @@ +package org.greenplum.pxf.service.fragment; + +import org.apache.hadoop.conf.Configuration; +import org.greenplum.pxf.api.error.PxfRuntimeException; +import org.greenplum.pxf.api.examples.DemoFragmentMetadata; +import org.greenplum.pxf.api.model.Fragment; +import org.greenplum.pxf.api.model.RequestContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import static org.greenplum.pxf.service.fragment.FragmentStrategyProvider.ACTIVE_SEGMENT_COUNT_OPTION; +import static org.junit.jupiter.api.Assertions.*; + +class ActiveSegmentFragmentStrategyTest { + private FragmentStrategy strategy; + private RequestContext context1; + private RequestContext context2; + private RequestContext context3; + private List fragmentList; + + @BeforeEach + void setUp() { + Configuration configuration = new Configuration(); + context1 = new RequestContext(); + context1.setTransactionId("XID-XYZ-123456"); + context1.setFragmenter("org.greenplum.pxf.api.model.Fragmenter1"); + context1.setSegmentId(0); + context1.setGpCommandCount(1); + context1.setGpSessionId(1); + context1.setTotalSegments(1); + context1.setDataSource("path.A"); + context1.setConfiguration(configuration); + + context2 = new RequestContext(); + context2.setTransactionId("XID-XYZ-654321"); + context2.setFragmenter("org.greenplum.pxf.api.model.Fragmenter2"); + context2.setSegmentId(0); + context2.setGpCommandCount(1); + context2.setGpSessionId(1); + context2.setTotalSegments(1); + context2.setDataSource("path.A"); + context2.setConfiguration(configuration); + + context3 = new RequestContext(); + context3.setTransactionId("XID-XYZ-654321"); + context3.setFragmenter("org.greenplum.pxf.api.model.Fragmenter3"); + context3.setSegmentId(0); + context3.setGpCommandCount(1); + context3.setGpSessionId(1); + context3.setTotalSegments(1); + context3.setDataSource("path.A"); + context3.setConfiguration(configuration); + + fragmentList = Arrays.asList( + new Fragment("foo.bar", new DemoFragmentMetadata()), + new Fragment("bar.foo", new DemoFragmentMetadata()), + new Fragment("foobar", new DemoFragmentMetadata()), + new Fragment("barfoo", new DemoFragmentMetadata()) + ); + + strategy = new ActiveSegmentFragmentStrategy(); + } + + @Test + public void testFragmenterCallWithOneActiveSegmentCount() { + context1.setGpSessionId(0); + context1.setGpCommandCount(0); + context1.setSegmentId(0); + context1.setTotalSegments(2); + context1.addOption(ACTIVE_SEGMENT_COUNT_OPTION, "1"); + + context2.setGpSessionId(0); + context2.setGpCommandCount(0); + context2.setSegmentId(1); + context2.setTotalSegments(2); + context2.addOption(ACTIVE_SEGMENT_COUNT_OPTION, "1"); + + List response1 = strategy.filterFragments(fragmentList, context1); + List response2 = strategy.filterFragments(fragmentList, context2); + + assertEquals(4, response1.size()); + assertEquals("foo.bar", response1.get(0).getSourceName()); + assertEquals("bar.foo", response1.get(1).getSourceName()); + assertEquals("foobar", response1.get(2).getSourceName()); + assertEquals("barfoo", response1.get(3).getSourceName()); + + assertEquals(0, response2.size()); + } + + @Test + public void testFragmenterCallWith2ActiveSegmentCountAnd3TotalSegments() { + context1.setGpSessionId(0); + context1.setGpCommandCount(0); + context1.setSegmentId(0); + context1.setTotalSegments(3); + context1.addOption(ACTIVE_SEGMENT_COUNT_OPTION, "2"); + + context2.setGpSessionId(0); + context2.setGpCommandCount(0); + context2.setSegmentId(1); + context2.setTotalSegments(3); + context2.addOption(ACTIVE_SEGMENT_COUNT_OPTION, "2"); + + context3.setGpSessionId(0); + context3.setGpCommandCount(0); + context3.setSegmentId(2); + context3.setTotalSegments(3); + context3.addOption(ACTIVE_SEGMENT_COUNT_OPTION, "2"); + + List response1 = strategy.filterFragments(fragmentList, context1); + List response2 = strategy.filterFragments(fragmentList, context2); + List response3 = strategy.filterFragments(fragmentList, context3); + + assertEquals(2, response1.size()); + assertEquals("foo.bar", response1.get(0).getSourceName()); + assertEquals("foobar", response1.get(1).getSourceName()); + + assertEquals(2, response3.size()); + assertEquals("bar.foo", response3.get(0).getSourceName()); + assertEquals("barfoo", response3.get(1).getSourceName()); + + assertEquals(0, response2.size()); + } + + @Test + public void testFragmenterCallWithFragmentSizeLessThanTotalSegments() { + fragmentList = Arrays.asList( + new Fragment("foo.bar", new DemoFragmentMetadata()), + new Fragment("bar.foo", new DemoFragmentMetadata()) + ); + + context1.setGpSessionId(0); + context1.setGpCommandCount(0); + context1.setSegmentId(0); + context1.setTotalSegments(3); + context1.addOption(ACTIVE_SEGMENT_COUNT_OPTION, "3"); + + context2.setGpSessionId(0); + context2.setGpCommandCount(0); + context2.setSegmentId(1); + context2.setTotalSegments(3); + context2.addOption(ACTIVE_SEGMENT_COUNT_OPTION, "3"); + + context3.setGpSessionId(0); + context3.setGpCommandCount(0); + context3.setSegmentId(2); + context3.setTotalSegments(3); + context3.addOption(ACTIVE_SEGMENT_COUNT_OPTION, "3"); + + List response1 = strategy.filterFragments(fragmentList, context1); + List response2 = strategy.filterFragments(fragmentList, context2); + List response3 = strategy.filterFragments(fragmentList, context3); + + assertEquals(1, response1.size()); + assertEquals("foo.bar", response1.get(0).getSourceName()); + + assertEquals(1, response3.size()); + assertEquals("bar.foo", response3.get(0).getSourceName()); + + assertEquals(0, response2.size()); + } + + @Test + public void testFragmenterCallWithWrongActiveSegmentCount() { + context1.setTransactionId("0"); + context1.setSegmentId(0); + context1.setTotalSegments(1); + context1.addOption(ACTIVE_SEGMENT_COUNT_OPTION, "WRONG"); + + Exception e = assertThrows(PxfRuntimeException.class, () -> strategy.filterFragments(fragmentList, context1)); + assertEquals("Failed to get active segment count: For input string: \"WRONG\". Check the value of the parameter 'ACTIVE_SEGMENT_COUNT'", e.getMessage()); + } + + @Test + public void testFragmenterCallWithLessThanOneActiveSegmentCount() { + context1.setTransactionId("0"); + context1.setSegmentId(0); + context1.setTotalSegments(1); + context1.addOption(ACTIVE_SEGMENT_COUNT_OPTION, "0"); + + Exception e = assertThrows(PxfRuntimeException.class, () -> strategy.filterFragments(fragmentList, context1)); + assertTrue(e.getMessage().contains("The parameter 'ACTIVE_SEGMENT_COUNT' has the value 0. The value of this " + + "parameter cannot be less than 1 or cannot be greater than the total amount of segments [1 segment(s)]")); + } + + // ----- TESTS for performance of list traversal ----- + @Test + public void testListTraversalPerformance() { + + // This test makes sure we iterate properly (using an iterator, not the index-based for loop) over a LinkedList + // that is returned by a fragmenter when building a list of fragment for a segment. + // Tested on MacBookPro, the timings are as follows: + // 10M fragments - from 15 mins to 1.3 secs + // 1M fragments - from 8.2 secs to 1.3 secs + // so we will run the large dataset that would've taken 15 minutes and make sure it computes within 10 seconds + // allowing 8x margin for test slowness when running on slower machines on in the cloud under a heavy workload + + Fragment fragment = new Fragment("mobile", new DemoFragmentMetadata()); + List fragmentList = new LinkedList<>(); + for (int i = 0; i < 10000000; i++) { + fragmentList.add(fragment); // add the same fragment, save on memory, we only care about testing timings + } + + context1.setSegmentId(10); + context1.setTotalSegments(500); + context1.setGpSessionId(500); + context1.setGpCommandCount(0); + context1.addOption(ACTIVE_SEGMENT_COUNT_OPTION, "50"); + + long start = System.currentTimeMillis(); + List fragments = strategy.filterFragments(fragmentList, context1); + long end = System.currentTimeMillis(); + + assertInstanceOf(ArrayList.class, fragments); + assertEquals(200000, fragments.size()); + assertEquals("mobile", fragments.get(0).getSourceName()); + assertTrue(end - start < 10000L); // should be less than 10 secs (8x margin), not minutes + } + + @Test + void getDistributionPolicy() { + assertEquals(FragmentDistributionPolicy.ACTIVE_SEGMENT, strategy.getDistributionPolicy()); + } +} \ No newline at end of file diff --git a/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/FragmentStrategyProviderTest.java b/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/FragmentStrategyProviderTest.java new file mode 100644 index 0000000000..297333d1f4 --- /dev/null +++ b/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/FragmentStrategyProviderTest.java @@ -0,0 +1,107 @@ +package org.greenplum.pxf.service.fragment; + +import org.apache.hadoop.conf.Configuration; +import org.greenplum.pxf.api.model.RequestContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Arrays; +import java.util.List; + +import static org.greenplum.pxf.service.fragment.FragmentStrategyProvider.ACTIVE_SEGMENT_COUNT_OPTION; +import static org.greenplum.pxf.service.fragment.FragmentStrategyProvider.FRAGMENT_DISTRIBUTION_POLICY_OPTION; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class FragmentStrategyProviderTest { + @Mock + private RoundRobinFragmentStrategy roundRobinFragmentStrategy; + @Mock + private ActiveSegmentFragmentStrategy activeSegmentFragmentStrategy; + @Mock + private ImprovedRoundRobinFragmentStrategy improvedRoundRobinFragmentStrategy; + @Mock + private RandomFragmentStrategy randomFragmentStrategy; + + private RequestContext context; + private FragmentStrategyProvider strategyProvider; + + @BeforeEach + void setUp() { + Configuration configuration = new Configuration(); + context = new RequestContext(); + context.setTransactionId("XID-XYZ-123456"); + context.setFragmenter("org.greenplum.pxf.api.model.Fragmenter1"); + context.setSegmentId(0); + context.setGpCommandCount(0); + context.setGpSessionId(2); + context.setTotalSegments(2); + context.setDataSource("path.A"); + context.setConfiguration(configuration); + + List strategies = Arrays.asList( + roundRobinFragmentStrategy, + improvedRoundRobinFragmentStrategy, + activeSegmentFragmentStrategy, + randomFragmentStrategy); + when(roundRobinFragmentStrategy.getDistributionPolicy()).thenReturn(FragmentDistributionPolicy.ROUND_ROBIN); + when(activeSegmentFragmentStrategy.getDistributionPolicy()).thenReturn(FragmentDistributionPolicy.ACTIVE_SEGMENT); + when(improvedRoundRobinFragmentStrategy.getDistributionPolicy()).thenReturn(FragmentDistributionPolicy.IMPROVED_ROUND_ROBIN); + when(randomFragmentStrategy.getDistributionPolicy()).thenReturn(FragmentDistributionPolicy.RANDOM); + + strategyProvider = new FragmentStrategyProvider(strategies); + } + + @Test + public void defaultFragmentDistributionPolicyOptionTest() { + assertInstanceOf(RoundRobinFragmentStrategy.class, strategyProvider.getStrategy(context)); + } + + @Test + public void roundRobinFragmentDistributionPolicyOptionTest() { + context.addOption(FRAGMENT_DISTRIBUTION_POLICY_OPTION, "round-robin"); + assertInstanceOf(RoundRobinFragmentStrategy.class, strategyProvider.getStrategy(context)); + } + + @Test + public void activeSegmentFragmentDistributionPolicyOptionTest() { + context.addOption(FRAGMENT_DISTRIBUTION_POLICY_OPTION, "active-segment"); + assertInstanceOf(ActiveSegmentFragmentStrategy.class, strategyProvider.getStrategy(context)); + } + + /** + * We return active-segment strategy if the parameter FRAGMENT_DISTRIBUTION_POLICY was not set, + * but the parameter ACTIVE_SEGMENT_COUNT is present. We need it for backward compatability + */ + @Test + public void activeSegmentFragmentDistributionPolicyOptionForBackwardCompatabilityTest() { + context.addOption(ACTIVE_SEGMENT_COUNT_OPTION, "1"); + assertInstanceOf(ActiveSegmentFragmentStrategy.class, strategyProvider.getStrategy(context)); + } + + @Test + public void improvedRoundRobinFragmentDistributionPolicyOptionTest() { + context.addOption(FRAGMENT_DISTRIBUTION_POLICY_OPTION, "improved-round-robin"); + assertInstanceOf(ImprovedRoundRobinFragmentStrategy.class, strategyProvider.getStrategy(context)); + } + + @Test + public void randomFragmentDistributionPolicyOptionTest() { + context.addOption(FRAGMENT_DISTRIBUTION_POLICY_OPTION, "random"); + assertInstanceOf(RandomFragmentStrategy.class, strategyProvider.getStrategy(context)); + } + + @Test + public void strategyPolicyIsNotFound() { + context.addOption(FRAGMENT_DISTRIBUTION_POLICY_OPTION, "fake-policy"); + Exception exception = assertThrows(IllegalArgumentException.class, () -> strategyProvider.getStrategy(context)); + String errorMessage = exception.getMessage(); + assertEquals("Cannot find corresponding fragment distribution policy with name fake-policy", errorMessage); + } +} diff --git a/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/FragmentUtilsTest.java b/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/FragmentUtilsTest.java new file mode 100644 index 0000000000..90d78080b3 --- /dev/null +++ b/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/FragmentUtilsTest.java @@ -0,0 +1,60 @@ +package org.greenplum.pxf.service.fragment; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class FragmentUtilsTest { + + @Test + void getActiveSegmentListWithShiftIndex0() { + int shiftedIndex = 0; + int activeSegmentCount = 2; + int totalSegments = 10; + List expected = List.of(0, 5); + List actual = FragmentUtils.getActiveSegmentList(shiftedIndex, activeSegmentCount, totalSegments); + assertEquals(expected, actual); + } + + @Test + void getActiveSegmentListWithShiftIndex1() { + int shiftedIndex = 1; + int activeSegmentCount = 3; + int totalSegments = 10; + List expected = List.of(1, 5, 9); + List actual = FragmentUtils.getActiveSegmentList(shiftedIndex, activeSegmentCount, totalSegments); + assertEquals(expected, actual); + } + + @Test + void getActiveSegmentListWithShiftBigIndex() { + int shiftedIndex = 3; + int activeSegmentCount = 3; + int totalSegments = 10; + List expected = List.of(3, 7, 1); + List actual = FragmentUtils.getActiveSegmentList(shiftedIndex, activeSegmentCount, totalSegments); + assertEquals(expected, actual); + } + + @Test + void getActiveSegmentListWithNotRepeatingSegId() { + int shiftedIndex = 0; + int activeSegmentCount = 5; + int totalSegments = 6; + List expected = List.of(0, 2, 4, 1, 5); + List actual = FragmentUtils.getActiveSegmentList(shiftedIndex, activeSegmentCount, totalSegments); + assertEquals(expected, actual); + } + + @Test + void getActiveSegmentListWhenActiveSegmentCountIsTheSameAsTotalSegments() { + int shiftedIndex = 3; + int activeSegmentCount = 6; + int totalSegments = 6; + List expected = List.of(3, 4, 5, 0, 1, 2); + List actual = FragmentUtils.getActiveSegmentList(shiftedIndex, activeSegmentCount, totalSegments); + assertEquals(expected, actual); + } +} \ No newline at end of file diff --git a/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/ImprovedRoundRobinFragmentStrategyTest.java b/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/ImprovedRoundRobinFragmentStrategyTest.java new file mode 100644 index 0000000000..357a46e399 --- /dev/null +++ b/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/ImprovedRoundRobinFragmentStrategyTest.java @@ -0,0 +1,207 @@ +package org.greenplum.pxf.service.fragment; + +import org.apache.hadoop.conf.Configuration; +import org.greenplum.pxf.api.examples.DemoFragmentMetadata; +import org.greenplum.pxf.api.model.Fragment; +import org.greenplum.pxf.api.model.RequestContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class ImprovedRoundRobinFragmentStrategyTest { + private FragmentStrategy strategy; + private RequestContext context1; + private RequestContext context2; + private RequestContext context3; + + @BeforeEach + void setUp() { + Configuration configuration = new Configuration(); + context1 = new RequestContext(); + context1.setTransactionId("XID-XYZ-123456"); + context1.setFragmenter("org.greenplum.pxf.api.model.Fragmenter1"); + context1.setGpCommandCount(0); + context1.setGpSessionId(6); + context1.setDataSource("path.A"); + context1.setConfiguration(configuration); + + context2 = new RequestContext(); + context2.setTransactionId("XID-XYZ-654321"); + context2.setFragmenter("org.greenplum.pxf.api.model.Fragmenter2"); + context2.setGpCommandCount(0); + context2.setGpSessionId(6); + context2.setDataSource("path.A"); + context2.setConfiguration(configuration); + + context3 = new RequestContext(); + context3.setTransactionId("XID-XYZ-654321"); + context3.setFragmenter("org.greenplum.pxf.api.model.Fragmenter3"); + context3.setGpCommandCount(0); + context3.setGpSessionId(6); + context3.setDataSource("path.A"); + context3.setConfiguration(configuration); + + strategy = new ImprovedRoundRobinFragmentStrategy(); + } + + /** + * The fragment size is less than total segment count. We will try to distribute them between segments evenly. + * The difference with 'round-robin' is that 'round-robin' algorithm would put these fragments on the segments + * which are close to each other and might be on the same host. + */ + @Test + void filterFragmentsWhenFragmentSizeIsLessThanTotalSegments() { + List fragmentList = Arrays.asList( + new Fragment("foo.bar", new DemoFragmentMetadata()), + new Fragment("bar.foo", new DemoFragmentMetadata()) + ); + + context1.setTotalSegments(3); + context1.setSegmentId(0); + List fragments1 = strategy.filterFragments(fragmentList, context1); + assertEquals(1, fragments1.size()); + assertEquals("foo.bar", fragments1.get(0).getSourceName()); + + context2.setTotalSegments(3); + context2.setSegmentId(1); + List fragments2 = strategy.filterFragments(fragmentList, context2); + assertEquals(0, fragments2.size()); + + context3.setTotalSegments(3); + context3.setSegmentId(2); + List fragments3 = strategy.filterFragments(fragmentList, context3); + assertEquals(1, fragments3.size()); + assertEquals("bar.foo", fragments3.get(0).getSourceName()); + } + + /** + * The fragment size is equal to an even number of total segments + */ + @Test + void filterFragmentsWhenFragmentSizeIsEvenNumberOfTotalSegments() { + List fragmentList = Arrays.asList( + new Fragment("tv", new DemoFragmentMetadata()), + new Fragment("console", new DemoFragmentMetadata()), + new Fragment("watch", new DemoFragmentMetadata()), + new Fragment("computer", new DemoFragmentMetadata()), + new Fragment("laptop", new DemoFragmentMetadata()), + new Fragment("smartphone", new DemoFragmentMetadata()) + ); + + context1.setTotalSegments(3); + context1.setSegmentId(0); + List fragments1 = strategy.filterFragments(fragmentList, context1); + assertEquals(2, fragments1.size()); + assertEquals("tv", fragments1.get(0).getSourceName()); + assertEquals("computer", fragments1.get(1).getSourceName()); + + context2.setTotalSegments(3); + context2.setSegmentId(1); + List fragments2 = strategy.filterFragments(fragmentList, context2); + assertEquals(2, fragments2.size()); + assertEquals("console", fragments2.get(0).getSourceName()); + assertEquals("laptop", fragments2.get(1).getSourceName()); + + context3.setTotalSegments(3); + context3.setSegmentId(2); + List fragments3 = strategy.filterFragments(fragmentList, context3); + assertEquals(2, fragments3.size()); + assertEquals("watch", fragments3.get(0).getSourceName()); + assertEquals("smartphone", fragments3.get(1).getSourceName()); + } + + /** + * The fragment size (8) is greater than total segment count (3). + * We will distribute 6 fragments between 3 segments (2 fragments per each segment) + * and the rest 2 fragments will be distributed between segments 0 and 2 + */ + @Test + void filterFragmentsWhenFragmentSizeIsGreaterThanTotalSegments() { + List fragmentList = Arrays.asList( + new Fragment("tv", new DemoFragmentMetadata()), + new Fragment("console", new DemoFragmentMetadata()), + new Fragment("watch", new DemoFragmentMetadata()), + new Fragment("computer", new DemoFragmentMetadata()), + new Fragment("laptop", new DemoFragmentMetadata()), + new Fragment("smartphone", new DemoFragmentMetadata()), + new Fragment("pipelac", new DemoFragmentMetadata()), + new Fragment("car", new DemoFragmentMetadata()) + ); + + context1.setTotalSegments(3); + context1.setSegmentId(0); + List fragments1 = strategy.filterFragments(fragmentList, context1); + assertEquals(3, fragments1.size()); + assertEquals("tv", fragments1.get(0).getSourceName()); + assertEquals("computer", fragments1.get(1).getSourceName()); + assertEquals("pipelac", fragments1.get(2).getSourceName()); + + context2.setTotalSegments(3); + context2.setSegmentId(1); + List fragments2 = strategy.filterFragments(fragmentList, context2); + assertEquals(2, fragments2.size()); + assertEquals("console", fragments2.get(0).getSourceName()); + assertEquals("laptop", fragments2.get(1).getSourceName()); + + context3.setTotalSegments(3); + context3.setSegmentId(2); + List fragments3 = strategy.filterFragments(fragmentList, context3); + assertEquals(3, fragments3.size()); + assertEquals("watch", fragments3.get(0).getSourceName()); + assertEquals("smartphone", fragments3.get(1).getSourceName()); + assertEquals("car", fragments3.get(2).getSourceName()); + } + + // ----- TESTS for performance of list traversal ----- + @Test + public void testListTraversalPerformance() { + + // This test makes sure we iterate properly (using an iterator, not the index-based for loop) over a LinkedList + // that is returned by a fragmenter when building a list of fragment for a segment. + // Tested on MacBookPro, the timings are as follows: + // 10M fragments - from 15 mins to 1.3 secs + // 1M fragments - from 8.2 secs to 1.3 secs + // so we will run the large dataset that would've taken 15 minutes and make sure it computes within 10 seconds + // allowing 8x margin for test slowness when running on slower machines on in the cloud under a heavy workload + + Fragment fragment = new Fragment("mobile", new DemoFragmentMetadata()); + List fragmentList = new LinkedList<>(); + for (int i = 0; i < 10000005; i++) { + fragmentList.add(fragment); // add the same fragment, save on memory, we only care about testing timings + } + + context1.setSegmentId(0); + context1.setTotalSegments(500); + context1.setGpSessionId(500); + + context2.setSegmentId(100); + context2.setTotalSegments(500); + context2.setGpSessionId(500); + + long start = System.currentTimeMillis(); + List fragments1 = strategy.filterFragments(fragmentList, context1); + List fragments2 = strategy.filterFragments(fragmentList, context2); + long end = System.currentTimeMillis(); + + assertInstanceOf(ArrayList.class, fragments1); + assertEquals(20001, fragments1.size()); + assertEquals("mobile", fragments1.get(0).getSourceName()); + + assertInstanceOf(ArrayList.class, fragments2); + assertEquals(20001, fragments2.size()); + assertEquals("mobile", fragments2.get(0).getSourceName()); + + assertTrue(end - start < 20000L); // should be less than 10 secs (8x margin), not minutes + } + + @Test + void getDistributionPolicy() { + assertEquals(FragmentDistributionPolicy.IMPROVED_ROUND_ROBIN, strategy.getDistributionPolicy()); + } +} \ No newline at end of file diff --git a/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/RandomFragmentStrategyTest.java b/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/RandomFragmentStrategyTest.java new file mode 100644 index 0000000000..e1b638a791 --- /dev/null +++ b/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/RandomFragmentStrategyTest.java @@ -0,0 +1,131 @@ +package org.greenplum.pxf.service.fragment; + +import org.apache.hadoop.conf.Configuration; +import org.greenplum.pxf.api.examples.DemoFragmentMetadata; +import org.greenplum.pxf.api.model.Fragment; +import org.greenplum.pxf.api.model.RequestContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.*; + +class RandomFragmentStrategyTest { + private FragmentStrategy strategy; + private RequestContext context1; + private RequestContext context2; + private RequestContext context3; + private List fragmentList; + + @BeforeEach + void setUp() { + Configuration configuration = new Configuration(); + context1 = new RequestContext(); + context1.setTransactionId("XID-XYZ-654321"); + context1.setFragmenter("org.greenplum.pxf.api.model.Fragmenter1"); + context1.setSegmentId(0); + context1.setGpCommandCount(10); + context1.setGpSessionId(3); + context1.setTotalSegments(3); + context1.setDataSource("path.A"); + context1.setConfiguration(configuration); + + context2 = new RequestContext(); + context2.setTransactionId("XID-XYZ-654321"); + context2.setFragmenter("org.greenplum.pxf.api.model.Fragmenter2"); + context2.setSegmentId(1); + context2.setGpCommandCount(10); + context2.setGpSessionId(3); + context2.setTotalSegments(3); + context2.setDataSource("path.A"); + context2.setConfiguration(configuration); + + context3 = new RequestContext(); + context3.setTransactionId("XID-XYZ-654321"); + context3.setFragmenter("org.greenplum.pxf.api.model.Fragmenter3"); + context3.setSegmentId(2); + context3.setGpCommandCount(10); + context3.setGpSessionId(3); + context3.setTotalSegments(3); + context3.setDataSource("path.A"); + context3.setConfiguration(configuration); + + fragmentList = Arrays.asList( + new Fragment("tv", new DemoFragmentMetadata()), + new Fragment("console", new DemoFragmentMetadata()), + new Fragment("watch", new DemoFragmentMetadata()), + new Fragment("computer", new DemoFragmentMetadata()), + new Fragment("laptop", new DemoFragmentMetadata()), + new Fragment("smartphone", new DemoFragmentMetadata()), + new Fragment("pipelac", new DemoFragmentMetadata()), + new Fragment("car", new DemoFragmentMetadata()) + ); + + strategy = new RandomFragmentStrategy(); + } + + @Test + void filterFragmentsWithShiftIndex10() { + // We use Random with the same seed for all segments as new Random(shiftedIndex) to get random segment id. + // This random has to produce the same random consequence if shiftedIndex still the same. + // shiftedIndex = context.getGpSessionId % context.getTotalSegments + context.getGpCommandCount(); + context1.setGpCommandCount(10); + context2.setGpCommandCount(10); + context3.setGpCommandCount(10); + IntStream.rangeClosed(0, 5).forEach(i -> { + List fragments1 = strategy.filterFragments(fragmentList, context1); + List fragments2 = strategy.filterFragments(fragmentList, context2); + List fragments3 = strategy.filterFragments(fragmentList, context3); + // segment id = 0 + assertEquals(4, fragments1.size()); + assertEquals("tv", fragments1.get(0).getSourceName()); + assertEquals("console", fragments1.get(1).getSourceName()); + assertEquals("watch", fragments1.get(2).getSourceName()); + assertEquals("computer", fragments1.get(3).getSourceName()); + // segment id = 1 + assertEquals(4, fragments2.size()); + assertEquals("laptop", fragments2.get(0).getSourceName()); + assertEquals("smartphone", fragments2.get(1).getSourceName()); + assertEquals("pipelac", fragments2.get(2).getSourceName()); + assertEquals("car", fragments2.get(3).getSourceName()); + // segment id = 2 + assertEquals(0, fragments3.size()); + }); + } + + @Test + void filterFragmentsWithShiftIndex15() { + // We use Random with the same seed for all segments as new Random(shiftedIndex) to get random segment id. + // This random has to produce the same random consequence if shiftedIndex still the same. + context1.setGpCommandCount(15); + context2.setGpCommandCount(15); + context3.setGpCommandCount(15); + IntStream.rangeClosed(0, 6).forEach(i -> { + List fragments1 = strategy.filterFragments(fragmentList, context1); + List fragments2 = strategy.filterFragments(fragmentList, context2); + List fragments3 = strategy.filterFragments(fragmentList, context3); + // segment id = 0 + assertEquals(2, fragments1.size()); + assertEquals("tv", fragments1.get(0).getSourceName()); + assertEquals("computer", fragments1.get(1).getSourceName()); + // segment id = 1 + assertEquals(3, fragments2.size()); + assertEquals("watch", fragments2.get(0).getSourceName()); + assertEquals("laptop", fragments2.get(1).getSourceName()); + assertEquals("car", fragments2.get(2).getSourceName()); + // segment id = 2 + assertEquals(3, fragments3.size()); + assertEquals("console", fragments3.get(0).getSourceName()); + assertEquals("smartphone", fragments3.get(1).getSourceName()); + assertEquals("pipelac", fragments3.get(2).getSourceName()); + }); + } + + @Test + void getDistributionPolicy() { + assertEquals(FragmentDistributionPolicy.RANDOM, strategy.getDistributionPolicy()); + } +} \ No newline at end of file diff --git a/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/RoundRobinFragmentStrategyTest.java b/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/RoundRobinFragmentStrategyTest.java new file mode 100644 index 0000000000..ed56d96c15 --- /dev/null +++ b/server/pxf-service/src/test/java/org/greenplum/pxf/service/fragment/RoundRobinFragmentStrategyTest.java @@ -0,0 +1,128 @@ +package org.greenplum.pxf.service.fragment; + +import org.apache.hadoop.conf.Configuration; +import org.greenplum.pxf.api.examples.DemoFragmentMetadata; +import org.greenplum.pxf.api.model.Fragment; +import org.greenplum.pxf.api.model.RequestContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class RoundRobinFragmentStrategyTest { + private FragmentStrategy strategy; + private RequestContext context1; + private RequestContext context2; + private List fragmentList; + + @BeforeEach + public void setup() { + Configuration configuration = new Configuration(); + context1 = new RequestContext(); + context1.setTransactionId("XID-XYZ-123456"); + context1.setFragmenter("org.greenplum.pxf.api.model.Fragmenter1"); + context1.setSegmentId(0); + context1.setGpCommandCount(1); + context1.setGpSessionId(1); + context1.setTotalSegments(1); + context1.setDataSource("path.A"); + context1.setConfiguration(configuration); + + context2 = new RequestContext(); + context2.setTransactionId("XID-XYZ-654321"); + context2.setFragmenter("org.greenplum.pxf.api.model.Fragmenter2"); + context2.setSegmentId(0); + context2.setGpCommandCount(1); + context2.setGpSessionId(1); + context2.setTotalSegments(1); + context2.setDataSource("path.A"); + context2.setConfiguration(configuration); + + fragmentList = Arrays.asList( + new Fragment("foo.bar", new DemoFragmentMetadata()), + new Fragment("bar.foo", new DemoFragmentMetadata()), + new Fragment("foobar", new DemoFragmentMetadata()), + new Fragment("barfoo", new DemoFragmentMetadata()) + ); + + strategy = new RoundRobinFragmentStrategy(); + } + + @Test + void filterFragmentsWithShiftIndex2() { + context1.setTotalSegments(2); + context1.setSegmentId(0); + List fragments1 = strategy.filterFragments(fragmentList, context1); + assertEquals(2, fragments1.size()); + assertEquals("foo.bar", fragments1.get(0).getSourceName()); + assertEquals("foobar", fragments1.get(1).getSourceName()); + + context2.setTotalSegments(2); + context2.setSegmentId(1); + List fragments2 = strategy.filterFragments(fragmentList, context2); + assertEquals(2, fragments2.size()); + assertEquals("bar.foo", fragments2.get(0).getSourceName()); + assertEquals("barfoo", fragments2.get(1).getSourceName()); + } + + @Test + void filterFragmentsWithShiftIndex3() { + context1.setTotalSegments(2); + context1.setSegmentId(0); + context1.setGpCommandCount(2); + List fragments1 = strategy.filterFragments(fragmentList, context1); + assertEquals(2, fragments1.size()); + assertEquals("bar.foo", fragments1.get(0).getSourceName()); + assertEquals("barfoo", fragments1.get(1).getSourceName()); + + context2.setTotalSegments(2); + context2.setSegmentId(1); + context2.setGpCommandCount(2); + List fragments2 = strategy.filterFragments(fragmentList, context2); + assertEquals(2, fragments2.size()); + assertEquals("foo.bar", fragments2.get(0).getSourceName()); + assertEquals("foobar", fragments2.get(1).getSourceName()); + } + + // ----- TESTS for performance of list traversal ----- + @Test + public void testListTraversalPerformance() { + + // This test makes sure we iterate properly (using an iterator, not the index-based for loop) over a LinkedList + // that is returned by a fragmenter when building a list of fragment for a segment. + // Tested on MacBookPro, the timings are as follows: + // 10M fragments - from 15 mins to 1.3 secs + // 1M fragments - from 8.2 secs to 1.3 secs + // so we will run the large dataset that would've taken 15 minutes and make sure it computes within 10 seconds + // allowing 8x margin for test slowness when running on slower machines on in the cloud under a heavy workload + + Fragment fragment = new Fragment("mobile", new DemoFragmentMetadata()); + List fragmentList = new LinkedList<>(); + for (int i=0; i<10000000; i++) { + fragmentList.add(fragment); // add the same fragment, save on memory, we only care about testing timings + } + + context1.setTransactionId("XID-XYZ-123456"); + context1.setSegmentId(0); + context1.setTotalSegments(100); + + long start = System.currentTimeMillis(); + List fragments = strategy.filterFragments(fragmentList, context1); + long end = System.currentTimeMillis(); + + assertInstanceOf(ArrayList.class, fragments); + assertEquals(100000, fragments.size()); + assertEquals("mobile", fragments.get(0).getSourceName()); + assertTrue(end-start < 10000L); // should be less than 10 secs (8x margin), not minutes + } + + @Test + void getDistributionPolicyTest() { + assertEquals(FragmentDistributionPolicy.ROUND_ROBIN, strategy.getDistributionPolicy()); + } +} \ No newline at end of file