Skip to content

Commit

Permalink
Merge pull request #120 from marklogic/feature/file-partitions-count
Browse files Browse the repository at this point in the history
MLE-14304 Can now specify partitions for reading some files
  • Loading branch information
rjrudin authored May 29, 2024
2 parents 7eb5739 + 6332191 commit 6460061
Show file tree
Hide file tree
Showing 15 changed files with 154 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ interface ReadXmlFilesOptions extends ReadFilesOptions<ReadXmlFilesOptions> {
ReadXmlFilesOptions uriNamespace(String uriNamespace);

ReadXmlFilesOptions compressionType(CompressionType compressionType);

ReadXmlFilesOptions partitions(Integer partitions);
}

AggregateXmlFilesImporter readFiles(Consumer<ReadXmlFilesOptions> consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public interface ArchiveFilesImporter extends Executor<ArchiveFilesImporter> {

interface ReadArchiveFilesOptions extends ReadFilesOptions<ReadArchiveFilesOptions> {
ReadArchiveFilesOptions categories(String... categories);
ReadArchiveFilesOptions partitions(Integer partitions);
}

ArchiveFilesImporter readFiles(Consumer<ReadArchiveFilesOptions> consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ enum DocumentType {

interface ReadGenericFilesOptions extends ReadFilesOptions<ReadGenericFilesOptions> {
ReadGenericFilesOptions compressionType(CompressionType compressionType);
ReadGenericFilesOptions partitions(Integer partitions);
}

interface WriteGenericDocumentsOptions extends WriteDocumentsOptions<WriteGenericDocumentsOptions> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public interface MlcpArchiveFilesImporter extends Executor<MlcpArchiveFilesImpor

interface ReadMlcpArchiveFilesOptions extends ReadFilesOptions<ReadMlcpArchiveFilesOptions> {
ReadMlcpArchiveFilesOptions categories(String... categories);
ReadMlcpArchiveFilesOptions partitions(Integer partitions);
}

MlcpArchiveFilesImporter readFiles(Consumer<ReadMlcpArchiveFilesOptions> consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public interface RdfFilesImporter extends Executor<RdfFilesImporter> {

interface ReadRdfFilesOptions extends ReadFilesOptions<ReadRdfFilesOptions> {
ReadRdfFilesOptions compressionType(CompressionType compressionType);
ReadRdfFilesOptions partitions(Integer partitions);
}

interface WriteTriplesDocumentsOptions extends WriteDocumentsOptions<WriteTriplesDocumentsOptions> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,14 @@ public static class ReadXmlFilesParams extends ReadFilesParams<ReadXmlFilesOptio
)
private CompressionType compressionType;

@Parameter(names = "--partitions", description = "Specifies the number of partitions used for reading files.")
private Integer partitions;

@Override
public Map<String, String> makeOptions() {
return OptionsUtil.addOptions(
super.makeOptions(),
Options.READ_NUM_PARTITIONS, partitions != null ? partitions.toString() : null,
Options.READ_AGGREGATES_XML_ELEMENT, element,
Options.READ_AGGREGATES_XML_NAMESPACE, namespace,
Options.READ_AGGREGATES_XML_URI_ELEMENT, uriElement,
Expand Down Expand Up @@ -107,6 +111,12 @@ public ReadXmlFilesOptions compressionType(CompressionType compressionType) {
this.compressionType = compressionType;
return this;
}

@Override
public ReadXmlFilesOptions partitions(Integer partitions) {
this.partitions = partitions;
return this;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,15 @@ public static class ReadArchiveFilesParams extends ReadFilesParams<ReadArchiveFi
"Valid choices are: collections, permissions, quality, properties, and metadatavalues.")
private String categories;

@Parameter(names = "--partitions", description = "Specifies the number of partitions used for reading files.")
private Integer partitions;

@Override
public Map<String, String> makeOptions() {
return OptionsUtil.addOptions(super.makeOptions(),
Options.READ_FILES_TYPE, "archive",
Options.READ_ARCHIVES_CATEGORIES, categories
Options.READ_ARCHIVES_CATEGORIES, categories,
Options.READ_NUM_PARTITIONS, partitions != null ? partitions.toString() : null
);
}

Expand All @@ -58,6 +62,12 @@ public ReadArchiveFilesOptions categories(String... categories) {
this.categories = Stream.of(categories).collect(Collectors.joining(","));
return this;
}

@Override
public ReadArchiveFilesOptions partitions(Integer partitions) {
this.partitions = partitions;
return this;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,22 @@ public ReadGenericFilesOptions compressionType(CompressionType compressionType)
return this;
}

@Parameter(names = "--partitions", description = "Specifies the number of partitions used for reading files.")
private Integer partitions;

@Override
public Map<String, String> makeOptions() {
return OptionsUtil.addOptions(super.makeOptions(),
Options.READ_NUM_PARTITIONS, partitions != null ? partitions.toString() : null,
Options.READ_FILES_COMPRESSION, compressionType != null ? compressionType.name() : null
);
}

@Override
public ReadGenericFilesOptions partitions(Integer partitions) {
this.partitions = partitions;
return this;
}
}

public static class WriteGenericDocumentsParams extends WriteDocumentParams<WriteGenericDocumentsOptions> implements WriteGenericDocumentsOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,15 @@ public static class ReadMlcpArchiveFilesParams extends ReadFilesParams<ReadMlcpA
"Valid choices are: collections, permissions, quality, properties, and metadatavalues.")
private String categories;

@Parameter(names = "--partitions", description = "Specifies the number of partitions used for reading files.")
private Integer partitions;

@Override
public Map<String, String> makeOptions() {
return OptionsUtil.addOptions(super.makeOptions(),
Options.READ_FILES_TYPE, "mlcp_archive",
Options.READ_ARCHIVES_CATEGORIES, categories
Options.READ_ARCHIVES_CATEGORIES, categories,
Options.READ_NUM_PARTITIONS, partitions != null ? partitions.toString() : null
);
}

Expand All @@ -57,6 +61,12 @@ public ReadMlcpArchiveFilesOptions categories(String... categories) {
this.categories = Stream.of(categories).collect(Collectors.joining(","));
return this;
}

@Override
public ReadMlcpArchiveFilesOptions partitions(Integer partitions) {
this.partitions = partitions;
return this;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@ public static class ReadRdfFilesParams extends ReadFilesParams<ReadRdfFilesOptio
@Parameter(names = "--compression", description = "When importing compressed files, specify the type of compression used.")
private CompressionType compressionType;

@Parameter(names = "--partitions", description = "Specifies the number of partitions used for reading files.")
private Integer partitions;

@Override
public Map<String, String> makeOptions() {
return OptionsUtil.addOptions(super.makeOptions(),
Options.READ_FILES_TYPE, "rdf",
Options.READ_FILES_COMPRESSION, compressionType != null ? compressionType.name() : null
Options.READ_FILES_COMPRESSION, compressionType != null ? compressionType.name() : null,
Options.READ_NUM_PARTITIONS, partitions != null ? partitions.toString() : null
);
}

Expand All @@ -55,6 +59,12 @@ public ReadRdfFilesOptions compressionType(CompressionType compressionType) {
this.compressionType = compressionType;
return this;
}

@Override
public ReadRdfFilesOptions partitions(Integer partitions) {
this.partitions = partitions;
return this;
}
}

public static class WriteTriplesDocumentsParams extends WriteDocumentParams<WriteTriplesDocumentsOptions> implements WriteTriplesDocumentsOptions {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.marklogic.newtool.impl.importdata;

import com.marklogic.newtool.impl.AbstractOptionsTest;
import com.marklogic.spark.Options;
import org.junit.jupiter.api.Test;

class ImportAggregateXmlFilesOptionsTest extends AbstractOptionsTest {

@Test
void numPartitions() {
ImportAggregateXmlCommand command = (ImportAggregateXmlCommand) getCommand(
"import_aggregate_xml_files",
"--path", "src/test/resources/xml-file",
"--preview", "10",
"--element", "anything",
"--partitions", "3"
);

assertOptions(command.getReadParams().makeOptions(),
Options.READ_AGGREGATES_XML_ELEMENT, "anything",
Options.READ_NUM_PARTITIONS, "3"
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.marklogic.newtool.impl.importdata;

import com.marklogic.newtool.impl.AbstractOptionsTest;
import com.marklogic.spark.Options;
import org.junit.jupiter.api.Test;

class ImportArchiveFilesOptionsTest extends AbstractOptionsTest {

@Test
void numPartitions() {
ImportArchiveFilesCommand command = (ImportArchiveFilesCommand) getCommand(
"import_archive_files",
"--path", "src/test/resources/archive-files",
"--preview", "10",
"--partitions", "18"
);

assertOptions(command.getReadParams().makeOptions(),
Options.READ_NUM_PARTITIONS, "18"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ void test() {
"--username", "someuser",
"--password", "someword",
"--path", "src/test/resources/mixed-files/hello*",
"--partitions", "6",
"--documentType", "XML",
"--abortOnWriteFailure",
"--batchSize", "50",
Expand All @@ -39,6 +40,10 @@ void test() {
Options.CLIENT_PASSWORD, "someword"
);

assertOptions(command.getReadParams().makeOptions(),
Options.READ_NUM_PARTITIONS, "6"
);

assertOptions(command.getWriteParams().makeOptions(),
Options.WRITE_ABORT_ON_FAILURE, "true",
Options.WRITE_ARCHIVE_PATH_FOR_FAILED_DOCUMENTS, "/my/failures",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.marklogic.newtool.impl.importdata;

import com.marklogic.newtool.impl.AbstractOptionsTest;
import com.marklogic.spark.Options;
import org.junit.jupiter.api.Test;

class ImportMlcpArchiveFilesOptionsTest extends AbstractOptionsTest {

@Test
void numPartitions() {
ImportMlcpArchiveFilesCommand command = (ImportMlcpArchiveFilesCommand) getCommand(
"import_mlcp_archive_files",
"--path", "src/test/resources/archive-files",
"--preview", "10",
"--partitions", "7"
);

assertOptions(command.getReadParams().makeOptions(),
Options.READ_NUM_PARTITIONS, "7"
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.marklogic.newtool.impl.importdata;

import com.marklogic.newtool.impl.AbstractOptionsTest;
import com.marklogic.spark.Options;
import org.junit.jupiter.api.Test;

class ImportRdfFilesOptionsTest extends AbstractOptionsTest {

@Test
void numPartitions() {
ImportRdfFilesCommand command = (ImportRdfFilesCommand) getCommand(
"import_rdf_files",
"--path", "src/test/resources/rdf",
"--preview", "10",
"--partitions", "4"
);

assertOptions(command.getReadParams().makeOptions(),
Options.READ_NUM_PARTITIONS, "4"
);
}
}

0 comments on commit 6460061

Please sign in to comment.