Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Sessions to send messages #583

Open
wants to merge 66 commits into
base: main
Choose a base branch
from
Open

Conversation

jeandemeusy
Copy link
Collaborator

@jeandemeusy jeandemeusy commented Nov 19, 2024

HOPRd 2.2 introduced some changes that are worth taking advantage of.

The main one is the introduction of sessions.
CT now opens a session from each CT node to each eligible peer, and "throws" packet in the socket defined by the port returned by the Session endpoint.
To be able to send data to each peer using its own session/socket-port, a dictionary of {peer_id:socket} is managed by each node.
When sending messages, if an entry in the dictionary exist for the target peer, it uses this associated socket, otherwise initiate a new session and automatically create a socket on the returned port. Then, messages are sent as bytes through the socket.

A second minor feature is the introcution of the win_probability endpoint. Instead of using a predefined value from the config file (set to 1 since.. the start of CT), it now retrieves this value from the CT nodes through the API in the same way the ticket price is retrieved.

Also, a more automated way to (de)serialize API objects has been added, since the CT is not using HOPRd SDK anymore.

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced HTTPMethod enum for standardized HTTP method representation.
    • Added SessionToSocket class for enhanced socket communication management.
    • New Address class encapsulating peer and native address attributes.
    • Enhanced HoprdAPI class with new methods for session and channel management.
    • Added Protocol class with TCP and UDP protocol types for improved protocol handling.
    • Added FooRequestObject and FooResponse classes for structured API request and response handling.
  • Improvements

    • Refined error handling and logging in various components.
    • Updated naming conventions for properties in ChannelStatus and other classes for consistency.
    • Streamlined the instantiation and control flow of the Core class.
    • Enhanced API request and response handling through the introduction of structured request and response classes.
  • Configuration Changes

    • Updated configuration files to reflect new parameters for session management and message observation.
  • Tests

    • Introduced new tests for FooRequestObject and FooResponse classes to validate API request and response handling.
    • Updated tests to reflect changes in class structures and naming conventions.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Outside diff range and nitpick comments (11)
ct-app/.configs/core_prod_config.yaml (3)

36-37: Add validation for On/Off flags

The flags observeMessageQueue and observeRelayedMessages are marked as "should only be On / Off" in comments, but there's no enforcement. Consider using proper boolean values or adding validation.

-    observeMessageQueue: On # should only be On / Off
-    observeRelayedMessages: On # should only be On / Off
+    observeMessageQueue: true # boolean
+    observeRelayedMessages: true # boolean

38-38: Document the observeSessions interval

The new observeSessions flag is set to 60 (presumably seconds) but lacks documentation about its purpose and impact. Please add a comment explaining:

  • What this interval controls
  • The implications of adjusting this value
  • Any minimum/maximum recommended values

103-103: Remove trailing spaces

Remove trailing spaces from lines 103, 113, and 119 to fix yamllint errors.

Also applies to: 113-113, 119-119

🧰 Tools
🪛 yamllint (1.35.1)

[error] 103-103: trailing spaces

(trailing-spaces)

ct-app/core/components/session_to_socket.py (1)

16-16: Enhance exception chaining for better traceback clarity

When raising an exception within an except block, it's good practice to use raise ... from e to maintain the exception context. This helps in debugging by showing the original traceback.

Apply this diff:

    except (socket.error, ValueError) as e:
-       raise ValueError(f"Error while creating socket: {e}")
+       raise ValueError(f"Error while creating socket: {e}") from e
🧰 Tools
🪛 Ruff (0.8.2)

16-16: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

ct-app/core/api/protocol.py (1)

29-31: Enhance exception chaining for better traceback clarity

When raising an exception within an except block, use raise ... from e to maintain the exception context. This provides a complete traceback of the error.

Apply this diff:

    @classmethod
    def fromString(cls, protocol: str):
        try:
            return getattr(cls, protocol.upper())
-       except AttributeError:
+       except AttributeError as e:
            raise ValueError(
                f"Invalid protocol: {protocol}. Valid values are: {[p.name for p in cls]}"
-           )
+           ) from e
🧰 Tools
🪛 Ruff (0.8.2)

29-31: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

ct-app/.configs/core_staging_config.yaml (2)

114-114: Remove trailing spaces

Line 114 contains trailing spaces, which can cause issues in YAML parsers.

Apply this diff:

-   # 
+   #
🧰 Tools
🪛 yamllint (1.35.1)

[error] 114-114: trailing spaces

(trailing-spaces)


119-119: Address the TODO comment for port

There's a TODO comment indicating that the port needs to be specified after consulting with @Ausias. Please update the port value when you have the necessary information.

Do you want me to help specify the port or open a new GitHub issue to track this task?

ct-app/core/core.py (2)

111-112: Consider optimizing peer version lookup

The current implementation performs two linear searches on the same list. Consider using a dictionary for O(1) lookup.

-                    new_version = visible_peers[visible_peers.index(
-                        peer)].version
+                    peer_dict = {p: p.version for p in visible_peers}
+                    new_version = peer_dict[peer]

451-451: Remove unnecessary empty line

Consider removing this extra empty line to maintain consistent spacing.

ct-app/core/api/hoprd_api.py (2)

58-63: Simplify nested context managers.

Consider using a single with statement with multiple contexts for better readability.

-            async with aiohttp.ClientSession(headers=self.headers) as s:
-                async with getattr(s, method.value)(
-                    url=f"{self.host}{self.prefix}{endpoint}",
-                    json={} if data is None else data.as_dict,
-                    headers=headers,
-                ) as res:
+            async with (
+                aiohttp.ClientSession(headers=self.headers) as s,
+                getattr(s, method.value)(
+                    url=f"{self.host}{self.prefix}{endpoint}",
+                    json={} if data is None else data.as_dict,
+                    headers=headers,
+                ) as res
+            ):
🧰 Tools
🪛 Ruff (0.8.2)

58-63: Use a single with statement with multiple contexts instead of nested with statements

(SIM117)


64-68: Enhance JSON parsing error handling.

The current error handling catches all exceptions. Consider catching specific JSON-related exceptions for better error reporting.

