Skip to content

Commit

Permalink
fix(whisper): Fixes leaking whisper connections. (#511)
Browse files Browse the repository at this point in the history
* fix(whisper): Fixes leaking whisper connections.

Moves setting transcriber room name earlier, so we can use it in creating the transcriber sessions.

When a participant never talked, is missing in one of the participants lists and those got desync and we never remove the websocket from the pool, which results in reusing it for the same room (the socket has timeouted and disconnected) and there is a NPE that prevents transcriptions in the meeting.
2023-11-30 13:09:17.645 SEVERE: [188854] org.jivesoftware.smack.AbstractXMPPConnection.lambda$invokeStanzaCollectorsAndNotifyRecvListeners$8: Exception in packet listener
java.lang.NullPointerException
	at org.jitsi.jigasi.transcription.WhisperWebsocket.addParticipantIfNotExists(WhisperWebsocket.java:362)
	at org.jitsi.jigasi.transcription.WhisperWebsocket.addListener(WhisperWebsocket.java:373)
	at org.jitsi.jigasi.transcription.WhisperTranscriptionService$WhisperWebsocketStreamingSession.addTranscriptionListener(WhisperTranscriptionService.java:170)
	at org.jitsi.jigasi.transcription.Participant.joined(Participant.java:524)

* squash: Fixes some warnings.
  • Loading branch information
damencho authored Dec 1, 2023
1 parent 38030bd commit 86273ae
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public TranscriptionGatewaySession(AbstractGateway gateway,
this.transcriber.addTranscriptionEventListener(
(TranscriptionEventListener)this.service);
}

transcriber.setRoomName(this.getCallContext().getRoomJid().toString());
}

@Override
Expand All @@ -137,7 +139,6 @@ void onConferenceCallInvited(Call incomingCall)
// the room name, url and start listening
transcriber.addTranscriptionListener(this);
transcriber.addTranslationListener(this);
transcriber.setRoomName(this.getCallContext().getRoomJid().toString());
transcriber.setRoomUrl(getMeetingUrl());

// We create a MediaWareCallConference whose MediaDevice
Expand Down Expand Up @@ -318,7 +319,7 @@ void notifyChatRoomMemberUpdated(ChatRoomMember chatMember, Presence presence)
}
catch (InterruptedException e)
{
e.printStackTrace();
logger.error(e);
}

if (!transcriber.isAnyParticipantRequestingTranscription())
Expand Down Expand Up @@ -563,7 +564,7 @@ private String getConferenceMemberResourceID(ConferenceMember member)
}
catch (XmppStringprepException e)
{
e.printStackTrace();
logger.error(e);
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,35 +47,19 @@ public class WhisperConnectionPool
/**
* A hashmap holding the state for each connection
*/
private final Map<String, ConnectionState> pool = new ConcurrentHashMap<>();
private final Map<String, WhisperWebsocket> pool = new ConcurrentHashMap<>();

/**
* The thread pool to serve all connect disconnect operations.
*/
private static ExecutorService threadPool = Util.createNewThreadPool("jigasi-whisper-ws");


private static class ConnectionState
{
public WhisperWebsocket wsConn;
public HashSet<String> participants = new HashSet<>();

public ConnectionState(WhisperWebsocket ws)
{
wsConn = ws;
}

}
private static final ExecutorService threadPool = Util.createNewThreadPool("jigasi-whisper-ws");

/**
* Gets a connection if it exists, creates one if it doesn't.
* @param roomId The room jid.
* @param participantId The participant id.
* @return The websocket.
* @throws Exception When fail to initialize the socket.
*/
public WhisperWebsocket getConnection(String roomId, String participantId)
throws Exception
public WhisperWebsocket getConnection(String roomId)
{
if (!pool.containsKey(roomId))
{
Expand All @@ -85,11 +69,10 @@ public WhisperWebsocket getConnection(String roomId, String participantId)
// connect socket in new thread to not block Smack threads
threadPool.execute(socket::connect);

pool.put(roomId, new ConnectionState(socket));
pool.put(roomId, socket);
}

pool.get(roomId).participants.add(participantId);
return pool.get(roomId).wsConn;
return pool.get(roomId);
}

/**
Expand All @@ -99,38 +82,23 @@ public WhisperWebsocket getConnection(String roomId, String participantId)
*/
public void end(String roomId, String participantId)
{
// execute thi in new thread to not block Smack
threadPool.execute(() -> {
this.endInternal(roomId, participantId);
});
// execute this in new thread to not block Smack
threadPool.execute(() -> this.endInternal(roomId, participantId));
}

private void endInternal(String roomId, String participantId)
{
ConnectionState state = pool.getOrDefault(roomId, null);
if (state == null)
{
return;
}

if (!state.participants.contains(participantId))
{
return;
}

state.participants.remove(participantId);

if (!state.participants.isEmpty())
WhisperWebsocket wsConn = pool.getOrDefault(roomId, null);
if (wsConn == null)
{
return;
}

boolean isEverybodyDisconnected = false;
try
{
isEverybodyDisconnected = state.wsConn.disconnectParticipant(participantId);
if (isEverybodyDisconnected)
if (wsConn.disconnectParticipant(participantId))
{
// remove from the pull if everyone is disconnected
pool.remove(roomId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,13 @@ public static class WhisperWebsocketStreamingSession


WhisperWebsocketStreamingSession(Participant participant)
throws Exception
{
this.participant = participant;
String[] debugName = this.participant.getDebugName().split("/");
participantId = debugName[1];
roomId = debugName[0];
roomId = participant.getTranscriber().getRoomName();
connectionPool = WhisperConnectionPool.getInstance();
wsClient = connectionPool.getConnection(roomId, participantId);
wsClient = connectionPool.getConnection(roomId);
wsClient.setTranscriptionTag(transcriptionTag);
}

Expand Down

0 comments on commit 86273ae

Please sign in to comment.