Skip to content

Commit 7baf1dc

Browse files
Merge pull request #9709 from soerenreichardt/cypher-aggregation-regression-fix
Use adaptive batch size for CypherAggregation progress logging
2 parents 8903c5d + dd87c2d commit 7baf1dc

File tree

6 files changed

+83
-79
lines changed

6 files changed

+83
-79
lines changed

cypher-aggregation/src/main/java/org/neo4j/gds/projection/GraphAggregator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.neo4j.gds.core.loading.construction.NodeLabelTokens;
4040
import org.neo4j.gds.core.loading.construction.PropertyValues;
4141
import org.neo4j.gds.core.utils.ProgressTimer;
42+
import org.neo4j.gds.core.utils.progress.BatchingTaskProgressTracker;
4243
import org.neo4j.gds.core.utils.progress.TaskRegistryFactory;
4344
import org.neo4j.gds.core.utils.progress.TaskStore;
4445
import org.neo4j.gds.core.utils.progress.tasks.TaskProgressTracker;
@@ -204,14 +205,15 @@ private GraphImporter createGraphImporter(
204205
var idMapBuilder = idMapBuilder(config.readConcurrency());
205206

206207
var taskVolume = queryEstimator.estimateRows(query);
207-
var progressTracker = new TaskProgressTracker(
208+
var internalProgressTracker = new TaskProgressTracker(
208209
GraphImporter.graphImporterTask(taskVolume),
209210
log,
210211
config.readConcurrency(),
211212
config.jobId(),
212213
TaskRegistryFactory.local(username, taskStore),
213214
EmptyUserLogRegistryFactory.INSTANCE
214215
);
216+
var progressTracker = BatchingTaskProgressTracker.create(internalProgressTracker, taskVolume, config.readConcurrency());
215217

216218
return new GraphImporter(
217219
config,

progress-tracking/src/main/java/org/neo4j/gds/core/utils/progress/BatchingProgressLogger.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import org.apache.commons.lang3.mutable.MutableLong;
2323
import org.neo4j.gds.core.concurrency.Concurrency;
2424
import org.neo4j.gds.core.utils.progress.tasks.Task;
25+
import org.neo4j.gds.logging.Log;
2526
import org.neo4j.gds.mem.BitUtil;
2627
import org.neo4j.gds.utils.CloseableThreadLocal;
27-
import org.neo4j.gds.logging.Log;
2828

2929
import java.util.Objects;
3030
import java.util.concurrent.atomic.LongAdder;
@@ -49,7 +49,7 @@ private static long calculateBatchSize(Task task, Concurrency concurrency) {
4949
return calculateBatchSize(Math.max(1L, task.getProgress().volume()), concurrency);
5050
}
5151

52-
private static long calculateBatchSize(long taskVolume, Concurrency concurrency) {
52+
static long calculateBatchSize(long taskVolume, Concurrency concurrency) {
5353
// target 100 logs per full run (every 1 percent)
5454
var batchSize = taskVolume / 100;
5555
// split batchSize into thread-local chunks
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.core.utils.progress;
21+
22+
import org.neo4j.gds.core.concurrency.Concurrency;
23+
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
24+
import org.neo4j.gds.core.utils.progress.tasks.ProgressTrackerAdapter;
25+
import org.neo4j.gds.core.utils.progress.tasks.Task;
26+
27+
public final class BatchingTaskProgressTracker {
28+
29+
private BatchingTaskProgressTracker() {}
30+
31+
public static ProgressTracker create(ProgressTracker delegate, long volume, Concurrency concurrency) {
32+
return volume == Task.UNKNOWN_VOLUME
33+
? new WithoutLogging(delegate)
34+
: new WithLogging(delegate, volume, concurrency);
35+
}
36+
37+
static class WithLogging extends ProgressTrackerAdapter {
38+
39+
private final long batchSize;
40+
private long rowCounter;
41+
42+
WithLogging(ProgressTracker delegate, long volume, Concurrency concurrency) {
43+
super(delegate);
44+
this.batchSize = BatchingProgressLogger.calculateBatchSize(volume, concurrency);
45+
this.rowCounter = 0;
46+
}
47+
48+
@Override
49+
public void logProgress() {
50+
if (++rowCounter == batchSize) {
51+
super.logProgress(batchSize);
52+
rowCounter = 0;
53+
}
54+
}
55+
}
56+
57+
static class WithoutLogging extends ProgressTrackerAdapter {
58+
59+
WithoutLogging(ProgressTracker delegate) {
60+
super(delegate);
61+
}
62+
63+
@Override
64+
public void logProgress() {
65+
}
66+
67+
@Override
68+
public void logProgress(long value) {
69+
}
70+
71+
@Override
72+
public void logProgress(long value, String messageTemplate) {
73+
}
74+
}
75+
}

triplet-graph-builder/src/main/java/org/neo4j/gds/projection/BatchingTaskProgressTracker.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

triplet-graph-builder/src/main/java/org/neo4j/gds/projection/GraphImporter.java

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363

6464
public final class GraphImporter {
6565

66-
private static final int PROGRESS_TRACKING_THRESHOLD = 10_000;
6766
public static final int NO_TARGET_NODE = -1;
6867

6968
private final GraphProjectConfig config;
@@ -74,7 +73,7 @@ public final class GraphImporter {
7473
private final WriteMode writeMode;
7574
private final String query;
7675

77-
private final BatchingTaskProgressTracker progressTracker;
76+
private final ProgressTracker progressTracker;
7877

7978
private final Map<RelationshipType, RelationshipsBuilder> relImporters;
8079
private final ImmutableMutableGraphSchema.Builder graphSchemaBuilder;
@@ -95,36 +94,14 @@ public GraphImporter(
9594
WriteMode writeMode,
9695
String query,
9796
ProgressTracker progressTracker
98-
) {
99-
this(
100-
config,
101-
undirectedRelationshipTypes,
102-
inverseIndexedRelationshipTypes,
103-
idMapBuilder,
104-
writeMode,
105-
query,
106-
progressTracker,
107-
PROGRESS_TRACKING_THRESHOLD
108-
);
109-
}
110-
111-
GraphImporter(
112-
GraphProjectConfig config,
113-
List<String> undirectedRelationshipTypes,
114-
List<String> inverseIndexedRelationshipTypes,
115-
LazyIdMapBuilder idMapBuilder,
116-
WriteMode writeMode,
117-
String query,
118-
ProgressTracker progressTracker,
119-
int progressTrackingThreshold
12097
) {
12198
this.config = config;
12299
this.undirectedRelationshipTypes = undirectedRelationshipTypes;
123100
this.inverseIndexedRelationshipTypes = inverseIndexedRelationshipTypes;
124101
this.idMapBuilder = idMapBuilder;
125102
this.writeMode = writeMode;
126103
this.query = query;
127-
this.progressTracker = new BatchingTaskProgressTracker(progressTracker, progressTrackingThreshold);
104+
this.progressTracker = progressTracker;
128105
this.relImporters = new ConcurrentHashMap<>();
129106
this.graphSchemaBuilder = MutableGraphSchema.builder();
130107

@@ -188,7 +165,6 @@ public AggregationResult result(
188165
ProgressTimer timer,
189166
boolean hasSeenArbitraryId
190167
) {
191-
progressTracker.logRemainingProgress();
192168
progressTracker.endSubTask("Update aggregation");
193169
progressTracker.beginSubTask("Build graph store");
194170
progressTracker.beginSubTask("Nodes");

triplet-graph-builder/src/test/java/org/neo4j/gds/projection/GraphImporterTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,7 @@ void shouldRegisterTaskAndLogProgress() {
413413
.build(),
414414
Capabilities.WriteMode.REMOTE,
415415
"",
416-
progressTracker,
417-
1
416+
progressTracker
418417
);
419418

420419
for (int i = 0; i < 2; i++) {

0 commit comments

Comments
 (0)