Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add heartbeat thread for both Kafka brokers #89

Merged
merged 3 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 91 additions & 51 deletions gtecs/alert/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(self):

# sentinel variables
self.running = False
self.latest_message_time = time.time()
self.notice_queue = []
self.latest_notice = None
self.received_notices = 0
Expand All @@ -49,25 +50,34 @@ def run(self, host, port, timeout=5):
"""Run the sentinel as a Pyro daemon."""
self.running = True

# Start threads
# TODO: Switch between socket & kafka, or even have both (as a backup)?
t1 = threading.Thread(
target=self._kafka_listener_thread,
args=(
params.KAFKA_USER,
params.KAFKA_PASSWORD,
params.KAFKA_BROKER,
params.KAFKA_GROUP_ID,
params.KAFKA_BACKDATE,
),
)
# t1 = threading.Thread(target=self._socket_listener_thread)
t1.daemon = True
t1.start()

t2 = threading.Thread(target=self._handler_thread)
t2.daemon = True
t2.start()
# Start listener and handler threads
listener_mode = 'KAFKA' # TODO: params (or have both?)
if listener_mode == 'SOCKET':
listener_thread = threading.Thread(target=self._socket_listener_thread)
listener_thread.daemon = True
listener_thread.start()
elif listener_mode == 'KAFKA':
listener_thread = threading.Thread(
target=self._kafka_listener_thread,
args=(
params.KAFKA_USER,
params.KAFKA_PASSWORD,
params.KAFKA_BROKER,
params.KAFKA_GROUP_ID,
params.KAFKA_BACKDATE,
),
)
listener_thread.daemon = True
listener_thread.start()
# Also start a thread to monitor the latest message time
heartbeat_thread = threading.Thread(target=self._kafka_heartbeat_thread)
heartbeat_thread.daemon = True
heartbeat_thread.start()
else:
raise ValueError('Listener mode must be "SOCKET" or "KAFKA"')
handler_thread = threading.Thread(target=self._handler_thread)
handler_thread.daemon = True
handler_thread.start()

# Check the Pyro address is available
try:
Expand Down Expand Up @@ -122,6 +132,7 @@ def _socket_listener_thread(self):

# Define basic handler function to create a Notice instance and add it to the queue
def _handler(payload, _root):
self.latest_message_time = time.time()
notice = Notice.from_payload(payload)
self.notice_queue.append(notice)

Expand Down Expand Up @@ -179,7 +190,6 @@ def _listen(vo_socket, handler):
self.log.info('Closed socket connection')

self.log.info('Alert listener thread stopped')
return

