Skip to content

Commit 37c51aa

Browse files
authored
Merge pull request #200 from marklogic/feature/total-thread-count
Supporting --thread-count and --thread-count-per-partition now
2 parents caf9157 + f729dea commit 37c51aa

File tree

10 files changed

+36
-35
lines changed

10 files changed

+36
-35
lines changed

docs/import/import-files/aggregate-xml.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ of the `person` element in the document:
4646
./bin/flux import-aggregate-xml-files \
4747
--path /data/people.xml \
4848
--connection-string user:password@localhost:8000 \
49-
--element employee --namespace org:example
49+
--element person \
50+
--namespace org:example
5051
```
5152

5253
## Controlling document URIs
@@ -60,8 +61,8 @@ based on the value of each `id` element in the `org:example` namespace:
6061
./bin/flux import-aggregate-xml-files \
6162
--path /data/people.xml \
6263
--connection-string user:password@localhost:8000 \
63-
--element employee --namespace org:example \
64-
--uri-element id --namespace org:example
64+
--element person --namespace org:example \
65+
--uri-element id --uri-namespace org:example
6566
```
6667

6768
You may still wish to use options like `--uri-prefix` and `--uri-suffix` to make the URI more self-describing. For

docs/import/tuning-performance.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,9 @@ Batch size is configured via the `--batch-size` option, which defaults to a valu
1313
your documents, you may find improved performance by raising this value significantly for smaller documents, such as 500
1414
or even 1000.
1515

16-
For the number of threads used to send requests to MarkLogic, two factors come into play. The product of the
17-
number of partitions and the value of the `--thread-count` option determines how many total threads will be used to send
18-
requests. For example, if the import command uses 4 partitions to read data and `--thread-count` is set to 4 (its
19-
default value), 16 total threads will send requests to MarkLogic.
16+
For the number of threads used to send requests to MarkLogic, two factors come into play.
17+
The value of the `--thread-count` option determines how many total threads will be used across all partitions to send
18+
requests. You can alternatively configure a number of threads per partition using `--thread-count-per-partition`.
2019

2120
The number of partitions is determined by how data is read and differs across the various import commands.
2221
Flux will log the number of partitions for each import command as shown below:
@@ -35,7 +34,7 @@ by placing a load balancer in front of MarkLogic or by configuring direct connec
3534

3635
The rule of thumb can thus be expressed as:
3736

38-
Number of partitions * Value of --thread-count <= Number of hosts * number of app server threads
37+
Value of --thread-count <= Number of hosts * number of app server threads
3938

4039
### Direct connections to each host
4140

