Asynchronous Processing #129
Replies: 3 comments
-
One of the possible ways to setup a reliable communication between processes and servers could be sockets. It could be considered as a suggested dependency only when async processing is required which would make whole async concept optional. Single Server ImplementationHere It would be ideal to limit the complexity of launching this implementation to absolute minimum, so dev would only decide how many processes he wants to spawn and let the ETL to do the rest. With this approach Worker processes would die after processing is finished making space for new workers. Pool size and capacity would be calculated by number of working processes against maximum number of workers. Multiple Servers ImplementationIn this case it might be a bit more tricky, since we are not dealing with one server but a collection of servers. First thing that came to my mind is a concept of cluster, where multiple nodes can be registered as So in this case,
Orchestration would look like this:
Common PartsIn both cases |
Beta Was this translation helpful? Give feedback.
-
Proof of Concept is available as flow-php/etl-async |
Beta Was this translation helpful? Give feedback.
-
Done! In order to make this happen, pretty much the entire internal structure of the library was redesigned. New features were added, like for example:
but finally, flow-php provides asynchronous processing abstraction out of the box that is provided by two adapters :
🎉 |
Beta Was this translation helpful? Give feedback.
-
Currently, the biggest limitation for this ETL is absolutely no support for parallel async processing.
So now even if the process would be launched at 12 cores machine with massive amount of RAM, processing of a 10GB CSV file would be not much faster than on a smaller machine due to a single process and one thread.
So this discussion is here to investigate if making async parallel processing is even possible.
Below few things that we need to consider/implement/decide even before we start implementing any async processing.
Things to think about:
Serialization
flow-php is loading a batch of Rows at once in Loader and later those
Rows
are passed through all registered Transformers and Loaders which are added to the pipeline under common interface Pipeline/Element.In order to even think about async processing we would need to be able to synchronize both:
So instead of processing row in the
main
process, ETL could launch a subprocess and pass to it serializeRows
andPipeline/Elements
With Rows serialization seems to be pretty straightforward, we need to serialize each element of Rows:
In this case, since those are only Value objects, there should not be any surprises.
Pipeline/Element
is a bit more tricky, lets look at this through example:With a single process results of transformations should be loaded into a single file
~/file.csv
. If we choose for example JSON serialization strategy then serialized loader can look like this:And we are getting here to the first issue, how to load results from multiple processes into a single file?
One of the solutions would be to work around that problem by loading results into a folder:
Not the most elegant solution but acceptable for the proof of concept. Loader would need to be aware of the runtime, so it would need to know if it's a single process ETL or multiprocess one and choose to save strategy (or always follow multiprocess convention)
Communication
In order to run the ETL process, we need to execute ETL::run() method, which would be a really nice entry point to the whole multiprocessing.
In the proof of concept async processing, we can start from using processes but we can try to design it in a way that would allow to spawn a processing cluster across multiple machines (yeah, probably overkill and there are better tools for that but lets at leas keep open minds).
So going back to the
run
method, in this step ETL could try to initialize processes, connect to workers etc.We can call the main process
coordinator
and all subprocessesworkers
which should not suggest any implementation. In a single process configuration,coordinator
andworker
would be the same thing without a need to initialize any communication. It would just take thePipeline/Element
and use them to processRows
.In the multiprocess communication
coordinator
would first need to get theworkers pool
size and pass to each availableworker
serializedRow
with allPipeline/Element
.Once again, this is a bit limiting, because each
worker
will get a singleRows
element and all registeredPipeline/Element
which won't let us optimize anything by groupingTransformers
but for the proof of concept it should be more than enough.Assuming that our Source comes with 10 rows this is how
coordinator
would allocate them across 4workers
and now it would need to wait for any worker to finish processing in order to pass somewhere row 5.
This is where we are getting to the problematic part, what communication protocol should we use?
Using subprocesses
coordinator
can spawn them, pass them the serializedRows
,Pipeline/Elements
and wait forworker
to report the results (or kill after a timeout), but what about cross servers communication? In this case, thecoordinator
would need to connect to aworker
which would be a long-running process that could be used to process chunks of data.So maybe we should think about
workers
as a long-running process that would listen for a connection (sockets maybe?) and that would be able to tellcoordinator
if its' already busy, send back some heartbeats maybe?There are 2 scenarios:
In both cases, we can launch multiple
worker
processes and letETL
know how to communicate with them, on a Single Server it could be at localhost on a given port, on Many Servers through ip address?There are still more answers than questions here, but if anyone would be interested in discussing this topic we can jump on a call or continue the conversation here. There is a very good chance it won't get implemented at all or in a very limited version (just subprocesses for example). There is also absolutely no timeline of when (if at all) it will get implemented.
I will also try to use this thread as a scratchpad and will try to note all thoughts I might have around this topic, feel free to do the same.
Beta Was this translation helpful? Give feedback.
All reactions