Understanding WebSockets versus Ajax/REST for Java EE Developers Part II
From Resin 4.0 Wiki
Under Review and under construction...
WebSocket API Chat Room using JSR 356
This is a continuation of the first tutorial in this series (Part 1). In this tutorial, we show how to build a jQuery chat client and a full working, scalable chat server using Resin 4.0.26.
- ChatServlet
- (same as before)
- ChatWebSocketListener
- super simple, just posts ChatMessage to queue for ChatRoom
- ChatRoom
- Uses CDI (Java EE 6 Dependency injection)
- @ApplicationScoped, @Startup,
- manages a collection of ChatClientHandler, sends messages to them
- ChatMessage
- holds Message and WebSocket for current message
- ChatClientHandler
- sits on a writeQueue waiting for messages from queue, 1 Queue per HTML 5 / client
ChatWebSocketListener (* to 1) -> Read Queue (1) -> ChatRoom (1) -> Write Queue (*) -> ChatClientHandler (*)
Only ChatClientHandler uses WebSocketContext to write.
Only ChatWebSocketListener reads from WebSocket.
The ChatRoom responds to three messages, add client, remove client and send message.
void dispatchMessage(ChatMessage message) { if (message.startsWith("remove client::::")) { removeClient(clientId); } else if (message.startsWith("send message::::")) { doSendMessage(clientId, payload); } else if (message.startsWith("add client::::")) { launchNewClient(chatMessage, clientId); }
Resin 4.0.27 and 4.0.28 should have some enhancements to further ease WebSocket development.
Registering WebSocket Protocol
ChatServlet.java
package com.caucho.websocket.example; import java.io.IOException; import java.util.concurrent.ExecutorService; import javax.inject.Inject; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import com.caucho.websocket.WebSocketServletRequest; /** * Servlet implementation class ChatServlet */ @WebServlet("/chat") public class ChatServlet extends HttpServlet { @Inject ChatRoom chatRoom; @Inject ExecutorService executor; /** * Handle Websocket handshake. * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse response) */ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { String protocol = request.getHeader("Sec-WebSocket-Protocol"); System.out.println("___________ doGet MEHTOD ________________ " + protocol); if ("caucho-example-chat-protocol".equals(protocol)) { System.out.println("___________ WEBSOCKET Handshake ________________ " + protocol); response.setHeader("Sec-WebSocket-Protocol", "caucho-example-chat-protocol"); WebSocketServletRequest wsRequest = (WebSocketServletRequest) request; wsRequest.startWebSocket(new ChatWebSocketListener(chatRoom)); } } }
WebSocketListener, delegating to queues
ChatWebSocketListener.java
package com.caucho.websocket.example; import java.io.IOException; import java.io.Reader; import com.caucho.websocket.WebSocketContext; import com.caucho.websocket.AbstractWebSocketListener; public class ChatWebSocketListener extends AbstractWebSocketListener { volatile boolean close; ChatRoom chatRoom; public ChatWebSocketListener(ChatRoom chatRoom) { this.chatRoom = chatRoom; } @Override public void onReadText(WebSocketContext context, Reader reader) throws IOException { System.out.println("___________ onReadText ________________ " ); char [] data = new char[4096]; reader.read(data); String message = new String(data).trim(); System.out.println("ChatWebSocketListener::::" + message ); chatRoom.sendMessage(new ChatMessage(message, context)); } @Override public void onDisconnect(WebSocketContext context) throws IOException { close=true; } }
Chat Room, handling message coordination
ChatRoom.java
package com.caucho.websocket.example; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.ejb.Startup; import javax.enterprise.context.ApplicationScoped; @ApplicationScoped @Startup public class ChatRoom { Map<String, ChatClientHandler> chatClients = new HashMap<String, ChatClientHandler>(); BlockingQueue<ChatMessage> readQueue = new ArrayBlockingQueue<ChatMessage>(2000); Executor executor; public ChatRoom() { executor = new ThreadPoolExecutor(10, 20, 90, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1)); (new Thread(new Runnable() { public void run() { ChatRoom.this.run(); } })).start(); } private void run() { while (true) { try { ChatMessage message = readQueue.poll(500, TimeUnit.MILLISECONDS); if (message != null) { dispatchMessage(message); } } catch (InterruptedException e) { break; } } } private void launchNewClient(ChatMessage message, String client) { System.out.println("launchNewClient::::" + client); ChatClientHandler chatClientHandler = new ChatClientHandler(message.getContext(), client); chatClients.put(chatClientHandler.getName(), chatClientHandler); executor.execute(chatClientHandler); doSendMessage(chatClientHandler.getName(), chatClientHandler.getName() + " has join the chat room"); } private void dispatchMessage(ChatMessage message) { String strMessage = message.getMessage(); String[] strings = strMessage.split("::::"); String clientId = strings[1]; if (strMessage.startsWith("remove client::::")) { removeClient(clientId); } else if (strMessage.startsWith("send message::::")) { doSendMessage(clientId, strings[2]); } else if (strMessage.startsWith("add client::::")) { launchNewClient(message, clientId); } else { System.err.println("ACK... Don't understand your message!!!!! " + message); } } private void doSendMessage(String client, String message) { String sendMessage = String.format("%s : %s", client, message); System.out.printf("sendMessage::Sending message %s\n", sendMessage ); Iterator<ChatClientHandler> iterator = chatClients.values().iterator(); while (iterator.hasNext()) { ChatClientHandler chatClientHandler = iterator.next(); if (client.equals(chatClientHandler.getName())) //comment this if you don't want to echo back messages continue; if (chatClientHandler.isError()) { iterator.remove(); try { chatClientHandler.close(); } catch (IOException e) { } continue; } if (!chatClientHandler.isAlive()) { //Could be kicked out of executor pool. Kick it back in. try { executor.execute(chatClientHandler); } catch (RejectedExecutionException ree) { iterator.remove(); } continue; } System.out.printf("sendMessage::Sending message %s to %s\n", sendMessage, chatClientHandler.getName() ); chatClientHandler.sendMessage(sendMessage); } } private void removeClient(String client) { System.out.println("removeClient::::[" + client + "]::::"); ChatClientHandler chatClientHandler = chatClients.get(client); if (chatClientHandler != null) { System.out.println("removeClient:::: found " + client + " to remove."); doSendMessage(chatClientHandler.getName(), chatClientHandler.getName() + " has become bored with this chat room"); chatClients.remove(client); try { chatClientHandler.close(); } catch (IOException e) { } } } public void sendMessage(ChatMessage message) { try { readQueue.offer(message, 100, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new IllegalStateException ("Unable to add message to queue"); } } }
ChatMessage.java
package com.caucho.websocket.example; import com.caucho.websocket.WebSocketContext; public class ChatMessage { String message; WebSocketContext context; public ChatMessage(String message, WebSocketContext context) { this.message = message; this.context = context; } public final String getMessage() { return message; } public final WebSocketContext getContext() { return context; } }
Managing the outgoing WebSocket streams
ChatClientHandler.java
package com.caucho.websocket.example; import java.io.Closeable; import java.io.IOException; import java.io.PrintWriter; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import com.caucho.websocket.WebSocketContext; /** * ChatClientHandler handles queued messages that are going to the client. * @author r hightower * */ public class ChatClientHandler implements Runnable, Closeable{ /** Name of the client. */ String name; /** WebSocketContext to send messages to the browser client.*/ WebSocketContext context; /** Queue that ChatClientHandler are monitoring. When a message comes in on this queue, ChatClientHandler sends it.*/ BlockingQueue<String> writeQueue = new ArrayBlockingQueue<String>(2000); /** Keeps track if the thread for this ChatClientHandler is alive. */ volatile boolean alive; /** Keeps track if their has been an error sending the message so the ChatRoom can clean it up. */ volatile boolean error; /** The reason for the error */ String reason; /** Flag to indicate whether we should close or not. */ volatile boolean close; public ChatClientHandler(WebSocketContext context, String name) { this.context = context; this.name = name; } /** Chat room calls this method to send a message. */ public void sendMessage(String sendMessage) { System.out.println("ChatClientHandler::::" + sendMessage); this.writeQueue.offer(sendMessage); } @Override public void run() { alive = true; reason = ""; while (true) { try { if (Thread.currentThread().isInterrupted()) { alive=false; break; } if (close) { break; } /* Get a message, send a message to client if not null. */ String message = writeQueue.poll(1000, TimeUnit.SECONDS); if (message!=null) { System.out.printf("ChatClientHandler::::run loop::::SENDING MESSAGE %s to %s\n", message, name); PrintWriter out = context.startTextMessage(); out.print(message); out.close(); //Close sends the message to browser. } } catch (InterruptedException e) { alive = false; //Our thread is interrupted. We can be resumed. break; } catch (IOException e) { alive = false; error = true; reason = e.getMessage(); e.printStackTrace(); break; } } } public String getName() { return name; } public boolean isAlive() { return alive; } public boolean isError() { return error; } public String getReason() { return reason; } @Override public void close() throws IOException { close = true; if (context!=null) { if (error) { context.disconnect(); } else { context.close(); } } } }
Simple JavaScript clients that use WebSocket to send messages to chat
rick.html
<html> <head> <title>RICK TEST CLIENT</title> <script language="javascript" type="text/javascript"> /* http://dev.w3.org/html5/websockets/ */ var socket = null; function sendChatMessage() { document.getElementById("span_result").innerHTML = ""; socket.send("send message::::Rick Hightower::::Hi Bob"); } function handleChatMessageWebSocketResponse(msg) { document.getElementById("span_result").innerHTML = msg.data; } function initSocket() { if (socket == null) { socket = new WebSocket("ws://localhost:8080/web-socket-example/chat", "caucho-example-chat-protocol"); socket.onmessage = handleChatMessageWebSocketResponse; } socket.onerror = function(msg) { document.getElementById("error_result").innerHTML = "ERROR:" + msg; } socket.onopen = function() { document.getElementById("span_result").innerHTML = "Socket Status: " + socket.readyState + " (open)"; } socket.onclose = function() { document.getElementById("span_result").innerHTML = "Socket Status: " + socket.readyState + " (Closed)"; } } initSocket(); function connectRick() { document.getElementById("span_result").innerHTML = "CONNECT RICK"; socket.send("add client::::Rick Hightower"); } function clearSend() { document.getElementById("span_result").innerHTML = ""; } function disconnectRick() { document.getElementById("span_result").innerHTML = "CONNECT RICK"; socket.send("remove client:::Rick Hightower"); } </script> </head> <body> <h1>RICK TEST CHAT</h1> <br /> <a href="javascript:connectRick();">CONNECT RICK</a> <br /> <a href="javascript:sendChatMessage();">Send Hi Bob</a> <br /> <a href="javascript:disconnectRick();">DISCONNECT RICK</a> <br /> <a href="javascript:clearSend();">Clear send results</a> <br /> <span id="span_result"></span> <span id="error_result"></span> </body> </html>
bob.html
<html> <head> <title>BOB TEST CLIENT</title> <script language="javascript" type="text/javascript"> /* http://dev.w3.org/html5/websockets/ */ var socket = null; function sendChatMessage() { document.getElementById("span_result").innerHTML = ""; socket.send("send message::::Bob Deanna::::Hi Rick"); } function handleChatMessageWebSocketResponse(msg) { document.getElementById("span_result").innerHTML = msg.data; } function initSocket() { if (socket == null) { socket = new WebSocket("ws://localhost:8080/web-socket-example/chat", "caucho-example-chat-protocol"); socket.onmessage = handleChatMessageWebSocketResponse; } socket.onerror = function(msg) { document.getElementById("error_result").innerHTML = "ERROR:" + msg; } socket.onopen = function() { document.getElementById("span_result").innerHTML = "Socket Status: " + socket.readyState + " (open)"; } socket.onclose = function() { document.getElementById("span_result").innerHTML = "Socket Status: " + socket.readyState + " (Closed)"; } } initSocket(); function connectBob() { document.getElementById("span_result").innerHTML = "CONNECT BOB"; socket.send("add client::::Bob Deanna"); } function clearSend() { document.getElementById("span_result").innerHTML = ""; } function disconnectBob() { document.getElementById("span_result").innerHTML = "CONNECT BOB"; socket.send("remove client::::Bob Deanna"); } </script> </head> <body> <h1>BOB TEST CHAT</h1> <br /> <a href="javascript:connectBob();">CONNECT BOB</a> <br /> <a href="javascript:sendChatMessage();">Send Hi Rick</a> <br /> <a href="javascript:disconnectBob();">DISCONNECT BOB</a> <br /> <a href="javascript:clearSend();">Clear send results</a> <br /> <span id="span_result"></span> <span id="error_result"></span> </body> </html>
Creating a jQuery prototype to get the user interactions down
chat_proto_no_web_socket.html
<html> <head> <title>Chat Client</title> <link rel="stylesheet" type="text/css" href="css/style.css" media="screen" /> <script type="text/javascript" src="scripts/jquery-1.7.1.js"></script> <script type="text/javascript"> var ENTER_KEY = '13'; var chatUserSession = {"version":"1.0"}; function chat_Login () { chatUserSession.userName = $("#userName").val(); $("#loginDiv").hide(500); $('#header').text("Chat Client (logging in...) : " + chatUserSession.userName); $('#statusBarPara1').text(chatUserSession.userName + " is logging in..."); $("#inputArea").show(500); $("#statusBar").show(500); $("#chatInput").focus(); } $(document).ready(function() { $("#inputArea").hide(); $("#userName").focus(); $("#statusBar").css("border", "1 px solid black"); $("#statusBar").click(function(){ $("#statusBar").hide(300); }); $("#chatInput").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if(keycode == ENTER_KEY){ var textMessage = $("#chatInput").val(); $("#chatInput").val(""); $("#hint").hide(500); $("#chatHistory").prepend("<p>" + textMessage + "</p>"); } event.stopPropagation(); }); $("#login").click(function(event) { chat_Login(); event.stopPropagation(); }); $("#userName").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if(keycode == ENTER_KEY){ chat_Login(); event.stopPropagation(); } }); }); </script> </head> <body> <h1 id="header">Chat Client</h1> <div id="statusBar"> <p id="statusBarPara1">Welcome to Chat App, Click to hide</p> </div> <div id="loginDiv"> User name <input id="userName" type="text" /> <input id="login" type="submit" value="Login" /> </div> <div id="inputArea"> <p id="hint">Type your message here and then hit return</p> <input id="chatInput" type="text" value="" /> </div> <div id="chatHistoryDiv"> <p id="chatHistory"></p> </div> </body> </html>
Creating a jQuery and WebSocket full featured chat application
chat.html (Uses full websocket API)
<html> <head> <title>Chat Client</title> <link rel="stylesheet" type="text/css" href="css/style.css" media="screen" /> <script type="text/javascript" src="scripts/jquery-1.7.1.js"></script> <script type="text/javascript"> var ENTER_KEY = '13'; var TOKEN_DELIM = "::::"; function buildWebSocketURL() { var url = document.URL; var parts = url.split('/'); var scheme = parts[0]; var hostPort = parts[2]; var wssScheme = null; if (scheme=="http:") { wssScheme="ws:"; } else if (scheme=="https:") { wssScheme="wss:"; } wssUrl = wssScheme + "//" + hostPort + "/web-socket-example/chat"; return wssUrl; } var chatUserSession = { version : "1.0", webSocketProtocol : "caucho-example-chat-protocol", webSocketURL : buildWebSocketURL(), webSocket : null,//WebSocket userName : null }; function chat_sendMessage(message) { $("#chatHistory").prepend("<p style='color:green'> ME : " + message + "</p>"); chatUserSession.webSocket.send("send message" + TOKEN_DELIM + chatUserSession.userName + TOKEN_DELIM + message); } function chat_joinChat() { chatUserSession.webSocket.send("add client" + TOKEN_DELIM + chatUserSession.userName); } function chat_leaveChat() { chatUserSession.status(chatUserSession.userName + " is leaving chat"); chatUserSession.webSocket.send("remove client" + TOKEN_DELIM + chatUserSession.userName); } function chat_openWebSocket() { chatUserSession.webSocket = new WebSocket(chatUserSession.webSocketURL, chatUserSession.webSocketProtocol); var socket = chatUserSession.webSocket; socket.onmessage = function(msg) { chatUserSession.onMessage(msg); } socket.onerror = function(errorEvent) { chatUserSession.onError(errorEvent); } socket.onopen = function() { chatUserSession.onOpen(); } socket.onclose = function(closeEvent) { chatUserSession.onClose(closeEvent); } } function chat_onMessage(msgEvent) { chatUserSession.status("New Message :" + msgEvent.data); $("#chatHistory").prepend("<p style='color:blue'>" + msgEvent.data + "</p>"); } function chat_Login() { chatUserSession.userName = $("#userName").val(); $("#loginDiv").hide(500); $('#header').text( "Chat Client (logging in...) : " + chatUserSession.userName); chatUserSession.status(chatUserSession.userName + " is logging in..."); chatUserSession.open(); } function chat_onOpen() { chatUserSession.joinChat(); chatUserSession.status("Chat Client (logged in) : " + chatUserSession.userName); $('#header').text( "Chat Client (logged in...) : " + chatUserSession.userName); $("#inputArea").show(500); $("#statusBar").show(500); $("#chatInput").focus(); } function chat_Status(message) { $('#statusBarPara1').text(message); $("#statusBar").show(500); } function chat_onClose(closeEvent) { $("#loginDiv").show(500); $('#header').text( "Chat Client (not connected) : " + chatUserSession.userName); $('#statusBarPara1').text(chatUserSession.userName + " not logged in. " + ":: Reason: " + closeEvent.reason + " Code: " + closeEvent.code); $("#inputArea").hide(500); $("#statusBar").show(500); $("#userName").val(chatUserSession.userName); $("#userName").focus(); } function chat_onError(msg) { $('#statusBarPara1').text(" Websocket error :" + JSON.stringfy(msg)); $("#statusBar").show(500); } chatUserSession.open = chat_openWebSocket; chatUserSession.onMessage = chat_onMessage; chatUserSession.onOpen = chat_onOpen; chatUserSession.login = chat_Login; chatUserSession.onClose = chat_onClose; chatUserSession.onError = chat_onError; chatUserSession.joinChat = chat_joinChat; chatUserSession.sendMessage = chat_sendMessage; chatUserSession.leaveChat = chat_leaveChat; chatUserSession.status = chat_Status; $(document).ready(function() { $("#inputArea").hide(); $("#userName").focus(); $("#statusBar").click(function() { $("#statusBar").hide(300); }); $("#chatInput").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { var textMessage = $("#chatInput").val(); if (textMessage=="bye!") { chatUserSession.leaveChat(); } else { $("#chatInput").val(""); $("#hint").hide(500); chatUserSession.sendMessage(textMessage); } } event.stopPropagation(); }); $("#login").click(function(event) { chatUserSession.login(); event.stopPropagation(); }); $("#userName").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { chatUserSession.login() event.stopPropagation(); } }); }); </script> </head> <body> <h1 id="header">Chat Client</h1> <div id="statusBar"> <p id="statusBarPara1">Welcome to Chat App, Click to hide</p> </div> <div id="loginDiv"> User name <input id="userName" type="text" /> <input id="login" type="submit" value="Login" /> </div> <div id="inputArea"> <p id="hint">Type your message here and then hit return (entering in 'bye!' logs out)</p> <input id="chatInput" type="text" value="" /> </div> <div id="chatHistoryDiv"> <p id="chatHistory"></p> </div> </body> </html>