-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tftp: negotiate blocksize and timeout #229
Conversation
Caution Review failedThe pull request is closed. WalkthroughThe update modifies the TFTP server and its tests to support option acknowledgment (OACK) and refines transfer logic. It introduces a new opcode and several new internal methods to handle request parsing, mode validation, path resolution, and option negotiation. The server’s structure is adjusted for improved readability and type hint consistency, and a readiness event now signals when the server is prepared to accept requests. In parallel, the tests have been enhanced with a retry mechanism for binding, awaited server signaling, and new tests for options negotiation, data packet retries, and invalid options handling. Changes
Sequence Diagram(s)sequenceDiagram
participant C as Client
participant S as TFTP Server
participant T as TFTP Read Transfer
C->>S: Send read request (with options)
S->>S: Parse request (_parse_request)
S->>S: Validate mode and resolve path (_validate_mode, _resolve_and_validate_path)
S->>S: Negotiate options (_negotiate_options)
S->>T: Start transfer (_start_transfer / _initialize_transfer)
T->>C: Send OACK packet / Data packet
loop Transfer Process
C->>T: Send ACK for data block
T->>C: Send next data block (with retries if needed)
end
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (2)
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
e5ccb8a
to
6b73495
Compare
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (2)
contrib/drivers/tftp/src/jumpstarter_driver_tftp/server.py (2)
Line range hint
138-209
: Refactor_handle_read_request
to reduce complexityThe
_handle_read_request
function has a cyclomatic complexity of 14, exceeding the maximum allowed complexity of 10. Refactoring will improve maintainability and readability.Suggestions:
- Extract option parsing into a separate method: Create a helper method to handle the parsing of options from the RRQ packet.
- Separate file validation logic: Move file path resolution and access checks into another method.
- Simplify error handling: Consolidate error responses where possible to reduce branching.
Line range hint
288-340
: Refactorstart
method to reduce complexityThe
start
method has a cyclomatic complexity of 11, exceeding the maximum allowed complexity of 10. Refactoring can enhance readability and maintainability.Suggestions:
- Extract the main transfer loop into a separate method: This will simplify
start
and isolate the loop logic.- Handle OACK confirmation separately: Create a dedicated method to manage OACK packet sending and acknowledgment.
- Simplify conditionals: Reorganize conditionals to reduce nesting and complexity.
🧰 Tools
🪛 Ruff (0.8.2)
382-382: Line too long (134 > 120)
(E501)
🪛 GitHub Actions: Lint
[error] 382-382: Line too long (134 characters, maximum allowed: 120)
🧹 Nitpick comments (4)
contrib/drivers/tftp/src/jumpstarter_driver_tftp/server.py (4)
150-158
: Enhance exception handling in options parsingCurrently, only
IndexError
is caught during options parsing. To prevent potential crashes due to decoding errors or other unexpected issues, consider catchingException
to handle all exceptions gracefully.Suggested change:
while i < len(parts) - 1: try: opt_name = parts[i].decode('utf-8').lower() opt_value = parts[i + 1].decode('utf-8') options[opt_name] = opt_value i += 2 - except IndexError: + except (IndexError, UnicodeDecodeError, Exception): break
182-193
: Default to server's block size when invalidblksize
is requestedWhen an invalid
blksize
is provided, defaulting toself.server.block_size
ensures consistency with the server's configuration rather than using a hardcoded value of 512.Suggested change:
else: - negotiated_options['blksize'] = 512 + negotiated_options['blksize'] = self.server.block_size except ValueError: - negotiated_options['blksize'] = 512 + negotiated_options['blksize'] = self.server.block_size else: negotiated_options['blksize'] = self.server.block_size
194-201
: Ensure default timeout is set when invalidtimeout
is providedIf the
timeout
option is invalid or out of range, explicitly settingnegotiated_options['timeout']
toself.server.timeout
ensures that the server's default timeout is used consistently.Suggested change:
except ValueError: + negotiated_options['timeout'] = self.server.timeout else: + # Ensure timeout within valid range + if not (1 <= negotiated_options['timeout'] <= 255): + negotiated_options['timeout'] = self.server.timeout else: negotiated_options['timeout'] = self.server.timeout
382-382
: Line exceeds maximum allowed lengthLine 382 is 134 characters long, exceeding the maximum allowed length of 120 characters. Consider splitting the line to comply with style guidelines.
Suggested fix:
self.logger.debug( f"Sent {'OACK' if is_oack else 'DATA'} block {expected_block}, " f"waiting for ACK (Attempt {attempt})" )🧰 Tools
🪛 Ruff (0.8.2)
382-382: Line too long (134 > 120)
(E501)
🪛 GitHub Actions: Lint
[error] 382-382: Line too long (134 characters, maximum allowed: 120)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lock
is excluded by!**/*.lock
📒 Files selected for processing (1)
contrib/drivers/tftp/src/jumpstarter_driver_tftp/server.py
(6 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
contrib/drivers/tftp/src/jumpstarter_driver_tftp/server.py
382-382: Line too long (134 > 120)
(E501)
🪛 GitHub Actions: Lint
contrib/drivers/tftp/src/jumpstarter_driver_tftp/server.py
[error] 138-138: Function '_handle_read_request' is too complex (complexity score: 14, maximum allowed: 10)
[error] 288-288: Function 'start' is too complex (complexity score: 11, maximum allowed: 10)
[error] 382-382: Line too long (134 characters, maximum allowed: 120)
🔇 Additional comments (2)
contrib/drivers/tftp/src/jumpstarter_driver_tftp/server.py (2)
17-17
: Addition ofOACK
opcode extends TFTP protocol supportIncluding
OACK = 6
in theOpcode
enum correctly enables option acknowledgment functionality as per the TFTP extension specifications.
399-411
: Handle duplicate ACKs and out-of-sequence ACKs appropriatelyIn the
handle_ack
method, ensure that duplicate and out-of-sequence ACKs are managed according to the TFTP protocol to prevent unnecessary retransmissions or termination of the transfer.Consider reviewing the handling of:
- Duplicate ACKs: Verify that resending the current packet on duplicate ACKs is appropriate.
- Out-of-sequence ACKs: Ensure that unexpected ACKs do not disrupt the transfer state.
72a7899
to
dcb4d59
Compare
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (3)
36-37
: Consider validating constructor arguments.
The default values forblock_size
,timeout
, andretries
are appropriate, but adding checks (e.g., ensuringblock_size >= 512
) can safeguard against invalid inputs.
165-174
: _send_oack method effectively builds and sends OACK packets.
Consider logging a warning ifself.transport
is unexpectedlyNone
.
475-508
: _send_with_retries attempts multiple times, waits for ACK.
Properly cancels on repeated timeouts. One minor suggestion: consider a short exponential backoff.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py
(10 hunks)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py
(7 hunks)packages/jumpstarter-driver-tftp/pyproject.toml
(2 hunks)
🔇 Additional comments (72)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (37)
17-17
: New OACK opcode confirmed.
This addition aligns with the TFTP RFC for option negotiation.
44-44
: Forward reference for type hint is good.
This usage is consistent with Python’s recommended approach to annotating classes not yet defined.
47-47
: Consistent approach to optional protocol attribute.
No issues found here.
50-55
: Property logic is straightforward.
ReturningNone
when transport is unavailable is sensible.
62-62
: Creating datagram endpoint with TftpServerProtocol.
This approach is standard for asyncio-based TFTP servers.
93-93
: Registering transfers with a set is logical.
No concerns.
97-97
: Unregistering transfers from the set is logical.
No issues with discarding.
131-131
: Opcode parsing is safe.
The surrounding try/except handles invalid bytes gracefully.
147-159
: RRQ handler flow looks good.
_parse_request
,_validate_mode
, and_resolve_and_validate_path
are called in a clear sequence.- Logging is descriptive.
177-179
: Error packet structure follows TFTP standard.
No issues with implementation.
185-195
: _parse_request correctly extracts filename, mode, and options.
Error handling for malformed packets is appropriate.
196-208
: _parse_options thoroughly decodes TFTP options.
Loop and break conditions handle partial or invalid data.
209-215
: _validate_mode ensures only netascii or octet.
Further extension for additional modes could be added in future.
216-231
: _resolve_and_validate_path prevents directory traversal.
- Good checks for existence and subpath.
232-252
: _negotiate_block_size covers valid TFTP block range.
Logging warnings for out-of-range values is helpful.
253-273
: _negotiate_timeout handles integer timeouts in [1..255].
Defaults are clearly used if invalid.
274-292
: _negotiate_options merges requested and fallback values.
Returning a dict of negotiated settings is clean.
293-304
: _start_transfer instantiates the TftpReadTransfer.
No issues observed.
320-321
: Base TftpTransfer constructor arguments are standard.
No concerns.
329-329
: protocol stored as optional reference.
Consistent with the rest of the design.
347-356
: TftpReadTransfer constructor includes negotiated_options.
Inheritance and super() usage is correct.
360-361
: oack_confirmed tracks successful OACK handshake.
Good approach for differentiating first ACK.
367-368
: _initialize_transfer awaits datagram endpoint creation.
No issues found.
371-373
: No negotiated options => set oack_confirmed immediately.
Clear logic for going straight to data transfer.
375-375
: _perform_transfer invoked after initialization.
Logical flow for read transfer.
381-391
: _initialize_transfer tries to send OACK only if non-default options are negotiated.
Correctly handles fallback scenario.
392-404
: Upon unsuccessful OACK acknowledgment, we log an error and return False.
Clean error handling.
405-416
: _perform_transfer reads the file in blocks.
Properly checks for shutdown event.
417-452
: _handle_data_block deals with empty files, normal blocks, and final block.
Block number wrap-around is correct for the 16-bit specification.
453-465
: _create_oack_packet constructs OACK with all negotiated options.
Straightforward approach.
466-474
: _send_packet logs basic information.
Differentiates DATA vs. OACK.
530-530
: connection_made with TftpTransferProtocol sets reference to transport.
No issues found.
540-540
: Warning for malformed packet is consistent.
Good short-circuit on incomplete data.
544-544
: Safely parsing opcode in new scope.
No issues.
546-547
: Sending ILLEGAL_OPERATION error on unknown opcode.
Correct TFTP response.
550-556
: ACK vs. unexpected opcode logic.
Correctly calls_send_error
for unrecognized calls.
567-569
: _send_error follows TFTP spec with final zero byte.
No concerns.packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py (29)
16-20
: TftpServer instantiation with dynamic port (0).
Allows the OS to pick an available port. Looks good.
21-22
: server_task creation.
We create the server start task to run in parallel. No issues.
23-30
: Retry loop for server.address.
Ensures the server is bound before proceeding. Good approach.
32-32
: Yield includes the port number in addition to the server and temp_dir.
Useful for external test logic.
35-35
: Wait for server_task upon fixture teardown.
Ensures a graceful shutdown.
37-43
: Cleanup of leftover tasks.
Prevents test contamination from background coroutines.
48-49
: create_test_client sets up a DatagramProtocol for testing.
No issues.
53-53
: @pytest.mark.asyncio usage.
Consistent with the new pytest config.
56-56
: Retrieving server from fixture.
Straightforward usage.
67-67
: @pytest.mark.asyncio for read request test.
No issues found.
70-70
: Using server_port from fixture.
Keeps the code consistent.
79-81
: RRQ packet includes 'test.txt', 'octet'.
Proper TFTP read request.
94-94
: Test for reading non-existent file.
Clear docstring.
97-97
: Retrieving server details for nonexistent file test.
Same pattern as above.
105-109
: RRQ packet with nonexistent.txt.
Expects an error from the server.
121-121
: Write request test ensures read-only mode.
No issues.
124-124
: Server fixture usage repeated.
Matches the established pattern.
130-134
: WRQ packet for 'test.txt' in octet mode.
Expect server to reject.
146-146
: Invalid packet handling test.
Checks that partial data triggers error.
148-148
: Server fixture usage repeated.
No issues.
154-154
: Sending only 2 bytes simulates a malformed TFTP packet.
Should produce ILLEGAL_OPERATION.
164-164
: Path traversal test checks if file requests outside root are blocked.
Matches TFTP security requirements.
167-167
: Accessing server reference for path traversal test.
Nothing suspicious.
175-179
: RRQ with ../../../ pattern.
Ensures no directory escaping.
191-193
: Testing option negotiation.
Docstring states we check blksize and timeout.
194-196
: Starting server within test function for concurrency.
Consistent approach with other tests.
198-299
: Sends RRQ with 'blksize=1024' and 'timeout=3'.
Expects server to respond with an OACK or fallback.
300-305
: Closing transport and shutting down server.
Clean teardown.
222-223
: (Note) The lines 222-223 are part of the test_retry_mechanism test. They are covered in the big block 198-299, so no separate new comment needed here.packages/jumpstarter-driver-tftp/pyproject.toml (6)
6-6
: Consolidated authors line.
No functional change. Looks fine.
12-12
: Trailing comma after aiofiles dependency.
Purely stylistic, no issues.
21-21
: Added trailing comma to dev dependency group.
No functional changes.
27-27
: Minor spacing in raw-options property.
Likely a formatting choice.
33-38
: Activated pytest configuration options.
log_cli
,log_cli_level
,testpaths
,asyncio_mode
are now set.
42-42
: build-backend remains hatchling.build.
No concerns.
fae37fd
to
145e10b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (2)
185-194
: Add more specific error handling in _parse_request.The current implementation raises a generic ValueError. Consider adding specific error handling for:
- UTF-8 decoding errors
- Malformed request formats
def _parse_request(self, data: bytes) -> Tuple[str, str, dict]: parts = data[2:].split(b'\x00') if len(parts) < 2: - raise ValueError("Invalid RRQ format") + raise ValueError("RRQ must contain at least filename and mode") + + try: + filename = parts[0].decode('utf-8') + mode = parts[1].decode('utf-8').lower() + except UnicodeDecodeError as e: + raise ValueError(f"Invalid UTF-8 encoding in request: {e}") - filename = parts[0].decode('utf-8') - mode = parts[1].decode('utf-8').lower() options = self._parse_options(parts[2:]) return filename, mode, options
232-251
: Define constants for block size limits.Consider extracting the magic numbers into named constants for better maintainability and documentation.
+# RFC 2348 - TFTP Blocksize Option +MIN_BLOCK_SIZE = 512 +MAX_BLOCK_SIZE = 65464 + def _negotiate_block_size(self, requested_blksize: Optional[str]) -> int: if requested_blksize is None: return self.server.block_size try: blksize = int(requested_blksize) - if 512 <= blksize <= 65464: + if MIN_BLOCK_SIZE <= blksize <= MAX_BLOCK_SIZE: return blksize else: self.logger.warning( - f"Requested block size {blksize} out of range (512-65464), " + f"Requested block size {blksize} out of range ({MIN_BLOCK_SIZE}-{MAX_BLOCK_SIZE}), " f"using default: {self.server.block_size}" )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py
(10 hunks)
🔇 Additional comments (6)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (6)
17-17
: LGTM! Added OACK opcode following RFC 2347.The addition of the OACK opcode (6) correctly implements the Option Acknowledgment mechanism as specified in RFC 2347.
50-55
: LGTM! Well-implemented address property.The property safely handles the case when transport is None and provides useful socket information for diagnostics.
380-402
: LGTM! Well-structured transfer initialization.The implementation correctly:
- Handles OACK confirmation
- Validates negotiated options
- Provides clear logging
415-451
: LGTM! Robust data block handling implementation.The implementation correctly handles:
- Empty files
- Block number wrapping (16-bit)
- Final block detection
- Error conditions
566-568
: LGTM! Consistent error handling implementation.The error packet creation follows the TFTP specification and maintains consistency across the codebase.
502-508
: Consider potential race condition in OACK confirmation.The
oack_confirmed
flag is checked and set without synchronization. While this might not be an issue in the current single-threaded context, it's worth adding a note for future maintainers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (1)
Line range hint
503-557
: Consider adding rate limiting.To prevent potential DoS attacks through rapid retransmission requests, consider adding rate limiting for duplicate ACKs.
def handle_ack(self, block_num: int): self.logger.debug(f"Received ACK for block {block_num} from {self.client_addr}") + self.last_duplicate_ack_time = getattr(self, 'last_duplicate_ack_time', 0) + current_time = time.time() # special handling for OACK acknowledgment if not self.oack_confirmed and self.negotiated_options and block_num == 0: self.oack_confirmed = True self.last_ack = block_num self.ack_received.set() return if block_num == self.block_num: self.last_ack = block_num self.ack_received.set() elif block_num == self.block_num - 1: + if current_time - self.last_duplicate_ack_time < 1.0: # 1 second cooldown + self.logger.warning("Rate limiting duplicate ACK retransmission") + return + self.last_duplicate_ack_time = current_time self.logger.warning(f"Duplicate ACK for block {block_num} received, resending block {self.block_num}") self.transport.sendto(self.current_packet) else: self.logger.warning(f"Out of sequence ACK: expected {self.block_num}, got {block_num}")
🧹 Nitpick comments (2)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py (1)
185-214
: Consider adding edge cases to options negotiation test.While the test covers basic options negotiation, consider adding test cases for:
- Maximum block size (65464)
- Minimum block size (512)
- Maximum timeout (255)
- Minimum timeout (1)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (1)
234-291
: Consider adding validation for option names.While the option value validation is thorough, consider adding validation for option names to prevent potential security issues with malformed option names.
def _negotiate_options(self, options: dict) -> Tuple[dict, int, float]: + valid_options = {'blksize', 'timeout'} + unknown_options = set(options.keys()) - valid_options + if unknown_options: + self.logger.warning(f"Ignoring unknown options: {unknown_options}") + options = {k: v for k, v in options.items() if k in valid_options} + negotiated = {} blksize = self.server.block_size timeout = self.server.timeout
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/client.py
(1 hunks)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver.py
(1 hunks)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py
(1 hunks)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py
(10 hunks)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py
(5 hunks)packages/jumpstarter-driver-tftp/pyproject.toml
(1 hunks)
✅ Files skipped from review due to trivial changes (2)
- packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/client.py
- packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: pytest-matrix (3.13)
- GitHub Check: pytest-matrix (3.12)
- GitHub Check: pytest-matrix (3.11)
🔇 Additional comments (6)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver.py (1)
12-12
: LGTM!The addition of the
export
import and decorators correctly exposes the necessary methods for TFTP functionality.packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py (3)
16-43
: LGTM! Robust server fixture implementation.The server fixture implementation correctly handles:
- Dynamic port allocation with port 0
- Server readiness verification
- Proper cleanup of resources
215-269
: LGTM! Comprehensive retry mechanism test.The test effectively verifies:
- Packet retransmission
- Block number consistency
- Timeout handling
271-297
: LGTM! Thorough invalid options handling test.The test properly validates the server's response to invalid option values.
packages/jumpstarter-driver-tftp/pyproject.toml (1)
37-40
: LGTM! Improved test configuration.The changes enable proper logging and asyncio configuration for testing.
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (1)
17-17
: LGTM! Proper OACK implementation.The OACK implementation follows RFC 2347 correctly:
- Proper opcode value (6)
- Correct packet format for options
Also applies to: 167-176
except Exception as e: | ||
self.logger.error(f"Error during read transfer: {e}") | ||
finally: | ||
await self.cleanup() | ||
|
||
async def _send_with_retries(self, packet: bytes) -> bool: | ||
async def _initialize_transfer(self) -> bool: | ||
loop = asyncio.get_running_loop() | ||
|
||
self.transport, self.protocol = await loop.create_datagram_endpoint( | ||
lambda: TftpTransferProtocol(self), | ||
local_addr=('0.0.0.0', 0), | ||
remote_addr=self.client_addr | ||
) | ||
local_addr = self.transport.get_extra_info('sockname') | ||
self.logger.debug(f"Transfer bound to local {local_addr}") | ||
|
||
# Only send OACK if we have non-default options to negotiate | ||
if self.negotiated_options and ( | ||
self.negotiated_options['blksize'] != 512 or | ||
self.negotiated_options['timeout'] != self.server.timeout | ||
): | ||
oack_packet = self._create_oack_packet() | ||
if not await self._send_with_retries(oack_packet, is_oack=True): | ||
self.logger.error("Failed to get acknowledgment for OACK") | ||
return False | ||
|
||
self.block_num = 1 | ||
return True | ||
|
||
async def _perform_transfer(self): | ||
async with aiofiles.open(self.filepath, 'rb') as f: | ||
while True: | ||
if self.server.shutdown_event.is_set(): | ||
self.logger.info(f"Server shutdown detected, stopping transfer to {self.client_addr}") | ||
break | ||
|
||
data = await f.read(self.block_size) | ||
if not await self._handle_data_block(data): | ||
break | ||
|
||
async def _handle_data_block(self, data: bytes) -> bool: | ||
""" | ||
Handle sending a block of data to the client. | ||
Returns False if transfer should stop, True if it should continue. | ||
""" | ||
if not data and self.block_num == 1: | ||
# Empty file case | ||
packet = self._create_data_packet(b'') | ||
await self._send_with_retries(packet) | ||
return False | ||
elif data: | ||
packet = self._create_data_packet(data) | ||
success = await self._send_with_retries(packet) | ||
if not success: | ||
self.logger.error(f"Failed to send block {self.block_num} to {self.client_addr}") | ||
return False | ||
|
||
self.logger.debug(f"Block {self.block_num} sent successfully") | ||
self.block_num += 1 | ||
|
||
# wrap block number around if it exceeds 16 bits | ||
self.block_num %= 65536 | ||
|
||
if len(data) < self.block_size: | ||
self.logger.info(f"Final block {self.block_num - 1} sent") | ||
return False | ||
return True | ||
else: | ||
# EOF reached | ||
packet = self._create_data_packet(b'') | ||
success = await self._send_with_retries(packet) | ||
if not success: | ||
self.logger.error(f"Failed to send final block {self.block_num}") | ||
else: | ||
self.logger.info(f"Transfer complete, final block {self.block_num}") | ||
return False | ||
|
||
def _create_oack_packet(self) -> bytes: | ||
packet = Opcode.OACK.to_bytes(2, 'big') | ||
for opt_name, opt_value in self.negotiated_options.items(): | ||
packet += f"{opt_name}\0{str(opt_value)}\0".encode('utf-8') | ||
return packet | ||
|
||
def _create_data_packet(self, data: bytes) -> bytes: | ||
return ( | ||
Opcode.DATA.to_bytes(2, 'big') + | ||
self.block_num.to_bytes(2, 'big') + | ||
data | ||
) | ||
|
||
def _send_packet(self, packet: bytes): | ||
self.transport.sendto(packet) | ||
if packet[0:2] == Opcode.DATA.to_bytes(2, 'big'): | ||
block = int.from_bytes(packet[2:4], 'big') | ||
data_length = len(packet) - 4 | ||
self.logger.debug(f"Sent DATA block {block} ({data_length} bytes) to {self.client_addr}") | ||
elif packet[0:2] == Opcode.OACK.to_bytes(2, 'big'): | ||
self.logger.debug(f"Sent OACK to {self.client_addr}") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider adding transfer size limit.
To prevent potential DoS attacks, consider adding a maximum transfer size limit.
def __init__(self, server: TftpServer, filepath: pathlib.Path, client_addr: Tuple[str, int],
block_size: int, timeout: float, retries: int, negotiated_options: Optional[dict] = None):
super().__init__(
server=server,
filepath=filepath,
client_addr=client_addr,
block_size=block_size,
timeout=timeout,
retries=retries
)
+ self.max_transfer_size = 1024 * 1024 * 1024 # 1GB limit
self.block_num = 0
self.ack_received = asyncio.Event()
self.last_ack = 0
self.oack_confirmed = False
self.negotiated_options = negotiated_options
self.current_packet: Optional[bytes] = None
+ self.total_bytes_sent = 0
async def _handle_data_block(self, data: bytes) -> bool:
+ self.total_bytes_sent += len(data)
+ if self.total_bytes_sent > self.max_transfer_size:
+ self.logger.error(f"Transfer size limit exceeded: {self.total_bytes_sent} bytes")
+ return False
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def __init__(self, server: TftpServer, filepath: pathlib.Path, client_addr: Tuple[str, int], | |
block_size: int, timeout: float, retries: int, negotiated_options: Optional[dict] = None): | |
super().__init__( | |
server=server, | |
filepath=filepath, | |
client_addr=client_addr, | |
block_size=block_size, | |
timeout=timeout, | |
retries=retries | |
) | |
self.block_num = 0 | |
self.ack_received = asyncio.Event() | |
self.last_ack = 0 | |
self.oack_confirmed = False | |
self.negotiated_options = negotiated_options | |
self.current_packet: Optional[bytes] = None | |
async def start(self): | |
self.logger.info(f"Starting read transfer of '{self.filepath.name}' to {self.client_addr}") | |
loop = asyncio.get_running_loop() | |
self.transport, self.protocol = await loop.create_datagram_endpoint( | |
lambda: TftpTransferProtocol(self), local_addr=("0.0.0.0", 0), remote_addr=self.client_addr | |
) | |
local_addr = self.transport.get_extra_info("sockname") | |
self.logger.debug(f"Transfer bound to local {local_addr}") | |
if not await self._initialize_transfer(): | |
return | |
try: | |
async with aiofiles.open(self.filepath, "rb") as f: | |
while True: | |
if self.server.shutdown_event.is_set(): | |
self.logger.info(f"Server shutdown detected, stopping transfer to {self.client_addr}") | |
break | |
data = await f.read(self.block_size) | |
if data: | |
packet = Opcode.DATA.to_bytes(2, "big") + self.block_num.to_bytes(2, "big") + data | |
success = await self._send_with_retries(packet) | |
if not success: | |
self.logger.error(f"Failed to send block {self.block_num} to {self.client_addr}") | |
break | |
self.logger.debug(f"Block {self.block_num} sent successfully") | |
self.block_num += 1 | |
# If the data read is less than block_size, this is the last packet | |
if len(data) < self.block_size: | |
self.logger.info(f"Final block {self.block_num - 1} reached for {self.client_addr}") | |
break | |
else: | |
# If no data is returned, it means the file size is an exact multiple of block_size | |
# Send an extra empty DATA packet to signal end of transfer | |
packet = Opcode.DATA.to_bytes(2, "big") + self.block_num.to_bytes(2, "big") + b"" | |
success = await self._send_with_retries(packet) | |
if not success: | |
self.logger.error( | |
f"Failed to send final empty block {self.block_num} to {self.client_addr}" | |
) | |
break | |
self.logger.info(f"Transfer complete to {self.client_addr}, final block {self.block_num}") | |
break | |
# if no options were negotiated, we can start sending data immediately | |
if not self.negotiated_options: | |
self.oack_confirmed = True | |
await self._perform_transfer() | |
except Exception as e: | |
self.logger.error(f"Error during read transfer: {e}") | |
finally: | |
await self.cleanup() | |
async def _send_with_retries(self, packet: bytes) -> bool: | |
async def _initialize_transfer(self) -> bool: | |
loop = asyncio.get_running_loop() | |
self.transport, self.protocol = await loop.create_datagram_endpoint( | |
lambda: TftpTransferProtocol(self), | |
local_addr=('0.0.0.0', 0), | |
remote_addr=self.client_addr | |
) | |
local_addr = self.transport.get_extra_info('sockname') | |
self.logger.debug(f"Transfer bound to local {local_addr}") | |
# Only send OACK if we have non-default options to negotiate | |
if self.negotiated_options and ( | |
self.negotiated_options['blksize'] != 512 or | |
self.negotiated_options['timeout'] != self.server.timeout | |
): | |
oack_packet = self._create_oack_packet() | |
if not await self._send_with_retries(oack_packet, is_oack=True): | |
self.logger.error("Failed to get acknowledgment for OACK") | |
return False | |
self.block_num = 1 | |
return True | |
async def _perform_transfer(self): | |
async with aiofiles.open(self.filepath, 'rb') as f: | |
while True: | |
if self.server.shutdown_event.is_set(): | |
self.logger.info(f"Server shutdown detected, stopping transfer to {self.client_addr}") | |
break | |
data = await f.read(self.block_size) | |
if not await self._handle_data_block(data): | |
break | |
async def _handle_data_block(self, data: bytes) -> bool: | |
""" | |
Handle sending a block of data to the client. | |
Returns False if transfer should stop, True if it should continue. | |
""" | |
if not data and self.block_num == 1: | |
# Empty file case | |
packet = self._create_data_packet(b'') | |
await self._send_with_retries(packet) | |
return False | |
elif data: | |
packet = self._create_data_packet(data) | |
success = await self._send_with_retries(packet) | |
if not success: | |
self.logger.error(f"Failed to send block {self.block_num} to {self.client_addr}") | |
return False | |
self.logger.debug(f"Block {self.block_num} sent successfully") | |
self.block_num += 1 | |
# wrap block number around if it exceeds 16 bits | |
self.block_num %= 65536 | |
if len(data) < self.block_size: | |
self.logger.info(f"Final block {self.block_num - 1} sent") | |
return False | |
return True | |
else: | |
# EOF reached | |
packet = self._create_data_packet(b'') | |
success = await self._send_with_retries(packet) | |
if not success: | |
self.logger.error(f"Failed to send final block {self.block_num}") | |
else: | |
self.logger.info(f"Transfer complete, final block {self.block_num}") | |
return False | |
def _create_oack_packet(self) -> bytes: | |
packet = Opcode.OACK.to_bytes(2, 'big') | |
for opt_name, opt_value in self.negotiated_options.items(): | |
packet += f"{opt_name}\0{str(opt_value)}\0".encode('utf-8') | |
return packet | |
def _create_data_packet(self, data: bytes) -> bytes: | |
return ( | |
Opcode.DATA.to_bytes(2, 'big') + | |
self.block_num.to_bytes(2, 'big') + | |
data | |
) | |
def _send_packet(self, packet: bytes): | |
self.transport.sendto(packet) | |
if packet[0:2] == Opcode.DATA.to_bytes(2, 'big'): | |
block = int.from_bytes(packet[2:4], 'big') | |
data_length = len(packet) - 4 | |
self.logger.debug(f"Sent DATA block {block} ({data_length} bytes) to {self.client_addr}") | |
elif packet[0:2] == Opcode.OACK.to_bytes(2, 'big'): | |
self.logger.debug(f"Sent OACK to {self.client_addr}") | |
def __init__(self, server: TftpServer, filepath: pathlib.Path, client_addr: Tuple[str, int], | |
block_size: int, timeout: float, retries: int, negotiated_options: Optional[dict] = None): | |
super().__init__( | |
server=server, | |
filepath=filepath, | |
client_addr=client_addr, | |
block_size=block_size, | |
timeout=timeout, | |
retries=retries | |
) | |
self.max_transfer_size = 1024 * 1024 * 1024 # 1GB limit | |
self.block_num = 0 | |
self.ack_received = asyncio.Event() | |
self.last_ack = 0 | |
self.oack_confirmed = False | |
self.negotiated_options = negotiated_options | |
self.current_packet: Optional[bytes] = None | |
self.total_bytes_sent = 0 | |
async def start(self): | |
self.logger.info(f"Starting read transfer of '{self.filepath.name}' to {self.client_addr}") | |
if not await self._initialize_transfer(): | |
return | |
try: | |
# if no options were negotiated, we can start sending data immediately | |
if not self.negotiated_options: | |
self.oack_confirmed = True | |
await self._perform_transfer() | |
except Exception as e: | |
self.logger.error(f"Error during read transfer: {e}") | |
finally: | |
await self.cleanup() | |
async def _initialize_transfer(self) -> bool: | |
loop = asyncio.get_running_loop() | |
self.transport, self.protocol = await loop.create_datagram_endpoint( | |
lambda: TftpTransferProtocol(self), | |
local_addr=('0.0.0.0', 0), | |
remote_addr=self.client_addr | |
) | |
local_addr = self.transport.get_extra_info('sockname') | |
self.logger.debug(f"Transfer bound to local {local_addr}") | |
# Only send OACK if we have non-default options to negotiate | |
if self.negotiated_options and ( | |
self.negotiated_options['blksize'] != 512 or | |
self.negotiated_options['timeout'] != self.server.timeout | |
): | |
oack_packet = self._create_oack_packet() | |
if not await self._send_with_retries(oack_packet, is_oack=True): | |
self.logger.error("Failed to get acknowledgment for OACK") | |
return False | |
self.block_num = 1 | |
return True | |
async def _perform_transfer(self): | |
async with aiofiles.open(self.filepath, 'rb') as f: | |
while True: | |
if self.server.shutdown_event.is_set(): | |
self.logger.info(f"Server shutdown detected, stopping transfer to {self.client_addr}") | |
break | |
data = await f.read(self.block_size) | |
if not await self._handle_data_block(data): | |
break | |
async def _handle_data_block(self, data: bytes) -> bool: | |
""" | |
Handle sending a block of data to the client. | |
Returns False if transfer should stop, True if it should continue. | |
""" | |
self.total_bytes_sent += len(data) | |
if self.total_bytes_sent > self.max_transfer_size: | |
self.logger.error(f"Transfer size limit exceeded: {self.total_bytes_sent} bytes") | |
return False | |
if not data and self.block_num == 1: | |
# Empty file case | |
packet = self._create_data_packet(b'') | |
await self._send_with_retries(packet) | |
return False | |
elif data: | |
packet = self._create_data_packet(data) | |
success = await self._send_with_retries(packet) | |
if not success: | |
self.logger.error(f"Failed to send block {self.block_num} to {self.client_addr}") | |
return False | |
self.logger.debug(f"Block {self.block_num} sent successfully") | |
self.block_num += 1 | |
# wrap block number around if it exceeds 16 bits | |
self.block_num %= 65536 | |
if len(data) < self.block_size: | |
self.logger.info(f"Final block {self.block_num - 1} sent") | |
return False | |
return True | |
else: | |
# EOF reached | |
packet = self._create_data_packet(b'') | |
success = await self._send_with_retries(packet) | |
if not success: | |
self.logger.error(f"Failed to send final block {self.block_num}") | |
else: | |
self.logger.info(f"Transfer complete, final block {self.block_num}") | |
return False | |
def _create_oack_packet(self) -> bytes: | |
packet = Opcode.OACK.to_bytes(2, 'big') | |
for opt_name, opt_value in self.negotiated_options.items(): | |
packet += f"{opt_name}\0{str(opt_value)}\0".encode('utf-8') | |
return packet | |
def _create_data_packet(self, data: bytes) -> bytes: | |
return ( | |
Opcode.DATA.to_bytes(2, 'big') + | |
self.block_num.to_bytes(2, 'big') + | |
data | |
) | |
def _send_packet(self, packet: bytes): | |
self.transport.sendto(packet) | |
if packet[0:2] == Opcode.DATA.to_bytes(2, 'big'): | |
block = int.from_bytes(packet[2:4], 'big') | |
data_length = len(packet) - 4 | |
self.logger.debug(f"Sent DATA block {block} ({data_length} bytes) to {self.client_addr}") | |
elif packet[0:2] == Opcode.OACK.to_bytes(2, 'big'): | |
self.logger.debug(f"Sent OACK to {self.client_addr}") |
e10a930
to
9b70ce0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
♻️ Duplicate comments (1)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (1)
234-291
: 🛠️ Refactor suggestionConsider adding transfer size limit.
To prevent potential DoS attacks, consider adding a maximum transfer size limit.
🧹 Nitpick comments (3)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py (1)
23-26
: Consider optimizing the server binding retry mechanism.The current implementation could take up to 1 second (10 retries * 0.1s sleep). Consider reducing the sleep duration or the number of retries for faster test execution.
- for _ in range(10): - if server.address is not None: - break - await asyncio.sleep(0.1) + for _ in range(5): # Reduce retries + if server.address is not None: + break + await asyncio.sleep(0.05) # Reduce sleep durationpackages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (2)
476-498
: Consider implementing exponential backoff.The retry mechanism could benefit from an exponential backoff strategy to handle network congestion better.
async def _send_with_retries(self, packet: bytes, is_oack: bool = False) -> bool: self.current_packet = packet expected_block = 0 if is_oack else self.block_num + base_timeout = self.timeout for attempt in range(1, self.retries + 1): try: self._send_packet(packet) self.logger.debug( f"Sent {'OACK' if is_oack else 'DATA'} block {expected_block}, " f"waiting for ACK (Attempt {attempt})" ) self.ack_received.clear() - await asyncio.wait_for(self.ack_received.wait(), timeout=self.timeout) + current_timeout = base_timeout * (2 ** (attempt - 1)) # Exponential backoff + await asyncio.wait_for(self.ack_received.wait(), timeout=current_timeout) if self.last_ack == expected_block: self.logger.debug(f"ACK received for block {expected_block}") return True else: self.logger.warning(f"Received wrong ACK: expected {expected_block}, got {self.last_ack}") except asyncio.TimeoutError: - self.logger.warning(f"Timeout waiting for ACK of block {expected_block} (Attempt {attempt})") + self.logger.warning( + f"Timeout ({current_timeout}s) waiting for ACK of block {expected_block} " + f"(Attempt {attempt}/{self.retries})" + )
545-557
: Add rate limiting for error responses.Consider implementing rate limiting for error responses to prevent potential abuse.
+from collections import deque +from time import time + class TftpTransferProtocol(asyncio.DatagramProtocol): def __init__(self, transfer: TftpReadTransfer): self.transfer = transfer self.logger = logging.getLogger(self.__class__.__name__) + self.error_timestamps = deque(maxlen=100) # Track last 100 errors + self.error_window = 60 # 60 seconds window + self.error_limit = 10 # Max 10 errors per window + + def _check_error_rate(self) -> bool: + now = time() + self.error_timestamps = deque( + (ts for ts in self.error_timestamps if now - ts < self.error_window), + maxlen=self.error_timestamps.maxlen + ) + return len(self.error_timestamps) < self.error_limit def _send_error(self, addr: Tuple[str, int], error_code: TftpErrorCode, message: str): + if not self._check_error_rate(): + self.logger.warning(f"Error rate limit exceeded, suppressing error to {addr}") + return + self.error_timestamps.append(time()) error_packet = ( Opcode.ERROR.to_bytes(2, 'big') + error_code.to_bytes(2, 'big') + message.encode('utf-8') + b'\x00' )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/client.py
(1 hunks)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver.py
(1 hunks)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py
(1 hunks)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py
(10 hunks)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py
(5 hunks)packages/jumpstarter-driver-tftp/pyproject.toml
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/client.py
- packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py
- packages/jumpstarter-driver-tftp/pyproject.toml
- packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver.py
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: pytest-matrix (3.13)
- GitHub Check: pytest-matrix (3.12)
- GitHub Check: pytest-matrix (3.11)
🔇 Additional comments (3)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (3)
17-17
: LGTM! OACK opcode follows RFC 2347.The OACK opcode value of 6 is correctly implemented according to RFC 2347 (TFTP Option Extension).
36-56
: LGTM! Well-structured server initialization and property.The changes improve server initialization with:
- Proper type hints
- Safe address property implementation
- Synchronization using ready_event
417-453
: 🛠️ Refactor suggestionAdd file size validation during transfer.
Consider validating file size changes during transfer to prevent potential security issues.
async def _perform_transfer(self): + initial_size = self.filepath.stat().st_size async with aiofiles.open(self.filepath, 'rb') as f: while True: if self.server.shutdown_event.is_set(): self.logger.info(f"Server shutdown detected, stopping transfer to {self.client_addr}") break + current_size = self.filepath.stat().st_size + if current_size != initial_size: + self.logger.error(f"File size changed during transfer: {initial_size} -> {current_size}") + break + data = await f.read(self.block_size) if not await self._handle_data_block(data): breakLikely invalid or redundant comment.
@pytest.mark.asyncio | ||
async def test_server_startup_and_shutdown(tftp_server): | ||
"""Test that server starts up and shuts down cleanly.""" | ||
server, _ = tftp_server | ||
server, temp_dir, server_port = tftp_server | ||
|
||
server_task = asyncio.create_task(server.start()) | ||
await asyncio.sleep(0.1) | ||
await server.ready_event.wait() | ||
|
||
await server.shutdown() | ||
|
||
# Wait for server task to complete | ||
await server_task | ||
|
||
assert True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance test assertions.
The current assert True
doesn't verify any specific behavior. Consider adding assertions to verify the server's state after startup and shutdown.
await server.ready_event.wait()
+ assert server.transport is not None, "Server transport should be initialized"
+ assert server.address is not None, "Server should be bound to an address"
await server.shutdown()
await server_task
- assert True
+ assert server.transport is None, "Server transport should be cleaned up"
+ assert not server.active_transfers, "All transfers should be cleaned up"
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
@pytest.mark.asyncio | |
async def test_server_startup_and_shutdown(tftp_server): | |
"""Test that server starts up and shuts down cleanly.""" | |
server, _ = tftp_server | |
server, temp_dir, server_port = tftp_server | |
server_task = asyncio.create_task(server.start()) | |
await asyncio.sleep(0.1) | |
await server.ready_event.wait() | |
await server.shutdown() | |
# Wait for server task to complete | |
await server_task | |
assert True | |
@pytest.mark.asyncio | |
async def test_server_startup_and_shutdown(tftp_server): | |
"""Test that server starts up and shuts down cleanly.""" | |
server, temp_dir, server_port = tftp_server | |
server_task = asyncio.create_task(server.start()) | |
await server.ready_event.wait() | |
assert server.transport is not None, "Server transport should be initialized" | |
assert server.address is not None, "Server should be bound to an address" | |
await server.shutdown() | |
await server_task | |
assert server.transport is None, "Server transport should be cleaned up" | |
assert not server.active_transfers, "All transfers should be cleaned up" |
@pytest.mark.asyncio | ||
async def test_invalid_options_handling(tftp_server): | ||
server, temp_dir, server_port = tftp_server | ||
server_task = asyncio.create_task(server.start()) | ||
await server.ready_event.wait() | ||
|
||
try: | ||
transport, _ = await create_test_client(server.port) | ||
|
||
rrq_packet = ( | ||
Opcode.RRQ.to_bytes(2, 'big') + | ||
b'test.txt\x00' + | ||
b'octet\x00' + | ||
b'blksize\x00' + | ||
b'invalid\x00' + | ||
b'timeout\x00' + | ||
b'999999\x00' | ||
) | ||
|
||
transport.sendto(rrq_packet) | ||
|
||
assert server.transport is not None | ||
|
||
finally: | ||
transport.close() | ||
await server.shutdown() | ||
await server_task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add assertions to verify invalid options handling.
The test sends invalid options but doesn't verify how the server handles them. Add assertions to verify either:
- Error packets for invalid values, or
- Fallback to default values in OACK
transport.sendto(rrq_packet)
+ await asyncio.sleep(0.1) # Wait for server response
+
+ # Verify server response
+ assert len(protocol.received_packets) > 0, "Should receive server response"
+ response = protocol.received_packets[0]
+
+ if response.startswith(Opcode.OACK.to_bytes(2, 'big')):
+ # Verify fallback to default values
+ options_data = response[2:].split(b'\x00')
+ negotiated_options = {}
+ for i in range(0, len(options_data) - 1, 2):
+ opt_name = options_data[i].decode('utf-8')
+ opt_value = options_data[i + 1].decode('utf-8')
+ negotiated_options[opt_name] = opt_value
+
+ assert int(negotiated_options['blksize']) == server.block_size, "Should fallback to default block size"
+ assert float(negotiated_options['timeout']) == server.timeout, "Should fallback to default timeout"
+ else:
+ # Verify error response
+ assert response.startswith(Opcode.ERROR.to_bytes(2, 'big')), "Should receive ERROR packet"
+ error_code = int.from_bytes(response[2:4], 'big')
+ assert error_code == TftpErrorCode.ILLEGAL_OPERATION.value, "Should indicate illegal operation"
assert server.transport is not None
Committable suggestion skipped: line range outside the PR's diff.
@pytest.mark.asyncio | ||
async def test_retry_mechanism(tftp_server): | ||
server, _, server_port = tftp_server | ||
|
||
# make the test faster | ||
server.timeout = 1 | ||
|
||
transport = None | ||
|
||
class TestProtocol(asyncio.DatagramProtocol): | ||
def __init__(self): | ||
self.received_packets = [] | ||
self.transport = None | ||
|
||
def connection_made(self, transport): | ||
self.transport = transport | ||
|
||
def datagram_received(self, data, addr): | ||
self.received_packets.append(data) | ||
|
||
try: | ||
loop = asyncio.get_running_loop() | ||
transport, protocol = await loop.create_datagram_endpoint( | ||
lambda: TestProtocol(), | ||
local_addr=('127.0.0.1', 0) | ||
) | ||
|
||
assert transport is not None, "Failed to create transport" | ||
|
||
rrq_packet = ( | ||
Opcode.RRQ.to_bytes(2, 'big') + | ||
b'test.txt\x00' + | ||
b'octet\x00' | ||
) | ||
|
||
transport.sendto(rrq_packet, ('127.0.0.1', server_port)) | ||
|
||
await asyncio.sleep(server.timeout * 2) | ||
|
||
data_packets = [p for p in protocol.received_packets | ||
if p[0:2] == Opcode.DATA.to_bytes(2, 'big')] | ||
|
||
assert len(data_packets) > 1, "Server should have retried sending DATA packet" | ||
|
||
block_numbers = {int.from_bytes(p[2:4], 'big') for p in data_packets} | ||
assert len(block_numbers) == 1, "All retried packets should be for the same block" | ||
assert 1 in block_numbers, "First block number should be 1" | ||
|
||
except Exception as e: | ||
pytest.fail(f"Test failed with error: {str(e)}") | ||
|
||
finally: | ||
if transport is not None: | ||
transport.close() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Make the retry test more deterministic.
The test relies on timing which could make it flaky. Consider making it more robust by:
- Verifying the exact number of retries
- Using a mock to control timing
server.timeout = 1
+ expected_retries = server.retries
transport = None
class TestProtocol(asyncio.DatagramProtocol):
def __init__(self):
self.received_packets = []
self.transport = None
+ self.packet_times = []
def datagram_received(self, data, addr):
self.received_packets.append(data)
+ self.packet_times.append(asyncio.get_event_loop().time())
# ... rest of the test ...
data_packets = [p for p in protocol.received_packets
if p[0:2] == Opcode.DATA.to_bytes(2, 'big')]
- assert len(data_packets) > 1, "Server should have retried sending DATA packet"
+ assert len(data_packets) == expected_retries, f"Server should retry exactly {expected_retries} times"
+
+ # Verify retry intervals
+ packet_intervals = [t2 - t1 for t1, t2 in zip(protocol.packet_times, protocol.packet_times[1:])]
+ assert all(interval >= server.timeout for interval in packet_intervals), "Retries should respect timeout interval"
Committable suggestion skipped: line range outside the PR's diff.
@pytest.mark.asyncio | ||
async def test_options_negotiation(tftp_server): | ||
"""Test that options (blksize, timeout) are properly negotiated.""" | ||
server, temp_dir, server_port = tftp_server | ||
server_task = asyncio.create_task(server.start()) | ||
await server.ready_event.wait() | ||
|
||
@pytest.fixture | ||
def anyio_backend(): | ||
return "asyncio" | ||
try: | ||
transport, _ = await create_test_client(server.port) | ||
|
||
# RRQ with options | ||
rrq_packet = ( | ||
Opcode.RRQ.to_bytes(2, 'big') + | ||
b'test.txt\x00' + | ||
b'octet\x00' + | ||
b'blksize\x00' + | ||
b'1024\x00' + | ||
b'timeout\x00' + | ||
b'3\x00' | ||
) | ||
|
||
transport.sendto(rrq_packet) | ||
|
||
assert server.transport is not None | ||
|
||
finally: | ||
transport.close() | ||
await server.shutdown() | ||
await server_task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add assertions to verify options negotiation.
The test sends a request with options but doesn't verify the server's response. Add assertions to verify the OACK packet and negotiated values.
transport.sendto(rrq_packet)
+ await asyncio.sleep(0.1) # Wait for server response
+
+ # Verify OACK packet
+ assert len(protocol.received_packets) > 0, "Should receive OACK packet"
+ oack_packet = protocol.received_packets[0]
+ assert oack_packet.startswith(Opcode.OACK.to_bytes(2, 'big')), "First packet should be OACK"
+
+ # Parse and verify negotiated options
+ options_data = oack_packet[2:].split(b'\x00')
+ negotiated_options = {}
+ for i in range(0, len(options_data) - 1, 2):
+ opt_name = options_data[i].decode('utf-8')
+ opt_value = options_data[i + 1].decode('utf-8')
+ negotiated_options[opt_name] = opt_value
+
+ assert negotiated_options['blksize'] == '1024', "Block size should be negotiated to 1024"
+ assert negotiated_options['timeout'] == '3', "Timeout should be negotiated to 3"
assert server.transport is not None
Committable suggestion skipped: line range outside the PR's diff.
def __init__(self, server: TftpServer, filepath: pathlib.Path, client_addr: Tuple[str, int], | ||
block_size: int, timeout: float, retries: int, negotiated_options: Optional[dict] = None): | ||
super().__init__( | ||
server=server, | ||
filepath=filepath, | ||
client_addr=client_addr, | ||
block_size=block_size, | ||
timeout=timeout, | ||
retries=retries | ||
) | ||
self.block_num = 0 | ||
self.ack_received = asyncio.Event() | ||
self.last_ack = 0 | ||
self.oack_confirmed = False | ||
self.negotiated_options = negotiated_options | ||
self.current_packet: Optional[bytes] = None | ||
|
||
async def start(self): | ||
self.logger.info(f"Starting read transfer of '{self.filepath.name}' to {self.client_addr}") | ||
loop = asyncio.get_running_loop() | ||
|
||
self.transport, self.protocol = await loop.create_datagram_endpoint( | ||
lambda: TftpTransferProtocol(self), local_addr=("0.0.0.0", 0), remote_addr=self.client_addr | ||
) | ||
local_addr = self.transport.get_extra_info("sockname") | ||
self.logger.debug(f"Transfer bound to local {local_addr}") | ||
if not await self._initialize_transfer(): | ||
return | ||
|
||
try: | ||
async with aiofiles.open(self.filepath, "rb") as f: | ||
while True: | ||
if self.server.shutdown_event.is_set(): | ||
self.logger.info(f"Server shutdown detected, stopping transfer to {self.client_addr}") | ||
break | ||
data = await f.read(self.block_size) | ||
if data: | ||
packet = Opcode.DATA.to_bytes(2, "big") + self.block_num.to_bytes(2, "big") + data | ||
success = await self._send_with_retries(packet) | ||
if not success: | ||
self.logger.error(f"Failed to send block {self.block_num} to {self.client_addr}") | ||
break | ||
self.logger.debug(f"Block {self.block_num} sent successfully") | ||
self.block_num += 1 | ||
|
||
# If the data read is less than block_size, this is the last packet | ||
if len(data) < self.block_size: | ||
self.logger.info(f"Final block {self.block_num - 1} reached for {self.client_addr}") | ||
break | ||
else: | ||
# If no data is returned, it means the file size is an exact multiple of block_size | ||
# Send an extra empty DATA packet to signal end of transfer | ||
packet = Opcode.DATA.to_bytes(2, "big") + self.block_num.to_bytes(2, "big") + b"" | ||
success = await self._send_with_retries(packet) | ||
if not success: | ||
self.logger.error( | ||
f"Failed to send final empty block {self.block_num} to {self.client_addr}" | ||
) | ||
break | ||
self.logger.info(f"Transfer complete to {self.client_addr}, final block {self.block_num}") | ||
break | ||
# if no options were negotiated, we can start sending data immediately | ||
if not self.negotiated_options: | ||
self.oack_confirmed = True | ||
|
||
await self._perform_transfer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add overall transfer timeout.
Consider adding a timeout for the entire transfer operation to prevent hanging transfers.
def __init__(self, server: TftpServer, filepath: pathlib.Path, client_addr: Tuple[str, int],
block_size: int, timeout: float, retries: int, negotiated_options: Optional[dict] = None):
super().__init__(
server=server,
filepath=filepath,
client_addr=client_addr,
block_size=block_size,
timeout=timeout,
retries=retries
)
+ self.transfer_timeout = max(30.0, timeout * 10) # Set reasonable transfer timeout
self.block_num = 0
self.ack_received = asyncio.Event()
self.last_ack = 0
self.oack_confirmed = False
self.negotiated_options = negotiated_options
self.current_packet: Optional[bytes] = None
async def start(self):
self.logger.info(f"Starting read transfer of '{self.filepath.name}' to {self.client_addr}")
if not await self._initialize_transfer():
return
try:
+ async with asyncio.timeout(self.transfer_timeout):
# if no options were negotiated, we can start sending data immediately
if not self.negotiated_options:
self.oack_confirmed = True
await self._perform_transfer()
+ except asyncio.TimeoutError:
+ self.logger.error(f"Transfer timed out after {self.transfer_timeout} seconds")
except Exception as e:
self.logger.error(f"Error during read transfer: {e}")
finally:
await self.cleanup()
Committable suggestion skipped: line range outside the PR's diff.
9b70ce0
to
581dd4c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (2)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py (2)
Line range hint
67-92
: Add assertions to verify file transfer.The test should verify that the file content is correctly transferred. Consider:
- Capturing the received data packets
- Verifying the content matches the original file
try: - transport, _ = await create_test_client(server.port) + transport, protocol = await create_test_client(server.port) + + class TestProtocol(asyncio.DatagramProtocol): + def __init__(self): + self.received_data = bytearray() + + def datagram_received(self, data, addr): + if data[0:2] == Opcode.DATA.to_bytes(2, 'big'): + self.received_data.extend(data[4:]) + + protocol = TestProtocol() + transport.set_protocol(protocol) rrq_packet = ( Opcode.RRQ.to_bytes(2, 'big') + b'test.txt\x00' + # filename b'octet\x00' # mode ) transport.sendto(rrq_packet) await server.ready_event.wait() + await asyncio.sleep(0.1) # Wait for transfer to complete - assert server.transport is not None + assert protocol.received_data.decode() == "Hello, TFTP!", "File content mismatch"
Line range hint
67-92
: Add assertions to verify the received data.The test sends a read request but doesn't verify that the correct data is received from the server.
transport.sendto(rrq_packet) - await server.ready_event.wait() + await asyncio.sleep(0.1) # Wait for server response + + # Verify received data + data_received = [] + for _ in range(10): # Reasonable timeout + if hasattr(protocol, 'datagram_received'): + data = await asyncio.wait_for(protocol.datagram_received(), timeout=0.1) + data_received.append(data) + if len(data) < server.block_size: # Last block + break + + assert data_received, "No data received from server" + received_content = b''.join(data_received) + assert received_content == b"Hello, TFTP!", "Incorrect file content received" assert server.transport is not None
♻️ Duplicate comments (6)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py (4)
53-65
: 🛠️ Refactor suggestionEnhance test assertions.
The current
assert True
doesn't verify any specific behavior. Consider adding assertions to verify the server's state after startup and shutdown.
185-214
: 🛠️ Refactor suggestionAdd assertions to verify options negotiation.
The test sends a request with options but doesn't verify the server's response. Add assertions to verify the OACK packet and negotiated values.
215-269
: 🛠️ Refactor suggestionMake the retry test more deterministic.
The test relies on timing which could make it flaky. Consider making it more robust by:
- Verifying the exact number of retries
- Using a mock to control timing
271-297
: 🛠️ Refactor suggestionAdd assertions to verify invalid options handling.
The test sends invalid options but doesn't verify how the server handles them. Add assertions to verify either:
- Error packets for invalid values, or
- Fallback to default values in OACK
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (2)
348-363
: 🛠️ Refactor suggestionConsider adding transfer size limit.
To prevent potential DoS attacks, consider adding a maximum transfer size limit.
382-404
: 🛠️ Refactor suggestionAdd overall transfer timeout.
Consider adding a timeout for the entire transfer operation to prevent hanging transfers.
🧹 Nitpick comments (5)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py (2)
23-30
: LGTM! Consider adding exponential backoff.The retry mechanism for server binding is well-implemented. However, consider adding exponential backoff to the sleep duration to handle potential port contention more gracefully.
+ base_sleep = 0.1 for _ in range(10): if server.address is not None: break - await asyncio.sleep(0.1) + await asyncio.sleep(base_sleep) + base_sleep *= 2 # Exponential backoff else: await server.shutdown() server_task.cancel()
23-30
: LGTM! Consider adding a configurable retry delay.The retry mechanism for server binding is well-implemented. However, the fixed sleep duration of 0.1s could be made configurable for environments where longer delays might be needed.
+ BIND_RETRY_DELAY = 0.1 # seconds + for _ in range(10): if server.address is not None: break - await asyncio.sleep(0.1) + await asyncio.sleep(BIND_RETRY_DELAY)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (3)
234-274
: Consider caching negotiated values.The
_negotiate_block_size
and_negotiate_timeout
methods perform similar validations that could be cached to avoid repeated calculations for the same values.+ @functools.lru_cache(maxsize=128) def _negotiate_block_size(self, requested_blksize: Optional[str]) -> int: # ... existing implementation ... + @functools.lru_cache(maxsize=128) def _negotiate_timeout(self, requested_timeout: Optional[str]) -> float: # ... existing implementation ...
406-452
: Consider adding progress tracking.The
_perform_transfer
method could benefit from progress tracking to:
- Monitor transfer speed
- Detect stalled transfers
- Log progress for large files
async def _perform_transfer(self): + start_time = time.monotonic() + total_bytes = 0 async with aiofiles.open(self.filepath, 'rb') as f: while True: if self.server.shutdown_event.is_set(): self.logger.info(f"Server shutdown detected, stopping transfer to {self.client_addr}") break data = await f.read(self.block_size) + total_bytes += len(data) + elapsed = time.monotonic() - start_time + speed = total_bytes / elapsed if elapsed > 0 else 0 + self.logger.info(f"Transfer progress: {total_bytes} bytes, {speed:.2f} bytes/sec") if not await self._handle_data_block(data): break
541-548
: Enhance error messages with more details.Consider providing more specific error messages to help with debugging:
- self.logger.warning(f"Received malformed packet from {addr}") + self.logger.warning(f"Received malformed packet from {addr}: length {len(data)} < 4 bytes") - self.logger.error(f"Unknown opcode from {addr}") + self.logger.error(f"Unknown opcode {data[0:2].hex()} from {addr}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/client.py
(1 hunks)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver.py
(1 hunks)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py
(1 hunks)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py
(10 hunks)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py
(5 hunks)packages/jumpstarter-driver-tftp/pyproject.toml
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/client.py
- packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver_test.py
- packages/jumpstarter-driver-tftp/pyproject.toml
- packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/driver.py
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: pytest-matrix (3.13)
- GitHub Check: pytest-matrix (3.12)
- GitHub Check: pytest-matrix (3.11)
🔇 Additional comments (15)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py (5)
48-49
: LGTM!The test client creation is well-implemented using asyncio's datagram endpoint.
53-65
: Enhance test assertions.The current
assert True
doesn't verify any specific behavior. Consider adding assertions to verify the server's state after startup and shutdown.
185-213
: Add assertions to verify options negotiation.The test sends a request with options but doesn't verify the server's response. Add assertions to verify the OACK packet and negotiated values.
215-269
: Make the retry test more deterministic.The test relies on timing which could make it flaky. Consider making it more robust by verifying the exact number of retries and using a mock to control timing.
271-297
: Add assertions to verify invalid options handling.The test sends invalid options but doesn't verify how the server handles them. Add assertions to verify either error packets for invalid values or fallback to default values in OACK.
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (10)
17-17
: LGTM!The
OACK
opcode is correctly defined according to RFC 2347.
49-49
: LGTM! Good addition of ready_event.The
ready_event
is a good addition for synchronizing test cases with server readiness.
51-56
: LGTM! Well-implemented address property.The
address
property correctly handles the case when transport is not available.
17-17
: LGTM! OACK opcode added correctly.The OACK opcode is added with the correct value (6) according to RFC 2347.
49-49
: LGTM! Ready event added correctly.The
ready_event
is properly initialized and used to signal when the server is ready to accept requests.
51-56
: LGTM! Address property implementation is correct.The
address
property correctly retrieves the server's bound address and port when the transport is available.
234-274
: LGTM! Option negotiation is well-implemented.The block size and timeout validation is thorough and follows RFC recommendations:
- Block size is limited to 512-65464 bytes
- Timeout is limited to 1-255 seconds
- Invalid values fall back to defaults with appropriate logging
348-357
: Consider adding transfer size limit and overall timeout.To prevent potential DoS attacks and hanging transfers:
- Add a maximum transfer size limit
- Add an overall transfer timeout
382-404
: LGTM! Transfer initialization is well-structured.The initialization process correctly:
- Creates a new endpoint for the transfer
- Handles OACK when necessary
- Sets up the initial block number
437-439
: LGTM! Block number wrapping handled correctly.The code properly handles block number wrapping at 16 bits (65536) as required by the TFTP protocol.
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py
Outdated
Show resolved
Hide resolved
49f1a4b
to
96700ae
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🔭 Outside diff range comments (1)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py (1)
Line range hint
67-92
: Add assertions to verify data transfer.The test should verify:
- The actual data received from the server matches the test file content.
- The transfer completion status.
Apply this diff to improve the test:
transport.sendto(rrq_packet) await server.ready_event.wait() + # Wait for data transfer + await asyncio.sleep(0.1) + + # Verify received data + assert len(protocol.received_packets) > 0, "Should receive data packets" + data = b''.join(p[4:] for p in protocol.received_packets + if p.startswith(Opcode.DATA.to_bytes(2, 'big'))) + assert data == b"Hello, TFTP!", "Received data should match test file content" + assert server.transport is not None
♻️ Duplicate comments (5)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py (3)
185-214
:⚠️ Potential issueAdd assertions to verify option negotiation.
The test sends options but doesn't verify the server's response. Add assertions to validate the OACK packet and negotiated values.
215-269
:⚠️ Potential issueMake the retry test more deterministic.
The test relies on timing which could make it flaky. Consider making it more robust by verifying the exact number of retries and using a mock to control timing.
271-297
:⚠️ Potential issueAdd assertions to verify invalid options handling.
The test sends invalid options but doesn't verify how the server handles them.
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (2)
417-453
: 🛠️ Refactor suggestionConsider adding transfer size limit.
To prevent potential DoS attacks, consider adding a maximum transfer size limit.
476-499
: 🛠️ Refactor suggestionAdd overall transfer timeout.
Consider adding a timeout for the entire transfer operation to prevent hanging transfers.
🧹 Nitpick comments (5)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py (2)
23-30
: Consider using constants for retry configuration.The retry logic is good, but the magic numbers should be constants for better maintainability.
+MAX_BIND_ATTEMPTS = 10 +BIND_RETRY_DELAY = 0.1 # seconds - for _ in range(10): + for _ in range(MAX_BIND_ATTEMPTS): if server.address is not None: break - await asyncio.sleep(0.1) + await asyncio.sleep(BIND_RETRY_DELAY)
23-30
: Consider increasing the retry timeout and adding jitter.The retry logic is a good addition, but consider:
- Increasing the total timeout (currently 1s) for slower systems.
- Adding jitter to prevent thundering herd in test environments.
+ import random + max_attempts = 10 + base_delay = 0.2 - for _ in range(10): + for attempt in range(max_attempts): if server.address is not None: break - await asyncio.sleep(0.1) + # Add jitter to prevent multiple tests from retrying simultaneously + delay = base_delay * (1 + random.random()) + await asyncio.sleep(delay) else: await server.shutdown() server_task.cancel() - raise RuntimeError("Failed to bind TFTP server to a port.") + raise RuntimeError(f"Failed to bind TFTP server to a port after {max_attempts} attempts.")packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (3)
234-291
: Consider extracting common option validation logic.The option negotiation code has duplicated validation patterns. Consider creating a generic option validator.
+def _validate_option( + self, name: str, value: str, min_val: int, max_val: int, + default: Union[int, float] +) -> Union[int, float]: + try: + val = int(value) + if min_val <= val <= max_val: + return float(val) if isinstance(default, float) else val + self.logger.warning( + f"{name} value {val} out of range ({min_val}-{max_val}), " + f"using default: {default}" + ) + except ValueError: + self.logger.warning( + f"Invalid {name} value '{value}', using default: {default}" + ) + return default def _negotiate_block_size(self, requested_blksize: Optional[str]) -> int: if requested_blksize is None: return self.server.block_size - try: - blksize = int(requested_blksize) - if 512 <= blksize <= 65464: - return blksize - else: - self.logger.warning( - f"Requested block size {blksize} out of range (512-65464), " - f"using default: {self.server.block_size}" - ) - return self.server.block_size - except ValueError: - self.logger.warning( - f"Invalid block size value '{requested_blksize}', " - f"using default: {self.server.block_size}" - ) - return self.server.block_size + return int(self._validate_option( + 'block size', requested_blksize, 512, 65464, self.server.block_size + )) def _negotiate_timeout(self, requested_timeout: Optional[str]) -> float: if requested_timeout is None: return self.server.timeout - try: - timeout = int(requested_timeout) - if 1 <= timeout <= 255: - return float(timeout) - else: - self.logger.warning( - f"Timeout value {timeout} out of range (1-255), " - f"using default: {self.server.timeout}" - ) - return self.server.timeout - except ValueError: - self.logger.warning( - f"Invalid timeout value '{requested_timeout}', " - f"using default: {self.server.timeout}" - ) - return self.server.timeout + return self._validate_option( + 'timeout', requested_timeout, 1, 255, self.server.timeout + )
382-404
: Define constants for TFTP protocol values.The default block size (512) should be defined as a constant.
+DEFAULT_BLOCK_SIZE = 512 # As per RFC 1350 # Only send OACK if we have non-default options to negotiate if self.negotiated_options and ( - self.negotiated_options['blksize'] != 512 or + self.negotiated_options['blksize'] != DEFAULT_BLOCK_SIZE or self.negotiated_options['timeout'] != self.server.timeout ):
541-557
: Consider adding rate limiting for error responses.To prevent potential DoS attacks, consider adding rate limiting for error responses.
class TftpTransferProtocol(asyncio.DatagramProtocol): def __init__(self, transfer: TftpReadTransfer): self.transfer = transfer self.logger = logging.getLogger(self.__class__.__name__) + self.error_count = 0 + self.last_error_time = 0 + self.MAX_ERRORS_PER_SECOND = 10 def _send_error(self, addr: Tuple[str, int], error_code: TftpErrorCode, message: str): + current_time = time.time() + if current_time - self.last_error_time >= 1: + self.error_count = 0 + self.last_error_time = current_time + + self.error_count += 1 + if self.error_count > self.MAX_ERRORS_PER_SECOND: + self.logger.warning(f"Error response rate limit exceeded for {addr}") + return + error_packet = ( Opcode.ERROR.to_bytes(2, 'big') + error_code.to_bytes(2, 'big') + message.encode('utf-8') + b'\x00' )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py
(10 hunks)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py
(5 hunks)packages/jumpstarter-driver-tftp/pyproject.toml
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/jumpstarter-driver-tftp/pyproject.toml
👮 Files not reviewed due to content moderation or server errors (2)
- packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py
- packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: pytest-matrix (3.13)
- GitHub Check: pytest-matrix (3.12)
- GitHub Check: pytest-matrix (3.11)
🔇 Additional comments (28)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py (11)
16-43
: LGTM! Robust server initialization with proper cleanup.The retry logic for server binding and thorough cleanup of tasks are good improvements.
53-65
: Enhance test assertions.The test should verify the server's state after startup and shutdown.
185-214
: Add assertions to verify options negotiation.The test sends a request with options but doesn't verify the server's response.
215-269
: Make the retry test more deterministic.The test relies on timing which could make it flaky.
271-297
: Add assertions to verify invalid options handling.The test sends invalid options but doesn't verify how the server handles them.
16-20
: LGTM! Good use of dynamic port allocation.Using port 0 allows the OS to assign an available port, which is ideal for testing to avoid port conflicts.
37-43
: LGTM! Thorough cleanup implementation.The cleanup code properly handles task cancellation and ensures no lingering tasks remain after tests.
53-65
: Enhance test assertions.The current
assert True
doesn't verify any specific behavior. Consider adding assertions to verify the server's state after startup and shutdown.
185-214
: Add assertions to verify options negotiation.The test sends a request with options but doesn't verify the server's response. Add assertions to verify the OACK packet and negotiated values.
215-269
: Make the retry test more deterministic.The test relies on timing which could make it flaky. Consider making it more robust by:
- Verifying the exact number of retries
- Using a mock to control timing
271-297
: Add assertions to verify invalid options handling.The test sends invalid options but doesn't verify how the server handles them. Add assertions to verify either:
- Error packets for invalid values, or
- Fallback to default values in OACK
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (17)
17-17
: LGTM! OACK opcode follows RFC 2347.The OACK opcode value (6) is correctly defined according to RFC 2347 for TFTP Option Extension.
36-56
: LGTM! Good synchronization and address handling.The
ready_event
for synchronization andaddress
property with proper null checks are well-implemented.
234-292
: LGTM! Thorough option negotiation following RFCs.The implementation:
- Follows RFC 2348 for block size (512-65464)
- Follows RFC 2349 for timeout (1-255)
- Handles invalid values gracefully with proper fallbacks
382-404
: Consider adding transfer size limit.To prevent potential DoS attacks, consider adding a maximum transfer size limit.
406-453
: Add overall transfer timeout.Consider adding a timeout for the entire transfer operation to prevent hanging transfers.
454-475
: LGTM! Packet creation follows TFTP RFC.The packet creation methods correctly handle:
- OACK packet format (RFC 2347)
- DATA packet format (RFC 1350)
- Byte order (big-endian)
476-499
: LGTM! Robust retry mechanism.The retry implementation includes:
- Proper timeout handling
- Clear logging for debugging
- Correct ACK verification
503-517
: LGTM! Comprehensive ACK handling.The implementation correctly handles:
- OACK acknowledgment (RFC 2347)
- Duplicate ACKs
- Out-of-sequence ACKs
17-17
: LGTM! Correct OACK opcode value.The OACK opcode value 6 is correctly defined according to RFC 2347.
454-475
: LGTM! Well-structured packet creation with good logging.The packet creation methods are well-implemented with appropriate logging of packet details.
17-17
: LGTM! OACK opcode value matches RFC 2347.The OACK opcode value of 6 is correctly defined according to RFC 2347.
44-56
: LGTM! Good use of ready_event and safe address property.The ready_event provides a reliable way to wait for server readiness, and the address property safely handles the case when transport is None.
234-291
: Consider adding transfer size limit.To prevent potential DoS attacks, consider adding a maximum transfer size limit.
348-475
: Add overall transfer timeout.Consider adding a timeout for the entire transfer operation to prevent hanging transfers.
187-196
: Consider adding input validation for filename.The
_parse_request
method should validate the filename for:
- Maximum length
- Allowed characters
- Potential null byte injection
437-439
: LGTM! Correct block number wrapping per RFC 1350.The block number wrapping at 65536 using the modulo operator is correctly implemented according to RFC 1350.
504-510
: LGTM! Robust OACK confirmation handling.The special handling for OACK acknowledgment with block number 0 is correctly implemented, with proper state management using the oack_confirmed flag.
96700ae
to
01d82a1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py (2)
23-30
: Consider using exponential backoff for retries.The current retry mechanism uses a fixed sleep duration of 0.1 seconds between attempts. Consider implementing exponential backoff to handle transient issues more gracefully.
Apply this diff to implement exponential backoff:
- for _ in range(10): - if server.address is not None: - break - await asyncio.sleep(0.1) + max_attempts = 10 + base_delay = 0.1 + for attempt in range(max_attempts): + if server.address is not None: + break + delay = base_delay * (2 ** attempt) # exponential backoff + await asyncio.sleep(delay)
37-43
: Consider adding timeout to task cancellation.The current cleanup code might hang if a task doesn't respond to cancellation. Consider adding a timeout to ensure cleanup completes in a timely manner.
Apply this diff to add timeout to task cancellation:
for task in asyncio.all_tasks(): if not task.done() and task != asyncio.current_task(): task.cancel() try: - await task + await asyncio.wait_for(task, timeout=5.0) # 5 seconds timeout except asyncio.CancelledError: pass + except asyncio.TimeoutError: + self.logger.warning(f"Task {task.get_name()} did not respond to cancellation")packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (1)
167-176
: Consider adding option validation.The
_send_oack
method should validate that the options dictionary is not empty before sending the OACK packet.Apply this diff to add validation:
def _send_oack(self, addr: Tuple[str, int], options: dict): """Send Option Acknowledgment (OACK) packet.""" + if not options: + self.logger.warning("Attempted to send OACK with no options") + return + oack_data = Opcode.OACK.to_bytes(2, 'big') for opt_name, opt_value in options.items(): oack_data += f"{opt_name}\0{str(opt_value)}\0".encode('utf-8')
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py
(10 hunks)packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py
(5 hunks)packages/jumpstarter-driver-tftp/pyproject.toml
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/jumpstarter-driver-tftp/pyproject.toml
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: pytest-matrix (3.13)
- GitHub Check: pytest-matrix (3.12)
- GitHub Check: pytest-matrix (3.11)
🔇 Additional comments (9)
packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server_test.py (5)
16-20
: LGTM!The server initialization is correct, using port 0 for dynamic port allocation and a temporary directory for testing.
53-65
: Enhance test assertions.The current
assert True
doesn't verify any specific behavior.Apply this diff to add meaningful assertions:
await server.ready_event.wait() + assert server.transport is not None, "Server transport should be initialized" + assert server.address is not None, "Server should be bound to an address" await server.shutdown() await server_task - assert True + assert server.transport is None, "Server transport should be cleaned up" + assert not server.active_transfers, "All transfers should be cleaned up"
185-214
: Add assertions to verify options negotiation.The test sends a request with options but doesn't verify the server's response.
Apply this diff to add assertions:
transport.sendto(rrq_packet) + await asyncio.sleep(0.1) # Wait for server response + + # Verify OACK packet + assert len(protocol.received_packets) > 0, "Should receive OACK packet" + oack_packet = protocol.received_packets[0] + assert oack_packet.startswith(Opcode.OACK.to_bytes(2, 'big')), "First packet should be OACK" + + # Parse and verify negotiated options + options_data = oack_packet[2:].split(b'\x00') + negotiated_options = {} + for i in range(0, len(options_data) - 1, 2): + opt_name = options_data[i].decode('utf-8') + opt_value = options_data[i + 1].decode('utf-8') + negotiated_options[opt_name] = opt_value + + assert negotiated_options['blksize'] == '1024', "Block size should be negotiated to 1024" + assert negotiated_options['timeout'] == '3', "Timeout should be negotiated to 3"
215-269
: Make the retry test more deterministic.The test relies on timing which could make it flaky.
Apply this diff to make it more robust:
server.timeout = 1 + expected_retries = server.retries transport = None class TestProtocol(asyncio.DatagramProtocol): def __init__(self): self.received_packets = [] self.transport = None + self.packet_times = [] def datagram_received(self, data, addr): self.received_packets.append(data) + self.packet_times.append(asyncio.get_event_loop().time()) # ... rest of the test ... data_packets = [p for p in protocol.received_packets if p[0:2] == Opcode.DATA.to_bytes(2, 'big')] - assert len(data_packets) > 1, "Server should have retried sending DATA packet" + assert len(data_packets) == expected_retries, f"Server should retry exactly {expected_retries} times" + + # Verify retry intervals + packet_intervals = [t2 - t1 for t1, t2 in zip(protocol.packet_times, protocol.packet_times[1:])] + assert all(interval >= server.timeout for interval in packet_intervals), "Retries should respect timeout interval"
271-297
: Add assertions to verify invalid options handling.The test sends invalid options but doesn't verify how the server handles them.
Apply this diff to add assertions:
transport.sendto(rrq_packet) + await asyncio.sleep(0.1) # Wait for server response + + # Verify server response + assert len(protocol.received_packets) > 0, "Should receive server response" + response = protocol.received_packets[0] + + if response.startswith(Opcode.OACK.to_bytes(2, 'big')): + # Verify fallback to default values + options_data = response[2:].split(b'\x00') + negotiated_options = {} + for i in range(0, len(options_data) - 1, 2): + opt_name = options_data[i].decode('utf-8') + opt_value = options_data[i + 1].decode('utf-8') + negotiated_options[opt_name] = opt_value + + assert int(negotiated_options['blksize']) == server.block_size, "Should fallback to default block size" + assert float(negotiated_options['timeout']) == server.timeout, "Should fallback to default timeout" + else: + # Verify error response + assert response.startswith(Opcode.ERROR.to_bytes(2, 'big')), "Should receive ERROR packet" + error_code = int.from_bytes(response[2:4], 'big') + assert error_code == TftpErrorCode.ILLEGAL_OPERATION.value, "Should indicate illegal operation"packages/jumpstarter-driver-tftp/jumpstarter_driver_tftp/server.py (4)
17-17
: LGTM!The OACK opcode value of 6 is correctly defined according to RFC 2347.
49-56
: LGTM!The
ready_event
andaddress
property are well-implemented. The type hints are consistent with PEP 8.
389-412
: Consider adding transfer size limit.To prevent potential DoS attacks, consider adding a maximum transfer size limit.
Apply this diff to add size limit:
async def _initialize_transfer(self) -> bool: + self.max_transfer_size = 1024 * 1024 * 1024 # 1GB limit + self.total_bytes_sent = 0 loop = asyncio.get_running_loop() self.transport, self.protocol = await loop.create_datagram_endpoint( lambda: TftpTransferProtocol(self), local_addr=('0.0.0.0', 0), remote_addr=self.client_addr )
548-564
: LGTM!The error handling improvements are well-implemented with descriptive error messages and proper logging.
Signed-off-by: Benny Zlotnik <[email protected]> fix negotiated options Signed-off-by: Benny Zlotnik <[email protected]> tftp: update tests Signed-off-by: Benny Zlotnik <[email protected]> tftp: coderabbit suggestion Signed-off-by: Benny Zlotnik <[email protected]>
b0541f7
to
b8cba63
Compare
Resolved merge conflict. |
Summary by CodeRabbit
New Features
Refactor
Tests