Websocket JSR 356 API Chat ROOM using latest spec changes, client API, improved session handling, and more

From Resin 4.0 Wiki

Revision as of 00:00, 25 November 2012 by Rick (Talk | contribs)
Jump to: navigation, search

Did some cleanup. Moved more websocket stuff into ChatClientPeerHandler. Now doing better session management. Wrote a test client that sent around 400,000 messages over 500 connections. It seems like our implementation is fast. :)

[Websocket JSR 356 API Chat Room using latest spec, added performance testing and a Java client]


Contents


Server Side

ChatClientPeerHandler

package com.example.server;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.websocket.Session;
import javax.websocket.WebSocketClose;
import javax.websocket.WebSocketEndpoint;
import javax.websocket.WebSocketMessage;
import javax.websocket.WebSocketOpen;

import com.example.concurrent.ThreadedQueue;

@WebSocketEndpoint("/chat")
public class ChatClientPeerHandler implements Closeable {
    
    public ChatClientPeerHandler() {
    }

    @Inject @ApplicationScoped 
    ChatRoom chatRoom;
    
    @Inject Executor executor;


    /** Name of the client. */
    String clientId;

    /** Session to send messages to the browser client. */
    Session session;

    /**
     * Queue that ChatClientHandler are monitoring. When a message comes in on
     * this queue, ChatClientHandler sends it.
     */
    BlockingQueue<String> writeQueue;
    
    ChatRoom room;

    /** Flag to indicate whether we should close or not. */
    volatile boolean close = true;

    
    @WebSocketOpen
    public void onOpen(Session session) {
        System.out.println("@WebSocketOpen " + session);
        this.session = session;
        init();
    }
    
    @WebSocketClose
    public void onClose() throws IOException {
        System.out.println("@WebSocketClose " + session);
        this.close();
    }


    @WebSocketMessage
    public void onMessage(String message) {
        
        String[] strings = message.split("::::");
        String clientId = strings[1];
        
        if (this.clientId!=null) {
            if (!this.clientId.equals(clientId)) {
                //This should never happen, but it proves our threads/client handling is correct.
                throw new IllegalStateException("client id " + this.clientId + " does not match " + clientId);
            }
        }
 

        if (message.startsWith("remove client::::")) {
            chatRoom.sendMessage(clientId, clientId
                    + " has become bored and left");
            chatRoom.removeClient(clientId);

        } else if (message.startsWith("send message::::")) {
            String m = strings[2];
            chatRoom.sendMessage(clientId, m);
        } else if (message.startsWith("add client::::")) {
            this.clientId = clientId;
            chatRoom.addClient(clientId, this);
            chatRoom.sendMessage(clientId, clientId
                    + " has join the chat room");

        } else {
            System.err.println("ACK... Don't understand your message!!!!! " + message);
        }
    }
    
    
    private void init() {
        close = false;
        if (session == null) {
            throw new IllegalStateException("session is null");
        }
        writeQueue = new ThreadedQueue<String>(new ArrayBlockingQueue<String>(2000), 
                new Runnable() {                
                    @Override
                    public void run() {
                        ChatClientPeerHandler.this.run();                   
                    }
                },  executor);

    }


    /** Chat room calls this method to send a message. */
    public void sendMessage(String sendMessage) {
        if (close) {
            return;
        }
        
        System.out.println("" + this + "sendMessage(" + sendMessage);
        try {
            /* We could do some error handling here
             * We could try to writeQueue.offer() or writeQueue.add() and if the queue is full, kill this client.
             * We could assume if you are backed up 2000 messages, that you are not active for example
             * This would be a biz logic decision and since this is a sample app... 
             */
            this.writeQueue.put(sendMessage);
        } catch (InterruptedException e) {
            //this should never happen
        }
    }

    private void run() {
        String message;

        /* Get messages, send a message to client if not null, if message is null break. */
        while ((message = writeQueue.poll()) != null) {
            try {
                if (close) {
                    break;
                }
                session.getRemote().sendString(message);
            }  catch (IOException e) {
                e.printStackTrace();
                try {
                    this.close();
                } catch (IOException e1) {
                    //ignore so it does not mask original exception
                    //once you install loggers, make this an info and the other an error.
                }
                break;
            }
        }
    }

