-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnotification.py
137 lines (101 loc) · 3.7 KB
/
notification.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import asyncio
from typing import AsyncIterator
from coagent.core import (
Address,
AgentSpec,
BaseAgent,
Context,
handler,
idle_loop,
logger,
Message,
new,
set_stderr_logger,
)
from coagent.core.messages import ControlMessage
from coagent.runtimes import NATSRuntime
class Notification(Message):
type: str
content: str
class Subscribe(Message):
user_id: str
class Notify(Message):
user_id: str
notification: Notification
class _SubscribeToCenter(Message):
user_id: str
sender: Address
class _UnsubscribeFromCenter(Message):
user_id: str
class _ControlNotify(ControlMessage):
"""A CONTROL message for putting a notification into the queue."""
notification: Notification
class Proxy(BaseAgent):
"""A proxy agent that accepts subscriptions from the user and forwards the
notifications from the singleton center agent to the user.
"""
def __init__(self):
# The agent is long-running and will be deleted when the user cancels.
super().__init__(timeout=float("inf"))
self.__queue: asyncio.Queue[Notification] = asyncio.Queue()
@handler
async def notify(self, msg: _ControlNotify, ctx: Context) -> None:
await self.__queue.put(msg.notification)
@handler
async def subscribe(
self, msg: Subscribe, ctx: Context
) -> AsyncIterator[Notification]:
# Subscribe to the singleton center agent.
await self.channel.publish(
Center.SINGLETON_ADDRESS,
_SubscribeToCenter(user_id=msg.user_id, sender=self.address).encode(),
)
while True:
try:
# Forward notifications from the center agent to the user.
notification = await self.__queue.get()
self.__queue.task_done()
yield notification
except asyncio.CancelledError:
# Unsubscribe from the center agent when the user cancelled.
await self.channel.publish(
Center.SINGLETON_ADDRESS,
_UnsubscribeFromCenter(user_id=msg.user_id).encode(),
)
raise
class Center(BaseAgent):
"""A center agent that listens to notifications and forwards them to the
appropriate subscribing agents.
"""
SINGLETON_ADDRESS = Address(name="center", id="singleton")
def __init__(self):
# This is a long-running agent and has the same lifetime as the application.
super().__init__(timeout=float("inf"))
self.__subscribers: dict[str, Address] = {}
@handler
async def subscribe(self, msg: _SubscribeToCenter, ctx: Context) -> None:
self.__subscribers[msg.user_id] = msg.sender
logger.info(f"User {msg.user_id} subscribed")
@handler
async def unsubscribe(self, msg: _UnsubscribeFromCenter, ctx: Context) -> None:
self.__subscribers.pop(msg.user_id, None)
logger.info(f"User {msg.user_id} unsubscribed")
@handler
async def notify(self, msg: Notify, ctx: Context) -> None:
addr = self.__subscribers.get(msg.user_id)
if not addr:
logger.warning(f"User {msg.user_id} is not subscribed")
return
_notify = _ControlNotify(notification=msg.notification)
await self.channel.publish(addr, _notify.encode())
logger.info(f"Notification sent to user {msg.user_id}")
proxy = AgentSpec("proxy", new(Proxy))
center = AgentSpec("center", new(Center))
async def main():
async with NATSRuntime.from_servers() as runtime:
await runtime.register(proxy)
await runtime.register(center)
await idle_loop()
if __name__ == "__main__":
set_stderr_logger()
asyncio.run(main())