flux-cli/src/main/java/com/marklogic/flux/api/WriteDocumentsOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public interface WriteDocumentsOptions<T extends WriteDocumentsOptions> {
2323

2424
T threadCount(int threadCount);
2525

26-
T totalThreadCount(int totalThreadCount);
26+
T threadCountPerPartition(int threadCountPerPartition);
2727

2828
T transform(String transform);
2929

flux-cli/src/main/java/com/marklogic/flux/impl/copy/CopyCommand.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,16 @@ public static class CopyWriteDocumentsParams implements WriteDocumentsOptions<Co
9898

9999
@CommandLine.Option(
100100
names = "--output-thread-count",
101-
description = "The number of threads used by each partition worker when writing batches of documents to MarkLogic."
101+
description = "The total number of threads used across all partitions when writing batches of documents to MarkLogic."
102102
)
103103
private int threadCount = 4;
104104

105105
@CommandLine.Option(
106-
names = "--output-total-thread-count",
107-
description = "The total number of threads used across all partitions when writing batches of documents to MarkLogic."
106+
names = "--output-thread-count-per-partition",
107+
description = "The number of threads used by each partition worker when writing batches of documents to MarkLogic. " +
108+
"Takes precedence over the '--output-thread-count' option."
108109
)
109-
private int totalThreadCount;
110+
private int threadCountPerPartition;
110111

111112
@CommandLine.Option(
112113
names = "--output-transform",
@@ -163,7 +164,7 @@ protected Map<String, String> makeOptions() {
163164
Options.WRITE_PERMISSIONS, permissions,
164165
Options.WRITE_TEMPORAL_COLLECTION, temporalCollection,
165166
Options.WRITE_THREAD_COUNT, OptionsUtil.intOption(threadCount),
166-
Options.WRITE_TOTAL_THREAD_COUNT, OptionsUtil.intOption(totalThreadCount),
167+
Options.WRITE_THREAD_COUNT_PER_PARTITION, OptionsUtil.intOption(threadCountPerPartition),
167168
Options.WRITE_TRANSFORM_NAME, transform,
168169
Options.WRITE_TRANSFORM_PARAMS, transformParams,
169170
Options.WRITE_TRANSFORM_PARAMS_DELIMITER, transformParamsDelimiter,
@@ -229,8 +230,8 @@ public CopyWriteDocumentsParams threadCount(int threadCount) {
229230
}
230231

231232
@Override
232-
public CopyWriteDocumentsParams totalThreadCount(int totalThreadCount) {
233-
this.totalThreadCount = totalThreadCount;
233+
public CopyWriteDocumentsParams threadCountPerPartition(int threadCountPerPartition) {
234+
this.threadCountPerPartition = threadCountPerPartition;
234235
return this;
235236
}
236237

flux-cli/src/main/java/com/marklogic/flux/impl/importdata/WriteDocumentParams.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,16 @@ public class WriteDocumentParams<T extends WriteDocumentsOptions> implements Wri
6464

6565
@CommandLine.Option(
6666
names = "--thread-count",
67-
description = "The number of threads used by each partition worker when writing batches of documents to MarkLogic."
67+
description = "The total number of threads used across all partitions when writing batches of documents to MarkLogic."
6868
)
6969
private int threadCount = 4;
7070

7171
@CommandLine.Option(
72-
names = "--total-thread-count",
73-
description = "The total number of threads used across all partitions when writing batches of documents to MarkLogic. " +
72+
names = "--thread-count-per-partition",
73+
description = "The number of threads used by each partition worker when writing batches of documents to MarkLogic. " +
7474
"Takes precedence over the '--thread-count' option."
7575
)
76-
private int totalThreadCount;
76+
private int threadCountPerPartition;
7777

7878
@CommandLine.Option(
7979
names = "--transform",
@@ -130,7 +130,7 @@ public Map<String, String> makeOptions() {
130130
Options.WRITE_PERMISSIONS, permissions,
131131
Options.WRITE_TEMPORAL_COLLECTION, temporalCollection,
132132
Options.WRITE_THREAD_COUNT, OptionsUtil.intOption(threadCount),
133-
Options.WRITE_TOTAL_THREAD_COUNT, OptionsUtil.intOption(totalThreadCount),
133+
Options.WRITE_THREAD_COUNT_PER_PARTITION, OptionsUtil.intOption(threadCountPerPartition),
134134
Options.WRITE_TRANSFORM_NAME, transform,
135135
Options.WRITE_TRANSFORM_PARAMS, transformParams,
136136
Options.WRITE_TRANSFORM_PARAMS_DELIMITER, transformParamsDelimiter,
@@ -201,8 +201,8 @@ public T threadCount(int threadCount) {
201201
}
202202

203203
@Override
204-
public T totalThreadCount(int totalThreadCount) {
205-
this.totalThreadCount = totalThreadCount;
204+
public T threadCountPerPartition(int threadCountPerPartition) {
205+
this.threadCountPerPartition = threadCountPerPartition;
206206
return (T) this;
207207
}
208208

flux-cli/src/main/resources/marklogic-spark-messages_en.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ spark.marklogic.write.graph=--graph
1414
spark.marklogic.write.graphOverride=--graph-override
1515
spark.marklogic.write.jsonRootName=--json-root-name
1616
spark.marklogic.write.threadCount=--thread-count
17-
spark.marklogic.write.totalThreadCount=--total-thread-count
17+
spark.marklogic.write.threadCountPerPartition=--thread-count-per-partition
1818
spark.marklogic.write.transformParams=--transform-params
1919
spark.marklogic.write.uriTemplate=--uri-template
2020
spark.marklogic.write.xmlRootName=--xml-root-name

flux-cli/src/test/java/com/marklogic/flux/impl/ErrorMessagesTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ void verifyEachKeyIsOverridden() {
3030
assertEquals("--graph-override", bundle.getString(Options.WRITE_GRAPH_OVERRIDE));
3131
assertEquals("--json-root-name", bundle.getString(Options.WRITE_JSON_ROOT_NAME));
3232
assertEquals("--thread-count", bundle.getString(Options.WRITE_THREAD_COUNT));
33-
assertEquals("--total-thread-count", bundle.getString(Options.WRITE_TOTAL_THREAD_COUNT));
33+
assertEquals("--thread-count-per-partition", bundle.getString(Options.WRITE_THREAD_COUNT_PER_PARTITION));
3434
assertEquals("--transform-params", bundle.getString(Options.WRITE_TRANSFORM_PARAMS));
3535
assertEquals("--uri-template", bundle.getString(Options.WRITE_URI_TEMPLATE));
3636
assertEquals("--xml-root-name", bundle.getString(Options.WRITE_XML_ROOT_NAME));

flux-cli/src/test/java/com/marklogic/flux/impl/importdata/ImportJdbcTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ void allCustomers() {
8282
"--collections", "customer",
8383
"--repartition", "2",
8484
// Just verifying that these work without causing any errors.
85-
"--total-thread-count", "16",
85+
"--thread-count-per-partition", "8",
8686
"--batch-size", "10"
8787
);
8888

flux-cli/src/test/java/com/marklogic/flux/impl/importdata/ImportParquetFilesTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ void defaultSettingsSingleFile() {
2121
"--connection-string", makeConnectionString(),
2222
"--permissions", DEFAULT_PERMISSIONS,
2323
"--collections", "parquet-test",
24-
"--total-thread-count", "1",
24+
"--thread-count", "1",
2525
"--uri-template", "/parquet/{model}.json",
2626

2727
// Including these for manual verification of progress logging.

mlcp-testing/build.gradle

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ task mlcpImportRdf(type: Exec) {
8181
"-output_permissions", "rest-reader,read,rest-writer,update"
8282
}
8383

84-
task ntImportRdf(type: Exec) {
84+
task fluxImportRdf(type: Exec) {
8585
description = "Intended to run against a local Caddy load balancer."
86-
workingDir = "../nt/"
86+
workingDir = "../flux/"
8787
commandLine "./bin/flux", "import-rdf-files",
8888
"--connection-string", "${mlUsername}:${mlPassword}@${lbHost}:${lbPort}",
8989
"--path", rdfFile,
@@ -93,9 +93,9 @@ task ntImportRdf(type: Exec) {
9393
"--batch-size", "100"
9494
}
9595

96-
task ntDirectImportRdf(type: Exec) {
96+
task fluxDirectImportRdf(type: Exec) {
9797
description = "For testing without a load balancer."
98-
workingDir = "../nt/"
98+
workingDir = "../flux/"
9999
commandLine "./bin/flux", "import-rdf-files",
100100
"--connection-string", "${mlUsername}:${mlPassword}@${mlHost}:${mlRestPort}",
101101
"--path", rdfFile,
@@ -105,8 +105,8 @@ task ntDirectImportRdf(type: Exec) {
105105
"--batch-size", "100"
106106
}
107107

108-
task ntCopy(type: Exec) {
109-
workingDir = "../nt/"
108+
task fluxCopy(type: Exec) {
109+
workingDir = "../flux/"
110110
commandLine "./bin/flux", "copy",
111111
"--connection-string", "${mlUsername}:${mlPassword}@${mlHost}:${mlRestPort}",
112112
"--collections", "address_small",
@@ -154,11 +154,11 @@ task mlcpExportArchive(type: Exec) {
154154
"-collection_filter", "author"
155155
}
156156

157-
task ntTwoWaySSL(type: Exec) {
157+
task fluxTwoWaySSL(type: Exec) {
158158
description = "For manual testing with the Java Client's java-unittest app server when its TwoWaySSLTest has been " +
159159
"paused in a debugger. You can copy the keyStore/trustStore files generated by that test to this directory and " +
160160
"then run this task to ensure that two-way SSL works."
161-
workingDir = "../nt/"
161+
workingDir = "../flux/"
162162
commandLine "./bin/flux", "copy",
163163
"--connection-string", "rest-writer:x@localhost:8012",
164164
"--collections", "zipcode",

0 commit comments

Comments
 (0)