    public String getName() {
        return clientId;
    }



    @Override
    public void close() throws IOException {
        close = true;
        if (session != null) {
            session.close();
            session=null;
        }
        room.removeClient(this.clientId);
    }

    @Override
    public String toString() {
        return "ChatClientPeerHandler [clientId=" + clientId + "]";
    }

}

ChatRoom

package com.example.server;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.ejb.Startup;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import com.example.concurrent.ThreadedQueue;

@ApplicationScoped
@Startup
public class ChatRoom {

    Map<String, ChatClientPeerHandler> chatClients = new HashMap<String, ChatClientPeerHandler>();
    BlockingQueue<ChatMessage> readQueue ;
    @Inject Executor executor;

    
    public void addClient(String clientId, ChatClientPeerHandler session) {
        offerMessage(new ChatMessage(session, clientId));        

    }

    public void sendMessage(String clientId, String message) {
        offerMessage(new ChatMessage(clientId, message));        
    }

    
    public void removeClient(String clientId) {
        offerMessage(new ChatMessage(clientId));                
    }
    


    
    @PostConstruct
    void init() throws Exception {
        System.out.println("POST CONSTRUCT CALLED");
        
        readQueue = new ThreadedQueue<ChatMessage>(new ArrayBlockingQueue<ChatMessage>(2000), new Runnable() {
            public void run() {
                ChatRoom.this.run();
            }},  executor);
        
    }


    private void run() {
        ChatMessage message;
        
        while ((message = readQueue.poll()) != null) {
             dispatchMessage(message);
        }

    }

    private void launchNewClient(ChatMessage message) {
        System.out.println("launchNewClient::::" + message.getClientId());
        chatClients.put(message.getSession().getName(), message.getSession());        
    }

    private void dispatchMessage(ChatMessage message) {
        System.out.println("Dispatch Message::" + message);
        
        switch(message.getType()) {
        case ADD_CLIENT:
            launchNewClient(message);
            break;
        case REMOVE_CLIENT:
            doRemoveClient(message.getClientId());
            break;
        case SEND_MESSAGE:
            doSendMessage(message.getClientId(), message.getMessage());
            break;
        default:
             System.err.println("ACK... Don't understand your message!!!!! "
                     + message);
             
        }

    }

    private void doSendMessage(String client, String message) {

        String sendMessage = String.format("%s : %s", client, message);
        System.out.printf("SendMessage::Sending message %s\n", sendMessage);
        Iterator<ChatClientPeerHandler> iterator = chatClients.values().iterator();
        while (iterator.hasNext()) {
            ChatClientPeerHandler chatClientHandler = iterator.next();
            
            //prevents sending messages to yourself
            if (client.equals(chatClientHandler.getName())){
                continue;
            }

            System.out.printf("sendMessage::Sending message %s to %s\n",
                        sendMessage, chatClientHandler.getName());
            chatClientHandler.sendMessage(sendMessage);
        }
    }

    private void doRemoveClient(String client) {

        System.out.println("removeClient::::[" + client + "]::::");

        ChatClientPeerHandler chatClientHandler = chatClients.get(client);

        if (chatClientHandler != null) {

            System.out.println("removeClient:::: found " + client
                    + " to remove.");

            chatClients.remove(client);
        }

    }

    static enum MessageType{REMOVE_CLIENT, SEND_MESSAGE, ADD_CLIENT};
    
    static class ChatMessage {

        String message;
        ChatClientPeerHandler session;
        MessageType type;
        String clientId;
        
     
        public MessageType getType() {
            return type;
        }

        public String getClientId() {
            return clientId;
        }

        public ChatMessage(String clientId) {
            this.clientId = clientId;
            this.type=MessageType.REMOVE_CLIENT;
        }
        
        public ChatMessage(ChatClientPeerHandler session, String clientId) {
            this.session = session;
            this.clientId = clientId;
            this.type=MessageType.ADD_CLIENT;
        }


