WebSocket API Chat Room using JSR 356 and poor man's actor model
From Resin 4.0 Wiki
This one should scale to however many FILE handles your OS allows. You should be able to have 50,000 or 100,000 connections.
WebSocket API Chat Room using JSR 356, poor man's actor model and WebSocketEndpoint annotation
ChatClientPeerHandler
package com.example; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import javax.net.websocket.Session; import com.example.concurrent.ThreadedQueue; public class ChatClientPeerHandler implements Runnable, Closeable { /** Name of the client. */ String clientId; /** Session to send messages to the browser client. */ Session session; /** * Queue that ChatClientHandler are monitoring. When a message comes in on * this queue, ChatClientHandler sends it. */ BlockingQueue<String> writeQueue; ChatRoom room; /** Flag to indicate whether we should close or not. */ volatile boolean close; public ChatClientPeerHandler(ChatRoom room, Session session, String name, Executor executor) { this.session = session; this.clientId = name; this.room = room; writeQueue = new ThreadedQueue<String>(new ArrayBlockingQueue<String>(2000), this, executor); } /** Chat room calls this method to send a message. */ public void sendMessage(String sendMessage) { System.out.println("" + this + "sendMessage(" + sendMessage); try { /* We could do some error handling here * We could try to writeQueue.offer() or writeQueue.add() and if the queue is full, kill this client. * We could assume if you are backed up 2000 messages, that you are not active for example * This would be a biz logic decision and since this is a sample app... */ this.writeQueue.put(sendMessage); } catch (InterruptedException e) { //this should never happen } } @Override public void run() { String message; /* Get messages, send a message to client if not null, if message is null break. */ while ((message = writeQueue.poll()) != null) { try { if (close) { break; } session.getRemote().sendString(message); } catch (IOException e) { e.printStackTrace(); break; } } } public String getName() { return clientId; } @Override public void close() throws IOException { close = true; if (session != null) { session.close(); } room.removeClient(this.clientId); } @Override public String toString() { return "ChatClientPeerHandler [clientId=" + clientId + "]"; } }
ChatMessage
package com.example; import javax.net.websocket.Session; public class ChatMessage { static enum MessageType{REMOVE_CLIENT, SEND_MESSAGE, ADD_CLIENT}; String message; Session session; MessageType type; String clientId; public MessageType getType() { return type; } public String getClientId() { return clientId; } public ChatMessage(MessageType type, String clientId) { } public ChatMessage(String strMessage, Session session) { String[] strings = strMessage.split("::::"); clientId = strings[1]; this.session = session; if (strMessage.startsWith("remove client::::")) { type = MessageType.REMOVE_CLIENT; this.session = session; } else if (strMessage.startsWith("send message::::")) { type = MessageType.SEND_MESSAGE; message = strings[2]; } else if (strMessage.startsWith("add client::::")) { type = MessageType.ADD_CLIENT; } else { System.err.println("ACK... Don't understand your message!!!!! " + message); } } public final String getMessage() { return message; } public final Session getSession() { return session; } @Override public String toString() { return "ChatMessage [message=" + message + ", session=" + session + ", type=" + type + ", clientId=" + clientId + "]"; } }
ChatRoom
package com.example; import java.io.IOException; import java.net.URI; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.ejb.Startup; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.net.websocket.ContainerProvider; import javax.net.websocket.DefaultServerConfiguration; import javax.net.websocket.ServerContainer; import javax.net.websocket.ServerEndpointConfiguration; import com.example.ChatMessage.MessageType; import com.example.websocket.ChatServerWebSocketEndpoint; @ApplicationScoped @Startup public class ChatRoom { Map<String, ChatClientPeerHandler> chatClients = new ConcurrentHashMap<String, ChatClientPeerHandler>(); BlockingQueue<ChatMessage> readQueue = new ArrayBlockingQueue<ChatMessage>( 2000); @Inject Executor executor; @Inject ChatServerWebSocketEndpoint endpoint; @PostConstruct void init() throws Exception { System.out.println("POST CONSTRUCT CALLED"); ServerEndpointConfiguration serverConfiguration; serverConfiguration = new DefaultServerConfiguration(new URI("/chat")); ServerContainer serverContainer = ContainerProvider.getServerContainer(); serverContainer.publishServer(endpoint,serverConfiguration); executor.execute(new Runnable() { public void run() { ChatRoom.this.run(); }}); } private void run() { while (true) { try { ChatMessage message = readQueue .poll(500, TimeUnit.MILLISECONDS); if (message != null) { dispatchMessage(message); } } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } } } } private void launchNewClient(ChatMessage message) { System.out.println("launchNewClient::::" + message.getClientId()); ChatClientPeerHandler chatClientHandler = new ChatClientPeerHandler(this, message.getSession(), message.getClientId(), executor); chatClients.put(chatClientHandler.getName(), chatClientHandler); doSendMessage(chatClientHandler.getName(), chatClientHandler.getName() + " has join the chat room"); } private void dispatchMessage(ChatMessage message) { System.out.println("Dispatch Message::" + message); switch(message.getType()) { case ADD_CLIENT: launchNewClient(message); break; case REMOVE_CLIENT: doRemoveClient(message.getClientId()); break; case SEND_MESSAGE: doSendMessage(message.getClientId(), message.getMessage()); break; default: System.err.println("ACK... Don't understand your message!!!!! " + message); } } private void doSendMessage(String client, String message) { String sendMessage = String.format("%s : %s", client, message); System.out.printf("SendMessage::Sending message %s\n", sendMessage); Iterator<ChatClientPeerHandler> iterator = chatClients.values().iterator(); while (iterator.hasNext()) { ChatClientPeerHandler chatClientHandler = iterator.next(); //prevents sending messages to yourself if (client.equals(chatClientHandler.getName())){ continue; } System.out.printf("sendMessage::Sending message %s to %s\n", sendMessage, chatClientHandler.getName()); chatClientHandler.sendMessage(sendMessage); } } private void doRemoveClient(String client) { System.out.println("removeClient::::[" + client + "]::::"); ChatClientPeerHandler chatClientHandler = chatClients.get(client); if (chatClientHandler != null) { System.out.println("removeClient:::: found " + client + " to remove."); doSendMessage(chatClientHandler.getName(), chatClientHandler.getName() + " has become bored with this chat room"); chatClients.remove(client); try { chatClientHandler.close(); } catch (IOException e) { } } } public void sendMessage(ChatMessage message) { try { readQueue.offer(message, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } e.printStackTrace(); } } public void removeClient(String clientId) { ChatMessage message = new ChatMessage(MessageType.REMOVE_CLIENT, clientId); try { readQueue.offer(message, 100, TimeUnit.SECONDS); } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } e.printStackTrace(); } } }
ThreadedQueue AKA poor man's Actor model
package com.example.concurrent; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * A Queue that wakes up a runnable every time something is added to it. * This is a poor man's Actor. The thread is tied to the queue. * If you put an item in and there is not a thread, one will be launched. */ public class ThreadedQueue<T> implements BlockingQueue<T> { BlockingQueue<T> queue; Executor executor; Runnable queueProcessor; AtomicBoolean active = new AtomicBoolean(); public ThreadedQueue(BlockingQueue<T> queue, Runnable queueProcessor, Executor executor) { this.queue = queue; this.queueProcessor = queueProcessor; this.executor = executor; } @Override public boolean isEmpty() { return queue.isEmpty(); } @Override public int size() { return queue.size(); } @Override public int remainingCapacity() { return queue.remainingCapacity(); } @Override public boolean contains(Object o) { return queue.contains(o); } @Override public boolean containsAll(Collection<?> c) { return queue.containsAll(c); } @Override public boolean offer(T item) { boolean result = queue.offer(item); launchRunnable(); return result; } @Override public boolean offer(T item, long timeout, TimeUnit unit) throws InterruptedException { boolean result = queue.offer(item); launchRunnable(); if (result) { return result; } else { result = queue.offer(item, timeout, unit); launchRunnable(); } return result; } @Override public boolean add(T item) { boolean result = queue.add(item); launchRunnable(); return result; } @Override public void put(T item) throws InterruptedException { System.out.println("PUT CALLED " + item); /* Offer it if you can, this is non-blocking. */ if (queue.offer(item)) { launchRunnable(); return; } /* If you were unable to offer it, then go ahead and block. */ queue.put(item); launchRunnable(); } @Override public boolean addAll(Collection<? extends T> c) { boolean result = queue.addAll(c); launchRunnable(); return result; } @Override public T peek() { return queue.peek(); } @Override public T poll() { return queue.poll(); } @Override public T poll(long timeout, TimeUnit unit) throws InterruptedException { return queue.poll(timeout, unit); } @Override public T element() { return queue.element(); } @Override public T take() throws InterruptedException { return queue.take(); } @Override public T remove() { return queue.remove(); } @Override public Iterator<T> iterator() { return queue.iterator(); } @Override public Object[] toArray() { return queue.toArray(); } @Override public <X> X[] toArray(X[] array) { return queue.toArray(array); } @Override public boolean removeAll(Collection<?> c) { return queue.removeAll(c); } @Override public boolean retainAll(Collection<?> c) { return queue.retainAll(c); } @Override public void clear() { queue.clear(); } @Override public boolean remove(Object o) { return queue.remove(o); } @Override public int drainTo(Collection<? super T> c) { return queue.drainTo(c); } @Override public int drainTo(Collection<? super T> c, int maxElements) { return queue.drainTo(c, maxElements); } public void launchRunnable() { System.out.println("LAUNCH RUNNABLE"); if (queue.isEmpty()) { return; } if (active.compareAndSet(false, true)) { executor.execute( new Runnable() { @Override public void run() { try { queueProcessor.run(); } finally { active.set(false); } } }); } } }
ChatServerWebSocketEndpoint
package com.example.websocket; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.net.websocket.Endpoint; import javax.net.websocket.Session; import com.example.ChatRoom; public class ChatServerWebSocketEndpoint extends Endpoint{ @Inject @ApplicationScoped ChatRoom room; @Override public void onOpen(Session session) { session.addMessageHandler(new ChatWebsocketMessageHandler(session, room)); } }
ChatWebsocketMessageHandler
package com.example.websocket; import javax.net.websocket.MessageHandler; import javax.net.websocket.Session; import com.example.ChatMessage; import com.example.ChatRoom; public class ChatWebsocketMessageHandler implements MessageHandler.Text { ChatRoom chatRoom; Session session; public ChatWebsocketMessageHandler(Session session, ChatRoom chatRoom) { this.session = session; this.chatRoom = chatRoom; } @Override public void onMessage(String message) { chatRoom.sendMessage(new ChatMessage(message, session)); } }
<html> <head> <title>Chat Client</title> <link rel="stylesheet" type="text/css" href="css/style.css" media="screen" /> <script type="text/javascript" src="scripts/jquery-1.7.1.js"></script> <script type="text/javascript"> var ENTER_KEY = '13'; var TOKEN_DELIM = "::::"; function buildWebSocketURL() { var url = document.URL; var parts = url.split('/'); var scheme = parts[0]; var hostPort = parts[2]; var wssScheme = null; if (scheme=="http:") { wssScheme="ws:"; } else if (scheme=="https:") { wssScheme="wss:"; } wssUrl = wssScheme + "//" + hostPort + "/chat/chat"; return wssUrl; } var chatUserSession = { version : "1.0", webSocketProtocol : "caucho-example-chat-protocol", webSocketURL : buildWebSocketURL(), webSocket : null,//WebSocket userName : null }; function chat_sendMessage(message) { $("#chatHistory").prepend("<p style='color:green'> ME : " + message + "</p>"); chatUserSession.webSocket.send("send message" + TOKEN_DELIM + chatUserSession.userName + TOKEN_DELIM + message); } function chat_joinChat() { chatUserSession.webSocket.send("add client" + TOKEN_DELIM + chatUserSession.userName); } function chat_leaveChat() { chatUserSession.status(chatUserSession.userName + " is leaving chat"); chatUserSession.webSocket.send("remove client" + TOKEN_DELIM + chatUserSession.userName); } function chat_openWebSocket() { chatUserSession.webSocket = new WebSocket(chatUserSession.webSocketURL, chatUserSession.webSocketProtocol); var socket = chatUserSession.webSocket; socket.onmessage = function(msg) { chatUserSession.onMessage(msg); } socket.onerror = function(errorEvent) { chatUserSession.onError(errorEvent); } socket.onopen = function() { chatUserSession.onOpen(); } socket.onclose = function(closeEvent) { chatUserSession.onClose(closeEvent); } } function chat_onMessage(msgEvent) { chatUserSession.status("New Message :" + msgEvent.data); $("#chatHistory").prepend("<p style='color:blue'>" + msgEvent.data + "</p>"); } function chat_Login() { chatUserSession.userName = $("#userName").val(); $("#loginDiv").hide(500); $('#header').text( "Chat Client (logging in...) : " + chatUserSession.userName); chatUserSession.status(chatUserSession.userName + " is logging in..."); chatUserSession.open(); } function chat_onOpen() { chatUserSession.joinChat(); chatUserSession.status("Chat Client (logged in) : " + chatUserSession.userName); $('#header').text( "Chat Client (logged in...) : " + chatUserSession.userName); $("#inputArea").show(500); $("#statusBar").show(500); $("#chatInput").focus(); } function chat_Status(message) { $('#statusBarPara1').text(message); $("#statusBar").show(500); } function chat_onClose(closeEvent) { $("#loginDiv").show(500); $('#header').text( "Chat Client (not connected) : " + chatUserSession.userName); $('#statusBarPara1').text(chatUserSession.userName + " not logged in. " + ":: Reason: " + closeEvent.reason + " Code: " + closeEvent.code); $("#inputArea").hide(500); $("#statusBar").show(500); $("#userName").val(chatUserSession.userName); $("#userName").focus(); } function chat_onError(msg) { $('#statusBarPara1').text(" Websocket error :" + JSON.stringfy(msg)); $("#statusBar").show(500); } chatUserSession.open = chat_openWebSocket; chatUserSession.onMessage = chat_onMessage; chatUserSession.onOpen = chat_onOpen; chatUserSession.login = chat_Login; chatUserSession.onClose = chat_onClose; chatUserSession.onError = chat_onError; chatUserSession.joinChat = chat_joinChat; chatUserSession.sendMessage = chat_sendMessage; chatUserSession.leaveChat = chat_leaveChat; chatUserSession.status = chat_Status; $(document).ready(function() { $("#inputArea").hide(); $("#userName").focus(); $("#statusBar").click(function() { $("#statusBar").hide(300); }); $("#chatInput").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { var textMessage = $("#chatInput").val(); if (textMessage=="bye!") { chatUserSession.leaveChat(); } else { $("#chatInput").val(""); $("#hint").hide(500); chatUserSession.sendMessage(textMessage); } } event.stopPropagation(); }); $("#login").click(function(event) { chatUserSession.login(); event.stopPropagation(); }); $("#userName").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { chatUserSession.login() event.stopPropagation(); } }); }); </script> </head> <body> <h1 id="header">Chat Client 6</h1> <div id="statusBar"> <p id="statusBarPara1">Welcome to Chat App, Click to hide</p> </div> <div id="loginDiv"> User name <input id="userName" type="text" /> <input id="login" type="submit" value="Login" /> </div> <div id="inputArea"> <p id="hint">Type your message here and then hit return (entering in 'bye!' logs out)</p> <input id="chatInput" type="text" value="" /> </div> <div id="chatHistoryDiv"> <p id="chatHistory"></p> </div> </body> </html>