Websocket JSR 356 API Chat ROOM using latest spec changes, client API, improved session handling, and more
From Resin 4.0 Wiki
(Difference between revisions)
Line 1: | Line 1: | ||
Did some cleanup. Moved more websocket stuff into '''ChatClientPeerHandler'''. Now doing better session management. Wrote a test client that sent around 400,000 messages over 500 connections. It seems like our implementation is fast. :) | Did some cleanup. Moved more websocket stuff into '''ChatClientPeerHandler'''. Now doing better session management. Wrote a test client that sent around 400,000 messages over 500 connections. It seems like our implementation is fast. :) | ||
+ | |||
+ | [[Websocket JSR 356 API Chat Room using latest spec, added performance testing and a Java client]] | ||
+ | |||
Latest revision as of 00:00, 25 November 2012
Did some cleanup. Moved more websocket stuff into ChatClientPeerHandler. Now doing better session management. Wrote a test client that sent around 400,000 messages over 500 connections. It seems like our implementation is fast. :)
Websocket JSR 356 API Chat Room using latest spec, added performance testing and a Java client
Contents |
[edit] Server Side
ChatClientPeerHandler
package com.example.server; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.websocket.Session; import javax.websocket.WebSocketClose; import javax.websocket.WebSocketEndpoint; import javax.websocket.WebSocketMessage; import javax.websocket.WebSocketOpen; import com.example.concurrent.ThreadedQueue; @WebSocketEndpoint("/chat") public class ChatClientPeerHandler implements Closeable { public ChatClientPeerHandler() { } @Inject @ApplicationScoped ChatRoom chatRoom; @Inject Executor executor; /** Name of the client. */ String clientId; /** Session to send messages to the browser client. */ Session session; /** * Queue that ChatClientHandler are monitoring. When a message comes in on * this queue, ChatClientHandler sends it. */ BlockingQueue<String> writeQueue; ChatRoom room; /** Flag to indicate whether we should close or not. */ volatile boolean close = true; @WebSocketOpen public void onOpen(Session session) { System.out.println("@WebSocketOpen " + session); this.session = session; init(); } @WebSocketClose public void onClose() throws IOException { System.out.println("@WebSocketClose " + session); this.close(); } @WebSocketMessage public void onMessage(String message) { String[] strings = message.split("::::"); String clientId = strings[1]; if (this.clientId!=null) { if (!this.clientId.equals(clientId)) { //This should never happen, but it proves our threads/client handling is correct. throw new IllegalStateException("client id " + this.clientId + " does not match " + clientId); } } if (message.startsWith("remove client::::")) { chatRoom.sendMessage(clientId, clientId + " has become bored and left"); chatRoom.removeClient(clientId); } else if (message.startsWith("send message::::")) { String m = strings[2]; chatRoom.sendMessage(clientId, m); } else if (message.startsWith("add client::::")) { this.clientId = clientId; chatRoom.addClient(clientId, this); chatRoom.sendMessage(clientId, clientId + " has join the chat room"); } else { System.err.println("ACK... Don't understand your message!!!!! " + message); } } private void init() { close = false; if (session == null) { throw new IllegalStateException("session is null"); } writeQueue = new ThreadedQueue<String>(new ArrayBlockingQueue<String>(2000), new Runnable() { @Override public void run() { ChatClientPeerHandler.this.run(); } }, executor); } /** Chat room calls this method to send a message. */ public void sendMessage(String sendMessage) { if (close) { return; } System.out.println("" + this + "sendMessage(" + sendMessage); try { /* We could do some error handling here * We could try to writeQueue.offer() or writeQueue.add() and if the queue is full, kill this client. * We could assume if you are backed up 2000 messages, that you are not active for example * This would be a biz logic decision and since this is a sample app... */ this.writeQueue.put(sendMessage); } catch (InterruptedException e) { //this should never happen } } private void run() { String message; /* Get messages, send a message to client if not null, if message is null break. */ while ((message = writeQueue.poll()) != null) { try { if (close) { break; } session.getRemote().sendString(message); } catch (IOException e) { e.printStackTrace(); try { this.close(); } catch (IOException e1) { //ignore so it does not mask original exception //once you install loggers, make this an info and the other an error. } break; } } } public String getName() { return clientId; } @Override public void close() throws IOException { close = true; if (session != null) { session.close(); session=null; } room.removeClient(this.clientId); } @Override public String toString() { return "ChatClientPeerHandler [clientId=" + clientId + "]"; } }
ChatRoom
package com.example.server; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.ejb.Startup; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import com.example.concurrent.ThreadedQueue; @ApplicationScoped @Startup public class ChatRoom { Map<String, ChatClientPeerHandler> chatClients = new HashMap<String, ChatClientPeerHandler>(); BlockingQueue<ChatMessage> readQueue ; @Inject Executor executor; public void addClient(String clientId, ChatClientPeerHandler session) { offerMessage(new ChatMessage(session, clientId)); } public void sendMessage(String clientId, String message) { offerMessage(new ChatMessage(clientId, message)); } public void removeClient(String clientId) { offerMessage(new ChatMessage(clientId)); } @PostConstruct void init() throws Exception { System.out.println("POST CONSTRUCT CALLED"); readQueue = new ThreadedQueue<ChatMessage>(new ArrayBlockingQueue<ChatMessage>(2000), new Runnable() { public void run() { ChatRoom.this.run(); }}, executor); } private void run() { ChatMessage message; while ((message = readQueue.poll()) != null) { dispatchMessage(message); } } private void launchNewClient(ChatMessage message) { System.out.println("launchNewClient::::" + message.getClientId()); chatClients.put(message.getSession().getName(), message.getSession()); } private void dispatchMessage(ChatMessage message) { System.out.println("Dispatch Message::" + message); switch(message.getType()) { case ADD_CLIENT: launchNewClient(message); break; case REMOVE_CLIENT: doRemoveClient(message.getClientId()); break; case SEND_MESSAGE: doSendMessage(message.getClientId(), message.getMessage()); break; default: System.err.println("ACK... Don't understand your message!!!!! " + message); } } private void doSendMessage(String client, String message) { String sendMessage = String.format("%s : %s", client, message); System.out.printf("SendMessage::Sending message %s\n", sendMessage); Iterator<ChatClientPeerHandler> iterator = chatClients.values().iterator(); while (iterator.hasNext()) { ChatClientPeerHandler chatClientHandler = iterator.next(); //prevents sending messages to yourself if (client.equals(chatClientHandler.getName())){ continue; } System.out.printf("sendMessage::Sending message %s to %s\n", sendMessage, chatClientHandler.getName()); chatClientHandler.sendMessage(sendMessage); } } private void doRemoveClient(String client) { System.out.println("removeClient::::[" + client + "]::::"); ChatClientPeerHandler chatClientHandler = chatClients.get(client); if (chatClientHandler != null) { System.out.println("removeClient:::: found " + client + " to remove."); chatClients.remove(client); } } static enum MessageType{REMOVE_CLIENT, SEND_MESSAGE, ADD_CLIENT}; static class ChatMessage { String message; ChatClientPeerHandler session; MessageType type; String clientId; public MessageType getType() { return type; } public String getClientId() { return clientId; } public ChatMessage(String clientId) { this.clientId = clientId; this.type=MessageType.REMOVE_CLIENT; } public ChatMessage(ChatClientPeerHandler session, String clientId) { this.session = session; this.clientId = clientId; this.type=MessageType.ADD_CLIENT; } public ChatMessage(String clientId, String message) { this.clientId = clientId; this.message = message; this.type=MessageType.SEND_MESSAGE; } public final String getMessage() { return message; } public final ChatClientPeerHandler getSession() { return session; } @Override public String toString() { return "ChatMessage [message=" + message + ", session=" + session + ", type=" + type + ", clientId=" + clientId + "]"; } } private void offerMessage(ChatMessage message) { try { readQueue.offer(message, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } e.printStackTrace(); } } }
Fixed timing issue in ThreadedQueue.
ThreadedQueue
package com.example.concurrent; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * A Queue that wakes up a runnable every time something is added to it. * This is a poor man's Actor. The thread is tied to the queue. * If you put an item in and there is not a thread, one will be launched. */ public class ThreadedQueue<T> implements BlockingQueue<T> { BlockingQueue<T> queue; Executor executor; Runnable queueProcessor; AtomicBoolean active = new AtomicBoolean(); public ThreadedQueue(BlockingQueue<T> queue, Runnable queueProcessor, Executor executor) { this.queue = queue; this.queueProcessor = queueProcessor; this.executor = executor; } @Override public boolean isEmpty() { return queue.isEmpty(); } @Override public int size() { return queue.size(); } @Override public int remainingCapacity() { return queue.remainingCapacity(); } @Override public boolean contains(Object o) { return queue.contains(o); } @Override public boolean containsAll(Collection<?> c) { return queue.containsAll(c); } @Override public boolean offer(T item) { boolean result = queue.offer(item); launchRunnable(); return result; } @Override public boolean offer(T item, long timeout, TimeUnit unit) throws InterruptedException { boolean result = queue.offer(item); launchRunnable(); if (result) { return result; } else { result = queue.offer(item, timeout, unit); launchRunnable(); } return result; } @Override public boolean add(T item) { boolean result = queue.add(item); launchRunnable(); return result; } @Override public void put(T item) throws InterruptedException { /* Offer it if you can, this is non-blocking. */ if (queue.offer(item)) { launchRunnable(); return; } /* If you were unable to offer it, then go ahead and block. */ queue.put(item); launchRunnable(); } @Override public boolean addAll(Collection<? extends T> c) { boolean result = queue.addAll(c); launchRunnable(); return result; } @Override public T peek() { return queue.peek(); } @Override public T poll() { return queue.poll(); } @Override public T poll(long timeout, TimeUnit unit) throws InterruptedException { return queue.poll(timeout, unit); } @Override public T element() { return queue.element(); } @Override public T take() throws InterruptedException { return queue.take(); } @Override public T remove() { return queue.remove(); } @Override public Iterator<T> iterator() { return queue.iterator(); } @Override public Object[] toArray() { return queue.toArray(); } @Override public <X> X[] toArray(X[] array) { return queue.toArray(array); } @Override public boolean removeAll(Collection<?> c) { return queue.removeAll(c); } @Override public boolean retainAll(Collection<?> c) { return queue.retainAll(c); } @Override public void clear() { queue.clear(); } @Override public boolean remove(Object o) { return queue.remove(o); } @Override public int drainTo(Collection<? super T> c) { return queue.drainTo(c); } @Override public int drainTo(Collection<? super T> c, int maxElements) { return queue.drainTo(c, maxElements); } public void launchRunnable() { if (queue.isEmpty()) { return; } if (active.compareAndSet(false, true)) { executor.execute( new Runnable() { @Override public void run() { try { queueProcessor.run(); } finally { active.set(false); launchRunnable();//just in case someone was trying to get in this block before we set the flag. } } }); } } }
[edit] Client Side Java
Java Websocket client
package com.example.client; import java.io.IOException; import javax.websocket.ClientContainer; import javax.websocket.ContainerProvider; import javax.websocket.Session; import javax.websocket.WebSocketClose; import javax.websocket.WebSocketEndpoint; import javax.websocket.WebSocketMessage; import javax.websocket.WebSocketOpen; @WebSocketEndpoint("") public class TestClient { /** Session to send messages to the server. */ Session session; @WebSocketOpen public void onOpen(Session session) { this.session = session; try { session.getRemote().sendString("add client::::" + this.hashCode()); session.getRemote().sendString("send message::::" + this.hashCode() + "::::hello from HELLO HELLO HELLO " + this.hashCode()); } catch (IOException e) { e.printStackTrace(); } } @WebSocketClose public void onClose() throws IOException { if (session!=null) { session.close(); } session=null; } @WebSocketMessage public void onMessage(String message) { System.out.println(this.hashCode() + " got message " + message); } public static void main (String [] args) throws InterruptedException { ClientContainer container = ContainerProvider.getClientContainer(); for (int index = 0; index < 500; index++) { container.connectToServer(new TestClient(), "ws://localhost:8080/chat/chat"); } Thread.sleep(100000); } }
[edit] Client Side JavaScript
JavaScript Websocket client
<html> <head> <title>Chat Client</title> <link rel="stylesheet" type="text/css" href="css/style.css" media="screen" /> <script type="text/javascript" src="scripts/jquery-1.7.1.js"></script> <script type="text/javascript"> var ENTER_KEY = '13'; var TOKEN_DELIM = "::::"; function buildWebSocketURL() { var url = document.URL; var parts = url.split('/'); var scheme = parts[0]; var hostPort = parts[2]; var wssScheme = null; if (scheme=="http:") { wssScheme="ws:"; } else if (scheme=="https:") { wssScheme="wss:"; } wssUrl = wssScheme + "//" + hostPort + "/chat/chat"; return wssUrl; } var chatUserSession = { version : "1.0", webSocketProtocol : "caucho-example-chat-protocol", webSocketURL : buildWebSocketURL(), webSocket : null,//WebSocket userName : null }; function chat_sendMessage(message) { $("#chatHistory").prepend("<p style='color:green'> ME : " + message + "</p>"); chatUserSession.webSocket.send("send message" + TOKEN_DELIM + chatUserSession.userName + TOKEN_DELIM + message); } function chat_joinChat() { chatUserSession.webSocket.send("add client" + TOKEN_DELIM + chatUserSession.userName); } function chat_leaveChat() { chatUserSession.status(chatUserSession.userName + " is leaving chat"); chatUserSession.webSocket.send("remove client" + TOKEN_DELIM + chatUserSession.userName); } function chat_openWebSocket() { chatUserSession.webSocket = new WebSocket(chatUserSession.webSocketURL, chatUserSession.webSocketProtocol); var socket = chatUserSession.webSocket; socket.onmessage = function(msg) { chatUserSession.onMessage(msg); } socket.onerror = function(errorEvent) { chatUserSession.onError(errorEvent); } socket.onopen = function() { chatUserSession.onOpen(); } socket.onclose = function(closeEvent) { chatUserSession.onClose(closeEvent); } } function chat_onMessage(msgEvent) { chatUserSession.status("New Message :" + msgEvent.data); $("#chatHistory").prepend("<p style='color:blue'>" + msgEvent.data + "</p>"); } function chat_Login() { chatUserSession.userName = $("#userName").val(); $("#loginDiv").hide(500); $('#header').text( "Chat Client (logging in...) : " + chatUserSession.userName); chatUserSession.status(chatUserSession.userName + " is logging in..."); chatUserSession.open(); } function chat_onOpen() { chatUserSession.joinChat(); chatUserSession.status("Chat Client (logged in) : " + chatUserSession.userName); $('#header').text( "Chat Client (logged in...) : " + chatUserSession.userName); $("#inputArea").show(500); $("#statusBar").show(500); $("#chatInput").focus(); } function chat_Status(message) { $('#statusBarPara1').text(message); $("#statusBar").show(500); } function chat_onClose(closeEvent) { $("#loginDiv").show(500); $('#header').text( "Chat Client (not connected) : " + chatUserSession.userName); $('#statusBarPara1').text(chatUserSession.userName + " not logged in. " + ":: Reason: " + closeEvent.reason + " Code: " + closeEvent.code); $("#inputArea").hide(500); $("#statusBar").show(500); $("#userName").val(chatUserSession.userName); $("#userName").focus(); } function chat_onError(msg) { $('#statusBarPara1').text(" Websocket error :" + JSON.stringfy(msg)); $("#statusBar").show(500); } chatUserSession.open = chat_openWebSocket; chatUserSession.onMessage = chat_onMessage; chatUserSession.onOpen = chat_onOpen; chatUserSession.login = chat_Login; chatUserSession.onClose = chat_onClose; chatUserSession.onError = chat_onError; chatUserSession.joinChat = chat_joinChat; chatUserSession.sendMessage = chat_sendMessage; chatUserSession.leaveChat = chat_leaveChat; chatUserSession.status = chat_Status; $(document).ready(function() { $("#inputArea").hide(); $("#userName").focus(); $("#statusBar").click(function() { $("#statusBar").hide(300); }); $("#chatInput").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { var textMessage = $("#chatInput").val(); if (textMessage=="bye!") { chatUserSession.leaveChat(); } else { $("#chatInput").val(""); $("#hint").hide(500); chatUserSession.sendMessage(textMessage); } } event.stopPropagation(); }); $("#login").click(function(event) { chatUserSession.login(); event.stopPropagation(); }); $("#userName").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { chatUserSession.login() event.stopPropagation(); } }); }); </script> </head> <body> <h1 id="header">Chat Client 8</h1> <div id="statusBar"> <p id="statusBarPara1">Welcome to Chat App, Click to hide</p> </div> <div id="loginDiv"> User name <input id="userName" type="text" /> <input id="login" type="submit" value="Login" /> </div> <div id="inputArea"> <p id="hint">Type your message here and then hit return (entering in 'bye!' logs out)</p> <input id="chatInput" type="text" value="" /> </div> <div id="chatHistoryDiv"> <p id="chatHistory"></p> </div> </body> </html>
[edit] Java Client that measures time it takes to send a message to N clients
Command line arguments: address ws://localhost:8080/chat/chat numClients 900 message hello
package com.example.client; import java.io.IOException; import java.util.HashSet; import java.util.Set; import javax.websocket.ClientContainer; import javax.websocket.ContainerProvider; import javax.websocket.Session; import javax.websocket.WebSocketClose; import javax.websocket.WebSocketEndpoint; import javax.websocket.WebSocketMessage; import javax.websocket.WebSocketOpen; @WebSocketEndpoint("") public class TestClient { static int numClients; volatile static int actualCount; volatile static int messageCount; volatile static long timeTestStarted; volatile static long timeTestEnded; static String address; static String message; static String[] COMMANDS = {"numClients", "nc", "address", "a", "message", "m", "help", "--help", "h", "-h"}; static Set<String> commands = new HashSet<String>(COMMANDS.length); static { for (String command : COMMANDS) { commands.add(command); } } /** Session to send messages to the server. */ Session session; @WebSocketOpen public void onOpen(Session session) { this.session = session; actualCount++; try { session.getRemote().sendString("add client::::" + this.hashCode()); } catch (IOException e) { e.printStackTrace(); } } @WebSocketClose public void onClose() throws IOException { if (session!=null) { session.close(); } session=null; } @WebSocketMessage public void onMessage(String message) { synchronized (TestClient.class) { if (message.contains("has join the chat room")){ return; } messageCount++; if (messageCount==numClients) { timeTestEnded = System.currentTimeMillis(); } } } public static void main (String [] args) throws InterruptedException, IOException { boolean wasCommand=false; String command = null; actualCount = 0; messageCount = 0; timeTestStarted = 0; timeTestEnded = 0; if (args.length==0) { printHelp(); } for (int index=0; index < args.length; index++) { String value = args[index]; if (commands.contains(value)) { command = value; wasCommand = true; continue; } if (wasCommand) { wasCommand=false; if ("numClients".equals(command) || "nc".equals(command)) { numClients = Integer.parseInt(value); } else if ("address".equals(command) || "a".equals(command)) { address=value; } else if ("message".equals(command) || "m".equals(command)) { message=value; } else if ("help".equals(command) ||"--help".equals(command) || "h".equals(command) || "-h".equals(command)) { printHelp(); } } } ClientContainer container = ContainerProvider.getClientContainer(); for (int index = 0; index < numClients; index++) { container.connectToServer(new TestClient(), address); } Thread.sleep(10000); //wait for all of the clients to connect while (actualCount < numClients) { Thread.sleep(10); System.out.print("."); } System.out.println("\nAll clients LAUNCHED!"); TestClient client = new TestClient(); container.connectToServer(client, address); Thread.sleep(1000); if (message!=null) { messageCount = 0; Thread.sleep(1000); timeTestStarted = System.currentTimeMillis(); client.session.getRemote().sendString("send message::::" + client.hashCode() + ":::: " + message); System.out.println("Message just sent "); Thread.sleep(1000); int timeout=10; int retry=0; while(messageCount < numClients) { retry++; Thread.sleep(1000); System.out.printf("messageCount = %s, numClients=%s \n", messageCount, numClients); if (retry>timeout) { break; } } System.out.println("Messages recieved " + messageCount); System.out.println("Time it took in miliseconds " + (timeTestEnded - timeTestStarted)); } } private static void printHelp() { System.out.println("available commands"); for (String command : COMMANDS) { System.out.println(command); } System.out.println("Example usage ./tc.sh address ws://localhost:8080/chat/chat numClients 5 message hello"); System.out.println("The above would connect five clients and send the message hello once to the chat server"); } }