        public ChatMessage(String clientId, String message) {
            this.clientId = clientId;
            this.message = message;
            this.type=MessageType.SEND_MESSAGE;
        }

        public final String getMessage() {
            return message;
        }
        public final ChatClientPeerHandler getSession() {
            return session;
        }

        @Override
        public String toString() {
            return "ChatMessage [message=" + message + ", session=" + session
                    + ", type=" + type + ", clientId=" + clientId + "]";
        }

    }
    
    private void offerMessage(ChatMessage message) {
        try {
            readQueue.offer(message, 10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            if (Thread.currentThread().isInterrupted()) {
                Thread.interrupted();
            }
            e.printStackTrace();
        }
    }




}


Fixed timing issue in ThreadedQueue.

ThreadedQueue

package com.example.concurrent;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * A Queue that wakes up a runnable every time something is added to it.
 * This is a poor man's Actor. The thread is tied to the queue.
 * If you put an item in and there is not a thread, one will be launched.
 */
public class ThreadedQueue<T> implements BlockingQueue<T> {
    BlockingQueue<T> queue;
    Executor executor;
    Runnable queueProcessor;
    AtomicBoolean active = new AtomicBoolean();

    public ThreadedQueue(BlockingQueue<T> queue, Runnable queueProcessor,
            Executor executor) {
        this.queue = queue;
        this.queueProcessor = queueProcessor;
        this.executor = executor;
    }

    @Override
    public boolean isEmpty() {
        return queue.isEmpty();
    }

    @Override
    public int size() {
        return queue.size();
    }

    @Override
    public int remainingCapacity() {
        return queue.remainingCapacity();
    }

    @Override
    public boolean contains(Object o) {
        return queue.contains(o);
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        return queue.containsAll(c);
    }

    @Override
    public boolean offer(T item) {
        boolean result = queue.offer(item);

        launchRunnable();

        return result;
    }

    @Override
    public boolean offer(T item, long timeout, TimeUnit unit)
            throws InterruptedException {
        boolean result = queue.offer(item);

        launchRunnable();

        if (result) {
            return result;
        } else {
            result = queue.offer(item, timeout, unit);
            launchRunnable();
        }
        
        return result;
    }

    @Override
    public boolean add(T item) {
        boolean result = queue.add(item);

        launchRunnable();

        return result;
    }

    @Override
    public void put(T item) throws InterruptedException {
        
        /* Offer it if you can, this is non-blocking. */
        if (queue.offer(item)) {
            launchRunnable();
            return;
        }



        /* If you were unable to offer it, then go ahead and block. */
        queue.put(item);

        launchRunnable();
    }

    @Override
    public boolean addAll(Collection<? extends T> c) {
        boolean result = queue.addAll(c);

        launchRunnable();

        return result;
    }

    @Override
    public T peek() {
        return queue.peek();
    }

    @Override
    public T poll() {
        return queue.poll();
    }

    @Override
    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
        return queue.poll(timeout, unit);
    }

    @Override
    public T element() {
        return queue.element();
    }

    @Override
    public T take() throws InterruptedException {
        return queue.take();
    }

    @Override
    public T remove() {
        return queue.remove();
    }

    @Override
    public Iterator<T> iterator() {
        return queue.iterator();
    }

    @Override
    public Object[] toArray() {
        return queue.toArray();
    }

    @Override
    public <X> X[] toArray(X[] array) {
        return queue.toArray(array);
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        return queue.removeAll(c);
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        return queue.retainAll(c);
    }

    @Override
    public void clear() {
        queue.clear();
    }

    @Override
    public boolean remove(Object o) {
        return queue.remove(o);
    }

    @Override
    public int drainTo(Collection<? super T> c) {
        return queue.drainTo(c);
    }

    @Override
    public int drainTo(Collection<? super T> c, int maxElements) {
        return queue.drainTo(c, maxElements);
    }