-                    try:
-                        data = await res.json()
-                    except Exception:
-                        data = await res.text()
+                    try:
+                        data = await res.json()
+                    except aiohttp.ContentTypeError:
+                        data = await res.text()
+                        self.warning(f"Failed to parse JSON response: {data}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 520ebde and 1fc3092.

📒 Files selected for processing (12)
  • ct-app/.configs/core_prod_config.yaml (4 hunks)
  • ct-app/.configs/core_staging_config.yaml (2 hunks)
  • ct-app/.env.example (1 hunks)
  • ct-app/core/api/hoprd_api.py (8 hunks)
  • ct-app/core/api/protocol.py (1 hunks)
  • ct-app/core/api/request_objects.py (1 hunks)
  • ct-app/core/components/peer.py (2 hunks)
  • ct-app/core/components/session_to_socket.py (1 hunks)
  • ct-app/core/components/utils.py (4 hunks)
  • ct-app/core/core.py (10 hunks)
  • ct-app/core/node.py (13 hunks)
  • ct-app/tests_endurance/test_fund_channels.py (3 hunks)
✅ Files skipped from review due to trivial changes (1)
  • ct-app/.env.example
🚧 Files skipped from review as they are similar to previous changes (2)
  • ct-app/core/components/utils.py
  • ct-app/tests_endurance/test_fund_channels.py
🧰 Additional context used
📓 Learnings (2)
ct-app/core/node.py (1)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/node.py:472-476
Timestamp: 2024-11-19T16:47:51.812Z
Learning: The `create_socket` function in `ct-app/core/node.py` is intentionally synchronous and should not be converted to an asynchronous function.
ct-app/core/api/hoprd_api.py (1)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/api/hoprd_api.py:0-0
Timestamp: 2024-11-26T15:39:26.752Z
Learning: In `ct-app/core/api/hoprd_api.py`, the `relayer` parameter in the `post_session` method is expected to be a string, as the application handles only one relayer. The `SessionPathBodyRelayers` class expects a list because the API requires an array of strings.
🪛 Ruff (0.8.2)
ct-app/core/api/protocol.py

29-31: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

ct-app/core/components/session_to_socket.py

16-16: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

ct-app/core/api/hoprd_api.py

58-63: Use a single with statement with multiple contexts instead of nested with statements

(SIM117)

🪛 yamllint (1.35.1)
ct-app/.configs/core_prod_config.yaml

[error] 103-103: trailing spaces

(trailing-spaces)


[error] 113-113: trailing spaces

(trailing-spaces)


[error] 119-119: trailing spaces

(trailing-spaces)

ct-app/.configs/core_staging_config.yaml

[error] 114-114: trailing spaces

(trailing-spaces)

🔇 Additional comments (10)
ct-app/.configs/core_prod_config.yaml (1)

115-118: Add documentation for sessions configuration

I understand these parameters will be defined later, but let's add documentation placeholders to track what needs to be defined:

  1. Document the significance of packetSize: 462
  2. Explain the impact of numPackets: 10
  3. Consider environment-specific port configuration
 sessions:
+  # Size of each packet in bytes
+  # TODO: Document why 462 bytes and any min/max constraints
   packetSize: 462
+  # Number of packets per session
+  # TODO: Document the impact on performance/reliability
   numPackets: 10
+  # Port for session communication
+  # TODO: Use environment-specific port
   port: 1234
ct-app/core/api/request_objects.py (1)

43-43: Use locals() instead of vars() in constructors for clarity

In the __init__ methods, using locals() makes it clear that you are capturing the constructor arguments. Using vars() can be misleading, as it's often used to get the __dict__ of an object.

Apply this diff:

    def __init__(self, amount: str, peer_address: str):
-       super().__init__(vars())
+       super().__init__(locals())

    def __init__(self, amount: float):
-       super().__init__(vars())
+       super().__init__(locals())

    def __init__(self, full_topology: str, including_closed: str):
-       super().__init__(vars())
+       super().__init__(locals())

    def __init__(self, quality: float):
-       super().__init__(vars())
+       super().__init__(locals())

    def __init__(
        self,
        capabilities: list,
        destination: str,
        listen_host: str,
        path: str,
        target: str,
    ):
-       super().__init__(vars())
+       super().__init__(locals())

    def __init__(self, retransmission: bool, segmentation: bool):
-       super().__init__(vars())
+       super().__init__(locals())

    def __init__(self, relayers: list[str]):
-       super().__init__(vars())
+       super().__init__(locals())

    def __init__(self, hops: int = 0):
-       super().__init__(vars())
+       super().__init__(locals())

    def __init__(self, service: int = 0):
-       super().__init__(vars())
+       super().__init__(locals())

    def __init__(self, ip: str, port: str):
-       super().__init__(vars())
+       super().__init__(locals())

Also applies to: 50-50, 63-63, 70-70, 90-90, 100-100, 115-115, 124-124, 136-136, 143-143

ct-app/.configs/core_staging_config.yaml (1)

38-38: Verify the value of observeSessions

The observeSessions parameter is set to 60, whereas similar parameters are set to On or Off. Please confirm if 60 is the intended value.

ct-app/core/core.py (3)

7-13: LGTM: Import reorganization aligns with HOPRd 2.2 changes

The import changes properly reflect the new dependencies needed for session management and API handling.


447-448: LGTM: Socket handling implementation for HOPRd 2.2

The implementation correctly adds TCP/UDP socket listeners for the new session management feature.


445-445: Verify backward compatibility of message queue observation

The change from watch_message_queue to observe_message_queue needs verification for backward compatibility with existing integrations.

ct-app/core/components/peer.py (1)

148-152: LGTM! Good improvement in message relay request handling.

The changes properly utilize the configured packet size and include a helpful comment explaining the doubled delay for loopback sessions.

ct-app/core/api/hoprd_api.py (1)

225-258: LGTM! Well-structured session management implementation.

The implementation correctly handles the relayer parameter as a string and provides comprehensive session creation functionality with proper parameter validation.

ct-app/core/node.py (2)

18-24: LGTM! Well-structured metrics implementation.

The metrics are properly defined with clear labels and are consistently used throughout the code for tracking channel operations and message statistics.


422-430: ⚠️ Potential issue

Improve session cleanup process.

The session cleanup process should handle errors during socket closure and session deletion.

-        to_remove = []
-        for peer, s in self.session_management.items():
-            if s.session in active_sessions:
-                continue
-            s.socket.close()
-            to_remove.append(peer)
-
-        for peer in to_remove:
-            await self.api.close_session(self.session_management.pop(peer).session)
+        to_remove = []
+        for peer, s in self.session_management.items():
+            if s.session in active_sessions:
+                continue
+            try:
+                s.socket.close()
+                to_remove.append(peer)
+            except Exception as e:
+                self.error(f"Error closing socket for peer {peer}: {e}")
+                to_remove.append(peer)
+
+        for peer in to_remove:
+            try:
+                session_management = self.session_management.pop(peer)
+                await self.api.close_session(session_management.session)
+            except Exception as e:
+                self.error(f"Error closing session for peer {peer}: {e}")

Likely invalid or redundant comment.

ct-app/core/components/session_to_socket.py Outdated Show resolved Hide resolved
ct-app/core/api/protocol.py Outdated Show resolved Hide resolved
ct-app/core/core.py Outdated Show resolved Hide resolved
ct-app/core/core.py Outdated Show resolved Hide resolved
ct-app/core/node.py Outdated Show resolved Hide resolved
ct-app/core/node.py Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (15)
ct-app/core/components/address.py (3)

2-4: Fix docstring typo

The class docstring has a typo: "native and native address" should be "hopr and native address".

-    Class that represents an address with a native and native address.
+    Class that represents an address with a hopr and native address.

7-11: Update parameter documentation

The parameter documentation references "id" but the parameter is named "hopr".

-        :param id: The id of the peer.
-        :param address: The address of the peer.
+        :param hopr: The peer ID.
+        :param native: The native address of the peer.

15-22: Add type hints to methods

Consider adding type hints to the comparison and representation methods for better code maintainability.

-    def __eq__(self, other):
+    def __eq__(self, other: 'Address') -> bool:
         return self.hopr == other.hopr and self.native == other.native

-    def __hash__(self):
+    def __hash__(self) -> int:
         return hash((self.hopr, self.native))

-    def __repr__(self):
+    def __repr__(self) -> str:
         return f"Address({self.hopr}, {self.native})"
ct-app/core/api/protocol.py (1)

4-8: Fix class name typo

The class name has a typo: "ProcolTemplate" should be "ProtocolTemplate".

-class ProcolTemplate:
+class ProtocolTemplate:
ct-app/test/model/test_peer.py (3)

61-62: Fix line continuation indentation

The line continuation indentation is inconsistent. It should use 4 spaces for consistency with Python style guidelines.

-        rand_delay = round(random.random() *
-                           (max_range - min_range) + min_range, 1)
+        rand_delay = round(random.random() *
+                         (max_range - min_range) + min_range, 1)

Line range hint 11-45: Add test cases for version comparison edge cases

The version comparison tests should include additional edge cases:

  1. Invalid version strings
  2. None values
  3. Pre-release versions vs. final releases

Consider adding these test cases:

def test_peer_version_edge_cases():
    peer = Peer("some_id", "some_address", "1.0.0")
    
    # Invalid version
    with pytest.raises(ValueError):
        peer.version = "invalid"
        
    # None handling
    peer.version = "1.0.0"
    with pytest.raises(TypeError):
        peer.is_old(None)
        
    # Pre-release vs final
    peer.version = "1.0.0-alpha"
    assert peer.is_old("1.0.0")

Line range hint 46-80: Consider using fixed seeds for randomized tests

The test uses random values which could make it flaky. Consider:

  1. Using a fixed seed for reproducibility
  2. Using predetermined test values instead of random ones

Example approach:

@pytest.mark.parametrize(
    "exc_time,min_range,max_range",
    [
        (5, 0.1, 0.3),
        (8, 0.2, 0.4),
        (10, 0.15, 0.35)
    ]
)
ct-app/core/api/response_objects.py (3)

26-60: LGTM! The ApiResponseObject base class implementation is well-structured.

The changes introduce robust nested key retrieval and useful utility methods. The implementation correctly handles None values and type conversions.

Consider these minor optimizations based on static analysis:

- return all(getattr(self, key) is None for key in self.keys.keys())
+ return all(getattr(self, key) is None for key in self.keys)

- return {key: getattr(self, key) for key in self.keys.keys()}
+ return {key: getattr(self, key) for key in self.keys}

- return all(
-     getattr(self, key) == getattr(other, key) for key in self.keys.keys()
- )
+ return all(
+     getattr(self, key) == getattr(other, key) for key in self.keys
+ )
🧰 Tools
🪛 Ruff (0.8.2)

44-44: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)


