It seems that there is an issue with how Flink handles broadcast DataSets.
Problem
Let's assume we have a Flink cluster with N = 20 nodes and T = 2 tasks per node, hence DOP = 20 * 2 = 40. If we now have a job that reads inputSize = 5mb of data into a single dataset and consecutively broadcast this dataset to the mappers (with max DOP), the data gets broadcasted to every mapper in isolation which means broadcastSize = DOP * inputSize = 40 * 5mb = 200mb need to transferred over the network.
In our case it becomes obvious when running the LinRegDS.dml script on flink_hybrid. The second flink job involves MapmmFLInstruction which broadcasts the smaller matrix to all the mappers. For DOP of 250 this results in about 10GB of broadcasted data.
Solution
Since all tasks per node run in the same JVM it would be better to simply broadcast to the taskmanagers only, which then pass a simple reference to the single task they are responsible for. So for the example this reduces the size the broadcast to broadcastSize = N * inputSize = 20 * 5mb = 100mb.
For the LinReg.dml use-case this fix will reduce the size of the broadcast by 16. Hence it will only need to broadcast 10GB / 16 = 0.625GB of data.
Workaround
For now this could be fixed if the dop is set really low for jobs that include a broadcast.
Follow Up
I will investigate a little bit more to see if this is a known issue for flink and if there are already ways to work around the problem, even maybe opening a PR with Flink.
It seems that there is an issue with how Flink handles broadcast DataSets.
Problem
Let's assume we have a Flink cluster with
N = 20nodes andT = 2tasks per node, henceDOP = 20 * 2 = 40. If we now have a job that readsinputSize = 5mbof data into a single dataset and consecutively broadcast this dataset to the mappers (with maxDOP), the data gets broadcasted to every mapper in isolation which meansbroadcastSize = DOP * inputSize = 40 * 5mb = 200mbneed to transferred over the network.In our case it becomes obvious when running the
LinRegDS.dmlscript onflink_hybrid. The second flink job involvesMapmmFLInstructionwhich broadcasts the smaller matrix to all the mappers. For DOP of 250 this results in about 10GB of broadcasted data.Solution
Since all tasks per node run in the same JVM it would be better to simply broadcast to the taskmanagers only, which then pass a simple reference to the single task they are responsible for. So for the example this reduces the size the broadcast to
broadcastSize = N * inputSize = 20 * 5mb = 100mb.For the
LinReg.dmluse-case this fix will reduce the size of the broadcast by 16. Hence it will only need to broadcast10GB / 16 = 0.625GBof data.Workaround
For now this could be fixed if the dop is set really low for jobs that include a broadcast.
Follow Up
I will investigate a little bit more to see if this is a known issue for flink and if there are already ways to work around the problem, even maybe opening a PR with Flink.