WebSocket API Chat Room using JSR 356
From Resin 4.0 Wiki
(Difference between revisions)
(Created page with " '''ChatServer''' <pre> package com.example; import java.net.URI; import java.net.URISyntaxException; import javax.inject.Inject; import javax.net.websocket.ContainerProvi...") |
|||
Line 42: | Line 42: | ||
} | } | ||
+ | </pre> | ||
+ | |||
+ | The Chat server injects a '''ChatRoom'''. Then it binds the ChatServerEndpoint to the URI "/chat". | ||
+ | |||
+ | '''ChatServerEndpoint''' | ||
+ | <pre> | ||
+ | package com.example; | ||
+ | |||
+ | import javax.net.websocket.Endpoint; | ||
+ | import javax.net.websocket.Session; | ||
+ | |||
+ | public class ChatServerEndpoint extends Endpoint{ | ||
+ | |||
+ | ChatRoom room; | ||
+ | |||
+ | public ChatServerEndpoint(ChatRoom room) { | ||
+ | this.room = room; | ||
+ | |||
+ | } | ||
+ | |||
+ | @Override | ||
+ | public void onOpen(Session session) { | ||
+ | session.addMessageHandler(new ChatMessageHandler(room, session)); | ||
+ | } | ||
+ | |||
+ | |||
+ | } | ||
+ | |||
+ | </pre> | ||
+ | |||
+ | |||
+ | The '''ChatServerEndpoint''' binds a '''ChatMessageHandler''' to the WebSocket session. | ||
+ | |||
+ | The '''ChatMessageHandler''' uses a chatRoom to send a message to other clients, when it gets a message. | ||
+ | |||
+ | <pre> | ||
+ | package com.example; | ||
+ | |||
+ | |||
+ | import javax.net.websocket.MessageHandler; | ||
+ | import javax.net.websocket.Session; | ||
+ | |||
+ | public class ChatMessageHandler implements MessageHandler.Text { | ||
+ | |||
+ | Session session; | ||
+ | ChatRoom chatRoom; | ||
+ | |||
+ | public ChatMessageHandler(ChatRoom room, Session session) { | ||
+ | this.session = session; | ||
+ | this.chatRoom = room; | ||
+ | } | ||
+ | |||
+ | @Override | ||
+ | public void onMessage(String message) { | ||
+ | chatRoom.sendMessage(new ChatMessage(message, session)); | ||
+ | } | ||
+ | |||
+ | } | ||
+ | </pre> | ||
+ | |||
+ | The '''ChatMessageHandler''' wraps the message in a '''ChatMessage'''. | ||
+ | |||
+ | |||
+ | '''ChatMessage''' | ||
+ | <pre> | ||
+ | package com.example; | ||
+ | |||
+ | import javax.net.websocket.Session; | ||
+ | |||
+ | public class ChatMessage { | ||
+ | String message; | ||
+ | Session session; | ||
+ | |||
+ | public ChatMessage(String message, Session session) { | ||
+ | this.message = message; | ||
+ | this.session = session; | ||
+ | } | ||
+ | |||
+ | public final String getMessage() { | ||
+ | return message; | ||
+ | } | ||
+ | public final Session getSession() { | ||
+ | return session; | ||
+ | } | ||
+ | |||
+ | } | ||
+ | </pre> | ||
+ | |||
+ | '''ChatRoom''' | ||
+ | <pre> | ||
+ | package com.example; | ||
+ | |||
+ | import java.io.IOException; | ||
+ | import java.util.HashMap; | ||
+ | import java.util.Iterator; | ||
+ | import java.util.Map; | ||
+ | import java.util.concurrent.ArrayBlockingQueue; | ||
+ | import java.util.concurrent.BlockingQueue; | ||
+ | import java.util.concurrent.Executor; | ||
+ | import java.util.concurrent.RejectedExecutionException; | ||
+ | import java.util.concurrent.ThreadPoolExecutor; | ||
+ | import java.util.concurrent.TimeUnit; | ||
+ | |||
+ | import javax.ejb.Startup; | ||
+ | import javax.enterprise.context.ApplicationScoped; | ||
+ | |||
+ | @ApplicationScoped | ||
+ | @Startup | ||
+ | public class ChatRoom { | ||
+ | |||
+ | Map<String, ChatClientHandler> chatClients = new HashMap<String, ChatClientHandler>(); | ||
+ | BlockingQueue<ChatMessage> readQueue = new ArrayBlockingQueue<ChatMessage>( | ||
+ | 2000); | ||
+ | |||
+ | Executor executor; | ||
+ | |||
+ | public ChatRoom() { | ||
+ | executor = new ThreadPoolExecutor(10, 20, 90, TimeUnit.SECONDS, | ||
+ | new ArrayBlockingQueue<Runnable>(1)); | ||
+ | |||
+ | (new Thread(new Runnable() { | ||
+ | public void run() { | ||
+ | ChatRoom.this.run(); | ||
+ | } | ||
+ | })).start(); | ||
+ | } | ||
+ | |||
+ | private void run() { | ||
+ | while (true) { | ||
+ | try { | ||
+ | |||
+ | ChatMessage message = readQueue | ||
+ | .poll(500, TimeUnit.MILLISECONDS); | ||
+ | if (message != null) { | ||
+ | dispatchMessage(message); | ||
+ | } | ||
+ | } catch (InterruptedException e) { | ||
+ | break; | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | |||
+ | private void launchNewClient(ChatMessage message, String client) { | ||
+ | System.out.println("launchNewClient::::" + client); | ||
+ | ChatClientHandler chatClientHandler = new ChatClientHandler( | ||
+ | message.getSession(), client); | ||
+ | chatClients.put(chatClientHandler.getName(), chatClientHandler); | ||
+ | executor.execute(chatClientHandler); | ||
+ | doSendMessage(chatClientHandler.getName(), chatClientHandler.getName() | ||
+ | + " has join the chat room"); | ||
+ | } | ||
+ | |||
+ | private void dispatchMessage(ChatMessage message) { | ||
+ | |||
+ | String strMessage = message.getMessage(); | ||
+ | String[] strings = strMessage.split("::::"); | ||
+ | String clientId = strings[1]; | ||
+ | |||
+ | if (strMessage.startsWith("remove client::::")) { | ||
+ | removeClient(clientId); | ||
+ | } else if (strMessage.startsWith("send message::::")) { | ||
+ | doSendMessage(clientId, strings[2]); | ||
+ | } else if (strMessage.startsWith("add client::::")) { | ||
+ | launchNewClient(message, clientId); | ||
+ | } else { | ||
+ | System.err.println("ACK... Don't understand your message!!!!! " | ||
+ | + message); | ||
+ | } | ||
+ | } | ||
+ | |||
+ | private void doSendMessage(String client, String message) { | ||
+ | |||
+ | String sendMessage = String.format("%s : %s", client, message); | ||
+ | System.out.printf("sendMessage::Sending message %s\n", sendMessage); | ||
+ | Iterator<ChatClientHandler> iterator = chatClients.values().iterator(); | ||
+ | while (iterator.hasNext()) { | ||
+ | ChatClientHandler chatClientHandler = iterator.next(); | ||
+ | |||
+ | if (client.equals(chatClientHandler.getName())) // comment this if | ||
+ | // you don't want to | ||
+ | // echo back | ||
+ | // messages | ||
+ | continue; | ||
+ | |||
+ | if (chatClientHandler.isError()) { | ||
+ | iterator.remove(); | ||
+ | try { | ||
+ | chatClientHandler.close(); | ||
+ | } catch (IOException e) { | ||
+ | } | ||
+ | continue; | ||
+ | } | ||
+ | |||
+ | if (!chatClientHandler.isAlive()) { // Could be kicked out of | ||
+ | // executor pool. Kick it back | ||
+ | // in. | ||
+ | try { | ||
+ | executor.execute(chatClientHandler); | ||
+ | } catch (RejectedExecutionException ree) { | ||
+ | iterator.remove(); | ||
+ | } | ||
+ | continue; | ||
+ | } | ||
+ | |||
+ | System.out.printf("sendMessage::Sending message %s to %s\n", | ||
+ | sendMessage, chatClientHandler.getName()); | ||
+ | |||
+ | chatClientHandler.sendMessage(sendMessage); | ||
+ | |||
+ | } | ||
+ | } | ||
+ | |||
+ | private void removeClient(String client) { | ||
+ | |||
+ | System.out.println("removeClient::::[" + client + "]::::"); | ||
+ | |||
+ | ChatClientHandler chatClientHandler = chatClients.get(client); | ||
+ | |||
+ | if (chatClientHandler != null) { | ||
+ | |||
+ | System.out.println("removeClient:::: found " + client | ||
+ | + " to remove."); | ||
+ | |||
+ | doSendMessage(chatClientHandler.getName(), | ||
+ | chatClientHandler.getName() | ||
+ | + " has become bored with this chat room"); | ||
+ | |||
+ | chatClients.remove(client); | ||
+ | try { | ||
+ | chatClientHandler.close(); | ||
+ | } catch (IOException e) { | ||
+ | } | ||
+ | } | ||
+ | |||
+ | } | ||
+ | |||
+ | public void sendMessage(ChatMessage message) { | ||
+ | try { | ||
+ | readQueue.offer(message, 100, TimeUnit.MILLISECONDS); | ||
+ | } catch (InterruptedException e) { | ||
+ | throw new IllegalStateException("Unable to add message to queue"); | ||
+ | } | ||
+ | } | ||
+ | |||
+ | } | ||
+ | |||
+ | </pre> | ||
+ | |||
+ | '''ChatClientHandler''' | ||
+ | |||
+ | <pre> | ||
+ | package com.example; | ||
+ | |||
+ | import java.io.Closeable; | ||
+ | import java.io.IOException; | ||
+ | import java.util.concurrent.ArrayBlockingQueue; | ||
+ | import java.util.concurrent.BlockingQueue; | ||
+ | import java.util.concurrent.TimeUnit; | ||
+ | |||
+ | import javax.net.websocket.Session; | ||
+ | |||
+ | public class ChatClientHandler implements Runnable, Closeable { | ||
+ | |||
+ | /** Name of the client. */ | ||
+ | String name; | ||
+ | |||
+ | /** Session to send messages to the browser client. */ | ||
+ | Session session; | ||
+ | |||
+ | /** | ||
+ | * Queue that ChatClientHandler are monitoring. When a message comes in on | ||
+ | * this queue, ChatClientHandler sends it. | ||
+ | */ | ||
+ | BlockingQueue<String> writeQueue = new ArrayBlockingQueue<String>(2000); | ||
+ | |||
+ | /** Keeps track if the thread for this ChatClientHandler is alive. */ | ||
+ | volatile boolean alive; | ||
+ | /** | ||
+ | * Keeps track if their has been an error sending the message so the | ||
+ | * ChatRoom can clean it up. | ||
+ | */ | ||
+ | volatile boolean error; | ||
+ | /** The reason for the error */ | ||
+ | String reason; | ||
+ | /** Flag to indicate whether we should close or not. */ | ||
+ | volatile boolean close; | ||
+ | |||
+ | public ChatClientHandler(Session session, String name) { | ||
+ | this.session = session; | ||
+ | this.name = name; | ||
+ | } | ||
+ | |||
+ | /** Chat room calls this method to send a message. */ | ||
+ | public void sendMessage(String sendMessage) { | ||
+ | System.out.println("ChatClientHandler::::" + sendMessage); | ||
+ | this.writeQueue.offer(sendMessage); | ||
+ | } | ||
+ | |||
+ | @Override | ||
+ | public void run() { | ||
+ | alive = true; | ||
+ | reason = ""; | ||
+ | while (true) { | ||
+ | try { | ||
+ | if (Thread.currentThread().isInterrupted()) { | ||
+ | alive = false; | ||
+ | break; | ||
+ | } | ||
+ | |||
+ | if (close) { | ||
+ | break; | ||
+ | } | ||
+ | |||
+ | /* Get a message, send a message to client if not null. */ | ||
+ | String message = writeQueue.poll(1000, TimeUnit.SECONDS); | ||
+ | |||
+ | if (message != null) { | ||
+ | System.out | ||
+ | .printf("ChatClientHandler::::run loop::::SENDING MESSAGE %s to %s\n", | ||
+ | message, name); | ||
+ | session.getRemote().sendString(message); | ||
+ | } | ||
+ | } catch (InterruptedException e) { | ||
+ | alive = false; // Our thread is interrupted. We can be resumed. | ||
+ | break; | ||
+ | } catch (IOException e) { | ||
+ | alive = false; | ||
+ | error = true; | ||
+ | reason = e.getMessage(); | ||
+ | e.printStackTrace(); | ||
+ | break; | ||
+ | } | ||
+ | |||
+ | } | ||
+ | } | ||
+ | |||
+ | public String getName() { | ||
+ | return name; | ||
+ | } | ||
+ | |||
+ | public boolean isAlive() { | ||
+ | return alive; | ||
+ | } | ||
+ | |||
+ | public boolean isError() { | ||
+ | return error; | ||
+ | } | ||
+ | |||
+ | public String getReason() { | ||
+ | return reason; | ||
+ | } | ||
+ | |||
+ | @Override | ||
+ | public void close() throws IOException { | ||
+ | close = true; | ||
+ | if (session != null) { | ||
+ | session.close(); | ||
+ | } | ||
+ | } | ||
+ | |||
+ | } | ||
+ | </pre> | ||
+ | |||
+ | The HTML 5 / JavaScript client that uses this chat room..... | ||
+ | |||
+ | <pre> | ||
+ | <html> | ||
+ | |||
+ | <head> | ||
+ | |||
+ | <title>Chat Client</title> | ||
+ | |||
+ | <link rel="stylesheet" type="text/css" href="css/style.css" | ||
+ | media="screen" /> | ||
+ | |||
+ | |||
+ | <script type="text/javascript" src="scripts/jquery-1.7.1.js"></script> | ||
+ | |||
+ | |||
+ | <script type="text/javascript"> | ||
+ | var ENTER_KEY = '13'; | ||
+ | var TOKEN_DELIM = "::::"; | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | function buildWebSocketURL() { | ||
+ | var url = document.URL; | ||
+ | var parts = url.split('/'); | ||
+ | var scheme = parts[0]; | ||
+ | var hostPort = parts[2]; | ||
+ | var wssScheme = null; | ||
+ | |||
+ | |||
+ | if (scheme=="http:") { | ||
+ | wssScheme="ws:"; | ||
+ | } else if (scheme=="https:") { | ||
+ | wssScheme="wss:"; | ||
+ | } | ||
+ | |||
+ | wssUrl = wssScheme + "//" + hostPort + "/chat/chat"; | ||
+ | |||
+ | return wssUrl; | ||
+ | |||
+ | } | ||
+ | |||
+ | |||
+ | var chatUserSession = { | ||
+ | version : "1.0", | ||
+ | webSocketProtocol : "caucho-example-chat-protocol", | ||
+ | webSocketURL : buildWebSocketURL(), | ||
+ | webSocket : null,//WebSocket | ||
+ | userName : null | ||
+ | }; | ||
+ | |||
+ | function chat_sendMessage(message) { | ||
+ | $("#chatHistory").prepend("<p style='color:green'> ME : " + message + "</p>"); | ||
+ | |||
+ | chatUserSession.webSocket.send("send message" + TOKEN_DELIM | ||
+ | + chatUserSession.userName + TOKEN_DELIM + message); | ||
+ | } | ||
+ | |||
+ | function chat_joinChat() { | ||
+ | chatUserSession.webSocket.send("add client" + TOKEN_DELIM | ||
+ | + chatUserSession.userName); | ||
+ | } | ||
+ | |||
+ | function chat_leaveChat() { | ||
+ | chatUserSession.status(chatUserSession.userName + " is leaving chat"); | ||
+ | chatUserSession.webSocket.send("remove client" + TOKEN_DELIM | ||
+ | + chatUserSession.userName); | ||
+ | } | ||
+ | |||
+ | function chat_openWebSocket() { | ||
+ | chatUserSession.webSocket = new WebSocket(chatUserSession.webSocketURL, | ||
+ | chatUserSession.webSocketProtocol); | ||
+ | var socket = chatUserSession.webSocket; | ||
+ | |||
+ | socket.onmessage = function(msg) { | ||
+ | chatUserSession.onMessage(msg); | ||
+ | } | ||
+ | |||
+ | socket.onerror = function(errorEvent) { | ||
+ | chatUserSession.onError(errorEvent); | ||
+ | } | ||
+ | |||
+ | socket.onopen = function() { | ||
+ | chatUserSession.onOpen(); | ||
+ | } | ||
+ | socket.onclose = function(closeEvent) { | ||
+ | chatUserSession.onClose(closeEvent); | ||
+ | } | ||
+ | |||
+ | } | ||
+ | |||
+ | function chat_onMessage(msgEvent) { | ||
+ | chatUserSession.status("New Message :" + msgEvent.data); | ||
+ | |||
+ | $("#chatHistory").prepend("<p style='color:blue'>" + msgEvent.data + "</p>"); | ||
+ | } | ||
+ | |||
+ | function chat_Login() { | ||
+ | |||
+ | chatUserSession.userName = $("#userName").val(); | ||
+ | $("#loginDiv").hide(500); | ||
+ | $('#header').text( | ||
+ | "Chat Client (logging in...) : " + chatUserSession.userName); | ||
+ | |||
+ | chatUserSession.status(chatUserSession.userName + " is logging in..."); | ||
+ | chatUserSession.open(); | ||
+ | |||
+ | } | ||
+ | |||
+ | function chat_onOpen() { | ||
+ | chatUserSession.joinChat(); | ||
+ | chatUserSession.status("Chat Client (logged in) : " + chatUserSession.userName); | ||
+ | $('#header').text( | ||
+ | "Chat Client (logged in...) : " + chatUserSession.userName); | ||
+ | |||
+ | $("#inputArea").show(500); | ||
+ | $("#statusBar").show(500); | ||
+ | $("#chatInput").focus(); | ||
+ | } | ||
+ | |||
+ | function chat_Status(message) { | ||
+ | $('#statusBarPara1').text(message); | ||
+ | $("#statusBar").show(500); | ||
+ | |||
+ | } | ||
+ | |||
+ | function chat_onClose(closeEvent) { | ||
+ | $("#loginDiv").show(500); | ||
+ | $('#header').text( | ||
+ | "Chat Client (not connected) : " + chatUserSession.userName); | ||
+ | $('#statusBarPara1').text(chatUserSession.userName + " not logged in. " + | ||
+ | ":: Reason: " + closeEvent.reason + | ||
+ | " Code: " + closeEvent.code); | ||
+ | |||
+ | $("#inputArea").hide(500); | ||
+ | $("#statusBar").show(500); | ||
+ | |||
+ | $("#userName").val(chatUserSession.userName); | ||
+ | $("#userName").focus(); | ||
+ | } | ||
+ | |||
+ | function chat_onError(msg) { | ||
+ | $('#statusBarPara1').text(" Websocket error :" + JSON.stringfy(msg)); | ||
+ | $("#statusBar").show(500); | ||
+ | } | ||
+ | |||
+ | chatUserSession.open = chat_openWebSocket; | ||
+ | chatUserSession.onMessage = chat_onMessage; | ||
+ | chatUserSession.onOpen = chat_onOpen; | ||
+ | chatUserSession.login = chat_Login; | ||
+ | chatUserSession.onClose = chat_onClose; | ||
+ | chatUserSession.onError = chat_onError; | ||
+ | chatUserSession.joinChat = chat_joinChat; | ||
+ | chatUserSession.sendMessage = chat_sendMessage; | ||
+ | chatUserSession.leaveChat = chat_leaveChat; | ||
+ | chatUserSession.status = chat_Status; | ||
+ | |||
+ | $(document).ready(function() { | ||
+ | |||
+ | $("#inputArea").hide(); | ||
+ | $("#userName").focus(); | ||
+ | |||
+ | |||
+ | |||
+ | $("#statusBar").click(function() { | ||
+ | $("#statusBar").hide(300); | ||
+ | }); | ||
+ | |||
+ | $("#chatInput").keypress(function(event) { | ||
+ | |||
+ | var keycode = (event.keyCode ? event.keyCode : event.which); | ||
+ | if (keycode == ENTER_KEY) { | ||
+ | var textMessage = $("#chatInput").val(); | ||
+ | |||
+ | if (textMessage=="bye!") { | ||
+ | chatUserSession.leaveChat(); | ||
+ | } else { | ||
+ | $("#chatInput").val(""); | ||
+ | $("#hint").hide(500); | ||
+ | chatUserSession.sendMessage(textMessage); | ||
+ | } | ||
+ | } | ||
+ | event.stopPropagation(); | ||
+ | }); | ||
+ | |||
+ | $("#login").click(function(event) { | ||
+ | chatUserSession.login(); | ||
+ | event.stopPropagation(); | ||
+ | }); | ||
+ | |||
+ | $("#userName").keypress(function(event) { | ||
+ | var keycode = (event.keyCode ? event.keyCode : event.which); | ||
+ | if (keycode == ENTER_KEY) { | ||
+ | chatUserSession.login() | ||
+ | event.stopPropagation(); | ||
+ | } | ||
+ | }); | ||
+ | }); | ||
+ | </script> | ||
+ | |||
+ | </head> | ||
+ | |||
+ | <body> | ||
+ | |||
+ | <h1 id="header">Chat Client</h1> | ||
+ | |||
+ | |||
+ | <div id="statusBar"> | ||
+ | <p id="statusBarPara1">Welcome to Chat App, Click to hide</p> | ||
+ | </div> | ||
+ | |||
+ | <div id="loginDiv"> | ||
+ | User name <input id="userName" type="text" /> <input | ||
+ | id="login" type="submit" value="Login" /> | ||
+ | </div> | ||
+ | |||
+ | |||
+ | <div id="inputArea"> | ||
+ | <p id="hint">Type your message here and then hit return (entering in 'bye!' logs out)</p> | ||
+ | <input id="chatInput" type="text" value="" /> | ||
+ | </div> | ||
+ | |||
+ | <div id="chatHistoryDiv"> | ||
+ | <p id="chatHistory"></p> | ||
+ | </div> | ||
+ | |||
+ | |||
+ | </body> | ||
+ | |||
+ | </html> | ||
</pre> | </pre> |
Revision as of 00:00, 9 November 2012
ChatServer
package com.example; import java.net.URI; import java.net.URISyntaxException; import javax.inject.Inject; import javax.net.websocket.ContainerProvider; import javax.net.websocket.DefaultServerConfiguration; import javax.net.websocket.ServerContainer; import javax.net.websocket.ServerEndpointConfiguration; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import javax.servlet.annotation.WebListener; @WebListener public class ChatServer implements ServletContextListener{ @Inject ChatRoom room; @Override public void contextDestroyed(ServletContextEvent arg0) { } @Override public void contextInitialized(ServletContextEvent arg0) { ServerEndpointConfiguration serverConfiguration; try { serverConfiguration = new DefaultServerConfiguration(new URI("/chat")); ServerContainer serverContainer = ContainerProvider.getServerContainer(); serverContainer.publishServer(new ChatServerEndpoint(room),serverConfiguration); } catch (URISyntaxException e) { e.printStackTrace(); } } }
The Chat server injects a ChatRoom. Then it binds the ChatServerEndpoint to the URI "/chat".
ChatServerEndpoint
package com.example; import javax.net.websocket.Endpoint; import javax.net.websocket.Session; public class ChatServerEndpoint extends Endpoint{ ChatRoom room; public ChatServerEndpoint(ChatRoom room) { this.room = room; } @Override public void onOpen(Session session) { session.addMessageHandler(new ChatMessageHandler(room, session)); } }
The ChatServerEndpoint binds a ChatMessageHandler to the WebSocket session.
The ChatMessageHandler uses a chatRoom to send a message to other clients, when it gets a message.
package com.example; import javax.net.websocket.MessageHandler; import javax.net.websocket.Session; public class ChatMessageHandler implements MessageHandler.Text { Session session; ChatRoom chatRoom; public ChatMessageHandler(ChatRoom room, Session session) { this.session = session; this.chatRoom = room; } @Override public void onMessage(String message) { chatRoom.sendMessage(new ChatMessage(message, session)); } }
The ChatMessageHandler wraps the message in a ChatMessage.
ChatMessage
package com.example; import javax.net.websocket.Session; public class ChatMessage { String message; Session session; public ChatMessage(String message, Session session) { this.message = message; this.session = session; } public final String getMessage() { return message; } public final Session getSession() { return session; } }
ChatRoom
package com.example; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.ejb.Startup; import javax.enterprise.context.ApplicationScoped; @ApplicationScoped @Startup public class ChatRoom { Map<String, ChatClientHandler> chatClients = new HashMap<String, ChatClientHandler>(); BlockingQueue<ChatMessage> readQueue = new ArrayBlockingQueue<ChatMessage>( 2000); Executor executor; public ChatRoom() { executor = new ThreadPoolExecutor(10, 20, 90, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1)); (new Thread(new Runnable() { public void run() { ChatRoom.this.run(); } })).start(); } private void run() { while (true) { try { ChatMessage message = readQueue .poll(500, TimeUnit.MILLISECONDS); if (message != null) { dispatchMessage(message); } } catch (InterruptedException e) { break; } } } private void launchNewClient(ChatMessage message, String client) { System.out.println("launchNewClient::::" + client); ChatClientHandler chatClientHandler = new ChatClientHandler( message.getSession(), client); chatClients.put(chatClientHandler.getName(), chatClientHandler); executor.execute(chatClientHandler); doSendMessage(chatClientHandler.getName(), chatClientHandler.getName() + " has join the chat room"); } private void dispatchMessage(ChatMessage message) { String strMessage = message.getMessage(); String[] strings = strMessage.split("::::"); String clientId = strings[1]; if (strMessage.startsWith("remove client::::")) { removeClient(clientId); } else if (strMessage.startsWith("send message::::")) { doSendMessage(clientId, strings[2]); } else if (strMessage.startsWith("add client::::")) { launchNewClient(message, clientId); } else { System.err.println("ACK... Don't understand your message!!!!! " + message); } } private void doSendMessage(String client, String message) { String sendMessage = String.format("%s : %s", client, message); System.out.printf("sendMessage::Sending message %s\n", sendMessage); Iterator<ChatClientHandler> iterator = chatClients.values().iterator(); while (iterator.hasNext()) { ChatClientHandler chatClientHandler = iterator.next(); if (client.equals(chatClientHandler.getName())) // comment this if // you don't want to // echo back // messages continue; if (chatClientHandler.isError()) { iterator.remove(); try { chatClientHandler.close(); } catch (IOException e) { } continue; } if (!chatClientHandler.isAlive()) { // Could be kicked out of // executor pool. Kick it back // in. try { executor.execute(chatClientHandler); } catch (RejectedExecutionException ree) { iterator.remove(); } continue; } System.out.printf("sendMessage::Sending message %s to %s\n", sendMessage, chatClientHandler.getName()); chatClientHandler.sendMessage(sendMessage); } } private void removeClient(String client) { System.out.println("removeClient::::[" + client + "]::::"); ChatClientHandler chatClientHandler = chatClients.get(client); if (chatClientHandler != null) { System.out.println("removeClient:::: found " + client + " to remove."); doSendMessage(chatClientHandler.getName(), chatClientHandler.getName() + " has become bored with this chat room"); chatClients.remove(client); try { chatClientHandler.close(); } catch (IOException e) { } } } public void sendMessage(ChatMessage message) { try { readQueue.offer(message, 100, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new IllegalStateException("Unable to add message to queue"); } } }
ChatClientHandler
package com.example; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import javax.net.websocket.Session; public class ChatClientHandler implements Runnable, Closeable { /** Name of the client. */ String name; /** Session to send messages to the browser client. */ Session session; /** * Queue that ChatClientHandler are monitoring. When a message comes in on * this queue, ChatClientHandler sends it. */ BlockingQueue<String> writeQueue = new ArrayBlockingQueue<String>(2000); /** Keeps track if the thread for this ChatClientHandler is alive. */ volatile boolean alive; /** * Keeps track if their has been an error sending the message so the * ChatRoom can clean it up. */ volatile boolean error; /** The reason for the error */ String reason; /** Flag to indicate whether we should close or not. */ volatile boolean close; public ChatClientHandler(Session session, String name) { this.session = session; this.name = name; } /** Chat room calls this method to send a message. */ public void sendMessage(String sendMessage) { System.out.println("ChatClientHandler::::" + sendMessage); this.writeQueue.offer(sendMessage); } @Override public void run() { alive = true; reason = ""; while (true) { try { if (Thread.currentThread().isInterrupted()) { alive = false; break; } if (close) { break; } /* Get a message, send a message to client if not null. */ String message = writeQueue.poll(1000, TimeUnit.SECONDS); if (message != null) { System.out .printf("ChatClientHandler::::run loop::::SENDING MESSAGE %s to %s\n", message, name); session.getRemote().sendString(message); } } catch (InterruptedException e) { alive = false; // Our thread is interrupted. We can be resumed. break; } catch (IOException e) { alive = false; error = true; reason = e.getMessage(); e.printStackTrace(); break; } } } public String getName() { return name; } public boolean isAlive() { return alive; } public boolean isError() { return error; } public String getReason() { return reason; } @Override public void close() throws IOException { close = true; if (session != null) { session.close(); } } }
The HTML 5 / JavaScript client that uses this chat room.....
<html> <head> <title>Chat Client</title> <link rel="stylesheet" type="text/css" href="css/style.css" media="screen" /> <script type="text/javascript" src="scripts/jquery-1.7.1.js"></script> <script type="text/javascript"> var ENTER_KEY = '13'; var TOKEN_DELIM = "::::"; function buildWebSocketURL() { var url = document.URL; var parts = url.split('/'); var scheme = parts[0]; var hostPort = parts[2]; var wssScheme = null; if (scheme=="http:") { wssScheme="ws:"; } else if (scheme=="https:") { wssScheme="wss:"; } wssUrl = wssScheme + "//" + hostPort + "/chat/chat"; return wssUrl; } var chatUserSession = { version : "1.0", webSocketProtocol : "caucho-example-chat-protocol", webSocketURL : buildWebSocketURL(), webSocket : null,//WebSocket userName : null }; function chat_sendMessage(message) { $("#chatHistory").prepend("<p style='color:green'> ME : " + message + "</p>"); chatUserSession.webSocket.send("send message" + TOKEN_DELIM + chatUserSession.userName + TOKEN_DELIM + message); } function chat_joinChat() { chatUserSession.webSocket.send("add client" + TOKEN_DELIM + chatUserSession.userName); } function chat_leaveChat() { chatUserSession.status(chatUserSession.userName + " is leaving chat"); chatUserSession.webSocket.send("remove client" + TOKEN_DELIM + chatUserSession.userName); } function chat_openWebSocket() { chatUserSession.webSocket = new WebSocket(chatUserSession.webSocketURL, chatUserSession.webSocketProtocol); var socket = chatUserSession.webSocket; socket.onmessage = function(msg) { chatUserSession.onMessage(msg); } socket.onerror = function(errorEvent) { chatUserSession.onError(errorEvent); } socket.onopen = function() { chatUserSession.onOpen(); } socket.onclose = function(closeEvent) { chatUserSession.onClose(closeEvent); } } function chat_onMessage(msgEvent) { chatUserSession.status("New Message :" + msgEvent.data); $("#chatHistory").prepend("<p style='color:blue'>" + msgEvent.data + "</p>"); } function chat_Login() { chatUserSession.userName = $("#userName").val(); $("#loginDiv").hide(500); $('#header').text( "Chat Client (logging in...) : " + chatUserSession.userName); chatUserSession.status(chatUserSession.userName + " is logging in..."); chatUserSession.open(); } function chat_onOpen() { chatUserSession.joinChat(); chatUserSession.status("Chat Client (logged in) : " + chatUserSession.userName); $('#header').text( "Chat Client (logged in...) : " + chatUserSession.userName); $("#inputArea").show(500); $("#statusBar").show(500); $("#chatInput").focus(); } function chat_Status(message) { $('#statusBarPara1').text(message); $("#statusBar").show(500); } function chat_onClose(closeEvent) { $("#loginDiv").show(500); $('#header').text( "Chat Client (not connected) : " + chatUserSession.userName); $('#statusBarPara1').text(chatUserSession.userName + " not logged in. " + ":: Reason: " + closeEvent.reason + " Code: " + closeEvent.code); $("#inputArea").hide(500); $("#statusBar").show(500); $("#userName").val(chatUserSession.userName); $("#userName").focus(); } function chat_onError(msg) { $('#statusBarPara1').text(" Websocket error :" + JSON.stringfy(msg)); $("#statusBar").show(500); } chatUserSession.open = chat_openWebSocket; chatUserSession.onMessage = chat_onMessage; chatUserSession.onOpen = chat_onOpen; chatUserSession.login = chat_Login; chatUserSession.onClose = chat_onClose; chatUserSession.onError = chat_onError; chatUserSession.joinChat = chat_joinChat; chatUserSession.sendMessage = chat_sendMessage; chatUserSession.leaveChat = chat_leaveChat; chatUserSession.status = chat_Status; $(document).ready(function() { $("#inputArea").hide(); $("#userName").focus(); $("#statusBar").click(function() { $("#statusBar").hide(300); }); $("#chatInput").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { var textMessage = $("#chatInput").val(); if (textMessage=="bye!") { chatUserSession.leaveChat(); } else { $("#chatInput").val(""); $("#hint").hide(500); chatUserSession.sendMessage(textMessage); } } event.stopPropagation(); }); $("#login").click(function(event) { chatUserSession.login(); event.stopPropagation(); }); $("#userName").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { chatUserSession.login() event.stopPropagation(); } }); }); </script> </head> <body> <h1 id="header">Chat Client</h1> <div id="statusBar"> <p id="statusBarPara1">Welcome to Chat App, Click to hide</p> </div> <div id="loginDiv"> User name <input id="userName" type="text" /> <input id="login" type="submit" value="Login" /> </div> <div id="inputArea"> <p id="hint">Type your message here and then hit return (entering in 'bye!' logs out)</p> <input id="chatInput" type="text" value="" /> </div> <div id="chatHistoryDiv"> <p id="chatHistory"></p> </div> </body> </html>