-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Session
s to send messages
#583
base: main
Are you sure you want to change the base?
Conversation
…oprnet/ct-research into jean/various-code-improvements
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Outside diff range and nitpick comments (11)
ct-app/.configs/core_prod_config.yaml (3)
36-37
: Add validation for On/Off flagsThe flags
observeMessageQueue
andobserveRelayedMessages
are marked as "should only be On / Off" in comments, but there's no enforcement. Consider using proper boolean values or adding validation.- observeMessageQueue: On # should only be On / Off - observeRelayedMessages: On # should only be On / Off + observeMessageQueue: true # boolean + observeRelayedMessages: true # boolean
38-38
: Document the observeSessions intervalThe new
observeSessions
flag is set to 60 (presumably seconds) but lacks documentation about its purpose and impact. Please add a comment explaining:
- What this interval controls
- The implications of adjusting this value
- Any minimum/maximum recommended values
103-103
: Remove trailing spacesRemove trailing spaces from lines 103, 113, and 119 to fix yamllint errors.
Also applies to: 113-113, 119-119
🧰 Tools
🪛 yamllint (1.35.1)
[error] 103-103: trailing spaces
(trailing-spaces)
ct-app/core/components/session_to_socket.py (1)
16-16
: Enhance exception chaining for better traceback clarityWhen raising an exception within an
except
block, it's good practice to useraise ... from e
to maintain the exception context. This helps in debugging by showing the original traceback.Apply this diff:
except (socket.error, ValueError) as e: - raise ValueError(f"Error while creating socket: {e}") + raise ValueError(f"Error while creating socket: {e}") from e🧰 Tools
🪛 Ruff (0.8.2)
16-16: Within an
except
clause, raise exceptions withraise ... from err
orraise ... from None
to distinguish them from errors in exception handling(B904)
ct-app/core/api/protocol.py (1)
29-31
: Enhance exception chaining for better traceback clarityWhen raising an exception within an
except
block, useraise ... from e
to maintain the exception context. This provides a complete traceback of the error.Apply this diff:
@classmethod def fromString(cls, protocol: str): try: return getattr(cls, protocol.upper()) - except AttributeError: + except AttributeError as e: raise ValueError( f"Invalid protocol: {protocol}. Valid values are: {[p.name for p in cls]}" - ) + ) from e🧰 Tools
🪛 Ruff (0.8.2)
29-31: Within an
except
clause, raise exceptions withraise ... from err
orraise ... from None
to distinguish them from errors in exception handling(B904)
ct-app/.configs/core_staging_config.yaml (2)
114-114
: Remove trailing spacesLine 114 contains trailing spaces, which can cause issues in YAML parsers.
Apply this diff:
- # + #🧰 Tools
🪛 yamllint (1.35.1)
[error] 114-114: trailing spaces
(trailing-spaces)
119-119
: Address the TODO comment forport
There's a TODO comment indicating that the port needs to be specified after consulting with
@Ausias
. Please update theport
value when you have the necessary information.Do you want me to help specify the port or open a new GitHub issue to track this task?
ct-app/core/core.py (2)
111-112
: Consider optimizing peer version lookupThe current implementation performs two linear searches on the same list. Consider using a dictionary for O(1) lookup.
- new_version = visible_peers[visible_peers.index( - peer)].version + peer_dict = {p: p.version for p in visible_peers} + new_version = peer_dict[peer]
451-451
: Remove unnecessary empty lineConsider removing this extra empty line to maintain consistent spacing.
ct-app/core/api/hoprd_api.py (2)
58-63
: Simplify nested context managers.Consider using a single
with
statement with multiple contexts for better readability.- async with aiohttp.ClientSession(headers=self.headers) as s: - async with getattr(s, method.value)( - url=f"{self.host}{self.prefix}{endpoint}", - json={} if data is None else data.as_dict, - headers=headers, - ) as res: + async with ( + aiohttp.ClientSession(headers=self.headers) as s, + getattr(s, method.value)( + url=f"{self.host}{self.prefix}{endpoint}", + json={} if data is None else data.as_dict, + headers=headers, + ) as res + ):🧰 Tools
🪛 Ruff (0.8.2)
58-63: Use a single
with
statement with multiple contexts instead of nestedwith
statements(SIM117)
64-68
: Enhance JSON parsing error handling.The current error handling catches all exceptions. Consider catching specific JSON-related exceptions for better error reporting.
- try: - data = await res.json() - except Exception: - data = await res.text() + try: + data = await res.json() + except aiohttp.ContentTypeError: + data = await res.text() + self.warning(f"Failed to parse JSON response: {data}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
ct-app/.configs/core_prod_config.yaml
(4 hunks)ct-app/.configs/core_staging_config.yaml
(2 hunks)ct-app/.env.example
(1 hunks)ct-app/core/api/hoprd_api.py
(8 hunks)ct-app/core/api/protocol.py
(1 hunks)ct-app/core/api/request_objects.py
(1 hunks)ct-app/core/components/peer.py
(2 hunks)ct-app/core/components/session_to_socket.py
(1 hunks)ct-app/core/components/utils.py
(4 hunks)ct-app/core/core.py
(10 hunks)ct-app/core/node.py
(13 hunks)ct-app/tests_endurance/test_fund_channels.py
(3 hunks)
✅ Files skipped from review due to trivial changes (1)
- ct-app/.env.example
🚧 Files skipped from review as they are similar to previous changes (2)
- ct-app/core/components/utils.py
- ct-app/tests_endurance/test_fund_channels.py
🧰 Additional context used
📓 Learnings (2)
ct-app/core/node.py (1)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/node.py:472-476
Timestamp: 2024-11-19T16:47:51.812Z
Learning: The `create_socket` function in `ct-app/core/node.py` is intentionally synchronous and should not be converted to an asynchronous function.
ct-app/core/api/hoprd_api.py (1)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/api/hoprd_api.py:0-0
Timestamp: 2024-11-26T15:39:26.752Z
Learning: In `ct-app/core/api/hoprd_api.py`, the `relayer` parameter in the `post_session` method is expected to be a string, as the application handles only one relayer. The `SessionPathBodyRelayers` class expects a list because the API requires an array of strings.
🪛 Ruff (0.8.2)
ct-app/core/api/protocol.py
29-31: Within an except
clause, raise exceptions with raise ... from err
or raise ... from None
to distinguish them from errors in exception handling
(B904)
ct-app/core/components/session_to_socket.py
16-16: Within an except
clause, raise exceptions with raise ... from err
or raise ... from None
to distinguish them from errors in exception handling
(B904)
ct-app/core/api/hoprd_api.py
58-63: Use a single with
statement with multiple contexts instead of nested with
statements
(SIM117)
🪛 yamllint (1.35.1)
ct-app/.configs/core_prod_config.yaml
[error] 103-103: trailing spaces
(trailing-spaces)
[error] 113-113: trailing spaces
(trailing-spaces)
[error] 119-119: trailing spaces
(trailing-spaces)
ct-app/.configs/core_staging_config.yaml
[error] 114-114: trailing spaces
(trailing-spaces)
🔇 Additional comments (10)
ct-app/.configs/core_prod_config.yaml (1)
115-118
: Add documentation for sessions configuration
I understand these parameters will be defined later, but let's add documentation placeholders to track what needs to be defined:
- Document the significance of
packetSize: 462
- Explain the impact of
numPackets: 10
- Consider environment-specific port configuration
sessions:
+ # Size of each packet in bytes
+ # TODO: Document why 462 bytes and any min/max constraints
packetSize: 462
+ # Number of packets per session
+ # TODO: Document the impact on performance/reliability
numPackets: 10
+ # Port for session communication
+ # TODO: Use environment-specific port
port: 1234
ct-app/core/api/request_objects.py (1)
43-43
: Use locals()
instead of vars()
in constructors for clarity
In the __init__
methods, using locals()
makes it clear that you are capturing the constructor arguments. Using vars()
can be misleading, as it's often used to get the __dict__
of an object.
Apply this diff:
def __init__(self, amount: str, peer_address: str):
- super().__init__(vars())
+ super().__init__(locals())
def __init__(self, amount: float):
- super().__init__(vars())
+ super().__init__(locals())
def __init__(self, full_topology: str, including_closed: str):
- super().__init__(vars())
+ super().__init__(locals())
def __init__(self, quality: float):
- super().__init__(vars())
+ super().__init__(locals())
def __init__(
self,
capabilities: list,
destination: str,
listen_host: str,
path: str,
target: str,
):
- super().__init__(vars())
+ super().__init__(locals())
def __init__(self, retransmission: bool, segmentation: bool):
- super().__init__(vars())
+ super().__init__(locals())
def __init__(self, relayers: list[str]):
- super().__init__(vars())
+ super().__init__(locals())
def __init__(self, hops: int = 0):
- super().__init__(vars())
+ super().__init__(locals())
def __init__(self, service: int = 0):
- super().__init__(vars())
+ super().__init__(locals())
def __init__(self, ip: str, port: str):
- super().__init__(vars())
+ super().__init__(locals())
Also applies to: 50-50, 63-63, 70-70, 90-90, 100-100, 115-115, 124-124, 136-136, 143-143
ct-app/.configs/core_staging_config.yaml (1)
38-38
: Verify the value of observeSessions
The observeSessions
parameter is set to 60
, whereas similar parameters are set to On
or Off
. Please confirm if 60
is the intended value.
ct-app/core/core.py (3)
7-13
: LGTM: Import reorganization aligns with HOPRd 2.2 changes
The import changes properly reflect the new dependencies needed for session management and API handling.
447-448
: LGTM: Socket handling implementation for HOPRd 2.2
The implementation correctly adds TCP/UDP socket listeners for the new session management feature.
445-445
: Verify backward compatibility of message queue observation
The change from watch_message_queue
to observe_message_queue
needs verification for backward compatibility with existing integrations.
ct-app/core/components/peer.py (1)
148-152
: LGTM! Good improvement in message relay request handling.
The changes properly utilize the configured packet size and include a helpful comment explaining the doubled delay for loopback sessions.
ct-app/core/api/hoprd_api.py (1)
225-258
: LGTM! Well-structured session management implementation.
The implementation correctly handles the relayer parameter as a string and provides comprehensive session creation functionality with proper parameter validation.
ct-app/core/node.py (2)
18-24
: LGTM! Well-structured metrics implementation.
The metrics are properly defined with clear labels and are consistently used throughout the code for tracking channel operations and message statistics.
422-430
:
Improve session cleanup process.
The session cleanup process should handle errors during socket closure and session deletion.
- to_remove = []
- for peer, s in self.session_management.items():
- if s.session in active_sessions:
- continue
- s.socket.close()
- to_remove.append(peer)
-
- for peer in to_remove:
- await self.api.close_session(self.session_management.pop(peer).session)
+ to_remove = []
+ for peer, s in self.session_management.items():
+ if s.session in active_sessions:
+ continue
+ try:
+ s.socket.close()
+ to_remove.append(peer)
+ except Exception as e:
+ self.error(f"Error closing socket for peer {peer}: {e}")
+ to_remove.append(peer)
+
+ for peer in to_remove:
+ try:
+ session_management = self.session_management.pop(peer)
+ await self.api.close_session(session_management.session)
+ except Exception as e:
+ self.error(f"Error closing session for peer {peer}: {e}")
Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (15)
ct-app/core/components/address.py (3)
2-4
: Fix docstring typoThe class docstring has a typo: "native and native address" should be "hopr and native address".
- Class that represents an address with a native and native address. + Class that represents an address with a hopr and native address.
7-11
: Update parameter documentationThe parameter documentation references "id" but the parameter is named "hopr".
- :param id: The id of the peer. - :param address: The address of the peer. + :param hopr: The peer ID. + :param native: The native address of the peer.
15-22
: Add type hints to methodsConsider adding type hints to the comparison and representation methods for better code maintainability.
- def __eq__(self, other): + def __eq__(self, other: 'Address') -> bool: return self.hopr == other.hopr and self.native == other.native - def __hash__(self): + def __hash__(self) -> int: return hash((self.hopr, self.native)) - def __repr__(self): + def __repr__(self) -> str: return f"Address({self.hopr}, {self.native})"ct-app/core/api/protocol.py (1)
4-8
: Fix class name typoThe class name has a typo: "ProcolTemplate" should be "ProtocolTemplate".
-class ProcolTemplate: +class ProtocolTemplate:ct-app/test/model/test_peer.py (3)
61-62
: Fix line continuation indentationThe line continuation indentation is inconsistent. It should use 4 spaces for consistency with Python style guidelines.
- rand_delay = round(random.random() * - (max_range - min_range) + min_range, 1) + rand_delay = round(random.random() * + (max_range - min_range) + min_range, 1)
Line range hint
11-45
: Add test cases for version comparison edge casesThe version comparison tests should include additional edge cases:
- Invalid version strings
- None values
- Pre-release versions vs. final releases
Consider adding these test cases:
def test_peer_version_edge_cases(): peer = Peer("some_id", "some_address", "1.0.0") # Invalid version with pytest.raises(ValueError): peer.version = "invalid" # None handling peer.version = "1.0.0" with pytest.raises(TypeError): peer.is_old(None) # Pre-release vs final peer.version = "1.0.0-alpha" assert peer.is_old("1.0.0")
Line range hint
46-80
: Consider using fixed seeds for randomized testsThe test uses random values which could make it flaky. Consider:
- Using a fixed seed for reproducibility
- Using predetermined test values instead of random ones
Example approach:
@pytest.mark.parametrize( "exc_time,min_range,max_range", [ (5, 0.1, 0.3), (8, 0.2, 0.4), (10, 0.15, 0.35) ] )ct-app/core/api/response_objects.py (3)
26-60
: LGTM! The ApiResponseObject base class implementation is well-structured.The changes introduce robust nested key retrieval and useful utility methods. The implementation correctly handles None values and type conversions.
Consider these minor optimizations based on static analysis:
- return all(getattr(self, key) is None for key in self.keys.keys()) + return all(getattr(self, key) is None for key in self.keys) - return {key: getattr(self, key) for key in self.keys.keys()} + return {key: getattr(self, key) for key in self.keys} - return all( - getattr(self, key) == getattr(other, key) for key in self.keys.keys() - ) + return all( + getattr(self, key) == getattr(other, key) for key in self.keys + )🧰 Tools
🪛 Ruff (0.8.2)
44-44: Use
key in dict
instead ofkey in dict.keys()
Remove
.keys()
(SIM118)
48-48: Use
key in dict
instead ofkey in dict.keys()
Remove
.keys()
(SIM118)
58-58: Use
key in dict
instead ofkey in dict.keys()
Remove
.keys()
(SIM118)
113-115
: Consider adding documentation about the expected probability range.While the implementation is correct, it would be helpful to document the expected range of the probability value (e.g., 0.0 to 1.0) in the class docstring.
134-140
: Consider adding port number validation.The Session class should validate that the port number is within the valid range (0-65535) in the post_init method.
class Session(ApiResponseObject): keys = { "ip": "ip", "port": "port", "protocol": "protocol", "target": "target", } + + def post_init(self): + if self.port is not None and not (0 <= self.port <= 65535): + raise ValueError(f"Invalid port number: {self.port}")ct-app/.configs/core_staging_config.yaml (1)
116-119
: Document rationale for different packet configurations.The staging environment uses different session packet configuration (size: 462, count: 10) compared to test (size: 472, count: 3). Please document the reasoning behind these differences to ensure maintainability.
ct-app/.configs/core_prod_config.yaml (1)
115-117
: Document the significance of session parametersWhile these parameters will be defined later, it would be helpful to add comments explaining:
- The significance of the 462-byte packet size
- Why 10 packets were chosen
- Any implications these values have on performance or reliability
ct-app/core/node.py (3)
23-23
: Add a descriptive help text for the MESSAGES_STATS metric.The empty description string ("") makes it harder for operators to understand what this metric represents. Consider adding a meaningful description.
-MESSAGES_STATS = Gauge("ct_messages_stats", "", ["type", "sender", "relayer"]) +MESSAGES_STATS = Gauge("ct_messages_stats", "Statistics for message operations", ["type", "sender", "relayer"])
52-52
: Document the session management dictionary.Add a docstring to explain the purpose and structure of the session management dictionary.
+ # Maps peer IDs to their corresponding socket sessions for message handling self.session_management = dict[str, SessionToSocket]()
74-77
: Enhance error message for address retrieval failure.The current implementation silently returns when address retrieval fails. Consider adding more context to help with debugging.
if addresses is None: + self.warning("Failed to retrieve node addresses from API") return
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
ct-app/.configs/core_prod_config.yaml
(4 hunks)ct-app/.configs/core_staging_config.yaml
(2 hunks)ct-app/core/api/hoprd_api.py
(8 hunks)ct-app/core/api/protocol.py
(1 hunks)ct-app/core/api/response_objects.py
(4 hunks)ct-app/core/components/address.py
(1 hunks)ct-app/core/components/peer.py
(7 hunks)ct-app/core/core.py
(12 hunks)ct-app/core/node.py
(20 hunks)ct-app/test/conftest.py
(4 hunks)ct-app/test/model/test_peer.py
(3 hunks)ct-app/test/test_config.yaml
(3 hunks)ct-app/test/test_node.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- ct-app/test/test_node.py
- ct-app/core/core.py
- ct-app/test/conftest.py
🧰 Additional context used
📓 Learnings (2)
ct-app/core/node.py (1)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/node.py:472-476
Timestamp: 2024-11-19T16:47:51.812Z
Learning: The `create_socket` function in `ct-app/core/node.py` is intentionally synchronous and should not be converted to an asynchronous function.
ct-app/core/api/hoprd_api.py (1)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/api/hoprd_api.py:0-0
Timestamp: 2024-11-26T15:39:26.752Z
Learning: In `ct-app/core/api/hoprd_api.py`, the `relayer` parameter in the `post_session` method is expected to be a string, as the application handles only one relayer. The `SessionPathBodyRelayers` class expects a list because the API requires an array of strings.
🪛 Ruff (0.8.2)
ct-app/core/api/hoprd_api.py
60-65: Use a single with
statement with multiple contexts instead of nested with
statements
(SIM117)
ct-app/core/api/response_objects.py
44-44: Use key in dict
instead of key in dict.keys()
Remove .keys()
(SIM118)
48-48: Use key in dict
instead of key in dict.keys()
Remove .keys()
(SIM118)
58-58: Use key in dict
instead of key in dict.keys()
Remove .keys()
(SIM118)
ct-app/core/api/protocol.py
29-31: Within an except
clause, raise exceptions with raise ... from err
or raise ... from None
to distinguish them from errors in exception handling
(B904)
🪛 yamllint (1.35.1)
ct-app/.configs/core_prod_config.yaml
[error] 103-103: trailing spaces
(trailing-spaces)
[error] 113-113: trailing spaces
(trailing-spaces)
[error] 118-118: trailing spaces
(trailing-spaces)
ct-app/.configs/core_staging_config.yaml
[error] 114-114: trailing spaces
(trailing-spaces)
ct-app/test/test_config.yaml
[error] 113-113: trailing spaces
(trailing-spaces)
[error] 118-118: trailing spaces
(trailing-spaces)
🔇 Additional comments (12)
ct-app/core/api/response_objects.py (1)
106-111
: LGTM! TicketProbability class correctly handles probability values.
The implementation properly converts the probability value to float and follows the established pattern.
ct-app/test/test_config.yaml (2)
100-102
: Document initialSleep configuration and consider reducing mean value.
The implementation shows that initialSleep
is used with asyncio.sleep()
and random.normalvariate()
to add a random delay during peer initialization.
115-118
: Verify session packet configuration values.
The new sessions configuration introduces packet size and count parameters. Please verify:
- Is 472 bytes an optimal packet size for the network conditions?
- Is 3 packets sufficient for reliable message delivery?
🧰 Tools
🪛 yamllint (1.35.1)
[error] 118-118: trailing spaces
(trailing-spaces)
ct-app/.configs/core_prod_config.yaml (2)
36-38
: LGTM: Flag changes align with session management updates
The renaming of message queue and relay flags, along with the addition of session observation with a 60-second interval, aligns well with the HOPRd 2.2 session management updates.
92-92
: LGTM: Improved documentation clarity
The added comment clarifies the relationship between the count parameter and wxHOPR, enhancing maintainability.
ct-app/core/components/peer.py (2)
5-11
: LGTM: Consistent address handling improvements
The changes improve consistency in address handling:
- Better organized imports
- Consistent use of native address in node_address
- Standardized use of hopr address in metrics
Also applies to: 70-70, 82-82
148-152
: Verify the 2x delay multiplier for relay sessions
While the null check and packet size changes look good, please verify that doubling the delay is the correct approach for relay sessions. Consider:
- Is this multiplier documented in the protocol specification?
- Could this cause performance issues in high-latency scenarios?
✅ Verification successful
Let me gather more context about the relay sessions and delay handling.
Let me search for more context about the message handling and session implementation.
Based on the evidence gathered, I can now generate the final response:
The 2x delay multiplier for relay sessions is justified
The code shows that relay sessions involve two distinct hops: first to the relay node and then back through the loopback session, as evidenced by the message tracking in ct-app/core/node.py
. The delay multiplication accounts for this two-hop nature of relay sessions, making it a correct implementation.
- The metrics tracking in
node.py
confirms separate "relayed" and "sent" message states, supporting the two-hop pattern - The delay multiplication is specifically scoped to loopback sessions with relay, not affecting other message paths
- The implementation aligns with the protocol's relay mechanism as shown by the session management code
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other instances of delay multiplication in the codebase
rg -A 2 "delay.*[*].*2"
Length of output: 208
Script:
#!/bin/bash
# Search for relay-related code and documentation
rg -l "relay" --type md
# Search for relay session handling
ast-grep --pattern 'class $_ {
$$$
async def message_relay_request($$$) {
$$$
}
$$$
}'
# Search for delay-related comments and documentation
rg -B 2 -A 2 "delay.*relay"
Length of output: 558
Script:
#!/bin/bash
# Look for the full implementation of the message relay method
ast-grep --pattern 'async def message_relay_request($$$) {
$$$
}'
# Search for session/hop related documentation or comments
rg -B 2 -A 2 "hop.*relay|relay.*hop"
# Look for MessageQueue implementation to understand the flow
ast-grep --pattern 'class MessageQueue {
$$$
}'
Length of output: 1488
ct-app/core/api/hoprd_api.py (2)
66-70
: LGTM: Improved error handling for API responses
The nested try-except block with text fallback improves robustness when handling non-JSON responses.
227-259
: Verify protocol support in session creation
The session creation implementation looks good, but please verify:
- Support for both UDP and TCP protocols
- Handling of retransmission and segmentation capabilities
✅ Verification successful
Let me gather more information about the Protocol enum and its capabilities.
Let me check the protocol implementations (TCPProtocol and UDPProtocol) to verify the capabilities.
Let me check the base ProcolTemplate class to complete the verification.
Protocol support is correctly implemented
The implementation correctly handles both TCP and UDP protocols with appropriate capabilities:
- TCP protocol is configured with retransmission and segmentation enabled
- UDP protocol is configured with both capabilities disabled
- Session creation properly uses these capabilities in the SessionCapabilitiesBody
- Socket creation and data transmission correctly handle the protocol differences
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check protocol handling across the codebase
rg -A 5 "Protocol\.(UDP|TCP)"
Length of output: 2262
Script:
#!/bin/bash
# Check Protocol enum definition and capabilities
rg -A 10 "class Protocol"
Length of output: 678
Script:
#!/bin/bash
# Check TCP and UDP protocol implementations
rg -A 10 "class (TCP|UDP)Protocol"
Length of output: 848
Script:
#!/bin/bash
# Check ProcolTemplate implementation
rg -A 10 "class ProcolTemplate"
Length of output: 607
ct-app/core/node.py (3)
417-425
:
Fix potential race condition in session cleanup.
The current implementation modifies the dictionary while iterating over it, which could lead to a race condition. Consider using a safer approach.
- to_remove = []
- for peer, s in self.session_management.items():
- if s.session in active_sessions:
- continue
- s.socket.close()
- to_remove.append(peer)
-
- for peer in to_remove:
- await self.api.close_session(self.session_management.pop(peer).session)
+ # Create a copy of keys to avoid modification during iteration
+ peers = list(self.session_management.keys())
+ for peer in peers:
+ s = self.session_management.get(peer)
+ if s is None or s.session in active_sessions:
+ continue
+ try:
+ s.socket.close()
+ await self.api.close_session(s.session)
+ self.session_management.pop(peer, None)
+ except Exception as e:
+ self.error(f"Error closing session for peer {peer}: {e}")
Likely invalid or redundant comment.
439-441
: Consider task ordering for proper initialization.
The new observation tasks are added at the end of the list. Consider if these tasks have dependencies that require them to run in a specific order.
✅ Verification successful
Looking at the implementation of these observation methods, I can now generate a final response:
Task ordering appears appropriate for initialization
The observation methods are independent tasks that monitor different aspects of the system:
observe_message_queue
: Processes messages from the queue, checking peer and channel validityobserve_relayed_messages
: Monitors the inbox for messages based on session managementobserve_sessions
: Maintains session cleanup by checking active sessions
These methods don't show direct dependencies on each other's execution order, as they handle separate concerns and don't share state modifications that would require specific sequencing.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for method dependencies in the codebase
ast-grep --pattern 'def observe_$_($$$) {
$$$
self.$_($$$)
$$$
}'
Length of output: 73
Script:
#!/bin/bash
# Let's try a different approach to check for dependencies between these observation methods
# First, let's look at the implementation of these methods
rg "def observe_(message_queue|relayed_messages|sessions)" -A 10
Length of output: 1839
379-385
:
Add error handling for socket operations.
Socket operations can fail and should include proper error handling to prevent crashes and maintain stability.
for relayer, s in self.session_management.items():
- buffer_size: int = (
- self.params.sessions.packetSize * self.params.sessions.numPackets
- )
- messages = s.receive(buffer_size).decode().split("\n")
- MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc(
- len(messages)
- )
+ try:
+ buffer_size: int = (
+ self.params.sessions.packetSize * self.params.sessions.numPackets
+ )
+ data = s.receive(buffer_size)
+ if not data:
+ self.warning(f"No data received from {relayer}")
+ continue
+
+ messages = data.decode().split("\n")
+ if messages:
+ MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc(
+ len(messages)
+ )
+ except Exception as e:
+ self.error(f"Error receiving messages from {relayer}: {e}")
+ continue
Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (6)
ct-app/.configs/core_prod_config.yaml (2)
36-38
: LGTM! Consider documenting the observeSessions interval.The flag naming changes improve consistency. The 60-second interval for
observeSessions
aligns with other monitoring intervals likehealthcheck
.Consider adding a comment to document the unit (seconds) and rationale for the 60-second interval for
observeSessions
, similar to other timing configurations.
103-103
: Remove trailing spaces.There are trailing spaces on these lines that should be removed for consistency.
- + -# +# - +Also applies to: 113-113, 118-118
🧰 Tools
🪛 yamllint (1.35.1)
[error] 103-103: trailing spaces
(trailing-spaces)
ct-app/core/components/session_to_socket.py (4)
21-23
: Enhance socket cleanup in__exit__
.The socket cleanup could be more robust by handling potential errors during close and ensuring the connection is also cleaned up.
def __exit__(self, exc_type, exc_val, exc_tb): - self.socket.close() - self.socket = None + try: + if self.socket: + self.socket.close() + finally: + self.socket = None + self.conn = None
32-38
: Remove unnecessary blank line in address property.For consistency with other properties, remove the blank line between the docstring and return statement.
@property def address(self): """ Returns the socket address tuple. """ - return (self.connect_address, self.session.port)
52-69
: Enhance method documentation with detailed docstrings.The send and receive methods would benefit from more detailed documentation including parameters, return types, and possible exceptions.
def send(self, data: bytes) -> int: """ - Sends data to the peer. + Sends data to the peer using UDP. + + Args: + data (bytes): The data to send + + Returns: + int: Number of bytes sent + + Raises: + ValueError: If protocol is not UDP + socket.error: If sending fails """ if self.session.protocol == Protocol.UDP: return self.socket.sendto(data, self.address) else: raise ValueError(f"Invalid protocol: {self.session.protocol}") def receive(self, size: int) -> bytes: """ - Receives data from the peer. + Receives data from the peer using UDP. + + Args: + size (int): Maximum number of bytes to receive + + Returns: + bytes: Received data + + Raises: + ValueError: If protocol is not UDP + socket.error: If receiving fails or times out """ if self.session.protocol == Protocol.UDP: data, _ = self.socket.recvfrom(size) return data else: raise ValueError(f"Invalid protocol: {self.session.protocol}")
1-69
: Implementation aligns well with HOPRd 2.2 session management requirements.The
SessionToSocket
class provides a clean abstraction for managing UDP socket connections within sessions. The context manager pattern ensures proper resource cleanup, and the implementation correctly handles the socket lifecycle.Consider adding retry logic or connection pooling in future iterations to handle network instability and improve performance.
🧰 Tools
🪛 Ruff (0.8.2)
16-16: Within an
except
clause, raise exceptions withraise ... from err
orraise ... from None
to distinguish them from errors in exception handling(B904)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
ct-app/.configs/core_prod_config.yaml
(4 hunks)ct-app/core/components/session_to_socket.py
(1 hunks)
🧰 Additional context used
🪛 yamllint (1.35.1)
ct-app/.configs/core_prod_config.yaml
[error] 103-103: trailing spaces
(trailing-spaces)
[error] 113-113: trailing spaces
(trailing-spaces)
[error] 118-118: trailing spaces
(trailing-spaces)
🪛 Ruff (0.8.2)
ct-app/core/components/session_to_socket.py
16-16: Within an except
clause, raise exceptions with raise ... from err
or raise ... from None
to distinguish them from errors in exception handling
(B904)
🔇 Additional comments (3)
ct-app/.configs/core_prod_config.yaml (2)
115-117
: Document session configuration parameters.
The fixed values for packetSize
and numPackets
need documentation to explain:
- The significance of 462 bytes packet size
- The rationale for 10 packets
- Any HOPRd 2.2 compatibility requirements
Consider adding comments to explain these parameters, for example:
sessions:
packetSize: 462 # Maximum safe packet size for HOPRd 2.2 compatibility
numPackets: 10 # Optimal packet count for reliable message delivery
92-92
: Verify storage count for wxHOPR requirements.
The comment indicates this count is specific to wxHOPR, but the rationale for the value of 100 isn't documented.
✅ Verification successful
Based on the search results, I can now generate a final response as we have enough context about wxHOPR's purpose and usage in the codebase.
Storage count of 100 appears reasonable for wxHOPR token distribution system
The value of 100 for storage count appears appropriate given that:
- The ct-app's primary purpose is to distribute wxHOPR tokens through HOP messages in the Dufour network
- The system handles token balances and allowances for safes
- The application serves as a replacement for staking rewards distribution
Since this is a configuration value for a production system that's already handling token distribution functionality, and there's no evidence of any reported issues with the current count value, we can consider this verification complete.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for wxHOPR-related storage requirements or documentation
rg -i "wxhopr.*storage" --type md
rg -i "storage.*count.*100" --type md
Length of output: 75
Script:
#!/bin/bash
# Let's try broader searches to understand the context of wxHOPR and storage configurations
rg -i "wxhopr" -A 5 -B 5
rg "count:\s*100" -A 5 -B 5
Length of output: 3755
ct-app/core/components/session_to_socket.py (1)
1-8
: LGTM! Clean imports and class definition.
The imports are minimal and appropriate for the functionality. The class name follows proper naming conventions.
def create_socket(self, timeout: int = 1): | ||
if self.session.protocol == Protocol.UDP: | ||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
else: | ||
raise ValueError(f"Invalid protocol: {self.session.protocol}") | ||
|
||
s.settimeout(timeout) | ||
|
||
conn = None | ||
|
||
return s, conn | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refactor socket creation method for clarity and remove unused variable.
The method initializes an unused conn
variable and could be more explicit about UDP-only support in its documentation.
def create_socket(self, timeout: int = 1):
+ """
+ Creates a UDP socket with the specified timeout.
+
+ Args:
+ timeout (int): Socket timeout in seconds
+
+ Returns:
+ tuple: (socket, None) tuple where socket is the UDP socket
+
+ Raises:
+ ValueError: If protocol is not UDP
+ """
if self.session.protocol == Protocol.UDP:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
else:
raise ValueError(f"Invalid protocol: {self.session.protocol}")
s.settimeout(timeout)
-
- conn = None
-
- return s, conn
+ return s, None
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def create_socket(self, timeout: int = 1): | |
if self.session.protocol == Protocol.UDP: | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
else: | |
raise ValueError(f"Invalid protocol: {self.session.protocol}") | |
s.settimeout(timeout) | |
conn = None | |
return s, conn | |
def create_socket(self, timeout: int = 1): | |
""" | |
Creates a UDP socket with the specified timeout. | |
Args: | |
timeout (int): Socket timeout in seconds | |
Returns: | |
tuple: (socket, None) tuple where socket is the UDP socket | |
Raises: | |
ValueError: If protocol is not UDP | |
""" | |
if self.session.protocol == Protocol.UDP: | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
else: | |
raise ValueError(f"Invalid protocol: {self.session.protocol}") | |
s.settimeout(timeout) | |
return s, None |
try: | ||
self.socket, self.conn = self.create_socket(timeout) | ||
except (socket.error, ValueError) as e: | ||
raise ValueError(f"Error while creating socket: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve exception handling with explicit exception chaining.
Use explicit exception chaining to preserve the original error context.
try:
self.socket, self.conn = self.create_socket(timeout)
except (socket.error, ValueError) as e:
- raise ValueError(f"Error while creating socket: {e}")
+ raise ValueError(f"Error while creating socket: {e}") from e
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
try: | |
self.socket, self.conn = self.create_socket(timeout) | |
except (socket.error, ValueError) as e: | |
raise ValueError(f"Error while creating socket: {e}") | |
try: | |
self.socket, self.conn = self.create_socket(timeout) | |
except (socket.error, ValueError) as e: | |
raise ValueError(f"Error while creating socket: {e}") from e |
🧰 Tools
🪛 Ruff (0.8.2)
16-16: Within an except
clause, raise exceptions with raise ... from err
or raise ... from None
to distinguish them from errors in exception handling
(B904)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
helm/values-staging.yaml (1)
12-12
: Consider high availability for session management.Enabling the core service with one replica is necessary for the new session management features in HOPRd 2.2. However, with five nodes in the configuration, consider increasing the replica count to ensure high availability and better load distribution for session management.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
helm/values-staging.yaml
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Build and push container image
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
ct-app/.configs/core_staging_config.yaml (2)
116-118
: Document the session parameters.The new sessions configuration lacks documentation explaining:
- The significance of the packet size (462 bytes)
- The rationale behind the number of packets (10)
- The impact on network performance and resource usage
Consider adding comments to explain these values:
sessions: + # Maximum size of each packet in bytes packetSize: 462 + # Number of packets per session for optimal performance numPackets: 10
114-114
: Fix trailing spaces.Remove trailing spaces to maintain consistent formatting.
-# +#🧰 Tools
🪛 yamllint (1.35.1)
[error] 114-114: trailing spaces
(trailing-spaces)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
ct-app/.configs/core_staging_config.yaml
(3 hunks)
🧰 Additional context used
🪛 yamllint (1.35.1)
ct-app/.configs/core_staging_config.yaml
[error] 114-114: trailing spaces
(trailing-spaces)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Build and push container image
🔇 Additional comments (2)
ct-app/.configs/core_staging_config.yaml (2)
36-38
: LGTM! Verify impact on existing configurations.The renamed observation flags and new session monitoring parameter align well with the HOPRd 2.2 session management changes. The naming is now more consistent.
Let's verify if these configuration changes are consistently applied across environments:
✅ Verification successful
Configuration changes are consistently implemented ✅
The new observation parameters are consistently defined across configurations with proper On/Off values and identical structure. No legacy parameter names were found in the codebase, indicating this is a new feature rather than a rename.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check if the new configuration keys are consistently used across all config files rg -l "watchMessageQueue|relayedMessagesToDB" .configs/Length of output: 107
Script:
#!/bin/bash # Find all YAML config files echo "=== Finding YAML config files ===" fd -e yaml -e yml # Search for old parameter names echo -e "\n=== Searching for old parameter names ===" rg "watchMessageQueue|relayedMessagesToDB" -l # Search for new parameter names echo -e "\n=== Searching for new parameter names ===" rg "observeMessageQueue|observeRelayedMessages" -lLength of output: 1376
Script:
#!/bin/bash # Check test config content echo "=== Content of test_config.yaml ===" cat ct-app/test/test_config.yaml # Search for similar parameters in helm templates echo -e "\n=== Searching for observe* parameters in helm templates ===" rg "observe[A-Za-z]+" helm/ctdapp/templates/Length of output: 3882
52-52
: Verify the significant APR increase.The APR value has been increased by 20x (from 0.000005 to 0.0001). This could have significant economic implications. Please confirm if this increase is intentional and has been properly validated.
Let's check if this change is documented or discussed elsewhere:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
ct-app/core/node.py (4)
18-24
: Consider adding more descriptive help text for metrics.The metrics labels could be more descriptive. For example,
ct_messages_stats
has an empty help text.-MESSAGES_STATS = Gauge("ct_messages_stats", "", ["type", "sender", "relayer"]) +MESSAGES_STATS = Gauge("ct_messages_stats", "Message statistics by type and peers", ["type", "sender", "relayer"])
52-52
: Add type annotation for session management dictionary value.The type annotation for the dictionary value should be more explicit.
- self.session_management = dict[str, SessionToSocket]() + self.session_management: dict[str, SessionToSocket] = {}
438-441
: Consider task ordering for session management.The order of tasks might affect session management. Consider moving
observe_sessions
beforeobserve_message_queue
to ensure sessions are cleaned up before new messages are processed.callbacks = [ self.healthcheck, self.retrieve_peers, self.retrieve_balances, self.retrieve_channels, self.open_channels, self.fund_channels, self.close_old_channels, self.close_incoming_channels, self.close_pending_channels, self.get_total_channel_funds, + self.observe_sessions, self.observe_message_queue, self.observe_relayed_messages, - self.observe_sessions, ]
392-399
: Add early return optimization.The peer and channel checks can be combined for better readability and performance.
- peers = [peer.address.hopr for peer in await self.peers.get()] - if message.relayer not in peers: - return - - channels = [ - channel.destination_peer_id for channel in self.channels.outgoing] - if message.relayer not in channels: + peers = {peer.address.hopr for peer in await self.peers.get()} + channels = {channel.destination_peer_id for channel in self.channels.outgoing} + if message.relayer not in (peers & channels): return
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
ct-app/core/node.py
(20 hunks)
🧰 Additional context used
📓 Learnings (1)
ct-app/core/node.py (1)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/node.py:472-476
Timestamp: 2024-11-19T16:47:51.812Z
Learning: The `create_socket` function in `ct-app/core/node.py` is intentionally synchronous and should not be converted to an asynchronous function.
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Build and push container image
🔇 Additional comments (4)
ct-app/core/node.py (4)
6-11
: LGTM! Clean import organization.The imports are well-organized and the new imports for session management are properly placed.
379-385
:⚠️ Potential issueAdd robust error handling for socket operations.
The socket receive operation needs proper error handling to handle connection issues and partial reads.
for relayer, s in self.session_management.items(): buffer_size: int = ( self.params.sessions.packetSize * self.params.sessions.numPackets ) - messages = s.receive(buffer_size).decode().split("\n") - MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc( - len(messages) - ) + try: + data = s.receive(buffer_size) + if not data: + self.warning(f"Connection closed by peer {relayer}") + continue + + messages = data.decode().split("\n") + if messages: + MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc( + len(messages) + ) + except ConnectionError as e: + self.error(f"Socket error for {relayer}: {e}") + continueLikely invalid or redundant comment.
417-425
:⚠️ Potential issuePrevent race condition in session cleanup.
The session cleanup has potential issues with dictionary modification during iteration.
- to_remove = [] - for peer, s in self.session_management.items(): - if s.session in active_sessions: - continue - s.socket.close() - to_remove.append(peer) - - for peer in to_remove: - await self.api.close_session(self.session_management.pop(peer).session) + peers_to_remove = [] + for peer, s in self.session_management.items(): + if s.session in active_sessions: + continue + try: + s.socket.close() + peers_to_remove.append(peer) + except Exception as e: + self.warning(f"Error closing socket for peer {peer}: {e}") + peers_to_remove.append(peer) + + for peer in peers_to_remove: + try: + session = self.session_management.pop(peer, None) + if session: + await self.api.close_session(session.session) + except Exception as e: + self.error(f"Error closing session for peer {peer}: {e}")Likely invalid or redundant comment.
401-411
:⚠️ Potential issueAdd validation and error handling for session initialization.
The session creation lacks proper validation and error handling.
if message.relayer not in self.session_management: - session = await self.api.post_session( - self.address.hopr, message.relayer - ) - self.session_management[message.relayer] = SessionToSocket( - session, self.url.split(':')[0] - ) - - self.session_management[message.relayer].send(message.bytes) - MESSAGES_STATS.labels("sent", self.address.hopr, message.relayer).inc() + try: + session = await self.api.post_session( + self.address.hopr, message.relayer + ) + if session is None: + self.error(f"Failed to create session for {message.relayer}") + return + self.session_management[message.relayer] = SessionToSocket( + session, self.url.split(':')[0] + ) + except Exception as e: + self.error(f"Error creating session for {message.relayer}: {e}") + return + + try: + bytes_sent = self.session_management[message.relayer].send(message.bytes) + if bytes_sent == len(message.bytes): + MESSAGES_STATS.labels("sent", self.address.hopr, message.relayer).inc() + else: + self.warning(f"Partial send to {message.relayer}: {bytes_sent}/{len(message.bytes)} bytes") + except ConnectionError as e: + self.error(f"Failed to send message to {message.relayer}: {e}") + self.session_management.pop(message.relayer, None)Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
ct-app/core/components/session_to_socket.py (1)
39-50
: Remove unused variable and improve documentation.The method initializes an unused
conn
variable and could be more explicit about UDP-only support.def create_socket(self, timeout: int = 1): + """ + Creates a UDP socket with the specified timeout. + + Args: + timeout (int): Socket timeout in seconds + + Returns: + tuple: (socket, None) tuple where socket is the UDP socket + + Raises: + ValueError: If protocol is not UDP + """ if self.session.protocol == Protocol.UDP: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) else: raise ValueError(f"Invalid protocol: {self.session.protocol}") s.settimeout(timeout) - - conn = None - - return s, conn + return s, None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
ct-app/core/components/session_to_socket.py
(1 hunks)ct-app/core/node.py
(20 hunks)
🧰 Additional context used
📓 Learnings (1)
ct-app/core/node.py (1)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/node.py:472-476
Timestamp: 2024-11-19T16:47:51.812Z
Learning: The `create_socket` function in `ct-app/core/node.py` is intentionally synchronous and should not be converted to an asynchronous function.
🪛 Ruff (0.8.2)
ct-app/core/components/session_to_socket.py
15-15: Within an except
clause, raise exceptions with raise ... from err
or raise ... from None
to distinguish them from errors in exception handling
(B904)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Build and push container image
🔇 Additional comments (8)
ct-app/core/components/session_to_socket.py (4)
17-23
: LGTM! Well-implemented context management.The implementation of
__enter__
and__exit__
methods ensures proper resource cleanup.
24-38
: LGTM! Clear and concise property implementations.The properties
port
andaddress
are well-documented and correctly implemented.
51-59
: LGTM! Robust protocol validation in data transmission methods.The
send
andreceive
methods correctly validate the protocol and use appropriate socket operations for UDP.Also applies to: 60-68
12-15
: 🛠️ Refactor suggestionImprove exception handling with explicit exception chaining.
Use explicit exception chaining to preserve the original error context.
try: self.socket, self.conn = self.create_socket(timeout) except (socket.error, ValueError) as e: - raise ValueError(f"Error while creating socket: {e}") + raise ValueError(f"Error while creating socket: {e}") from eLikely invalid or redundant comment.
🧰 Tools
🪛 Ruff (0.8.2)
15-15: Within an
except
clause, raise exceptions withraise ... from err
orraise ... from None
to distinguish them from errors in exception handling(B904)
ct-app/core/node.py (4)
52-52
: LGTM! Clear type annotation for session management.The session management dictionary is well-typed and initialized correctly.
372-385
: 🛠️ Refactor suggestionAdd robust error handling for socket operations.
The message observation lacks error handling for socket operations and message parsing.
async def observe_relayed_messages(self): if self.address is None: return for relayer, s in self.session_management.items(): buffer_size: int = ( self.params.sessions.packetSize * self.params.sessions.numPackets ) - messages = s.receive(buffer_size).decode().split("\n") - MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc( - len(messages) - ) + try: + data = s.receive(buffer_size) + if not data: + self.warning(f"Connection closed by peer {relayer}") + continue + + messages = data.decode().split("\n") + if messages: + MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc( + len(messages) + ) + except ConnectionError as e: + self.error(f"Socket error for {relayer}: {e}") + continueLikely invalid or redundant comment.
415-428
: 🛠️ Refactor suggestionAdd error handling and prevent race condition.
The session cleanup has potential issues with error handling and dictionary modification during iteration.
@formalin @connectguard async def observe_sessions(self): active_sessions = await self.api.get_sessions(Protocol.UDP) - to_remove = [] + peers_to_remove = [] for peer, s in self.session_management.items(): if s.session in active_sessions: continue - s.socket.close() - to_remove.append(peer) + try: + s.socket.close() + peers_to_remove.append(peer) + except Exception as e: + self.warning(f"Error closing socket for peer {peer}: {e}") + peers_to_remove.append(peer) - for peer in to_remove: - await self.api.close_session(self.session_management.pop(peer).session) + for peer in peers_to_remove: + try: + session = self.session_management.pop(peer, None) + if session: + await self.api.close_session(session.session) + except Exception as e: + self.error(f"Error closing session for peer {peer}: {e}")Likely invalid or redundant comment.
389-414
: 🛠️ Refactor suggestionAdd validation and error handling for session creation.
The session initialization lacks proper validation and error handling.
if message.relayer not in self.session_management: + try: session = await self.api.post_session( self.address.hopr, message.relayer ) if session is None: + self.error(f"Failed to create session for {message.relayer}") return self.session_management[message.relayer] = SessionToSocket( session, self.url.split(':')[0] ) + except Exception as e: + self.error(f"Error creating session for {message.relayer}: {e}") + return - self.session_management[message.relayer].send(message.bytes) - MESSAGES_STATS.labels("sent", self.address.hopr, message.relayer).inc() + try: + bytes_sent = self.session_management[message.relayer].send(message.bytes) + if bytes_sent == len(message.bytes): + MESSAGES_STATS.labels("sent", self.address.hopr, message.relayer).inc() + else: + self.warning(f"Partial send to {message.relayer}: {bytes_sent}/{len(message.bytes)} bytes") + except ConnectionError as e: + self.error(f"Failed to send message to {message.relayer}: {e}") + self.session_management.pop(message.relayer, None)Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
helm/values-staging-green.yaml (1)
13-14
: Consider security implications of fixed port range.The fixed port range (10000-11000) might make the service predictable for potential attackers. Consider:
- Randomizing the port range on deployment
- Implementing rate limiting for connection attempts
- Adding network policies to restrict access to these ports only from trusted peers
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
helm/values-staging-green.yaml
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Build and push container image
- name: HOPRD_SESSION_PORT_RANGE | ||
value: 10000:11000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Service configuration missing for session port range
The Kubernetes service configuration in helm/ctdapp/templates/service.yaml
needs to be updated to expose the session port range (10000-11000) for peer-to-peer connections. Currently, only the HTTP port is exposed.
- Add port range configuration to the service template to allow peer connections through ports 10000-11000
🔗 Analysis chain
Verify Kubernetes networking configuration for session ports.
The new session port range (10000-11000) needs to be properly exposed in the Kubernetes service configuration to allow peer connections.
Run this script to check the service configuration:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check if the port range is properly exposed in the Kubernetes service
# and if there are any potential port conflicts.
# Look for service configurations
fd -e yaml -e yml | xargs rg -l 'kind:\s*Service' | xargs rg -A 10 'ports:'
# Check for other port range configurations that might conflict
rg -i 'port.*range|port.*from|port.*to'
Length of output: 2020
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
ct-app/core/node.py (1)
72-77
: Add error logging for address retrieval failure.When address retrieval fails, we should log the reason for better debugging.
if addresses is None: + self.error("Failed to retrieve addresses") return
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
ct-app/core/node.py
(20 hunks)
🧰 Additional context used
📓 Learnings (1)
ct-app/core/node.py (1)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/node.py:472-476
Timestamp: 2024-11-19T16:47:51.812Z
Learning: The `create_socket` function in `ct-app/core/node.py` is intentionally synchronous and should not be converted to an asynchronous function.
🔇 Additional comments (7)
ct-app/core/node.py (7)
6-11
: LGTM! Clean import organization.The imports are well-organized and the new imports for Protocol and SessionToSocket are properly placed.
18-24
: LGTM! Well-structured metrics.The metrics are properly defined with clear labels and descriptions.
52-52
: Initialize session_management with proper typing.The session management dictionary is properly typed, which is good for type safety.
442-444
: LGTM! Task list properly updated.The new observation methods have been correctly added to the task list.
379-385
:⚠️ Potential issueAdd robust error handling for socket operations.
The socket receive operation needs proper error handling to handle connection issues and partial reads.
for relayer, s in self.session_management.items(): buffer_size: int = ( self.params.sessions.packetSize * self.params.sessions.numPackets ) - messages = s.receive(buffer_size).decode().split("\n") - MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc( - len(messages) - ) + try: + data = s.receive(buffer_size) + if not data: + self.warning(f"Connection closed by peer {relayer}") + continue + + messages = data.decode().split("\n") + if messages: + MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc( + len(messages) + ) + except ConnectionError as e: + self.error(f"Socket error for {relayer}: {e}") + continueLikely invalid or redundant comment.
420-428
:⚠️ Potential issuePrevent race condition in session cleanup.
The current implementation modifies the dictionary while iterating over it and lacks error handling for socket operations.
- to_remove = [] - for peer, s in self.session_management.items(): - if s.session in active_sessions: - continue - s.socket.close() - to_remove.append(peer) - - for peer in to_remove: - await self.api.close_session(self.session_management.pop(peer).session) + peers_to_remove = [] + for peer, s in self.session_management.items(): + if s.session in active_sessions: + continue + try: + s.socket.close() + peers_to_remove.append(peer) + except Exception as e: + self.warning(f"Error closing socket for peer {peer}: {e}") + peers_to_remove.append(peer) + + for peer in peers_to_remove: + try: + session = self.session_management.pop(peer, None) + if session: + await self.api.close_session(session.session) + except Exception as e: + self.error(f"Error closing session for peer {peer}: {e}")Likely invalid or redundant comment.
401-413
:⚠️ Potential issueAdd validation and error handling for session management.
The session initialization and message sending lack proper validation and error handling.
if message.relayer not in self.session_management: try: session = await self.api.post_session( self.address.hopr, message.relayer ) if session is None: + self.error(f"Failed to create session for {message.relayer}") return self.session_management[message.relayer] = SessionToSocket( session, session.ip ) + except Exception as e: + self.error(f"Error creating session for {message.relayer}: {e}") + return - self.session_management[message.relayer].send(message.bytes) - MESSAGES_STATS.labels("sent", self.address.hopr, message.relayer).inc() + try: + bytes_sent = self.session_management[message.relayer].send(message.bytes) + if bytes_sent == len(message.bytes): + MESSAGES_STATS.labels("sent", self.address.hopr, message.relayer).inc() + else: + self.warning(f"Partial send to {message.relayer}: {bytes_sent}/{len(message.bytes)} bytes") + except ConnectionError as e: + self.error(f"Failed to send message to {message.relayer}: {e}") + self.session_management.pop(message.relayer, None)Likely invalid or redundant comment.
Session
s to send messages
HOPRd 2.2 introduced some changes that are worth taking advantage of.
The main one is the introduction of sessions.
CT now opens a session from each CT node to each eligible peer, and "throws" packet in the socket defined by the port returned by the Session endpoint.
To be able to send data to each peer using its own session/socket-port, a dictionary of
{peer_id:socket}
is managed by each node.When sending messages, if an entry in the dictionary exist for the target peer, it uses this associated socket, otherwise initiate a new session and automatically create a socket on the returned port. Then, messages are sent as bytes through the socket.
A second minor feature is the introcution of the
win_probability
endpoint. Instead of using a predefined value from the config file (set to1
since.. the start of CT), it now retrieves this value from the CT nodes through the API in the same way the ticket price is retrieved.Also, a more automated way to (de)serialize API objects has been added, since the CT is not using HOPRd SDK anymore.
Summary by CodeRabbit
Release Notes
New Features
HTTPMethod
enum for standardized HTTP method representation.SessionToSocket
class for enhanced socket communication management.Address
class encapsulating peer and native address attributes.HoprdAPI
class with new methods for session and channel management.Protocol
class with TCP and UDP protocol types for improved protocol handling.FooRequestObject
andFooResponse
classes for structured API request and response handling.Improvements
ChannelStatus
and other classes for consistency.Core
class.Configuration Changes
Tests
FooRequestObject
andFooResponse
classes to validate API request and response handling.