def _kafka_listener_thread(
self,
Expand All @@ -204,7 +214,7 @@ def _kafka_listener_thread(
# There's also no way to select specific notice types
# SCIMMA have said they will be reworking their GCN system at some point...
topics = ['gcn.notice']
# Also subscribe to the heartbeat topic, do we can check if we're still connected
# Also subscribe to the heartbeat topic, so we can check if we're still connected
topics += ['sys.heartbeat']
elif broker == 'NASA':
broker_url = 'kafka://kafka.gcn.nasa.gov/'
Expand Down Expand Up @@ -247,6 +257,8 @@ def _kafka_listener_thread(
'gcn.classic.voevent.ICECUBE_ASTROTRACK_GOLD',
'gcn.classic.voevent.ICECUBE_CASCADE',
]
# Also subscribe to the heartbeat topic, so we can check if we're still connected
topics += ['gcn.heartbeat']
else:
raise ValueError('Broker must be "SCIMMA" or "NASA"')

Expand All @@ -271,21 +283,23 @@ def _kafka_listener_thread(
# # but it doesn't seem to be implemented in hop-client yet.
# start_position = datetime.now() - timedelta(hours=12)

# One of the advantages of the newer brokers is monitoring the heartbeat topic
# to make sure we're connected.
# However, if we backdate with a new group ID we'll get weeks and weeks of
# heartbeat messages, which is very annoying.
# So instead we'll sneaky read the latest heartbeat message right now.
# That gives the starting point for that topic, so when we start the stream
# below it'll only have to handle a few seconds of heartbeat messages
# rather than weeks.
if broker == 'SCIMMA':
# One of the advantages of the SCIMMA broker is monitoring the heartbeat topic
# to make sure we're connected.
# However, if we backdate with a new group ID we'll get weeks and weeks of
# heartbeat messages, which is very annoying.
# So instead we'll sneaky read the latest heartbeat message right now.
# That gives the starting point for that topic, so when we start the stream
# below it'll only have to handle a few seconds of heartbeat messages
# rather than weeks.
url = broker_url + 'sys.heartbeat'
stream = Stream(auth=auth, start_at=StartPosition.LATEST, until_eos=True)
consumer = stream.open(url, mode='r', group_id=group_id)
for payload, metadata in consumer.read_raw(metadata=True, autocommit=True):
if metadata.topic == 'sys.heartbeat':
break
elif broker == 'NASA':
url = broker_url + 'gcn.heartbeat'
stream = Stream(auth=auth, start_at=StartPosition.LATEST, until_eos=True)
consumer = stream.open(url, mode='r', group_id=group_id)
for payload, metadata in consumer.read_raw(metadata=True, autocommit=True):
if 'heartbeat' in metadata.topic:
break
else:
self.log.debug('Starting Kafka stream from latest message')
start_position = StartPosition.LATEST
Expand All @@ -306,22 +320,14 @@ def _kafka_listener_thread(
for payload, metadata in consumer.read_raw(metadata=True, autocommit=True):
if not self.running:
break
# Because of the heartbeat messages we should be getting a message
# every few seconds, so we can use this timestamp to check if we're
# still connected.
self.latest_message_time = time.time()

if broker == 'SCIMMA':
# We can use the system heartbeat to check if we're still connected
heartbeat_timeout = 60
latest_message_time = 0
# Because of the sys.heartbeat messages we should be getting a message
# every few seconds, so we can use this timestamp to check if we're
# still connected.
if (latest_message_time and
time.time() - latest_message_time > heartbeat_timeout):
raise TimeoutError(f'No heartbeat in {heartbeat_timeout}s')
latest_message_time = time.time()

if metadata.topic == 'sys.heartbeat':
# No need to process heartbeat messages
continue
if 'heartbeat' in metadata.topic:
# No need to process heartbeat messages
continue

# Create the notice and add it to the queue
try:
Expand Down Expand Up @@ -362,7 +368,38 @@ def _kafka_listener_thread(
self.log.info('Closed connection')

self.log.info('Alert listener thread stopped')
return

def _kafka_heartbeat_thread(self, timeout=60):
"""Monitor the latest message timestamp to check if we're still connected."""
self.log.info('Heartbeat thread started')

timed_out = False
while self.running:
# Because of the heartbeat messages we should be getting a message
# every few seconds, so we can use this timestamp to check if we're
# still connected.
time_delta = time.time() - self.latest_message_time
if time_delta > timeout:
# We haven't received a message in a while.
# We can't kill the listener thread from here, this is just for our own logging.
self.log.warning(f'No new messages for {time_delta:.0f}s')
if not timed_out:
# Only send the Slack message once
# Obviously this will fail if the network is down,
# it's more useful if there's an issue on the broker's end.
send_slack_msg(f'Sentinel reports no new messages for {time_delta:.0f}s')
timed_out = True
timed_out_time = time.time()
else:
if timed_out:
# We've started receiving messages again!
self.log.info('Connection restored')
time_delta = time.time() - timed_out_time
send_slack_msg(f'Sentinel connection restored after {time_delta:.0f}s')
timed_out = False
time.sleep(5)

self.log.info('Heartbeat thread stopped')

def _handler_thread(self):
"""Monitor the notice queue and handle any new notices."""
Expand Down Expand Up @@ -417,7 +454,6 @@ def _handler_thread(self):
time.sleep(0.1)

self.log.info('Alert handler thread stopped')
return

def _fermi_skymap_thread(self, notice, timeout=600):
"""Listen for the official skymap for Fermi notices."""
Expand Down Expand Up @@ -489,6 +525,10 @@ def get_queue(self):
# We could return raw payloads I guess...
return [notice.ivorn for notice in self.notice_queue]

def clear_queue(self):
"""Clear the current notice queue."""
self.notice_queue = []


def run():
"""Start the sentinel."""
Expand Down
19 changes: 14 additions & 5 deletions scripts/sentinel
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,19 @@ def query(command, args):
print(proxy.ingest_from_file(args[0]))

elif command == 'queue':
with Pyro4.Proxy(params.PYRO_URI) as proxy:
queue = proxy.get_queue()
print(f'There are {len(queue)} notices currently in the queue')
for i, ivorn in enumerate(queue):
print(i, ivorn)
if len(args) == 0:
with Pyro4.Proxy(params.PYRO_URI) as proxy:
queue = proxy.get_queue()
print(f'There are {len(queue)} notices currently in the queue')
for i, ivorn in enumerate(queue):
print(i, ivorn)
elif len(args) == 1 and args[0] == 'clear':
with Pyro4.Proxy(params.PYRO_URI) as proxy:
proxy.clear_queue()
print('Queue cleared')
else:
print('ERROR: Invalid arguments for "queue" command')
print('Usage: sentinel queue [clear]')

elif command == 'topics':
with Pyro4.Proxy(params.PYRO_URI) as proxy:
Expand All @@ -97,6 +105,7 @@ def print_instructions():
' shutdown shut down the sentinel',
' ingest [path|ivorn] add the given notice to the queue',
' queue print the notices currently in the queue',
' queue clear clear all notices currently in the queue',
' topics print all subscribed Kafka topics',
' log [tail args] print sentinel log (alias for tail)',
' help print these instructions',
Expand Down