From 6332191806dcc9a510b6c8e16bd92b5968b52501 Mon Sep 17 00:00:00 2001 From: Rob Rudin Date: Tue, 28 May 2024 15:55:57 -0400 Subject: [PATCH] MLE-14304 Can now specify partitions for reading some files The import commands that use Spark data sources have other mechanisms for determining partitions. The tests simply verify that when `--partitions` is passed in, the value is correctly added to the list of reader options. --- .../api/AggregateXmlFilesImporter.java | 2 ++ .../newtool/api/ArchiveFilesImporter.java | 1 + .../newtool/api/GenericFilesImporter.java | 1 + .../newtool/api/MlcpArchiveFilesImporter.java | 1 + .../newtool/api/RdfFilesImporter.java | 1 + .../importdata/ImportAggregateXmlCommand.java | 10 ++++++++ .../importdata/ImportArchiveFilesCommand.java | 12 +++++++++- .../impl/importdata/ImportFilesCommand.java | 10 ++++++++ .../ImportMlcpArchiveFilesCommand.java | 12 +++++++++- .../importdata/ImportRdfFilesCommand.java | 12 +++++++++- .../ImportAggregateXmlFilesOptionsTest.java | 24 +++++++++++++++++++ .../ImportArchiveFilesOptionsTest.java | 22 +++++++++++++++++ .../importdata/ImportFilesOptionsTest.java | 5 ++++ .../ImportMlcpArchiveFilesOptionsTest.java | 22 +++++++++++++++++ .../importdata/ImportRdfFilesOptionsTest.java | 22 +++++++++++++++++ 15 files changed, 154 insertions(+), 3 deletions(-) create mode 100644 new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportAggregateXmlFilesOptionsTest.java create mode 100644 new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportArchiveFilesOptionsTest.java create mode 100644 new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportMlcpArchiveFilesOptionsTest.java create mode 100644 new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportRdfFilesOptionsTest.java diff --git a/new-tool-cli/src/main/java/com/marklogic/newtool/api/AggregateXmlFilesImporter.java b/new-tool-cli/src/main/java/com/marklogic/newtool/api/AggregateXmlFilesImporter.java index 42121591..bd6c570e 100644 --- a/new-tool-cli/src/main/java/com/marklogic/newtool/api/AggregateXmlFilesImporter.java +++ b/new-tool-cli/src/main/java/com/marklogic/newtool/api/AggregateXmlFilesImporter.java @@ -14,6 +14,8 @@ interface ReadXmlFilesOptions extends ReadFilesOptions { ReadXmlFilesOptions uriNamespace(String uriNamespace); ReadXmlFilesOptions compressionType(CompressionType compressionType); + + ReadXmlFilesOptions partitions(Integer partitions); } AggregateXmlFilesImporter readFiles(Consumer consumer); diff --git a/new-tool-cli/src/main/java/com/marklogic/newtool/api/ArchiveFilesImporter.java b/new-tool-cli/src/main/java/com/marklogic/newtool/api/ArchiveFilesImporter.java index 9e82bab2..ddca08d7 100644 --- a/new-tool-cli/src/main/java/com/marklogic/newtool/api/ArchiveFilesImporter.java +++ b/new-tool-cli/src/main/java/com/marklogic/newtool/api/ArchiveFilesImporter.java @@ -6,6 +6,7 @@ public interface ArchiveFilesImporter extends Executor { interface ReadArchiveFilesOptions extends ReadFilesOptions { ReadArchiveFilesOptions categories(String... categories); + ReadArchiveFilesOptions partitions(Integer partitions); } ArchiveFilesImporter readFiles(Consumer consumer); diff --git a/new-tool-cli/src/main/java/com/marklogic/newtool/api/GenericFilesImporter.java b/new-tool-cli/src/main/java/com/marklogic/newtool/api/GenericFilesImporter.java index 285cd6b1..cc8f4790 100644 --- a/new-tool-cli/src/main/java/com/marklogic/newtool/api/GenericFilesImporter.java +++ b/new-tool-cli/src/main/java/com/marklogic/newtool/api/GenericFilesImporter.java @@ -10,6 +10,7 @@ enum DocumentType { interface ReadGenericFilesOptions extends ReadFilesOptions { ReadGenericFilesOptions compressionType(CompressionType compressionType); + ReadGenericFilesOptions partitions(Integer partitions); } interface WriteGenericDocumentsOptions extends WriteDocumentsOptions { diff --git a/new-tool-cli/src/main/java/com/marklogic/newtool/api/MlcpArchiveFilesImporter.java b/new-tool-cli/src/main/java/com/marklogic/newtool/api/MlcpArchiveFilesImporter.java index 73ff345d..046a9e47 100644 --- a/new-tool-cli/src/main/java/com/marklogic/newtool/api/MlcpArchiveFilesImporter.java +++ b/new-tool-cli/src/main/java/com/marklogic/newtool/api/MlcpArchiveFilesImporter.java @@ -6,6 +6,7 @@ public interface MlcpArchiveFilesImporter extends Executor { ReadMlcpArchiveFilesOptions categories(String... categories); + ReadMlcpArchiveFilesOptions partitions(Integer partitions); } MlcpArchiveFilesImporter readFiles(Consumer consumer); diff --git a/new-tool-cli/src/main/java/com/marklogic/newtool/api/RdfFilesImporter.java b/new-tool-cli/src/main/java/com/marklogic/newtool/api/RdfFilesImporter.java index d20b3f58..1295b4c9 100644 --- a/new-tool-cli/src/main/java/com/marklogic/newtool/api/RdfFilesImporter.java +++ b/new-tool-cli/src/main/java/com/marklogic/newtool/api/RdfFilesImporter.java @@ -6,6 +6,7 @@ public interface RdfFilesImporter extends Executor { interface ReadRdfFilesOptions extends ReadFilesOptions { ReadRdfFilesOptions compressionType(CompressionType compressionType); + ReadRdfFilesOptions partitions(Integer partitions); } interface WriteTriplesDocumentsOptions extends WriteDocumentsOptions { diff --git a/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportAggregateXmlCommand.java b/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportAggregateXmlCommand.java index ecbd002c..0a1a4fe4 100644 --- a/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportAggregateXmlCommand.java +++ b/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportAggregateXmlCommand.java @@ -66,10 +66,14 @@ public static class ReadXmlFilesParams extends ReadFilesParams makeOptions() { return OptionsUtil.addOptions( super.makeOptions(), + Options.READ_NUM_PARTITIONS, partitions != null ? partitions.toString() : null, Options.READ_AGGREGATES_XML_ELEMENT, element, Options.READ_AGGREGATES_XML_NAMESPACE, namespace, Options.READ_AGGREGATES_XML_URI_ELEMENT, uriElement, @@ -107,6 +111,12 @@ public ReadXmlFilesOptions compressionType(CompressionType compressionType) { this.compressionType = compressionType; return this; } + + @Override + public ReadXmlFilesOptions partitions(Integer partitions) { + this.partitions = partitions; + return this; + } } @Override diff --git a/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportArchiveFilesCommand.java b/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportArchiveFilesCommand.java index 9e915f6b..a238cdfa 100644 --- a/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportArchiveFilesCommand.java +++ b/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportArchiveFilesCommand.java @@ -45,11 +45,15 @@ public static class ReadArchiveFilesParams extends ReadFilesParams makeOptions() { return OptionsUtil.addOptions(super.makeOptions(), Options.READ_FILES_TYPE, "archive", - Options.READ_ARCHIVES_CATEGORIES, categories + Options.READ_ARCHIVES_CATEGORIES, categories, + Options.READ_NUM_PARTITIONS, partitions != null ? partitions.toString() : null ); } @@ -58,6 +62,12 @@ public ReadArchiveFilesOptions categories(String... categories) { this.categories = Stream.of(categories).collect(Collectors.joining(",")); return this; } + + @Override + public ReadArchiveFilesOptions partitions(Integer partitions) { + this.partitions = partitions; + return this; + } } @Override diff --git a/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportFilesCommand.java b/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportFilesCommand.java index bd032cc1..0139fc82 100644 --- a/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportFilesCommand.java +++ b/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportFilesCommand.java @@ -64,12 +64,22 @@ public ReadGenericFilesOptions compressionType(CompressionType compressionType) return this; } + @Parameter(names = "--partitions", description = "Specifies the number of partitions used for reading files.") + private Integer partitions; + @Override public Map makeOptions() { return OptionsUtil.addOptions(super.makeOptions(), + Options.READ_NUM_PARTITIONS, partitions != null ? partitions.toString() : null, Options.READ_FILES_COMPRESSION, compressionType != null ? compressionType.name() : null ); } + + @Override + public ReadGenericFilesOptions partitions(Integer partitions) { + this.partitions = partitions; + return this; + } } public static class WriteGenericDocumentsParams extends WriteDocumentParams implements WriteGenericDocumentsOptions { diff --git a/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportMlcpArchiveFilesCommand.java b/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportMlcpArchiveFilesCommand.java index b6d35728..de657732 100644 --- a/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportMlcpArchiveFilesCommand.java +++ b/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportMlcpArchiveFilesCommand.java @@ -44,11 +44,15 @@ public static class ReadMlcpArchiveFilesParams extends ReadFilesParams makeOptions() { return OptionsUtil.addOptions(super.makeOptions(), Options.READ_FILES_TYPE, "mlcp_archive", - Options.READ_ARCHIVES_CATEGORIES, categories + Options.READ_ARCHIVES_CATEGORIES, categories, + Options.READ_NUM_PARTITIONS, partitions != null ? partitions.toString() : null ); } @@ -57,6 +61,12 @@ public ReadMlcpArchiveFilesOptions categories(String... categories) { this.categories = Stream.of(categories).collect(Collectors.joining(",")); return this; } + + @Override + public ReadMlcpArchiveFilesOptions partitions(Integer partitions) { + this.partitions = partitions; + return this; + } } @Override diff --git a/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportRdfFilesCommand.java b/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportRdfFilesCommand.java index 9b812605..216c6906 100644 --- a/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportRdfFilesCommand.java +++ b/new-tool-cli/src/main/java/com/marklogic/newtool/impl/importdata/ImportRdfFilesCommand.java @@ -42,11 +42,15 @@ public static class ReadRdfFilesParams extends ReadFilesParams makeOptions() { return OptionsUtil.addOptions(super.makeOptions(), Options.READ_FILES_TYPE, "rdf", - Options.READ_FILES_COMPRESSION, compressionType != null ? compressionType.name() : null + Options.READ_FILES_COMPRESSION, compressionType != null ? compressionType.name() : null, + Options.READ_NUM_PARTITIONS, partitions != null ? partitions.toString() : null ); } @@ -55,6 +59,12 @@ public ReadRdfFilesOptions compressionType(CompressionType compressionType) { this.compressionType = compressionType; return this; } + + @Override + public ReadRdfFilesOptions partitions(Integer partitions) { + this.partitions = partitions; + return this; + } } public static class WriteTriplesDocumentsParams extends WriteDocumentParams implements WriteTriplesDocumentsOptions { diff --git a/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportAggregateXmlFilesOptionsTest.java b/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportAggregateXmlFilesOptionsTest.java new file mode 100644 index 00000000..0eb120de --- /dev/null +++ b/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportAggregateXmlFilesOptionsTest.java @@ -0,0 +1,24 @@ +package com.marklogic.newtool.impl.importdata; + +import com.marklogic.newtool.impl.AbstractOptionsTest; +import com.marklogic.spark.Options; +import org.junit.jupiter.api.Test; + +class ImportAggregateXmlFilesOptionsTest extends AbstractOptionsTest { + + @Test + void numPartitions() { + ImportAggregateXmlCommand command = (ImportAggregateXmlCommand) getCommand( + "import_aggregate_xml_files", + "--path", "src/test/resources/xml-file", + "--preview", "10", + "--element", "anything", + "--partitions", "3" + ); + + assertOptions(command.getReadParams().makeOptions(), + Options.READ_AGGREGATES_XML_ELEMENT, "anything", + Options.READ_NUM_PARTITIONS, "3" + ); + } +} diff --git a/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportArchiveFilesOptionsTest.java b/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportArchiveFilesOptionsTest.java new file mode 100644 index 00000000..0f293cc9 --- /dev/null +++ b/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportArchiveFilesOptionsTest.java @@ -0,0 +1,22 @@ +package com.marklogic.newtool.impl.importdata; + +import com.marklogic.newtool.impl.AbstractOptionsTest; +import com.marklogic.spark.Options; +import org.junit.jupiter.api.Test; + +class ImportArchiveFilesOptionsTest extends AbstractOptionsTest { + + @Test + void numPartitions() { + ImportArchiveFilesCommand command = (ImportArchiveFilesCommand) getCommand( + "import_archive_files", + "--path", "src/test/resources/archive-files", + "--preview", "10", + "--partitions", "18" + ); + + assertOptions(command.getReadParams().makeOptions(), + Options.READ_NUM_PARTITIONS, "18" + ); + } +} diff --git a/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportFilesOptionsTest.java b/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportFilesOptionsTest.java index ec51de1e..62f46781 100644 --- a/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportFilesOptionsTest.java +++ b/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportFilesOptionsTest.java @@ -15,6 +15,7 @@ void test() { "--username", "someuser", "--password", "someword", "--path", "src/test/resources/mixed-files/hello*", + "--partitions", "6", "--documentType", "XML", "--abortOnWriteFailure", "--batchSize", "50", @@ -39,6 +40,10 @@ void test() { Options.CLIENT_PASSWORD, "someword" ); + assertOptions(command.getReadParams().makeOptions(), + Options.READ_NUM_PARTITIONS, "6" + ); + assertOptions(command.getWriteParams().makeOptions(), Options.WRITE_ABORT_ON_FAILURE, "true", Options.WRITE_ARCHIVE_PATH_FOR_FAILED_DOCUMENTS, "/my/failures", diff --git a/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportMlcpArchiveFilesOptionsTest.java b/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportMlcpArchiveFilesOptionsTest.java new file mode 100644 index 00000000..9378a928 --- /dev/null +++ b/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportMlcpArchiveFilesOptionsTest.java @@ -0,0 +1,22 @@ +package com.marklogic.newtool.impl.importdata; + +import com.marklogic.newtool.impl.AbstractOptionsTest; +import com.marklogic.spark.Options; +import org.junit.jupiter.api.Test; + +class ImportMlcpArchiveFilesOptionsTest extends AbstractOptionsTest { + + @Test + void numPartitions() { + ImportMlcpArchiveFilesCommand command = (ImportMlcpArchiveFilesCommand) getCommand( + "import_mlcp_archive_files", + "--path", "src/test/resources/archive-files", + "--preview", "10", + "--partitions", "7" + ); + + assertOptions(command.getReadParams().makeOptions(), + Options.READ_NUM_PARTITIONS, "7" + ); + } +} diff --git a/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportRdfFilesOptionsTest.java b/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportRdfFilesOptionsTest.java new file mode 100644 index 00000000..d59672f7 --- /dev/null +++ b/new-tool-cli/src/test/java/com/marklogic/newtool/impl/importdata/ImportRdfFilesOptionsTest.java @@ -0,0 +1,22 @@ +package com.marklogic.newtool.impl.importdata; + +import com.marklogic.newtool.impl.AbstractOptionsTest; +import com.marklogic.spark.Options; +import org.junit.jupiter.api.Test; + +class ImportRdfFilesOptionsTest extends AbstractOptionsTest { + + @Test + void numPartitions() { + ImportRdfFilesCommand command = (ImportRdfFilesCommand) getCommand( + "import_rdf_files", + "--path", "src/test/resources/rdf", + "--preview", "10", + "--partitions", "4" + ); + + assertOptions(command.getReadParams().makeOptions(), + Options.READ_NUM_PARTITIONS, "4" + ); + } +}