Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance with TableSaw #75

Merged
merged 7 commits into from
May 1, 2024
Merged

Performance with TableSaw #75

merged 7 commits into from
May 1, 2024

Conversation

marecabo
Copy link
Collaborator

Hey there :) Steffen pointed out some code to me and I looked into it performance-wise.

Accessing a Tablesaw column by String name is rather expensive, we learned. This is because they do a string comparison equalsIgnoreCase to identify the desired column.

The critical path is:

@steffenaxer In this case, I moved getting the columns out of the loop, it should help a bit for this part.

I also changed some type definitions from implementation (HashMap) to interface (Map) and reduced queries for map entries a bit in MoneyLog, which should also help performance slightly.

@steffenaxer
Copy link
Contributor

Why can't we merge this PR ❓

@steffenaxer steffenaxer requested review from mfitz and KasiaKoz April 24, 2024 18:35
@mfitz
Copy link
Contributor

mfitz commented Apr 24, 2024

Why can't we merge this PR ❓

@steffenaxer I ran a version of Gelato from this branch against our Paris East model soon after the PR was created, and it failed. I will document the exact failure here when I try again.

I then ran this branch against a different, larger existing model, and it failed there, too, albeit for a different reason.

I am very keen to merge this branch, but before we do that, I need to understand fully why it failed with these two models and what can be done to fix those errors.

But don't worry, I haven't forgotten about it 👍

@mfitz
Copy link
Contributor

mfitz commented Apr 29, 2024

The Error

When I run this version of Gelato against my (admittedly old) Paris East outputs:

2024-04-29T12:55:34,432  INFO TablesawKpiCalculator:907 Adding costs to legs table. Legs table has 2673030 rows, personModeScores table has 3751146 rows, moneyLog has 0 entries
2024-04-29T12:55:50,746  INFO MemoryObserver:42 used RAM: 7028 MB  free: 3931 MB  total: 10960 MB
2024-04-29T12:55:55,832  INFO TablesawKpiCalculator:933 Adding contribution from person money events
2024-04-29T12:55:56,518  INFO TablesawKpiCalculator:947 Iterating over the money log
 java.util.ConcurrentModificationException: java.util.ConcurrentModificationException
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
	at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:562)
	at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:591)
	at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:689)
	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
	at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
	at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:765)
	at com.arup.cml.abm.kpi.tablesaw.TablesawKpiCalculator.addCostToLegs(TablesawKpiCalculator.java:953)
	at com.arup.cml.abm.kpi.tablesaw.TablesawKpiCalculator.readLegs(TablesawKpiCalculator.java:119)
	at com.arup.cml.abm.kpi.tablesaw.TablesawKpiCalculator.<init>(TablesawKpiCalculator.java:75)
	at com.arup.cml.abm.kpi.matsim.run.MatsimKpiGenerator.run(MatsimKpiGenerator.java:90)
	at picocli.CommandLine.executeUserObject(CommandLine.java:2026)
	at picocli.CommandLine.access$1500(CommandLine.java:148)
	at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2461)
	at picocli.CommandLine$RunLast.handle(CommandLine.java:2453)
	at picocli.CommandLine$RunLast.handle(CommandLine.java:2415)
	at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2273)
	at picocli.CommandLine$RunLast.execute(CommandLine.java:2417)
	at picocli.CommandLine.execute(CommandLine.java:2170)
	at com.arup.cml.abm.kpi.matsim.run.MatsimKpiGenerator.main(MatsimKpiGenerator.java:44)
Caused by: java.util.ConcurrentModificationException
	at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1221)
	at com.arup.cml.abm.kpi.data.MoneyLog.getPersonLog(MoneyLog.java:29)
	at com.arup.cml.abm.kpi.data.MoneyLog.getMoneyLogData(MoneyLog.java:19)
	at com.arup.cml.abm.kpi.tablesaw.TablesawKpiCalculator.lambda$addCostToLegs$9(TablesawKpiCalculator.java:958)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
	at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

Explanation of the Error