48-48: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)


58-58: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)


113-115: Consider adding documentation about the expected probability range.

While the implementation is correct, it would be helpful to document the expected range of the probability value (e.g., 0.0 to 1.0) in the class docstring.


134-140: Consider adding port number validation.

The Session class should validate that the port number is within the valid range (0-65535) in the post_init method.

 class Session(ApiResponseObject):
     keys = {
         "ip": "ip",
         "port": "port",
         "protocol": "protocol",
         "target": "target",
     }
+
+    def post_init(self):
+        if self.port is not None and not (0 <= self.port <= 65535):
+            raise ValueError(f"Invalid port number: {self.port}")
ct-app/.configs/core_staging_config.yaml (1)

116-119: Document rationale for different packet configurations.

The staging environment uses different session packet configuration (size: 462, count: 10) compared to test (size: 472, count: 3). Please document the reasoning behind these differences to ensure maintainability.

ct-app/.configs/core_prod_config.yaml (1)

115-117: Document the significance of session parameters

While these parameters will be defined later, it would be helpful to add comments explaining:

  • The significance of the 462-byte packet size
  • Why 10 packets were chosen
  • Any implications these values have on performance or reliability
ct-app/core/node.py (3)

23-23: Add a descriptive help text for the MESSAGES_STATS metric.

The empty description string ("") makes it harder for operators to understand what this metric represents. Consider adding a meaningful description.

-MESSAGES_STATS = Gauge("ct_messages_stats", "", ["type", "sender", "relayer"])
+MESSAGES_STATS = Gauge("ct_messages_stats", "Statistics for message operations", ["type", "sender", "relayer"])

52-52: Document the session management dictionary.

Add a docstring to explain the purpose and structure of the session management dictionary.

+        # Maps peer IDs to their corresponding socket sessions for message handling
         self.session_management = dict[str, SessionToSocket]()

74-77: Enhance error message for address retrieval failure.

The current implementation silently returns when address retrieval fails. Consider adding more context to help with debugging.

         if addresses is None:
+            self.warning("Failed to retrieve node addresses from API")
             return
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1fc3092 and 3b8206f.

📒 Files selected for processing (13)
  • ct-app/.configs/core_prod_config.yaml (4 hunks)
  • ct-app/.configs/core_staging_config.yaml (2 hunks)
  • ct-app/core/api/hoprd_api.py (8 hunks)
  • ct-app/core/api/protocol.py (1 hunks)
  • ct-app/core/api/response_objects.py (4 hunks)
  • ct-app/core/components/address.py (1 hunks)
  • ct-app/core/components/peer.py (7 hunks)
  • ct-app/core/core.py (12 hunks)
  • ct-app/core/node.py (20 hunks)
  • ct-app/test/conftest.py (4 hunks)
  • ct-app/test/model/test_peer.py (3 hunks)
  • ct-app/test/test_config.yaml (3 hunks)
  • ct-app/test/test_node.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • ct-app/test/test_node.py
  • ct-app/core/core.py
  • ct-app/test/conftest.py
🧰 Additional context used
📓 Learnings (2)
ct-app/core/node.py (1)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/node.py:472-476
Timestamp: 2024-11-19T16:47:51.812Z
Learning: The `create_socket` function in `ct-app/core/node.py` is intentionally synchronous and should not be converted to an asynchronous function.
ct-app/core/api/hoprd_api.py (1)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/api/hoprd_api.py:0-0
Timestamp: 2024-11-26T15:39:26.752Z
Learning: In `ct-app/core/api/hoprd_api.py`, the `relayer` parameter in the `post_session` method is expected to be a string, as the application handles only one relayer. The `SessionPathBodyRelayers` class expects a list because the API requires an array of strings.
🪛 Ruff (0.8.2)
ct-app/core/api/hoprd_api.py

60-65: Use a single with statement with multiple contexts instead of nested with statements

(SIM117)

ct-app/core/api/response_objects.py

44-44: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)


48-48: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)


58-58: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)

ct-app/core/api/protocol.py

29-31: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🪛 yamllint (1.35.1)
ct-app/.configs/core_prod_config.yaml

[error] 103-103: trailing spaces

(trailing-spaces)


[error] 113-113: trailing spaces

(trailing-spaces)


[error] 118-118: trailing spaces

(trailing-spaces)

ct-app/.configs/core_staging_config.yaml

[error] 114-114: trailing spaces

(trailing-spaces)

ct-app/test/test_config.yaml

[error] 113-113: trailing spaces

(trailing-spaces)


[error] 118-118: trailing spaces

(trailing-spaces)

🔇 Additional comments (12)
ct-app/core/api/response_objects.py (1)

106-111: LGTM! TicketProbability class correctly handles probability values.

The implementation properly converts the probability value to float and follows the established pattern.