    public void launchRunnable() {
        if (queue.isEmpty()) {
            return;
        }

        if (active.compareAndSet(false, true)) {
            executor.execute(
                    new Runnable() {
                        @Override
                        public void run() {
                            try {
                                queueProcessor.run();
                            } finally {
                              active.set(false);
                              launchRunnable();//just in case someone was trying to get in this block before we set the flag.
                            }
                        }
                    });
        }
    }

 }


Client Side Java

Java Websocket client

package com.example.client;


import java.io.IOException;

import javax.websocket.ClientContainer;
import javax.websocket.ContainerProvider;
import javax.websocket.Session;
import javax.websocket.WebSocketClose;
import javax.websocket.WebSocketEndpoint;
import javax.websocket.WebSocketMessage;
import javax.websocket.WebSocketOpen;


@WebSocketEndpoint("")
public class TestClient {
    /** Session to send messages to the server. */
    Session session;

    @WebSocketOpen
    public void onOpen(Session session) {
        this.session = session;
        try {
            session.getRemote().sendString("add client::::" + this.hashCode());
            session.getRemote().sendString("send message::::" + this.hashCode() + "::::hello from HELLO HELLO HELLO " + this.hashCode());
        } catch (IOException e) {
            e.printStackTrace();
        }
        

    }
    
    @WebSocketClose
    public void onClose() throws IOException {
        if (session!=null) {
            session.close();
        }
        session=null;
    }

    @WebSocketMessage
    public void onMessage(String message) {
        System.out.println(this.hashCode() + " got message " + message);
    }

    
    public static void main (String [] args) throws InterruptedException {
        ClientContainer container = ContainerProvider.getClientContainer();
        for (int index = 0; index < 500; index++) {
            container.connectToServer(new TestClient(),
                "ws://localhost:8080/chat/chat");
        }
        Thread.sleep(100000);
    }

}


Client Side JavaScript

JavaScript Websocket client

<html>

<head>

<title>Chat Client</title>

<link rel="stylesheet" type="text/css" href="css/style.css"
    media="screen" />


<script type="text/javascript" src="scripts/jquery-1.7.1.js"></script>


<script type="text/javascript">
    var ENTER_KEY = '13';
    var TOKEN_DELIM = "::::";
    
    

    
    function buildWebSocketURL() {
        var url = document.URL;
        var parts = url.split('/');
        var scheme = parts[0];
        var hostPort = parts[2];
        var wssScheme = null;
        
        
        if (scheme=="http:") {
            wssScheme="ws:";
        } else if (scheme=="https:") {
            wssScheme="wss:";
        }
        
        wssUrl = wssScheme + "//" + hostPort  + "/chat/chat";
        
        return wssUrl;
        
    }
    

    var chatUserSession = {
                version : "1.0",
                webSocketProtocol : "caucho-example-chat-protocol",
                webSocketURL : buildWebSocketURL(),
                webSocket : null,//WebSocket
                userName : null
    };

    function chat_sendMessage(message) {
        $("#chatHistory").prepend("<p style='color:green'> ME : " + message + "</p>");

        chatUserSession.webSocket.send("send message" + TOKEN_DELIM
                + chatUserSession.userName + TOKEN_DELIM + message);
    }

    function chat_joinChat() {
        chatUserSession.webSocket.send("add client" + TOKEN_DELIM
                + chatUserSession.userName);
    }

    function chat_leaveChat() {
        chatUserSession.status(chatUserSession.userName + " is leaving chat");
        chatUserSession.webSocket.send("remove client" + TOKEN_DELIM
                + chatUserSession.userName);
    }

    function chat_openWebSocket() {
        chatUserSession.webSocket = new WebSocket(chatUserSession.webSocketURL,
                chatUserSession.webSocketProtocol);
        var socket = chatUserSession.webSocket;

        socket.onmessage = function(msg) {
            chatUserSession.onMessage(msg);
        }

        socket.onerror = function(errorEvent) {
            chatUserSession.onError(errorEvent);
        }

        socket.onopen = function() {
            chatUserSession.onOpen();
        }
        socket.onclose = function(closeEvent) {
            chatUserSession.onClose(closeEvent);
        }

    }

    function chat_onMessage(msgEvent) {
        chatUserSession.status("New Message :" + msgEvent.data);

        $("#chatHistory").prepend("<p style='color:blue'>" + msgEvent.data + "</p>");
    }

    function chat_Login() {

        chatUserSession.userName = $("#userName").val();
        $("#loginDiv").hide(500);
        $('#header').text(
                "Chat Client (logging in...) : " + chatUserSession.userName);

        chatUserSession.status(chatUserSession.userName + " is logging in...");
        chatUserSession.open();

    }

    function chat_onOpen() {
        chatUserSession.joinChat();
        chatUserSession.status("Chat Client (logged in) : " + chatUserSession.userName);
          $('#header').text(
                    "Chat Client (logged in...) : " + chatUserSession.userName);

        $("#inputArea").show(500);
        $("#statusBar").show(500);
        $("#chatInput").focus();
    }
    
    function chat_Status(message) {
        $('#statusBarPara1').text(message);
        $("#statusBar").show(500);
        
    }

    function chat_onClose(closeEvent) {
        $("#loginDiv").show(500);
        $('#header').text(
                "Chat Client (not connected) : " + chatUserSession.userName);
        $('#statusBarPara1').text(chatUserSession.userName + " not logged in. " + 
                ":: Reason: " + closeEvent.reason + 
                " Code: " + closeEvent.code);

        $("#inputArea").hide(500);
        $("#statusBar").show(500);

        $("#userName").val(chatUserSession.userName);
        $("#userName").focus();
    }

    function chat_onError(msg) {
        $('#statusBarPara1').text(" Websocket error :" + JSON.stringfy(msg));
        $("#statusBar").show(500);
    }

    chatUserSession.open = chat_openWebSocket;
    chatUserSession.onMessage = chat_onMessage;
    chatUserSession.onOpen = chat_onOpen;
    chatUserSession.login = chat_Login;
    chatUserSession.onClose = chat_onClose;
    chatUserSession.onError = chat_onError;
    chatUserSession.joinChat = chat_joinChat;
    chatUserSession.sendMessage = chat_sendMessage;
    chatUserSession.leaveChat = chat_leaveChat;
    chatUserSession.status = chat_Status;

    $(document).ready(function() {

        $("#inputArea").hide();
        $("#userName").focus();
        
        

        $("#statusBar").click(function() {
            $("#statusBar").hide(300);
        });

        $("#chatInput").keypress(function(event) {

            var keycode = (event.keyCode ? event.keyCode : event.which);
            if (keycode == ENTER_KEY) {
                var textMessage = $("#chatInput").val();

                if (textMessage=="bye!") {
                    chatUserSession.leaveChat();
                } else {
                    $("#chatInput").val("");
                    $("#hint").hide(500);
                    chatUserSession.sendMessage(textMessage);
                }
            }
            event.stopPropagation();
        });

        $("#login").click(function(event) {
            chatUserSession.login();
            event.stopPropagation();
        });

        $("#userName").keypress(function(event) {
            var keycode = (event.keyCode ? event.keyCode : event.which);
            if (keycode == ENTER_KEY) {
                chatUserSession.login()
                event.stopPropagation();
            }
        });
    });
</script>

</head>

<body>

    <h1 id="header">Chat Client 8</h1>


    <div id="statusBar">
        <p id="statusBarPara1">Welcome to Chat App, Click to hide</p>
    </div>

    <div id="loginDiv">
        User name   <input id="userName" type="text" /> <input
            id="login" type="submit" value="Login" />
    </div>


    <div id="inputArea">
        <p id="hint">Type your message here and then hit return (entering in 'bye!' logs out)</p>
        <input id="chatInput" type="text" value="" />
    </div>

    <div id="chatHistoryDiv">
        <p id="chatHistory"></p>
    </div>


</body>

</html>


Java Client that measures time it takes to send a message to N clients

Command line arguments: address ws://localhost:8080/chat/chat numClients 900 message hello
package com.example.client;