Under the covers, parallel streaming over the legs table via legs.stream().parallel().forEach(row -> { launches multiple threads, each of which is accessing the MoneyLog. The MoneyLog now contains this method, which is called from inside the legs iteration loop:

public double getMoneyLogData(String personID, double departureTime, double arrivalTime) {
    return getPersonLog(personID).entrySet().stream()
            .filter(e -> e.getKey() > departureTime && e.getKey() <= arrivalTime).mapToDouble(Entry::getValue).sum();
}

The getPersonLog method called from getMoneyLogData looks like this:

private Map<Double, Double> getPersonLog(String personID) {
    return moneyLogData.computeIfAbsent(personID, k -> new HashMap<>());
}

So, we have multiple threads calling getMoneyLogData concurrently. getMoneyLogData now streams (iterates) over the entries of the underlying hash map in order to filter them, but also calls the getPersonLog method, which potentially modifies the same hash map. Given enough leg rows, at some point, threads will intertwine such that the hashmap is being iterated over in one thread whilst an attempt is made to modify the hashmap in a different thread. This is not thread-safe, so HashMap detects and forbids it, hence the java.util.ConcurrentModificationException.

Fixing the Error

When I change this line:

legs.stream().parallel().forEach(row -> {

To move the streaming from multiple threads running in parallel to streaming in serial in a single thread:

legs.stream().forEach(row -> {

A couple of things happen:

1 - The concurrent modification error disappears
2 - The MatsimKpiGeneratorIntegrationTest.testAppWithDrt test fails when asserting the value of one of the KPIs

Expected :3042939934L
Actual   :2105941526L

This is quite surprising. The answer should be the same whether we're iterating over the legs in the old way, the new way in parallel, or the new way in series. I haven't investigated this at all.
3 - (Possibly) The performance probably slows from the parallel version (but is still faster than the version without the other changes in this branch). I don't have timings for all three scenarios because I cannot avoid the concurrent modification exception for the parallel streaming version with the model I'm using, but for main, Gelato processed the model outputs in 25m 21s, and for this branch but with serial streaming over the legs table, the same model took 21m 26s. A decent improvement, but likely not as a dramatic as we would see if we were streaming over the legs table in parallel.

Questions

  • Why is there no concurrent modification error from your models @marecabo and @steffenaxer? Do your models have relatively small numbers of legs? I was under the impression they are large models, so that seems unlikely. Could it be a difference in thread behaviour between Windows and macOS? Or do your models all have person money events? In the model I'm using, there are no person money events, which probably increases the number of writes to the underlying hashmap (via return moneyLogData.computeIfAbsent(personID, k -> new HashMap<>());), making the likelihood of concurrent modification much higher.
  • What kind of performance boost did you find from this branch with your own models?
  • Why does the DRT integration test produce different KPI values when I stream over the legs table in series rather than parallel? Does the same thing happen on your machines @marecabo @steffenaxer?

Possible quick fix

Keeping the parallel streaming, but changing MoneyLog's underlying HashMap for a ConcurrentHashMap, or maybe a synchronized map via Collections.synchronizedMap might fix the concurrency issues without resorting to serial streaming. However, using Collections.synchronizedMap in particular might introduce so much locking overhead that all of the benefits of parallelism disappear and we are effectively running in serial.

Copy link
Contributor

@mfitz mfitz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the general idea, but we cannot merge this until we've addressed the points in this comment.

It would also be good to see some before-and-after performance metrics for various test models.

@marecabo
Copy link
Collaborator Author

marecabo commented Apr 30, 2024

@mfitz Thank you for your investigation and great comment. I think, I we did two things at once here, which should be in two PRs:

  1. Optimizing some Tablesaw calls
  2. Parallelizing the operations.

Both itself should reproduce the same result on their own.

I would suggest to remove the parallel() first and investigate, why the results differ. Then we can consider parallelizing it in a later step. Could you share your output folder, where this different results occur?

@marecabo marecabo marked this pull request as draft April 30, 2024 11:42
@marecabo
Copy link
Collaborator Author

@steffenaxer I reverted your parallel() commit
@mfitz Could you check again, whether you get the expected results?

@mfitz
Copy link
Contributor

mfitz commented Apr 30, 2024

Could you share your output folder, where this different results occur?

Are you asking about the different results from the DRT integration test? You can see that by changing legs.stream().parallel().forEach(row -> { to legs.stream().forEach(row -> { and running MatsimKpiGeneratorIntegrationTest. That fails consistently on my machine. If you set a breakpoint in testAppWithDrt just before the assertions, you will be able to inspect the KPI files in the local temp directory created by the test.

I didn't actually compare the results of my Paris East model using main versus this-branch-but-without-parallel-streaming to check for different KPI values (I only compared running times), but I will do that at some point. I think there's a reasonable chance that some of the KPI values differ. Do you want the model data for that? It's 61 million events, around 1.5 gigs for all of the input files, and everything except the MATSim config file is compressed.

@marecabo marecabo force-pushed the potential-performance branch from bb1263a to 377ef8c Compare April 30, 2024 11:58
@marecabo
Copy link
Collaborator Author

You can see that by changing legs.stream().parallel().forEach(row -> { to legs.stream().forEach(row -> { and running MatsimKpiGeneratorIntegrationTest.

Perfect, then we can use that test to check it. I'll update this branch with the latest master before, though.

@marecabo marecabo marked this pull request as ready for review April 30, 2024 12:37
@marecabo
Copy link
Collaborator Author

@mfitz I merged the latest main and all my tests work locally. However, I cannot rerun the checks. Can you eventually?

@mfitz
Copy link
Contributor

mfitz commented Apr 30, 2024

@mfitz I merged the latest main and all my tests work locally. However, I cannot rerun the checks. Can you eventually?

We have just hit a monthly Arup-wide limit of paid GitHub Actions minutes. It will reset tomorrow, so I doubt our admins will add more credits today, but it means we can't run any CI builds in GitHub until tomorrow 😞

I'll grab the latest changes and do some local investigation, though.

Copy link
Contributor

@mfitz mfitz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this most recent set of changes, Gelato took 22m 58s on my Paris East model. This represents a performance improvement, albeit a fairly small one, so I'm going to approve this for merging.

Note that most of the changes in the diffs are formatting changes - we should try to avoid that in future, perhaps by including formatting as a commit hook.

@mfitz mfitz merged commit b96ed09 into main May 1, 2024
1 check passed
@mfitz mfitz deleted the potential-performance branch May 1, 2024 12:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants