Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Sessions to send messages #583

Open
wants to merge 66 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
e6a6157
improvemnts
jeandemeusy Sep 24, 2024
44968be
reduced calls to channels endpoint
jeandemeusy Sep 25, 2024
36692fa
Merge branch 'main' into jean/various-code-improvements
jeandemeusy Sep 25, 2024
bdfee82
fix
jeandemeusy Sep 25, 2024
c9d17ad
Merge branch 'jean/various-code-improvements' of https://github.com/h…
jeandemeusy Sep 25, 2024
1ac53be
typos
jeandemeusy Sep 25, 2024
c7ca8f1
typo
jeandemeusy Sep 25, 2024
73dc2b8
typos
jeandemeusy Oct 4, 2024
a0a84b0
stop storing messages
jeandemeusy Oct 4, 2024
6c9c51e
imports refactoring
jeandemeusy Oct 4, 2024
7947c0e
enum for subgraphs
jeandemeusy Oct 4, 2024
daf0967
remove extra code
jeandemeusy Oct 4, 2024
0e2614a
Refactor subgraph mode variable names and comments
jeandemeusy Oct 4, 2024
f3f7ccb
Update ct-app/core/model/subgraph/entries/balance.py
jeandemeusy Oct 4, 2024
80cbaff
avoid calling enum vars
jeandemeusy Oct 4, 2024
1ae6a88
reduce channels sg calls
jeandemeusy Oct 7, 2024
a35b316
imports handling
jeandemeusy Oct 7, 2024
93ddebc
typos
jeandemeusy Oct 10, 2024
5ab3005
async api calls
jeandemeusy Oct 28, 2024
d4e4970
Merge branch 'main' into jean/make-api-calls-really-async
jeandemeusy Oct 29, 2024
70a0938
fmt
jeandemeusy Oct 29, 2024
f1921c3
reset staging params
jeandemeusy Oct 29, 2024
d98a825
extract messages only if call is successfull
jeandemeusy Oct 29, 2024
b82d8d2
typo in pop messages
jeandemeusy Oct 29, 2024
c98a75a
Merge branch 'main' into jean/make-api-calls-really-async
jeandemeusy Nov 12, 2024
7e02e30
missing api result conversions
jeandemeusy Nov 12, 2024
d20968c
fix parsing popped messages
jeandemeusy Nov 12, 2024
64776a0
Merge branch 'main' into jean/make-api-calls-really-async
jeandemeusy Nov 12, 2024
de3040c
Merge branch 'main' into jean/make-api-calls-really-async
jeandemeusy Nov 12, 2024
b0ce815
increase prod batch to db size
jeandemeusy Nov 12, 2024
e941a52
change storage values on staging
jeandemeusy Nov 12, 2024
5cf24d5
applied rabbit's recommendations
jeandemeusy Nov 12, 2024
9eb51fa
simplified model's reward calculation
jeandemeusy Nov 12, 2024
13b883e
endurance tests
jeandemeusy Nov 13, 2024
225ccb6
Update ct-app/tests_endurance/test_send_messages.py
jeandemeusy Nov 13, 2024
061aa07
dynamic message in load testing
jeandemeusy Nov 13, 2024
ae7876b
Merge branch 'jean/make-api-calls-really-async' of https://github.com…
jeandemeusy Nov 13, 2024
ce069bb
Merge branch 'main' into jean/make-api-calls-really-async
jeandemeusy Nov 14, 2024
58c3b40
typos
jeandemeusy Nov 14, 2024
dc19d26
store logs in file
jeandemeusy Nov 14, 2024
dd99d35
Merge branch 'main' into jean/make-api-calls-really-async
jeandemeusy Nov 15, 2024
674a723
use dynamic win prob
jeandemeusy Nov 15, 2024
88109b8
session api wrapper and object (de)serialization
jeandemeusy Nov 15, 2024
c8c3891
parameter enhancement
jeandemeusy Nov 15, 2024
235224e
format
jeandemeusy Nov 15, 2024
33d85c2
using sessions
jeandemeusy Nov 18, 2024
2bac89b
Merge branch 'main' into jean/hoprd-2-2-compliance
jeandemeusy Nov 19, 2024
4b751fc
restructure, no-db anymore, and sessions
jeandemeusy Nov 26, 2024
a9ac929
format
jeandemeusy Nov 26, 2024
7ea31b2
bots comments addressed
jeandemeusy Nov 26, 2024
cf4560e
channel endurance test fix
jeandemeusy Nov 26, 2024
f539187
Merge branch 'main' into jean/hoprd-2-2-compliance
jeandemeusy Nov 27, 2024
689a58d
Refactor channel status properties and improve peer session management
jeandemeusy Dec 5, 2024
520ebde
reduce ticket winn prob
jeandemeusy Dec 6, 2024
1fc3092
using loopback sessions
jeandemeusy Dec 12, 2024
3b8206f
gets winn prob from config and unify addresses format
jeandemeusy Dec 12, 2024
01fa187
use udp receive method
jeandemeusy Dec 12, 2024
60bdce0
Merge branch 'main' into jean/hoprd-2-2-compliance
jeandemeusy Dec 12, 2024
fe7596a
deploy staging
jeandemeusy Jan 6, 2025
e76c7bb
increase apr on staging
jeandemeusy Jan 8, 2025
37c7f8d
specify session target
jeandemeusy Jan 8, 2025
fbc7a2a
session creation guard
jeandemeusy Jan 8, 2025
d1dbb84
specify port range
jeandemeusy Jan 9, 2025
8ef76a1
use detected ip by session
jeandemeusy Jan 9, 2025
fa82376
Merge branch 'main' into jean/hoprd-2-2-compliance
jeandemeusy Jan 21, 2025
06b5e9d
class ordering
jeandemeusy Jan 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions ct-app/.configs/core_prod_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ flags:
getTotalChannelFunds: 300

observeMessageQueue: On # should only be On / Off
observeRelayedMessages: 15
observeRelayedMessages: On # should only be On / Off
observeSessions: 60

peer:
messageRelayRequest: On # should only be On / Off
Expand Down Expand Up @@ -84,7 +85,6 @@ economicModel:
upperbound: 1.0
offset: 4.08


# =============================================================================
#
# =============================================================================
Expand All @@ -102,6 +102,13 @@ channel:
fundingAmount: 30
maxAgeSeconds: 172800

# =============================================================================
#
# =============================================================================
sessions:
packetSize: 462
numPackets: 10

# =============================================================================
#
# =============================================================================
Expand Down
10 changes: 9 additions & 1 deletion ct-app/.configs/core_staging_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ flags:
getTotalChannelFunds: 300

observeMessageQueue: On # should only be On / Off
observeRelayedMessages: 15
observeRelayedMessages: On # should only be On / Off
observeSessions: 60

peer:
messageRelayRequest: On # should only be On / Off
Expand Down Expand Up @@ -102,6 +103,13 @@ channel:
fundingAmount: 0.1
maxAgeSeconds: 86400

# =============================================================================
#
# =============================================================================
sessions:
packetSize: 462
numPackets: 10

# =============================================================================
#
# =============================================================================
Expand Down
3 changes: 2 additions & 1 deletion ct-app/core/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .hoprd_api import HoprdAPI
from .protocol import Protocol

__all__ = ["HoprdAPI"]
__all__ = ["HoprdAPI", "Protocol"]
99 changes: 68 additions & 31 deletions ct-app/core/api/hoprd_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@
from typing import Optional

import aiohttp

from core.baseclass import Base

from .http_method import HTTPMethod
from .protocol import Protocol
from .request_objects import (
ApiRequestObject,
CreateSessionBody,
DeleteSessionBody,

FundChannelBody,
GetChannelsBody,
GetPeersBody,
OpenChannelBody,
PopMessagesBody,
SendMessageBody,
SessionCapabilitiesBody,
SessionPathBodyRelayers,
SessionTargetBody,
)
from .response_objects import (
Addresses,
Expand All @@ -23,8 +27,8 @@
Configuration,
ConnectedPeer,
Infos,
Message,
OpenedChannel,
Session,
TicketPrice,
TicketProbability,
)
Expand Down Expand Up @@ -185,22 +189,6 @@ async def get_address(self) -> Optional[Addresses]:

return Addresses(response) if is_ok else None

async def send_message(
self, destination: str, message: str, hops: list[str], tag: int = MESSAGE_TAG
) -> bool:
"""
Sends a message to the given destination.
:param: destination: str
:param: message: str
:param: hops: list[str]
:param: tag: int = 0x0320
:return: bool
"""
data = SendMessageBody(message, hops, destination, tag)
is_ok, _ = await self.__call_api(HTTPMethod.POST, "messages", data=data)

return is_ok

async def node_info(self) -> Optional[Infos]:
"""
Gets informations about the HOPRd node.
Expand All @@ -217,17 +205,6 @@ async def ticket_price(self) -> Optional[TicketPrice]:
is_ok, response = await self.__call_api(HTTPMethod.GET, "network/price")
return TicketPrice(response) if is_ok else None

async def messages_pop_all(self, tag: int = MESSAGE_TAG) -> list:
"""
Pop all messages from the inbox
:param: tag = 0x0320
:return: list
"""
is_ok, response = await self.__call_api(
HTTPMethod.POST, "messages/pop-all", data=PopMessagesBody(tag)
)
return [Message(item) for item in response.get("messages", [])] if is_ok else []

async def winning_probability(self) -> Optional[TicketProbability]:
"""
Gets the winning probability set in the HOPRd node configuration file.
Expand All @@ -236,6 +213,66 @@ async def winning_probability(self) -> Optional[TicketProbability]:
is_ok, response = await self.__call_api(HTTPMethod.GET, "node/configuration")
return TicketProbability(Configuration(json.loads(response)).as_dict) if is_ok else None

async def get_sessions(self, protocol: Protocol = Protocol.UDP) -> list[Session]:
"""
Lists existing Session listeners for the given IP protocol
:param: protocol: Protocol
:return: list[Session]
"""
is_ok, response = await self.__call_api(
HTTPMethod.GET, f"session/{protocol.name.lower()}"
)

return [Session(s) for s in response] if is_ok else None

async def post_session(
self,
destination: str,
relayer: str,
listen_host: str = ":0",
protocol: Protocol = Protocol.UDP
) -> Optional[Session]:
"""
Creates a new client session returning the given session listening host & port over TCP or UDP.
:param: destination: PeerID of the recipient
:param: relayer: PeerID of the relayer
:param: listen_host: str
:param: protocol: Protocol (UDP or TCP)
:return: Session
"""
capabilities_body = SessionCapabilitiesBody(
protocol.retransmit, protocol.segment)
target_body = SessionTargetBody()
path_body = SessionPathBodyRelayers([relayer])

data = CreateSessionBody(
capabilities_body.as_array,
destination,
listen_host,
path_body.as_dict,
target_body.as_dict,
)

is_ok, response = await self.__call_api(
HTTPMethod.POST, f"session/{protocol.name.lower()}", data
)

return Session(response) if is_ok else None

async def close_session(self, session: Session) -> bool:
"""
Closes an existing Sessino listener for the given IP protocol, IP and port.
:param: session: Session
"""
data = DeleteSessionBody(session.ip, session.port)

is_ok, _ = await self.__call_api(
HTTPMethod.DELETE, f"session/{session.protocol}", data
)

return is_ok


async def healthyz(self, timeout: int = 20) -> bool:
"""
Checks if the node is healthy. Return True if `healthyz` returns 200 after max `timeout` seconds.
Expand Down
45 changes: 45 additions & 0 deletions ct-app/core/api/protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from enum import Enum


class ProcolTemplate:
def __init__(self, retransmit: bool, segment: bool):
self.retransmit = retransmit
self.segment = segment


class TCPProtocol(ProcolTemplate):
def __init__(self):
super().__init__(retransmit=True, segment=True)


class UDPProtocol(ProcolTemplate):
def __init__(self):
super().__init__(retransmit=False, segment=False)


class Protocol(Enum):
TCP = TCPProtocol()
UDP = UDPProtocol()

@classmethod
def fromString(cls, protocol: str):
try:
return getattr(cls, protocol.upper())
except AttributeError:
raise ValueError(
f"Invalid protocol: {protocol}. Valid values are: {[p.name for p in cls]}"
)
jeandemeusy marked this conversation as resolved.
Show resolved Hide resolved

@property
def segment(self):
return self.value.segment

@property
def retransmit(self):
return self.value.retransmit

def __eq__(self, other):
if isinstance(other, str):
return other == self.name

return self.name == other.name
jeandemeusy marked this conversation as resolved.
Show resolved Hide resolved
82 changes: 70 additions & 12 deletions ct-app/core/api/request_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,83 @@ class GetChannelsBody(ApiRequestObject):
def __init__(self, full_topology: str, including_closed: str):
super().__init__(vars())

class SendMessageBody(ApiRequestObject):

class GetPeersBody(ApiRequestObject):
keys = {"quality": "quality"}

def __init__(self, quality: float):
super().__init__(vars())


class CreateSessionBody(ApiRequestObject):
keys = {
"body": "body",
"capabilities": "capabilities",
"destination": "destination",
"listen_host": "listenHost",
"path": "path",
"destination": "peerId",
"tag": "tag",
"target": "target",
}

def __init__(self, body: str, path: list[str], destination: str, tag: int):
def __init__(
self,
capabilities: list,
destination: str,
listen_host: str,
path: str,
target: str,
):
super().__init__(vars())

class PopMessagesBody(ApiRequestObject):
keys = {"tag": "tag"}

def __init__(self, tag: int):
class SessionCapabilitiesBody(ApiRequestObject):
keys = {
"retransmission": "Retransmission",
"segmentation": "Segmentation",
}

def __init__(self, retransmission: bool, segmentation: bool):
super().__init__(vars())

class GetPeersBody(ApiRequestObject):
keys = {"quality": "quality"}
@property
def as_array(self) -> list:
return [
self.keys[var] for var in vars(self) if var in self.keys and vars(self)[var]
]


class SessionPathBodyRelayers(ApiRequestObject):
keys = {
"relayers": "IntermediatePath",
}

def __init__(self, relayers: list[str]):
super().__init__(vars())


class SessionPathBodyHops(ApiRequestObject):
keys = {
"hops": "Hops",
}

def __init__(self, hops: int = 0):
super().__init__(vars())

def post_init(self):
self.hops = int(self.hops)


class SessionTargetBody(ApiRequestObject):
keys = {
"service": "Service",
}

def __init__(self, service: int = 0):
super().__init__(vars())


class DeleteSessionBody(ApiRequestObject):
keys = {"ip": "listeningIp", "port": "port"}

def __init__(self, ip: str, port: str):
super().__init__(vars())

def __init__(self, quality: float):
super().__init__(vars())
15 changes: 11 additions & 4 deletions ct-app/core/api/response_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ class Channel(ApiResponseObject):

def post_init(self):
self.status = ChannelStatus.fromString(self.status)

if isinstance(self.destination_address, str):
self.destination_address = self.destination_address.lower()
if isinstance(self.source_address, str):
self.source_address = self.source_address.lower()


class TicketPrice(ApiResponseObject):
keys = {"value": "price"}
Expand All @@ -121,8 +121,6 @@ class Configuration(ApiResponseObject):
class OpenedChannel(ApiResponseObject):
keys = {"channel_id": "channelId", "receipt": "transactionReceipt"}

class Message(ApiResponseObject):
keys = {"body": "body", "timestamp": "timestamp"}

class Channels:
def __init__(self, data: dict):
Expand All @@ -134,4 +132,13 @@ def __str__(self):
return str(self.__dict__)

def __repr__(self):
return str(self)
return str(self)


class Session(ApiResponseObject):
keys = {
"ip": "ip",
"port": "port",
"protocol": "protocol",
"target": "target",
}
1 change: 0 additions & 1 deletion ct-app/core/components/asyncloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ def __init__(self):
self.loop.add_signal_handler(SIGINT, self.stop)
self.loop.add_signal_handler(SIGTERM, self.stop)

@classmethod
def run(cls, process: Callable, stop_callback: Callable):
try:
cls().loop.run_until_complete(process())
Expand Down
Loading
Loading