-
Notifications
You must be signed in to change notification settings - Fork 711
[P/D] Performance enhancement of Layerwise connector in TP asymmetric scenarios #5540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces performance enhancements for the Layerwise connector in asymmetric Tensor Parallelism scenarios by refactoring the KV cache transfer logic to batch requests. While the overall direction of refactoring is positive, there are several critical issues in the implementation that need to be addressed. Specifically, the error handling for transfer failures is incorrect and could lead to data corruption. Additionally, there are concerns about resource management and swallowed exceptions related to the new metaserver communication, which could cause the system to hang or fail silently. I have provided detailed comments on these points.
| if ret < 0: | ||
| logger.error( | ||
| f"Mooncake transfer failed for send requests {transfer_meta.req_ids} kv cache to {session_id}" | ||
| ) | ||
| if send_task.layer_idx == (self.total_layers - 1): | ||
| for req_id in transfer_meta.req_ids: | ||
| req_meta = send_task.send_request[req_id] | ||
| if req_meta.chunk_finish: | ||
| self.callback_func( | ||
| req_id, req_meta | ||
| ) # TODO Send a signal indicating transmission failure | ||
| else: | ||
| if send_task.layer_idx == (self.total_layers - 1): | ||
| for req_id in transfer_meta.req_ids: | ||
| req_meta = send_task.send_request[req_id] | ||
| if req_meta.chunk_finish: | ||
| self.callback_func(req_id, req_meta) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling for batch_transfer_sync_write has been changed to no longer raise an exception on failure. Instead, it logs an error and, for the final layer, proceeds to call self.callback_func. This callback signals to the receiving end that the transfer is complete. This is incorrect behavior in case of a failure, as the receiver will assume a successful transfer and may proceed with incomplete or corrupted KV cache data, leading to incorrect model outputs or crashes.
The previous implementation correctly raised a RuntimeError, which would stop the sending thread and prevent signaling a false success. The TODO comment indicates awareness of this issue, but the current implementation is dangerous. The callback should not be called on failure. The exception should be re-raised to ensure the failure is propagated.
if ret < 0:
logger.error(
f"Mooncake transfer failed for send requests {transfer_meta.req_ids} kv cache to {session_id}"
)
raise RuntimeError(f"Mooncake transfer failed for requests {transfer_meta.req_ids}")
else:
if send_task.layer_idx == (self.total_layers - 1):
for req_id in transfer_meta.req_ids:
req_meta = send_task.send_request[req_id]
if req_meta.chunk_finish:
self.callback_func(req_id, req_meta)| self.metaserver_client = httpx.Client( | ||
| limits=httpx.Limits(max_connections=100000), timeout=None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The httpx.Client is initialized with timeout=None. This means that requests to the metaserver can hang indefinitely if the server is unresponsive, which can cause the scheduler to hang. It is recommended to set a reasonable timeout to prevent this. Additionally, max_connections=100000 is a very high limit and could lead to resource exhaustion on the client or server side. Please consider if such a high limit is necessary and if it can be reduced.
self.metaserver_client = httpx.Client(
limits=httpx.Limits(max_connections=1000), timeout=30.0)| def handle_exception(future): | ||
| if future.exception(): | ||
| logger.error( | ||
| f"Access metaserver fail: {future.exception()}") | ||
|
|
||
| future.add_done_callback(handle_exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The future.add_done_callback for the _access_metaserver call only logs the exception if one occurs. The main thread continues execution without being aware of the failure. If accessing the metaserver is critical for the request to be processed correctly (which seems to be the case, as it sets up remote prefill), then swallowing the exception can lead to hard-to-debug failures later on. The failure should be propagated or handled in a way that the request is marked as failed. For example, the callback could add the request to a failure queue to be handled by the scheduler, or the main thread could block on future.result() to handle the exception immediately.
|
👋 Hi! Thank you for contributing to the vLLM Ascend project. The following points will speed up your PR merge:
If CI fails, you can run linting and testing checks locally according Contributing and Testing. |
|
This pull request has conflicts, please resolve those before we can evaluate the pull request. |
Signed-off-by: liziyu <[email protected]>
Signed-off-by: nwpu-zxr <[email protected]>
Signed-off-by: nwpu-zxr <[email protected]>
b72d2a1 to
04038db
Compare
Signed-off-by: wangxiaoteng <[email protected]>
Signed-off-by: wangxiaoteng <[email protected]>
fc5f891 to
4598820
Compare
Signed-off-by: wangxiaoteng <[email protected]>
4598820 to
bb00446
Compare
Signed-off-by: wangxiaoteng <[email protected]>
What this PR does / why we need it?
[P/D] Performance enhancement of Layerwise connector in TP asymmetric scenarios
[RFC]: CDCP Scheduling for Disaggregated Prefilling with KV Cache Layerwise Push Support #4842
Does this PR introduce any user-facing change?
How was this patch tested?