WebSocket API Chat Room using JSR 356
From Resin 4.0 Wiki
ChatServerWebSocketEndpoint
package com.example.websocket; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.net.websocket.Endpoint; import javax.net.websocket.Session; import com.example.ChatRoom; public class ChatServerWebSocketEndpoint extends Endpoint{ @Inject @ApplicationScoped ChatRoom room; @Override public void onOpen(Session session) { session.addMessageHandler(new ChatWebsocketMessageHandler(session, room)); } }
The ChatServerWebSocketEndpoint binds a ChatWebsocketMessageHandler to the WebSocket session.
The ChatWebsocketMessageHandler uses a chatRoom to send a message to other clients, when it gets a message.
package com.example.websocket; import javax.net.websocket.MessageHandler; import javax.net.websocket.Session; import com.example.ChatMessage; import com.example.ChatRoom; public class ChatWebsocketMessageHandler implements MessageHandler.Text { ChatRoom chatRoom; Session session; public ChatWebsocketMessageHandler(Session session, ChatRoom chatRoom) { this.session = session; this.chatRoom = chatRoom; } @Override public void onMessage(String message) { chatRoom.sendMessage(new ChatMessage(message, session)); } }
The ChatWebsocketMessageHandler wraps the message in a ChatMessage. The ChatMessage parses what type of chat message this is.
ChatMessage
package com.example; import javax.net.websocket.Session; public class ChatMessage { static enum MessageType{REMOVE_CLIENT, SEND_MESSAGE, ADD_CLIENT}; String message; Session session; MessageType type; String clientId; public MessageType getType() { return type; } public String getClientId() { return clientId; } public ChatMessage(MessageType type, String clientId) { } public ChatMessage(String strMessage, Session session) { String[] strings = strMessage.split("::::"); clientId = strings[1]; this.session = session; if (strMessage.startsWith("remove client::::")) { type = MessageType.REMOVE_CLIENT; this.session = session; } else if (strMessage.startsWith("send message::::")) { type = MessageType.SEND_MESSAGE; message = strings[2]; } else if (strMessage.startsWith("add client::::")) { type = MessageType.ADD_CLIENT; } else { System.err.println("ACK... Don't understand your message!!!!! " + message); } } public final String getMessage() { return message; } public final Session getSession() { return session; } @Override public String toString() { return "ChatMessage [message=" + message + ", session=" + session + ", type=" + type + ", clientId=" + clientId + "]"; } }
ChatRoom
package com.example; import java.io.IOException; import java.net.URI; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.ejb.Startup; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.net.websocket.ContainerProvider; import javax.net.websocket.DefaultServerConfiguration; import javax.net.websocket.ServerContainer; import javax.net.websocket.ServerEndpointConfiguration; import com.example.ChatMessage.MessageType; import com.example.websocket.ChatServerWebSocketEndpoint; @ApplicationScoped @Startup public class ChatRoom { Map<String, ChatClientPeerHandler> chatClients = new ConcurrentHashMap<String, ChatClientPeerHandler>(); BlockingQueue<ChatMessage> readQueue = new ArrayBlockingQueue<ChatMessage>( 2000); @Inject Executor executor; @Inject ChatServerWebSocketEndpoint endpoint; @PostConstruct void init() throws Exception { System.out.println("POST CONSTRUCT CALLED"); ServerEndpointConfiguration serverConfiguration; serverConfiguration = new DefaultServerConfiguration(new URI("/chat")); ServerContainer serverContainer = ContainerProvider.getServerContainer(); serverContainer.publishServer(endpoint,serverConfiguration); executor.execute(new Runnable() { public void run() { ChatRoom.this.run(); }}); } private void run() { while (true) { try { ChatMessage message = readQueue .poll(500, TimeUnit.MILLISECONDS); if (message != null) { dispatchMessage(message); } } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } } } } private void launchNewClient(ChatMessage message) { System.out.println("launchNewClient::::" + message.getClientId()); ChatClientPeerHandler chatClientHandler = new ChatClientPeerHandler( message.getSession(), message.getClientId()); chatClients.put(chatClientHandler.getName(), chatClientHandler); executor.execute(chatClientHandler); doSendMessage(chatClientHandler.getName(), chatClientHandler.getName() + " has join the chat room"); } private void dispatchMessage(ChatMessage message) { System.out.println("Dispatch Message::" + message); switch(message.getType()) { case ADD_CLIENT: launchNewClient(message); break; case REMOVE_CLIENT: doRemoveClient(message.getClientId()); break; case SEND_MESSAGE: doSendMessage(message.getClientId(), message.getMessage()); break; default: System.err.println("ACK... Don't understand your message!!!!! " + message); } } private void doSendMessage(String client, String message) { String sendMessage = String.format("%s : %s", client, message); System.out.printf("SendMessage::Sending message %s\n", sendMessage); Iterator<ChatClientPeerHandler> iterator = chatClients.values().iterator(); while (iterator.hasNext()) { ChatClientPeerHandler chatClientHandler = iterator.next(); //prevents sending messages to yourself if (client.equals(chatClientHandler.getName())){ continue; } System.out.printf("sendMessage::Sending message %s to %s\n", sendMessage, chatClientHandler.getName()); chatClientHandler.sendMessage(sendMessage); } } private void doRemoveClient(String client) { System.out.println("removeClient::::[" + client + "]::::"); ChatClientPeerHandler chatClientHandler = chatClients.get(client); if (chatClientHandler != null) { System.out.println("removeClient:::: found " + client + " to remove."); doSendMessage(chatClientHandler.getName(), chatClientHandler.getName() + " has become bored with this chat room"); chatClients.remove(client); try { chatClientHandler.close(); } catch (IOException e) { } } } public void sendMessage(ChatMessage message) { try { readQueue.offer(message, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } e.printStackTrace(); } } public void removeClient(String clientId) { ChatMessage message = new ChatMessage(MessageType.REMOVE_CLIENT, clientId); try { readQueue.offer(message, 100, TimeUnit.SECONDS); } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } e.printStackTrace(); } } }
The ChatClientPeerHandler manages sending messages back to the clients.
ChatClientPeerHandler
package com.example; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.net.websocket.Session; public class ChatClientPeerHandler implements Runnable, Closeable { /** Name of the client. */ String clientId; /** Session to send messages to the browser client. */ Session session; /** * Queue that ChatClientHandler are monitoring. When a message comes in on * this queue, ChatClientHandler sends it. */ BlockingQueue<String> writeQueue = new ArrayBlockingQueue<String>(2000); @Inject @ApplicationScoped ChatRoom room; /** Keeps track if the thread for this ChatClientHandler is alive. */ volatile boolean alive; /** Flag to indicate whether we should close or not. */ volatile boolean close; public ChatClientPeerHandler(Session session, String name) { this.session = session; this.clientId = name; } /** Chat room calls this method to send a message. */ public void sendMessage(String sendMessage) { System.out.println("ChatClientHandler::::" + sendMessage); this.writeQueue.offer(sendMessage); } @Override public void run() { alive = true; while (true) { try { if (close) { alive = false; break; } /* Get a message, send a message to client if not null. */ String message = writeQueue.poll(60, TimeUnit.SECONDS); if (message != null) { System.out .printf("ChatClientHandler::::run loop::::SENDING MESSAGE %s to %s\n", message, clientId); session.getRemote().sendString(message); } } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } } catch (IOException e) { alive = false; e.printStackTrace(); break; } } } public String getName() { return clientId; } public boolean isAlive() { return alive; } @Override public void close() throws IOException { close = true; if (session != null) { session.close(); } room.removeClient(this.clientId); } }
The HTML 5 / JavaScript client that uses this chat room.....
<html> <head> <title>Chat Client</title> <link rel="stylesheet" type="text/css" href="css/style.css" media="screen" /> <script type="text/javascript" src="scripts/jquery-1.7.1.js"></script> <script type="text/javascript"> var ENTER_KEY = '13'; var TOKEN_DELIM = "::::"; function buildWebSocketURL() { var url = document.URL; var parts = url.split('/'); var scheme = parts[0]; var hostPort = parts[2]; var wssScheme = null; if (scheme=="http:") { wssScheme="ws:"; } else if (scheme=="https:") { wssScheme="wss:"; } wssUrl = wssScheme + "//" + hostPort + "/chat/chat"; return wssUrl; } var chatUserSession = { version : "1.0", webSocketProtocol : "caucho-example-chat-protocol", webSocketURL : buildWebSocketURL(), webSocket : null,//WebSocket userName : null }; function chat_sendMessage(message) { $("#chatHistory").prepend("<p style='color:green'> ME : " + message + "</p>"); chatUserSession.webSocket.send("send message" + TOKEN_DELIM + chatUserSession.userName + TOKEN_DELIM + message); } function chat_joinChat() { chatUserSession.webSocket.send("add client" + TOKEN_DELIM + chatUserSession.userName); } function chat_leaveChat() { chatUserSession.status(chatUserSession.userName + " is leaving chat"); chatUserSession.webSocket.send("remove client" + TOKEN_DELIM + chatUserSession.userName); } function chat_openWebSocket() { chatUserSession.webSocket = new WebSocket(chatUserSession.webSocketURL, chatUserSession.webSocketProtocol); var socket = chatUserSession.webSocket; socket.onmessage = function(msg) { chatUserSession.onMessage(msg); } socket.onerror = function(errorEvent) { chatUserSession.onError(errorEvent); } socket.onopen = function() { chatUserSession.onOpen(); } socket.onclose = function(closeEvent) { chatUserSession.onClose(closeEvent); } } function chat_onMessage(msgEvent) { chatUserSession.status("New Message :" + msgEvent.data); $("#chatHistory").prepend("<p style='color:blue'>" + msgEvent.data + "</p>"); } function chat_Login() { chatUserSession.userName = $("#userName").val(); $("#loginDiv").hide(500); $('#header').text( "Chat Client (logging in...) : " + chatUserSession.userName); chatUserSession.status(chatUserSession.userName + " is logging in..."); chatUserSession.open(); } function chat_onOpen() { chatUserSession.joinChat(); chatUserSession.status("Chat Client (logged in) : " + chatUserSession.userName); $('#header').text( "Chat Client (logged in...) : " + chatUserSession.userName); $("#inputArea").show(500); $("#statusBar").show(500); $("#chatInput").focus(); } function chat_Status(message) { $('#statusBarPara1').text(message); $("#statusBar").show(500); } function chat_onClose(closeEvent) { $("#loginDiv").show(500); $('#header').text( "Chat Client (not connected) : " + chatUserSession.userName); $('#statusBarPara1').text(chatUserSession.userName + " not logged in. " + ":: Reason: " + closeEvent.reason + " Code: " + closeEvent.code); $("#inputArea").hide(500); $("#statusBar").show(500); $("#userName").val(chatUserSession.userName); $("#userName").focus(); } function chat_onError(msg) { $('#statusBarPara1').text(" Websocket error :" + JSON.stringfy(msg)); $("#statusBar").show(500); } chatUserSession.open = chat_openWebSocket; chatUserSession.onMessage = chat_onMessage; chatUserSession.onOpen = chat_onOpen; chatUserSession.login = chat_Login; chatUserSession.onClose = chat_onClose; chatUserSession.onError = chat_onError; chatUserSession.joinChat = chat_joinChat; chatUserSession.sendMessage = chat_sendMessage; chatUserSession.leaveChat = chat_leaveChat; chatUserSession.status = chat_Status; $(document).ready(function() { $("#inputArea").hide(); $("#userName").focus(); $("#statusBar").click(function() { $("#statusBar").hide(300); }); $("#chatInput").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { var textMessage = $("#chatInput").val(); if (textMessage=="bye!") { chatUserSession.leaveChat(); } else { $("#chatInput").val(""); $("#hint").hide(500); chatUserSession.sendMessage(textMessage); } } event.stopPropagation(); }); $("#login").click(function(event) { chatUserSession.login(); event.stopPropagation(); }); $("#userName").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { chatUserSession.login() event.stopPropagation(); } }); }); </script> </head> <body> <h1 id="header">Chat Client</h1> <div id="statusBar"> <p id="statusBarPara1">Welcome to Chat App, Click to hide</p> </div> <div id="loginDiv"> User name <input id="userName" type="text" /> <input id="login" type="submit" value="Login" /> </div> <div id="inputArea"> <p id="hint">Type your message here and then hit return (entering in 'bye!' logs out)</p> <input id="chatInput" type="text" value="" /> </div> <div id="chatHistoryDiv"> <p id="chatHistory"></p> </div> </body> </html>