Websocket JSR 356 API Chat ROOM using latest spec changes, client API, improved session handling, and more
From Resin 4.0 Wiki
Did some cleanup. Moved more websocket stuff into ChatClientPeerHandler. Now doing better session management. Wrote a test client that sent around 400,000 messages over 500 connections. It seems like our implementation is fast. :)
[Websocket JSR 356 API Chat Room using latest spec, added performance testing and a Java client]
Contents |
Server Side
ChatClientPeerHandler
package com.example.server; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.websocket.Session; import javax.websocket.WebSocketClose; import javax.websocket.WebSocketEndpoint; import javax.websocket.WebSocketMessage; import javax.websocket.WebSocketOpen; import com.example.concurrent.ThreadedQueue; @WebSocketEndpoint("/chat") public class ChatClientPeerHandler implements Closeable { public ChatClientPeerHandler() { } @Inject @ApplicationScoped ChatRoom chatRoom; @Inject Executor executor; /** Name of the client. */ String clientId; /** Session to send messages to the browser client. */ Session session; /** * Queue that ChatClientHandler are monitoring. When a message comes in on * this queue, ChatClientHandler sends it. */ BlockingQueue<String> writeQueue; ChatRoom room; /** Flag to indicate whether we should close or not. */ volatile boolean close = true; @WebSocketOpen public void onOpen(Session session) { System.out.println("@WebSocketOpen " + session); this.session = session; init(); } @WebSocketClose public void onClose() throws IOException { System.out.println("@WebSocketClose " + session); this.close(); } @WebSocketMessage public void onMessage(String message) { String[] strings = message.split("::::"); String clientId = strings[1]; if (this.clientId!=null) { if (!this.clientId.equals(clientId)) { //This should never happen, but it proves our threads/client handling is correct. throw new IllegalStateException("client id " + this.clientId + " does not match " + clientId); } } if (message.startsWith("remove client::::")) { chatRoom.sendMessage(clientId, clientId + " has become bored and left"); chatRoom.removeClient(clientId); } else if (message.startsWith("send message::::")) { String m = strings[2]; chatRoom.sendMessage(clientId, m); } else if (message.startsWith("add client::::")) { this.clientId = clientId; chatRoom.addClient(clientId, this); chatRoom.sendMessage(clientId, clientId + " has join the chat room"); } else { System.err.println("ACK... Don't understand your message!!!!! " + message); } } private void init() { close = false; if (session == null) { throw new IllegalStateException("session is null"); } writeQueue = new ThreadedQueue<String>(new ArrayBlockingQueue<String>(2000), new Runnable() { @Override public void run() { ChatClientPeerHandler.this.run(); } }, executor); } /** Chat room calls this method to send a message. */ public void sendMessage(String sendMessage) { if (close) { return; } System.out.println("" + this + "sendMessage(" + sendMessage); try { /* We could do some error handling here * We could try to writeQueue.offer() or writeQueue.add() and if the queue is full, kill this client. * We could assume if you are backed up 2000 messages, that you are not active for example * This would be a biz logic decision and since this is a sample app... */ this.writeQueue.put(sendMessage); } catch (InterruptedException e) { //this should never happen } } private void run() { String message; /* Get messages, send a message to client if not null, if message is null break. */ while ((message = writeQueue.poll()) != null) { try { if (close) { break; } session.getRemote().sendString(message); } catch (IOException e) { e.printStackTrace(); try { this.close(); } catch (IOException e1) { //ignore so it does not mask original exception //once you install loggers, make this an info and the other an error. } break; } } } public String getName() { return clientId; } @Override public void close() throws IOException { close = true; if (session != null) { session.close(); session=null; } room.removeClient(this.clientId); } @Override public String toString() { return "ChatClientPeerHandler [clientId=" + clientId + "]"; } }
ChatRoom
package com.example.server; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.ejb.Startup; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import com.example.concurrent.ThreadedQueue; @ApplicationScoped @Startup public class ChatRoom { Map<String, ChatClientPeerHandler> chatClients = new HashMap<String, ChatClientPeerHandler>(); BlockingQueue<ChatMessage> readQueue ; @Inject Executor executor; public void addClient(String clientId, ChatClientPeerHandler session) { offerMessage(new ChatMessage(session, clientId)); } public void sendMessage(String clientId, String message) { offerMessage(new ChatMessage(clientId, message)); } public void removeClient(String clientId) { offerMessage(new ChatMessage(clientId)); } @PostConstruct void init() throws Exception { System.out.println("POST CONSTRUCT CALLED"); readQueue = new ThreadedQueue<ChatMessage>(new ArrayBlockingQueue<ChatMessage>(2000), new Runnable() { public void run() { ChatRoom.this.run(); }}, executor); } private void run() { ChatMessage message; while ((message = readQueue.poll()) != null) { dispatchMessage(message); } } private void launchNewClient(ChatMessage message) { System.out.println("launchNewClient::::" + message.getClientId()); chatClients.put(message.getSession().getName(), message.getSession()); } private void dispatchMessage(ChatMessage message) { System.out.println("Dispatch Message::" + message); switch(message.getType()) { case ADD_CLIENT: launchNewClient(message); break; case REMOVE_CLIENT: doRemoveClient(message.getClientId()); break; case SEND_MESSAGE: doSendMessage(message.getClientId(), message.getMessage()); break; default: System.err.println("ACK... Don't understand your message!!!!! " + message); } } private void doSendMessage(String client, String message) { String sendMessage = String.format("%s : %s", client, message); System.out.printf("SendMessage::Sending message %s\n", sendMessage); Iterator<ChatClientPeerHandler> iterator = chatClients.values().iterator(); while (iterator.hasNext()) { ChatClientPeerHandler chatClientHandler = iterator.next(); //prevents sending messages to yourself if (client.equals(chatClientHandler.getName())){ continue; } System.out.printf("sendMessage::Sending message %s to %s\n", sendMessage, chatClientHandler.getName()); chatClientHandler.sendMessage(sendMessage); } } private void doRemoveClient(String client) { System.out.println("removeClient::::[" + client + "]::::"); ChatClientPeerHandler chatClientHandler = chatClients.get(client); if (chatClientHandler != null) { System.out.println("removeClient:::: found " + client + " to remove."); chatClients.remove(client); } } static enum MessageType{REMOVE_CLIENT, SEND_MESSAGE, ADD_CLIENT}; static class ChatMessage { String message; ChatClientPeerHandler session; MessageType type; String clientId; public MessageType getType() { return type; } public String getClientId() { return clientId; } public ChatMessage(String clientId) { this.clientId = clientId; this.type=MessageType.REMOVE_CLIENT; } public ChatMessage(ChatClientPeerHandler session, String clientId) { this.session = session; this.clientId = clientId; this.type=MessageType.ADD_CLIENT; } public ChatMessage(String clientId, String message) { this.clientId = clientId; this.message = message; this.type=MessageType.SEND_MESSAGE; } public final String getMessage() { return message; } public final ChatClientPeerHandler getSession() { return session; } @Override public String toString() { return "ChatMessage [message=" + message + ", session=" + session + ", type=" + type + ", clientId=" + clientId + "]"; } } private void offerMessage(ChatMessage message) { try { readQueue.offer(message, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } e.printStackTrace(); } } }
Fixed timing issue in ThreadedQueue.
ThreadedQueue
package com.example.concurrent; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * A Queue that wakes up a runnable every time something is added to it. * This is a poor man's Actor. The thread is tied to the queue. * If you put an item in and there is not a thread, one will be launched. */ public class ThreadedQueue<T> implements BlockingQueue<T> { BlockingQueue<T> queue; Executor executor; Runnable queueProcessor; AtomicBoolean active = new AtomicBoolean(); public ThreadedQueue(BlockingQueue<T> queue, Runnable queueProcessor, Executor executor) { this.queue = queue; this.queueProcessor = queueProcessor; this.executor = executor; } @Override public boolean isEmpty() { return queue.isEmpty(); } @Override public int size() { return queue.size(); } @Override public int remainingCapacity() { return queue.remainingCapacity(); } @Override public boolean contains(Object o) { return queue.contains(o); } @Override public boolean containsAll(Collection<?> c) { return queue.containsAll(c); } @Override public boolean offer(T item) { boolean result = queue.offer(item); launchRunnable(); return result; } @Override public boolean offer(T item, long timeout, TimeUnit unit) throws InterruptedException { boolean result = queue.offer(item); launchRunnable(); if (result) { return result; } else { result = queue.offer(item, timeout, unit); launchRunnable(); } return result; } @Override public boolean add(T item) { boolean result = queue.add(item); launchRunnable(); return result; } @Override public void put(T item) throws InterruptedException { /* Offer it if you can, this is non-blocking. */ if (queue.offer(item)) { launchRunnable(); return; } /* If you were unable to offer it, then go ahead and block. */ queue.put(item); launchRunnable(); } @Override public boolean addAll(Collection<? extends T> c) { boolean result = queue.addAll(c); launchRunnable(); return result; } @Override public T peek() { return queue.peek(); } @Override public T poll() { return queue.poll(); } @Override public T poll(long timeout, TimeUnit unit) throws InterruptedException { return queue.poll(timeout, unit); } @Override public T element() { return queue.element(); } @Override public T take() throws InterruptedException { return queue.take(); } @Override public T remove() { return queue.remove(); } @Override public Iterator<T> iterator() { return queue.iterator(); } @Override public Object[] toArray() { return queue.toArray(); } @Override public <X> X[] toArray(X[] array) { return queue.toArray(array); } @Override public boolean removeAll(Collection<?> c) { return queue.removeAll(c); } @Override public boolean retainAll(Collection<?> c) { return queue.retainAll(c); } @Override public void clear() { queue.clear(); } @Override public boolean remove(Object o) { return queue.remove(o); } @Override public int drainTo(Collection<? super T> c) { return queue.drainTo(c); } @Override public int drainTo(Collection<? super T> c, int maxElements) { return queue.drainTo(c, maxElements); } public void launchRunnable() { if (queue.isEmpty()) { return; } if (active.compareAndSet(false, true)) { executor.execute( new Runnable() { @Override public void run() { try { queueProcessor.run(); } finally { active.set(false); launchRunnable();//just in case someone was trying to get in this block before we set the flag. } } }); } } }
Client Side Java
Java Websocket client
package com.example.client; import java.io.IOException; import javax.websocket.ClientContainer; import javax.websocket.ContainerProvider; import javax.websocket.Session; import javax.websocket.WebSocketClose; import javax.websocket.WebSocketEndpoint; import javax.websocket.WebSocketMessage; import javax.websocket.WebSocketOpen; @WebSocketEndpoint("") public class TestClient { /** Session to send messages to the server. */ Session session; @WebSocketOpen public void onOpen(Session session) { this.session = session; try { session.getRemote().sendString("add client::::" + this.hashCode()); session.getRemote().sendString("send message::::" + this.hashCode() + "::::hello from HELLO HELLO HELLO " + this.hashCode()); } catch (IOException e) { e.printStackTrace(); } } @WebSocketClose public void onClose() throws IOException { if (session!=null) { session.close(); } session=null; } @WebSocketMessage public void onMessage(String message) { System.out.println(this.hashCode() + " got message " + message); } public static void main (String [] args) throws InterruptedException { ClientContainer container = ContainerProvider.getClientContainer(); for (int index = 0; index < 500; index++) { container.connectToServer(new TestClient(), "ws://localhost:8080/chat/chat"); } Thread.sleep(100000); } }
Client Side JavaScript
JavaScript Websocket client
<html> <head> <title>Chat Client</title> <link rel="stylesheet" type="text/css" href="css/style.css" media="screen" /> <script type="text/javascript" src="scripts/jquery-1.7.1.js"></script> <script type="text/javascript"> var ENTER_KEY = '13'; var TOKEN_DELIM = "::::"; function buildWebSocketURL() { var url = document.URL; var parts = url.split('/'); var scheme = parts[0]; var hostPort = parts[2]; var wssScheme = null; if (scheme=="http:") { wssScheme="ws:"; } else if (scheme=="https:") { wssScheme="wss:"; } wssUrl = wssScheme + "//" + hostPort + "/chat/chat"; return wssUrl; } var chatUserSession = { version : "1.0", webSocketProtocol : "caucho-example-chat-protocol", webSocketURL : buildWebSocketURL(), webSocket : null,//WebSocket userName : null }; function chat_sendMessage(message) { $("#chatHistory").prepend("<p style='color:green'> ME : " + message + "</p>"); chatUserSession.webSocket.send("send message" + TOKEN_DELIM + chatUserSession.userName + TOKEN_DELIM + message); } function chat_joinChat() { chatUserSession.webSocket.send("add client" + TOKEN_DELIM + chatUserSession.userName); } function chat_leaveChat() { chatUserSession.status(chatUserSession.userName + " is leaving chat"); chatUserSession.webSocket.send("remove client" + TOKEN_DELIM + chatUserSession.userName); } function chat_openWebSocket() { chatUserSession.webSocket = new WebSocket(chatUserSession.webSocketURL, chatUserSession.webSocketProtocol); var socket = chatUserSession.webSocket; socket.onmessage = function(msg) { chatUserSession.onMessage(msg); } socket.onerror = function(errorEvent) { chatUserSession.onError(errorEvent); } socket.onopen = function() { chatUserSession.onOpen(); } socket.onclose = function(closeEvent) { chatUserSession.onClose(closeEvent); } } function chat_onMessage(msgEvent) { chatUserSession.status("New Message :" + msgEvent.data); $("#chatHistory").prepend("<p style='color:blue'>" + msgEvent.data + "</p>"); } function chat_Login() { chatUserSession.userName = $("#userName").val(); $("#loginDiv").hide(500); $('#header').text( "Chat Client (logging in...) : " + chatUserSession.userName); chatUserSession.status(chatUserSession.userName + " is logging in..."); chatUserSession.open(); } function chat_onOpen() { chatUserSession.joinChat(); chatUserSession.status("Chat Client (logged in) : " + chatUserSession.userName); $('#header').text( "Chat Client (logged in...) : " + chatUserSession.userName); $("#inputArea").show(500); $("#statusBar").show(500); $("#chatInput").focus(); } function chat_Status(message) { $('#statusBarPara1').text(message); $("#statusBar").show(500); } function chat_onClose(closeEvent) { $("#loginDiv").show(500); $('#header').text( "Chat Client (not connected) : " + chatUserSession.userName); $('#statusBarPara1').text(chatUserSession.userName + " not logged in. " + ":: Reason: " + closeEvent.reason + " Code: " + closeEvent.code); $("#inputArea").hide(500); $("#statusBar").show(500); $("#userName").val(chatUserSession.userName); $("#userName").focus(); } function chat_onError(msg) { $('#statusBarPara1').text(" Websocket error :" + JSON.stringfy(msg)); $("#statusBar").show(500); } chatUserSession.open = chat_openWebSocket; chatUserSession.onMessage = chat_onMessage; chatUserSession.onOpen = chat_onOpen; chatUserSession.login = chat_Login; chatUserSession.onClose = chat_onClose; chatUserSession.onError = chat_onError; chatUserSession.joinChat = chat_joinChat; chatUserSession.sendMessage = chat_sendMessage; chatUserSession.leaveChat = chat_leaveChat; chatUserSession.status = chat_Status; $(document).ready(function() { $("#inputArea").hide(); $("#userName").focus(); $("#statusBar").click(function() { $("#statusBar").hide(300); }); $("#chatInput").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { var textMessage = $("#chatInput").val(); if (textMessage=="bye!") { chatUserSession.leaveChat(); } else { $("#chatInput").val(""); $("#hint").hide(500); chatUserSession.sendMessage(textMessage); } } event.stopPropagation(); }); $("#login").click(function(event) { chatUserSession.login(); event.stopPropagation(); }); $("#userName").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { chatUserSession.login() event.stopPropagation(); } }); }); </script> </head> <body> <h1 id="header">Chat Client 8</h1> <div id="statusBar"> <p id="statusBarPara1">Welcome to Chat App, Click to hide</p> </div> <div id="loginDiv"> User name <input id="userName" type="text" /> <input id="login" type="submit" value="Login" /> </div> <div id="inputArea"> <p id="hint">Type your message here and then hit return (entering in 'bye!' logs out)</p> <input id="chatInput" type="text" value="" /> </div> <div id="chatHistoryDiv"> <p id="chatHistory"></p> </div> </body> </html>
Java Client that measures time it takes to send a message to N clients
Command line arguments: address ws://localhost:8080/chat/chat numClients 900 message hello
package com.example.client; import java.io.IOException; import java.util.HashSet; import java.util.Set; import javax.websocket.ClientContainer; import javax.websocket.ContainerProvider; import javax.websocket.Session; import javax.websocket.WebSocketClose; import javax.websocket.WebSocketEndpoint; import javax.websocket.WebSocketMessage; import javax.websocket.WebSocketOpen; @WebSocketEndpoint("") public class TestClient { static int numClients; volatile static int actualCount; volatile static int messageCount; volatile static long timeTestStarted; volatile static long timeTestEnded; static String address; static String message; static String[] COMMANDS = {"numClients", "nc", "address", "a", "message", "m", "help", "--help", "h", "-h"}; static Set<String> commands = new HashSet<String>(COMMANDS.length); static { for (String command : COMMANDS) { commands.add(command); } } /** Session to send messages to the server. */ Session session; @WebSocketOpen public void onOpen(Session session) { this.session = session; actualCount++; try { session.getRemote().sendString("add client::::" + this.hashCode()); } catch (IOException e) { e.printStackTrace(); } } @WebSocketClose public void onClose() throws IOException { if (session!=null) { session.close(); } session=null; } @WebSocketMessage public void onMessage(String message) { synchronized (TestClient.class) { if (message.contains("has join the chat room")){ return; } messageCount++; if (messageCount==numClients) { timeTestEnded = System.currentTimeMillis(); } } } public static void main (String [] args) throws InterruptedException, IOException { boolean wasCommand=false; String command = null; actualCount = 0; messageCount = 0; timeTestStarted = 0; timeTestEnded = 0; if (args.length==0) { printHelp(); } for (int index=0; index < args.length; index++) { String value = args[index]; if (commands.contains(value)) { command = value; wasCommand = true; continue; } if (wasCommand) { wasCommand=false; if ("numClients".equals(command) || "nc".equals(command)) { numClients = Integer.parseInt(value); } else if ("address".equals(command) || "a".equals(command)) { address=value; } else if ("message".equals(command) || "m".equals(command)) { message=value; } else if ("help".equals(command) ||"--help".equals(command) || "h".equals(command) || "-h".equals(command)) { printHelp(); } } } ClientContainer container = ContainerProvider.getClientContainer(); for (int index = 0; index < numClients; index++) { container.connectToServer(new TestClient(), address); } Thread.sleep(10000); //wait for all of the clients to connect while (actualCount < numClients) { Thread.sleep(10); System.out.print("."); } System.out.println("\nAll clients LAUNCHED!"); TestClient client = new TestClient(); container.connectToServer(client, address); Thread.sleep(1000); if (message!=null) { messageCount = 0; Thread.sleep(1000); timeTestStarted = System.currentTimeMillis(); client.session.getRemote().sendString("send message::::" + client.hashCode() + ":::: " + message); System.out.println("Message just sent "); Thread.sleep(1000); int timeout=10; int retry=0; while(messageCount < numClients) { retry++; Thread.sleep(1000); System.out.printf("messageCount = %s, numClients=%s \n", messageCount, numClients); if (retry>timeout) { break; } } System.out.println("Messages recieved " + messageCount); System.out.println("Time it took in miliseconds " + (timeTestEnded - timeTestStarted)); } } private static void printHelp() { System.out.println("available commands"); for (String command : COMMANDS) { System.out.println(command); } System.out.println("Example usage ./tc.sh address ws://localhost:8080/chat/chat numClients 5 message hello"); System.out.println("The above would connect five clients and send the message hello once to the chat server"); } }