ct-app/test/test_config.yaml (2)

100-102: Document initialSleep configuration and consider reducing mean value.

The implementation shows that initialSleep is used with asyncio.sleep() and random.normalvariate() to add a random delay during peer initialization.


115-118: Verify session packet configuration values.

The new sessions configuration introduces packet size and count parameters. Please verify:

  1. Is 472 bytes an optimal packet size for the network conditions?
  2. Is 3 packets sufficient for reliable message delivery?
🧰 Tools
🪛 yamllint (1.35.1)

[error] 118-118: trailing spaces

(trailing-spaces)

ct-app/.configs/core_prod_config.yaml (2)

36-38: LGTM: Flag changes align with session management updates

The renaming of message queue and relay flags, along with the addition of session observation with a 60-second interval, aligns well with the HOPRd 2.2 session management updates.


92-92: LGTM: Improved documentation clarity

The added comment clarifies the relationship between the count parameter and wxHOPR, enhancing maintainability.

ct-app/core/components/peer.py (2)

5-11: LGTM: Consistent address handling improvements

The changes improve consistency in address handling:

  • Better organized imports
  • Consistent use of native address in node_address
  • Standardized use of hopr address in metrics

Also applies to: 70-70, 82-82


148-152: Verify the 2x delay multiplier for relay sessions

While the null check and packet size changes look good, please verify that doubling the delay is the correct approach for relay sessions. Consider:

  • Is this multiplier documented in the protocol specification?
  • Could this cause performance issues in high-latency scenarios?
✅ Verification successful

Let me gather more context about the relay sessions and delay handling.


Let me search for more context about the message handling and session implementation.


Based on the evidence gathered, I can now generate the final response:

The 2x delay multiplier for relay sessions is justified

The code shows that relay sessions involve two distinct hops: first to the relay node and then back through the loopback session, as evidenced by the message tracking in ct-app/core/node.py. The delay multiplication accounts for this two-hop nature of relay sessions, making it a correct implementation.

  • The metrics tracking in node.py confirms separate "relayed" and "sent" message states, supporting the two-hop pattern
  • The delay multiplication is specifically scoped to loopback sessions with relay, not affecting other message paths
  • The implementation aligns with the protocol's relay mechanism as shown by the session management code
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other instances of delay multiplication in the codebase
rg -A 2 "delay.*[*].*2" 

Length of output: 208


Script:

#!/bin/bash
# Search for relay-related code and documentation
rg -l "relay" --type md

# Search for relay session handling
ast-grep --pattern 'class $_ {
  $$$
  async def message_relay_request($$$) {
    $$$
  }
  $$$
}'

# Search for delay-related comments and documentation
rg -B 2 -A 2 "delay.*relay"

Length of output: 558


Script:

#!/bin/bash
# Look for the full implementation of the message relay method
ast-grep --pattern 'async def message_relay_request($$$) {
  $$$
}'

# Search for session/hop related documentation or comments
rg -B 2 -A 2 "hop.*relay|relay.*hop"

# Look for MessageQueue implementation to understand the flow
ast-grep --pattern 'class MessageQueue {
  $$$
}'

Length of output: 1488

ct-app/core/api/hoprd_api.py (2)

66-70: LGTM: Improved error handling for API responses

The nested try-except block with text fallback improves robustness when handling non-JSON responses.


227-259: Verify protocol support in session creation

The session creation implementation looks good, but please verify:

  • Support for both UDP and TCP protocols
  • Handling of retransmission and segmentation capabilities
✅ Verification successful

Let me gather more information about the Protocol enum and its capabilities.


Let me check the protocol implementations (TCPProtocol and UDPProtocol) to verify the capabilities.


Let me check the base ProcolTemplate class to complete the verification.


Protocol support is correctly implemented

The implementation correctly handles both TCP and UDP protocols with appropriate capabilities:

  • TCP protocol is configured with retransmission and segmentation enabled
  • UDP protocol is configured with both capabilities disabled
  • Session creation properly uses these capabilities in the SessionCapabilitiesBody
  • Socket creation and data transmission correctly handle the protocol differences
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check protocol handling across the codebase
rg -A 5 "Protocol\.(UDP|TCP)"

Length of output: 2262


Script:

#!/bin/bash
# Check Protocol enum definition and capabilities
rg -A 10 "class Protocol"

Length of output: 678


Script:

#!/bin/bash
# Check TCP and UDP protocol implementations
rg -A 10 "class (TCP|UDP)Protocol"

Length of output: 848


Script:

#!/bin/bash
# Check ProcolTemplate implementation
rg -A 10 "class ProcolTemplate"

Length of output: 607

ct-app/core/node.py (3)

417-425: ⚠️ Potential issue

Fix potential race condition in session cleanup.

The current implementation modifies the dictionary while iterating over it, which could lead to a race condition. Consider using a safer approach.

-        to_remove = []
-        for peer, s in self.session_management.items():
-            if s.session in active_sessions:
-                continue
-            s.socket.close()
-            to_remove.append(peer)
-
-        for peer in to_remove:
-            await self.api.close_session(self.session_management.pop(peer).session)
+        # Create a copy of keys to avoid modification during iteration
+        peers = list(self.session_management.keys())
+        for peer in peers:
+            s = self.session_management.get(peer)
+            if s is None or s.session in active_sessions:
+                continue
+            try:
+                s.socket.close()
+                await self.api.close_session(s.session)
+                self.session_management.pop(peer, None)
+            except Exception as e:
+                self.error(f"Error closing session for peer {peer}: {e}")

Likely invalid or redundant comment.


439-441: Consider task ordering for proper initialization.

The new observation tasks are added at the end of the list. Consider if these tasks have dependencies that require them to run in a specific order.

✅ Verification successful

Looking at the implementation of these observation methods, I can now generate a final response:

Task ordering appears appropriate for initialization

The observation methods are independent tasks that monitor different aspects of the system:

  • observe_message_queue: Processes messages from the queue, checking peer and channel validity
  • observe_relayed_messages: Monitors the inbox for messages based on session management
  • observe_sessions: Maintains session cleanup by checking active sessions

These methods don't show direct dependencies on each other's execution order, as they handle separate concerns and don't share state modifications that would require specific sequencing.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for method dependencies in the codebase
ast-grep --pattern 'def observe_$_($$$) {
  $$$
  self.$_($$$)
  $$$
}'

Length of output: 73


Script:

#!/bin/bash
# Let's try a different approach to check for dependencies between these observation methods
# First, let's look at the implementation of these methods
rg "def observe_(message_queue|relayed_messages|sessions)" -A 10

Length of output: 1839


379-385: ⚠️ Potential issue

Add error handling for socket operations.

Socket operations can fail and should include proper error handling to prevent crashes and maintain stability.

         for relayer, s in self.session_management.items():
-            buffer_size: int = (
-                self.params.sessions.packetSize * self.params.sessions.numPackets
-            )
-            messages = s.receive(buffer_size).decode().split("\n")
-            MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc(
-                len(messages)
-            )
+            try:
+                buffer_size: int = (
+                    self.params.sessions.packetSize * self.params.sessions.numPackets
+                )
+                data = s.receive(buffer_size)
+                if not data:
+                    self.warning(f"No data received from {relayer}")
+                    continue
+                
+                messages = data.decode().split("\n")
+                if messages:
+                    MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc(
+                        len(messages)
+                    )
+            except Exception as e:
+                self.error(f"Error receiving messages from {relayer}: {e}")
+                continue

Likely invalid or redundant comment.

ct-app/core/api/protocol.py Show resolved Hide resolved
ct-app/core/api/protocol.py Show resolved Hide resolved
ct-app/core/api/hoprd_api.py Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (6)
ct-app/.configs/core_prod_config.yaml (2)

36-38: LGTM! Consider documenting the observeSessions interval.

The flag naming changes improve consistency. The 60-second interval for observeSessions aligns with other monitoring intervals like healthcheck.

Consider adding a comment to document the unit (seconds) and rationale for the 60-second interval for observeSessions, similar to other timing configurations.


103-103: Remove trailing spaces.

There are trailing spaces on these lines that should be removed for consistency.

-    
+
-# 
+#
-  
+

Also applies to: 113-113, 118-118

🧰 Tools
🪛 yamllint (1.35.1)

[error] 103-103: trailing spaces

(trailing-spaces)

ct-app/core/components/session_to_socket.py (4)

21-23: Enhance socket cleanup in __exit__.

The socket cleanup could be more robust by handling potential errors during close and ensuring the connection is also cleaned up.

     def __exit__(self, exc_type, exc_val, exc_tb):
-        self.socket.close()
-        self.socket = None
+        try:
+            if self.socket:
+                self.socket.close()
+        finally:
+            self.socket = None
+            self.conn = None

32-38: Remove unnecessary blank line in address property.

For consistency with other properties, remove the blank line between the docstring and return statement.

     @property
     def address(self):
         """
         Returns the socket address tuple.
         """
-
         return (self.connect_address, self.session.port)

52-69: Enhance method documentation with detailed docstrings.

The send and receive methods would benefit from more detailed documentation including parameters, return types, and possible exceptions.

     def send(self, data: bytes) -> int:
         """
-        Sends data to the peer.
+        Sends data to the peer using UDP.
+
+        Args:
+            data (bytes): The data to send
+
+        Returns:
+            int: Number of bytes sent
+
+        Raises:
+            ValueError: If protocol is not UDP
+            socket.error: If sending fails
         """
         if self.session.protocol == Protocol.UDP:
             return self.socket.sendto(data, self.address)
         else:
             raise ValueError(f"Invalid protocol: {self.session.protocol}")

     def receive(self, size: int) -> bytes:
         """
-        Receives data from the peer.
+        Receives data from the peer using UDP.
+
+        Args:
+            size (int): Maximum number of bytes to receive
+
+        Returns:
+            bytes: Received data
+
+        Raises:
+            ValueError: If protocol is not UDP
+            socket.error: If receiving fails or times out
         """
         if self.session.protocol == Protocol.UDP:
             data, _ = self.socket.recvfrom(size)
             return data
         else:
             raise ValueError(f"Invalid protocol: {self.session.protocol}")

1-69: Implementation aligns well with HOPRd 2.2 session management requirements.

The SessionToSocket class provides a clean abstraction for managing UDP socket connections within sessions. The context manager pattern ensures proper resource cleanup, and the implementation correctly handles the socket lifecycle.

Consider adding retry logic or connection pooling in future iterations to handle network instability and improve performance.

🧰 Tools
🪛 Ruff (0.8.2)

16-16: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3b8206f and 60bdce0.

📒 Files selected for processing (2)
  • ct-app/.configs/core_prod_config.yaml (4 hunks)
  • ct-app/core/components/session_to_socket.py (1 hunks)
🧰 Additional context used
🪛 yamllint (1.35.1)
ct-app/.configs/core_prod_config.yaml

[error] 103-103: trailing spaces

(trailing-spaces)


[error] 113-113: trailing spaces

(trailing-spaces)


[error] 118-118: trailing spaces

(trailing-spaces)

🪛 Ruff (0.8.2)
ct-app/core/components/session_to_socket.py

16-16: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🔇 Additional comments (3)
ct-app/.configs/core_prod_config.yaml (2)

115-117: Document session configuration parameters.

The fixed values for packetSize and numPackets need documentation to explain:

  1. The significance of 462 bytes packet size
  2. The rationale for 10 packets
  3. Any HOPRd 2.2 compatibility requirements

Consider adding comments to explain these parameters, for example:

sessions:
  packetSize: 462  # Maximum safe packet size for HOPRd 2.2 compatibility
  numPackets: 10   # Optimal packet count for reliable message delivery

92-92: Verify storage count for wxHOPR requirements.

The comment indicates this count is specific to wxHOPR, but the rationale for the value of 100 isn't documented.

✅ Verification successful

Based on the search results, I can now generate a final response as we have enough context about wxHOPR's purpose and usage in the codebase.

Storage count of 100 appears reasonable for wxHOPR token distribution system

The value of 100 for storage count appears appropriate given that:

  • The ct-app's primary purpose is to distribute wxHOPR tokens through HOP messages in the Dufour network
  • The system handles token balances and allowances for safes
  • The application serves as a replacement for staking rewards distribution

Since this is a configuration value for a production system that's already handling token distribution functionality, and there's no evidence of any reported issues with the current count value, we can consider this verification complete.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for wxHOPR-related storage requirements or documentation
rg -i "wxhopr.*storage" --type md
rg -i "storage.*count.*100" --type md

Length of output: 75


Script:

#!/bin/bash
# Let's try broader searches to understand the context of wxHOPR and storage configurations
rg -i "wxhopr" -A 5 -B 5
rg "count:\s*100" -A 5 -B 5

Length of output: 3755

ct-app/core/components/session_to_socket.py (1)

1-8: LGTM! Clean imports and class definition.

The imports are minimal and appropriate for the functionality. The class name follows proper naming conventions.

Comment on lines +40 to +51
def create_socket(self, timeout: int = 1):
if self.session.protocol == Protocol.UDP:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
else:
raise ValueError(f"Invalid protocol: {self.session.protocol}")

s.settimeout(timeout)

conn = None

return s, conn

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor socket creation method for clarity and remove unused variable.

The method initializes an unused conn variable and could be more explicit about UDP-only support in its documentation.

     def create_socket(self, timeout: int = 1):
+        """
+        Creates a UDP socket with the specified timeout.
+        
+        Args:
+            timeout (int): Socket timeout in seconds
+        
+        Returns:
+            tuple: (socket, None) tuple where socket is the UDP socket
+        
+        Raises:
+            ValueError: If protocol is not UDP
+        """
         if self.session.protocol == Protocol.UDP:
             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
         else:
             raise ValueError(f"Invalid protocol: {self.session.protocol}")
 
         s.settimeout(timeout)
-
-        conn = None
-
-        return s, conn
+        return s, None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def create_socket(self, timeout: int = 1):
if self.session.protocol == Protocol.UDP:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
else:
raise ValueError(f"Invalid protocol: {self.session.protocol}")
s.settimeout(timeout)
conn = None
return s, conn
def create_socket(self, timeout: int = 1):
"""
Creates a UDP socket with the specified timeout.
Args:
timeout (int): Socket timeout in seconds
Returns:
tuple: (socket, None) tuple where socket is the UDP socket
Raises:
ValueError: If protocol is not UDP
"""
if self.session.protocol == Protocol.UDP:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
else:
raise ValueError(f"Invalid protocol: {self.session.protocol}")
s.settimeout(timeout)
return s, None

