Skip to content

Latest commit

 

History

History
182 lines (147 loc) · 5.75 KB

README.adoc

File metadata and controls

182 lines (147 loc) · 5.75 KB

Simple Stampy - JSR 356 (WebSocket) integration

Server

Basic usage

To define a stampy endpoint extend com.github.rmannibucau.websocket.stomp.BaseStampyEndpoint and override method getStampyListeners().

Here is a sample:

@ServerEndpoint(
    value = "/touch", // your endpoint
    subprotocols = "stomp" // what you want actually
)
public class Server extends BaseStampyEndpoint {
    @Override
    public StampyMessageListener[] getStampyListeners() {
        return new StampyMessageListener[]{
            new DebugListener(Logger.getLogger("debug-server")),
            new SubscribeListener(gateway)
        };
    }
}

Subscription

To ease subscription com.github.rmannibucau.websocket.stomp.listener.BaseSubscribeListener is provided. Inheriting from it you can define a custom asia.stampy.common.gateway.StampyMessageListener dedicated to SUBSCRIBE/UNSUBSCRIBE commands:

public class SubscribeListener extends BaseSubscribeListener {
    public SubscribeListener(final WebSocketGateway webSocketGateway) {
        super(webSocketGateway);
    }

    @Override
    protected void onSessionClose(final HostPort hostPort) {
        // clean up any state you used
    }

    @Override
    protected void onUnsubscribe(final HostPort hostPort, final UnsubscribeMessage unsubscribe) {
        // TODO: often just calling onSessionClose(hostPort);
    }

    @Override
    protected void onSubscribe(final HostPort hostPort, final SubscribeMessage subscribe) {
        // TODO
    }
}

A more concrete sample is:

public class SubscribeListener extends BaseSubscribeListener {
    private final AtomicLong idGenerator = new AtomicLong(Long.MIN_VALUE);

    // in practise key would be hostport + subscription but fine for a test
    private final Map<HostPort, Sender> threads = new ConcurrentHashMap<>();

    public SubscribeListener(final WebSocketGateway webSocketGateway) {
        super(webSocketGateway);
    }

    @Override
    protected void onSessionClose(final HostPort hostPort) {
        final Sender sender = threads.remove(hostPort);
        if (sender != null) {
            sender.running.set(false);
            try {
                sender.join();
            } catch (final InterruptedException e) {
                Thread.interrupted();
            }
        }
    }

    @Override
    protected void onUnsubscribe(final HostPort hostPort, final UnsubscribeMessage unsubscribe) {
        onSessionClose(hostPort);
    }

    @Override // while not unsubscribed send message
    protected void onSubscribe(final HostPort hostPort, final SubscribeMessage subscribe) {
        final Sender sender = new Sender(subscribe.getHeader(), hostPort);
        threads.put(hostPort, sender);
        sender.start();
    }

    private class Sender extends Thread {
        private final AtomicBoolean running = new AtomicBoolean(true);
        private final SubscribeHeader headers;
        private final HostPort hostPort;

        private Sender(final SubscribeHeader headers, final HostPort hostPort) {
            this.headers = headers;
            this.hostPort = hostPort;
        }

        @Override
        public void run() {
            int i = 0;
            while (running.get()) {
                try {
                    final String msg = "{ \"message\":\"test " + i++ + "\"}";
                    final MessageMessage pushMessage = new MessageMessage(headers.getDestination(), Long.toString(idGenerator.incrementAndGet()), headers.getId());
                    pushMessage.setBody(msg);
                    pushMessage.getHeader().setContentLength(msg.length());
                    pushMessage.getHeader().setContentType("text/json"); // application/json but if we want it we need to subsclass MessageMessage
                    gateway.sendMessage(pushMessage, hostPort);
                    if (i > 5) { // don't use CPU that much
                        try {
                            sleep(100);
                        } catch (final InterruptedException e) {
                            Thread.interrupted();
                        }
                    }
                } catch (final InterceptException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Client

Not yet done

Tip: here is a subscription sample (test in the project actually)

@ClientEndpoint(subprotocols = "stomp")
public class Client {
    private final Semaphore latch;

    public Client(final Semaphore latch) {
        this.latch = latch;
    }

    @OnMessage
    public void onMessage(final String msg) {
        latch.release();
    }

    @OnClose
    public void close() {
        latch.release();
    }
}

// Usage:
final WebSocketContainer container = ContainerProvider.getWebSocketContainer();

final Semaphore latch = new Semaphore(0);
final Session session = container.connectToServer(
        new Client(latch),
        new URI("ws", null, context.getHost(), context.getPort(), context.getPath() + "touch", null, null));

final RemoteEndpoint.Basic remote = session.getBasicRemote();

remote.sendText(new ConnectMessage("1.2", context.getHost()).toStompMessage(false));

final String id = UUID.randomUUID().toString();
remote.sendText(new SubscribeMessage("/endpoint", id).toStompMessage(false));
assertTrue(latch.tryAcquire(1 + 10, 1, TimeUnit.MINUTES));  // CONNECTED + n MESSAGEs
remote.sendText(new UnsubscribeMessage(id).toStompMessage(false));

final DisconnectMessage disconnectMessage = new DisconnectMessage();
disconnectMessage.getHeader().setReceipt("end");
remote.sendText(disconnectMessage.toStompMessage(false));
assertTrue(latch.tryAcquire(1, 1, TimeUnit.MINUTES));  // RECEIPT

session.close(new CloseReason(GOING_AWAY, "bye"));
assertTrue(latch.tryAcquire(1, 1, TimeUnit.MINUTES));  // server closes the connection