import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import javax.websocket.ClientContainer;
import javax.websocket.ContainerProvider;
import javax.websocket.Session;
import javax.websocket.WebSocketClose;
import javax.websocket.WebSocketEndpoint;
import javax.websocket.WebSocketMessage;
import javax.websocket.WebSocketOpen;


@WebSocketEndpoint("")
public class TestClient {
    
    static int numClients;
    volatile static int actualCount;
    volatile static int messageCount;
    volatile static long timeTestStarted;
    volatile static long timeTestEnded;
    
    static String address;
    static String message;
    static String[] COMMANDS = {"numClients", "nc", "address", "a", "message", "m", "help", "--help", "h", "-h"};
    static Set<String> commands = new HashSet<String>(COMMANDS.length);
    static
    {
        for (String command : COMMANDS) {
            commands.add(command);
        }
    }
    
    /** Session to send messages to the server. */
    Session session;

    @WebSocketOpen
    public void onOpen(Session session) {
        this.session = session;
        actualCount++;
        try {
            session.getRemote().sendString("add client::::" + this.hashCode());
        } catch (IOException e) {
            e.printStackTrace();
        }
        

    }
    
    @WebSocketClose
    public void onClose() throws IOException {
        if (session!=null) {
            session.close();
        }
        session=null;
    }

    @WebSocketMessage
    public void onMessage(String message) {
        synchronized (TestClient.class) {            
            if (message.contains("has join the chat room")){
                return;
            }
            messageCount++;
            if (messageCount==numClients) {
                timeTestEnded = System.currentTimeMillis();
            }
        }

    }

    
    public static void main (String [] args) throws InterruptedException, IOException {
        boolean wasCommand=false;
        String command = null;
        
        actualCount = 0;
        messageCount = 0;
        timeTestStarted = 0;
        timeTestEnded = 0;

        
        if (args.length==0) {
            printHelp();            
        }
        
        for (int index=0; index < args.length; index++) {
            String value = args[index];
            if (commands.contains(value)) {
                command = value;
                wasCommand = true;
                continue;
            }
            if (wasCommand) {
                wasCommand=false;
                if ("numClients".equals(command) || "nc".equals(command)) {
                    numClients = Integer.parseInt(value);
                } else if ("address".equals(command) || "a".equals(command)) {
                    address=value;
                } else if ("message".equals(command) || "m".equals(command)) {
                    message=value;
                } else if ("help".equals(command) ||"--help".equals(command) || "h".equals(command) || "-h".equals(command)) {
                    printHelp();
                }
            }
        }
        
        ClientContainer container = ContainerProvider.getClientContainer();
        for (int index = 0; index < numClients; index++) {
            container.connectToServer(new TestClient(), address);
        }
        Thread.sleep(10000);


        //wait for all of the clients to connect
        while (actualCount < numClients) {
            Thread.sleep(10);
            System.out.print(".");
        }
        System.out.println("\nAll clients LAUNCHED!");
               
        TestClient client = new TestClient();
        container.connectToServer(client, address);
        Thread.sleep(1000);

        
        if (message!=null) { 
            messageCount = 0;
            Thread.sleep(1000);

            
            timeTestStarted = System.currentTimeMillis();
            client.session.getRemote().sendString("send message::::" + client.hashCode() + ":::: " + message);
            System.out.println("Message just sent ");
            Thread.sleep(1000);
            
            int timeout=10;
            int retry=0;
            while(messageCount < numClients) {
                retry++;
                Thread.sleep(1000);
                
                System.out.printf("messageCount = %s, numClients=%s \n", messageCount, numClients);
                if (retry>timeout) {
                    break;
                }
            }
            
            System.out.println("Messages recieved " + messageCount);
            System.out.println("Time it took in miliseconds " + (timeTestEnded - timeTestStarted));
        }
        
        
    }

    private static void printHelp() {
        System.out.println("available commands");
        for (String command : COMMANDS) {
            System.out.println(command);
        }
        
        System.out.println("Example usage ./tc.sh address ws://localhost:8080/chat/chat numClients 5 message hello");
        System.out.println("The above would connect five clients and send the message hello once to the chat server");

    }

}

Personal tools
TOOLBOX
LANGUAGES