Comment on lines +13 to +16
try:
self.socket, self.conn = self.create_socket(timeout)
except (socket.error, ValueError) as e:
raise ValueError(f"Error while creating socket: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve exception handling with explicit exception chaining.

Use explicit exception chaining to preserve the original error context.

         try:
             self.socket, self.conn = self.create_socket(timeout)
         except (socket.error, ValueError) as e:
-            raise ValueError(f"Error while creating socket: {e}")
+            raise ValueError(f"Error while creating socket: {e}") from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
self.socket, self.conn = self.create_socket(timeout)
except (socket.error, ValueError) as e:
raise ValueError(f"Error while creating socket: {e}")
try:
self.socket, self.conn = self.create_socket(timeout)
except (socket.error, ValueError) as e:
raise ValueError(f"Error while creating socket: {e}") from e
🧰 Tools
🪛 Ruff (0.8.2)

16-16: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
helm/values-staging.yaml (1)

12-12: Consider high availability for session management.

Enabling the core service with one replica is necessary for the new session management features in HOPRd 2.2. However, with five nodes in the configuration, consider increasing the replica count to ensure high availability and better load distribution for session management.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 60bdce0 and fe7596a.

📒 Files selected for processing (1)
  • helm/values-staging.yaml (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Build and push container image

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
ct-app/.configs/core_staging_config.yaml (2)

116-118: Document the session parameters.

The new sessions configuration lacks documentation explaining:

  • The significance of the packet size (462 bytes)
  • The rationale behind the number of packets (10)
  • The impact on network performance and resource usage

Consider adding comments to explain these values:

 sessions:
+  # Maximum size of each packet in bytes
   packetSize: 462
+  # Number of packets per session for optimal performance
   numPackets: 10

114-114: Fix trailing spaces.

Remove trailing spaces to maintain consistent formatting.

-# 
+#
🧰 Tools
🪛 yamllint (1.35.1)

[error] 114-114: trailing spaces

(trailing-spaces)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fe7596a and e76c7bb.

📒 Files selected for processing (1)
  • ct-app/.configs/core_staging_config.yaml (3 hunks)
🧰 Additional context used
🪛 yamllint (1.35.1)
ct-app/.configs/core_staging_config.yaml

[error] 114-114: trailing spaces

(trailing-spaces)

⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Build and push container image
🔇 Additional comments (2)
ct-app/.configs/core_staging_config.yaml (2)

36-38: LGTM! Verify impact on existing configurations.

The renamed observation flags and new session monitoring parameter align well with the HOPRd 2.2 session management changes. The naming is now more consistent.

Let's verify if these configuration changes are consistently applied across environments:

✅ Verification successful

Configuration changes are consistently implemented

The new observation parameters are consistently defined across configurations with proper On/Off values and identical structure. No legacy parameter names were found in the codebase, indicating this is a new feature rather than a rename.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if the new configuration keys are consistently used across all config files
rg -l "watchMessageQueue|relayedMessagesToDB" .configs/

Length of output: 107


Script:

#!/bin/bash
# Find all YAML config files
echo "=== Finding YAML config files ==="
fd -e yaml -e yml

# Search for old parameter names
echo -e "\n=== Searching for old parameter names ==="
rg "watchMessageQueue|relayedMessagesToDB" -l

# Search for new parameter names
echo -e "\n=== Searching for new parameter names ==="
rg "observeMessageQueue|observeRelayedMessages" -l

Length of output: 1376


Script:

#!/bin/bash
# Check test config content
echo "=== Content of test_config.yaml ==="
cat ct-app/test/test_config.yaml

# Search for similar parameters in helm templates
echo -e "\n=== Searching for observe* parameters in helm templates ==="
rg "observe[A-Za-z]+" helm/ctdapp/templates/

Length of output: 3882


52-52: Verify the significant APR increase.

The APR value has been increased by 20x (from 0.000005 to 0.0001). This could have significant economic implications. Please confirm if this increase is intentional and has been properly validated.

Let's check if this change is documented or discussed elsewhere:

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
ct-app/core/node.py (4)

18-24: Consider adding more descriptive help text for metrics.

The metrics labels could be more descriptive. For example, ct_messages_stats has an empty help text.

-MESSAGES_STATS = Gauge("ct_messages_stats", "", ["type", "sender", "relayer"])
+MESSAGES_STATS = Gauge("ct_messages_stats", "Message statistics by type and peers", ["type", "sender", "relayer"])

52-52: Add type annotation for session management dictionary value.

The type annotation for the dictionary value should be more explicit.

-        self.session_management = dict[str, SessionToSocket]()
+        self.session_management: dict[str, SessionToSocket] = {}

438-441: Consider task ordering for session management.

The order of tasks might affect session management. Consider moving observe_sessions before observe_message_queue to ensure sessions are cleaned up before new messages are processed.

         callbacks = [
             self.healthcheck,
             self.retrieve_peers,
             self.retrieve_balances,
             self.retrieve_channels,
             self.open_channels,
             self.fund_channels,
             self.close_old_channels,
             self.close_incoming_channels,
             self.close_pending_channels,
             self.get_total_channel_funds,
+            self.observe_sessions,
             self.observe_message_queue,
             self.observe_relayed_messages,
-            self.observe_sessions,
         ]

392-399: Add early return optimization.

The peer and channel checks can be combined for better readability and performance.

-        peers = [peer.address.hopr for peer in await self.peers.get()]
-        if message.relayer not in peers:
-            return
-
-        channels = [
-            channel.destination_peer_id for channel in self.channels.outgoing]
-        if message.relayer not in channels:
+        peers = {peer.address.hopr for peer in await self.peers.get()}
+        channels = {channel.destination_peer_id for channel in self.channels.outgoing}
+        if message.relayer not in (peers & channels):
             return
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e76c7bb and 37c7f8d.

📒 Files selected for processing (1)
  • ct-app/core/node.py (20 hunks)
🧰 Additional context used
📓 Learnings (1)
ct-app/core/node.py (1)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/node.py:472-476
Timestamp: 2024-11-19T16:47:51.812Z
Learning: The `create_socket` function in `ct-app/core/node.py` is intentionally synchronous and should not be converted to an asynchronous function.
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Build and push container image
🔇 Additional comments (4)
ct-app/core/node.py (4)

6-11: LGTM! Clean import organization.

The imports are well-organized and the new imports for session management are properly placed.


379-385: ⚠️ Potential issue

Add robust error handling for socket operations.

The socket receive operation needs proper error handling to handle connection issues and partial reads.

         for relayer, s in self.session_management.items():
             buffer_size: int = (
                 self.params.sessions.packetSize * self.params.sessions.numPackets
             )
-            messages = s.receive(buffer_size).decode().split("\n")
-            MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc(
-                len(messages)
-            )
+            try:
+                data = s.receive(buffer_size)
+                if not data:
+                    self.warning(f"Connection closed by peer {relayer}")
+                    continue
+                
+                messages = data.decode().split("\n")
+                if messages:
+                    MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc(
+                        len(messages)
+                    )
+            except ConnectionError as e:
+                self.error(f"Socket error for {relayer}: {e}")
+                continue

Likely invalid or redundant comment.


417-425: ⚠️ Potential issue

Prevent race condition in session cleanup.

The session cleanup has potential issues with dictionary modification during iteration.

-        to_remove = []
-        for peer, s in self.session_management.items():
-            if s.session in active_sessions:
-                continue
-            s.socket.close()
-            to_remove.append(peer)
-
-        for peer in to_remove:
-            await self.api.close_session(self.session_management.pop(peer).session)
+        peers_to_remove = []
+        for peer, s in self.session_management.items():
+            if s.session in active_sessions:
+                continue
+            try:
+                s.socket.close()
+                peers_to_remove.append(peer)
+            except Exception as e:
+                self.warning(f"Error closing socket for peer {peer}: {e}")
+                peers_to_remove.append(peer)
+        
+        for peer in peers_to_remove:
+            try:
+                session = self.session_management.pop(peer, None)
+                if session:
+                    await self.api.close_session(session.session)
+            except Exception as e:
+                self.error(f"Error closing session for peer {peer}: {e}")

Likely invalid or redundant comment.


401-411: ⚠️ Potential issue

Add validation and error handling for session initialization.

The session creation lacks proper validation and error handling.

         if message.relayer not in self.session_management:
-            session = await self.api.post_session(
-                self.address.hopr, message.relayer
-            )
-            self.session_management[message.relayer] = SessionToSocket(
-                session, self.url.split(':')[0]
-            )
-
-        self.session_management[message.relayer].send(message.bytes)
-        MESSAGES_STATS.labels("sent", self.address.hopr, message.relayer).inc()
+            try:
+                session = await self.api.post_session(
+                    self.address.hopr, message.relayer
+                )
+                if session is None:
+                    self.error(f"Failed to create session for {message.relayer}")
+                    return
+                self.session_management[message.relayer] = SessionToSocket(
+                    session, self.url.split(':')[0]
+                )
+            except Exception as e:
+                self.error(f"Error creating session for {message.relayer}: {e}")
+                return
+
+        try:
+            bytes_sent = self.session_management[message.relayer].send(message.bytes)
+            if bytes_sent == len(message.bytes):
+                MESSAGES_STATS.labels("sent", self.address.hopr, message.relayer).inc()
+            else:
+                self.warning(f"Partial send to {message.relayer}: {bytes_sent}/{len(message.bytes)} bytes")
+        except ConnectionError as e:
+            self.error(f"Failed to send message to {message.relayer}: {e}")
+            self.session_management.pop(message.relayer, None)

Likely invalid or redundant comment.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
ct-app/core/components/session_to_socket.py (1)

39-50: Remove unused variable and improve documentation.

The method initializes an unused conn variable and could be more explicit about UDP-only support.

     def create_socket(self, timeout: int = 1):
+        """
+        Creates a UDP socket with the specified timeout.
+        
+        Args:
+            timeout (int): Socket timeout in seconds
+        
+        Returns:
+            tuple: (socket, None) tuple where socket is the UDP socket
+        
+        Raises:
+            ValueError: If protocol is not UDP
+        """
         if self.session.protocol == Protocol.UDP:
             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
         else:
             raise ValueError(f"Invalid protocol: {self.session.protocol}")
 
         s.settimeout(timeout)
-
-        conn = None
-
-        return s, conn
+        return s, None
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 37c7f8d and fbc7a2a.

📒 Files selected for processing (2)
  • ct-app/core/components/session_to_socket.py (1 hunks)
  • ct-app/core/node.py (20 hunks)
🧰 Additional context used
📓 Learnings (1)
ct-app/core/node.py (1)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/node.py:472-476
Timestamp: 2024-11-19T16:47:51.812Z
Learning: The `create_socket` function in `ct-app/core/node.py` is intentionally synchronous and should not be converted to an asynchronous function.
🪛 Ruff (0.8.2)
ct-app/core/components/session_to_socket.py

15-15: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Build and push container image
🔇 Additional comments (8)
ct-app/core/components/session_to_socket.py (4)

17-23: LGTM! Well-implemented context management.

The implementation of __enter__ and __exit__ methods ensures proper resource cleanup.


24-38: LGTM! Clear and concise property implementations.

The properties port and address are well-documented and correctly implemented.


51-59: LGTM! Robust protocol validation in data transmission methods.

The send and receive methods correctly validate the protocol and use appropriate socket operations for UDP.

Also applies to: 60-68


12-15: 🛠️ Refactor suggestion

Improve exception handling with explicit exception chaining.

Use explicit exception chaining to preserve the original error context.

         try:
             self.socket, self.conn = self.create_socket(timeout)
         except (socket.error, ValueError) as e:
-            raise ValueError(f"Error while creating socket: {e}")
+            raise ValueError(f"Error while creating socket: {e}") from e

Likely invalid or redundant comment.

🧰 Tools
🪛 Ruff (0.8.2)

15-15: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

ct-app/core/node.py (4)

52-52: LGTM! Clear type annotation for session management.

The session management dictionary is well-typed and initialized correctly.


372-385: 🛠️ Refactor suggestion

Add robust error handling for socket operations.

The message observation lacks error handling for socket operations and message parsing.

     async def observe_relayed_messages(self):
         if self.address is None:
             return
 
         for relayer, s in self.session_management.items():
             buffer_size: int = (
                 self.params.sessions.packetSize * self.params.sessions.numPackets
             )
-            messages = s.receive(buffer_size).decode().split("\n")
-            MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc(
-                len(messages)
-            )
+            try:
+                data = s.receive(buffer_size)
+                if not data:
+                    self.warning(f"Connection closed by peer {relayer}")
+                    continue
+                
+                messages = data.decode().split("\n")
+                if messages:
+                    MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc(
+                        len(messages)
+                    )
+            except ConnectionError as e:
+                self.error(f"Socket error for {relayer}: {e}")
+                continue

Likely invalid or redundant comment.


415-428: 🛠️ Refactor suggestion

Add error handling and prevent race condition.

The session cleanup has potential issues with error handling and dictionary modification during iteration.

     @formalin
     @connectguard
     async def observe_sessions(self):
         active_sessions = await self.api.get_sessions(Protocol.UDP)
 
-        to_remove = []
+        peers_to_remove = []
         for peer, s in self.session_management.items():
             if s.session in active_sessions:
                 continue
-            s.socket.close()
-            to_remove.append(peer)
+            try:
+                s.socket.close()
+                peers_to_remove.append(peer)
+            except Exception as e:
+                self.warning(f"Error closing socket for peer {peer}: {e}")
+                peers_to_remove.append(peer)
 
-        for peer in to_remove:
-            await self.api.close_session(self.session_management.pop(peer).session)
+        for peer in peers_to_remove:
+            try:
+                session = self.session_management.pop(peer, None)
+                if session:
+                    await self.api.close_session(session.session)
+            except Exception as e:
+                self.error(f"Error closing session for peer {peer}: {e}")

Likely invalid or redundant comment.


389-414: 🛠️ Refactor suggestion

Add validation and error handling for session creation.

The session initialization lacks proper validation and error handling.

         if message.relayer not in self.session_management:
+            try:
                 session = await self.api.post_session(
                     self.address.hopr, message.relayer
                 )
                 if session is None:
+                    self.error(f"Failed to create session for {message.relayer}")
                     return
                 
                 self.session_management[message.relayer] = SessionToSocket(
                     session, self.url.split(':')[0]
                 )
+            except Exception as e:
+                self.error(f"Error creating session for {message.relayer}: {e}")
+                return

-        self.session_management[message.relayer].send(message.bytes)
-        MESSAGES_STATS.labels("sent", self.address.hopr, message.relayer).inc()
+        try:
+            bytes_sent = self.session_management[message.relayer].send(message.bytes)
+            if bytes_sent == len(message.bytes):
+                MESSAGES_STATS.labels("sent", self.address.hopr, message.relayer).inc()
+            else:
+                self.warning(f"Partial send to {message.relayer}: {bytes_sent}/{len(message.bytes)} bytes")
+        except ConnectionError as e:
+            self.error(f"Failed to send message to {message.relayer}: {e}")
+            self.session_management.pop(message.relayer, None)

Likely invalid or redundant comment.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
helm/values-staging-green.yaml (1)

13-14: Consider security implications of fixed port range.

The fixed port range (10000-11000) might make the service predictable for potential attackers. Consider:

  1. Randomizing the port range on deployment
  2. Implementing rate limiting for connection attempts
  3. Adding network policies to restrict access to these ports only from trusted peers
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fbc7a2a and d1dbb84.

📒 Files selected for processing (1)
  • helm/values-staging-green.yaml (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Build and push container image

Comment on lines +13 to +14
- name: HOPRD_SESSION_PORT_RANGE
value: 10000:11000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Service configuration missing for session port range

The Kubernetes service configuration in helm/ctdapp/templates/service.yaml needs to be updated to expose the session port range (10000-11000) for peer-to-peer connections. Currently, only the HTTP port is exposed.

  • Add port range configuration to the service template to allow peer connections through ports 10000-11000
🔗 Analysis chain

Verify Kubernetes networking configuration for session ports.

The new session port range (10000-11000) needs to be properly exposed in the Kubernetes service configuration to allow peer connections.

Run this script to check the service configuration:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if the port range is properly exposed in the Kubernetes service
# and if there are any potential port conflicts.

# Look for service configurations
fd -e yaml -e yml | xargs rg -l 'kind:\s*Service' | xargs rg -A 10 'ports:'

# Check for other port range configurations that might conflict
rg -i 'port.*range|port.*from|port.*to'

Length of output: 2020

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
ct-app/core/node.py (1)

72-77: Add error logging for address retrieval failure.

When address retrieval fails, we should log the reason for better debugging.

     if addresses is None:
+        self.error("Failed to retrieve addresses")
         return
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d1dbb84 and 8ef76a1.

📒 Files selected for processing (1)
  • ct-app/core/node.py (20 hunks)
🧰 Additional context used
📓 Learnings (1)
ct-app/core/node.py (1)
Learnt from: jeandemeusy
PR: hoprnet/ct-research#583
File: ct-app/core/node.py:472-476
Timestamp: 2024-11-19T16:47:51.812Z
Learning: The `create_socket` function in `ct-app/core/node.py` is intentionally synchronous and should not be converted to an asynchronous function.
🔇 Additional comments (7)
ct-app/core/node.py (7)

6-11: LGTM! Clean import organization.

The imports are well-organized and the new imports for Protocol and SessionToSocket are properly placed.


18-24: LGTM! Well-structured metrics.

The metrics are properly defined with clear labels and descriptions.


52-52: Initialize session_management with proper typing.

The session management dictionary is properly typed, which is good for type safety.


442-444: LGTM! Task list properly updated.

The new observation methods have been correctly added to the task list.


379-385: ⚠️ Potential issue

Add robust error handling for socket operations.

The socket receive operation needs proper error handling to handle connection issues and partial reads.

     for relayer, s in self.session_management.items():
         buffer_size: int = (
             self.params.sessions.packetSize * self.params.sessions.numPackets
         )
-        messages = s.receive(buffer_size).decode().split("\n")
-        MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc(
-            len(messages)
-        )
+        try:
+            data = s.receive(buffer_size)
+            if not data:
+                self.warning(f"Connection closed by peer {relayer}")
+                continue
+            
+            messages = data.decode().split("\n")
+            if messages:
+                MESSAGES_STATS.labels("relayed", self.address.hopr, relayer).inc(
+                    len(messages)
+                )
+        except ConnectionError as e:
+            self.error(f"Socket error for {relayer}: {e}")
+            continue

Likely invalid or redundant comment.


420-428: ⚠️ Potential issue

Prevent race condition in session cleanup.

The current implementation modifies the dictionary while iterating over it and lacks error handling for socket operations.

-    to_remove = []
-    for peer, s in self.session_management.items():
-        if s.session in active_sessions:
-            continue
-        s.socket.close()
-        to_remove.append(peer)
-
-    for peer in to_remove:
-        await self.api.close_session(self.session_management.pop(peer).session)
+    peers_to_remove = []
+    for peer, s in self.session_management.items():
+        if s.session in active_sessions:
+            continue
+        try:
+            s.socket.close()
+            peers_to_remove.append(peer)
+        except Exception as e:
+            self.warning(f"Error closing socket for peer {peer}: {e}")
+            peers_to_remove.append(peer)
+    
+    for peer in peers_to_remove:
+        try:
+            session = self.session_management.pop(peer, None)
+            if session:
+                await self.api.close_session(session.session)
+        except Exception as e:
+            self.error(f"Error closing session for peer {peer}: {e}")

Likely invalid or redundant comment.


401-413: ⚠️ Potential issue

Add validation and error handling for session management.

The session initialization and message sending lack proper validation and error handling.

     if message.relayer not in self.session_management:
         try:
             session = await self.api.post_session(
                 self.address.hopr, message.relayer
             )
             if session is None:
+                self.error(f"Failed to create session for {message.relayer}")
                 return
             
             self.session_management[message.relayer] = SessionToSocket(
                 session, session.ip
             )
+        except Exception as e:
+            self.error(f"Error creating session for {message.relayer}: {e}")
+            return

-    self.session_management[message.relayer].send(message.bytes)
-    MESSAGES_STATS.labels("sent", self.address.hopr, message.relayer).inc()
+    try:
+        bytes_sent = self.session_management[message.relayer].send(message.bytes)
+        if bytes_sent == len(message.bytes):
+            MESSAGES_STATS.labels("sent", self.address.hopr, message.relayer).inc()
+        else:
+            self.warning(f"Partial send to {message.relayer}: {bytes_sent}/{len(message.bytes)} bytes")
+    except ConnectionError as e:
+        self.error(f"Failed to send message to {message.relayer}: {e}")
+        self.session_management.pop(message.relayer, None)

Likely invalid or redundant comment.

@jeandemeusy jeandemeusy changed the title Make CT HORPd 2.2 compliant Use sessions to send messages Jan 15, 2025
@jeandemeusy jeandemeusy changed the title Use sessions to send messages Use Sessions to send messages Jan 15, 2025
@jeandemeusy jeandemeusy linked an issue Jan 15, 2025 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Send message through sessions instead of using the API
2 participants