WebSocket API Chat Room using JSR 356
From Resin 4.0 Wiki
(One intermediate revision by one user not shown) | |||
Line 1: | Line 1: | ||
− | |||
− | + | This version will scale to however many threads your box can run (64 bit OS running a multi core machine can run thousands of threads). | |
− | + | ||
− | + | There is another version that will run to however many FILE_DESCRIPTORS your OS supports using a poor man's actor model. | |
− | + | ||
− | + | [[WebSocket API Chat Room using JSR 356 and poor man's actor model]] | |
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | '''ChatServerWebSocketEndpoint''' | |
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
<pre> | <pre> | ||
− | package com.example; | + | package com.example.websocket; |
+ | import javax.enterprise.context.ApplicationScoped; | ||
+ | import javax.inject.Inject; | ||
import javax.net.websocket.Endpoint; | import javax.net.websocket.Endpoint; | ||
import javax.net.websocket.Session; | import javax.net.websocket.Session; | ||
− | + | import com.example.ChatRoom; | |
+ | |||
+ | public class ChatServerWebSocketEndpoint extends Endpoint{ | ||
+ | |||
+ | @Inject @ApplicationScoped | ||
ChatRoom room; | ChatRoom room; | ||
− | |||
− | |||
− | |||
− | |||
@Override | @Override | ||
public void onOpen(Session session) { | public void onOpen(Session session) { | ||
− | session.addMessageHandler(new | + | session.addMessageHandler(new ChatWebsocketMessageHandler(session, room)); |
} | } | ||
} | } | ||
− | |||
</pre> | </pre> | ||
− | The ''' | + | The '''ChatServerWebSocketEndpoint''' binds a '''ChatWebsocketMessageHandler''' to the WebSocket session. |
− | The ''' | + | The '''ChatWebsocketMessageHandler''' uses a chatRoom to send a message to other clients, when it gets a message. |
<pre> | <pre> | ||
− | package com.example; | + | package com.example.websocket; |
Line 84: | Line 46: | ||
import javax.net.websocket.Session; | import javax.net.websocket.Session; | ||
− | public class | + | import com.example.ChatMessage; |
+ | import com.example.ChatRoom; | ||
+ | |||
+ | public class ChatWebsocketMessageHandler implements MessageHandler.Text { | ||
− | |||
ChatRoom chatRoom; | ChatRoom chatRoom; | ||
+ | Session session; | ||
− | public | + | public ChatWebsocketMessageHandler(Session session, ChatRoom chatRoom) { |
this.session = session; | this.session = session; | ||
− | this.chatRoom = | + | this.chatRoom = chatRoom; |
} | } | ||
Line 102: | Line 67: | ||
</pre> | </pre> | ||
− | The ''' | + | The '''ChatWebsocketMessageHandler''' wraps the message in a '''ChatMessage'''. |
+ | The '''ChatMessage''' parses what type of chat message this is. | ||
Line 112: | Line 78: | ||
public class ChatMessage { | public class ChatMessage { | ||
+ | static enum MessageType{REMOVE_CLIENT, SEND_MESSAGE, ADD_CLIENT}; | ||
+ | |||
String message; | String message; | ||
Session session; | Session session; | ||
+ | MessageType type; | ||
+ | String clientId; | ||
+ | |||
− | public ChatMessage(String | + | public MessageType getType() { |
− | + | return type; | |
+ | } | ||
+ | |||
+ | public String getClientId() { | ||
+ | return clientId; | ||
+ | } | ||
+ | |||
+ | public ChatMessage(MessageType type, String clientId) { | ||
+ | |||
+ | } | ||
+ | public ChatMessage(String strMessage, Session session) { | ||
+ | |||
+ | String[] strings = strMessage.split("::::"); | ||
+ | clientId = strings[1]; | ||
this.session = session; | this.session = session; | ||
+ | |||
+ | |||
+ | if (strMessage.startsWith("remove client::::")) { | ||
+ | type = MessageType.REMOVE_CLIENT; | ||
+ | this.session = session; | ||
+ | } else if (strMessage.startsWith("send message::::")) { | ||
+ | type = MessageType.SEND_MESSAGE; | ||
+ | message = strings[2]; | ||
+ | } else if (strMessage.startsWith("add client::::")) { | ||
+ | type = MessageType.ADD_CLIENT; | ||
+ | } else { | ||
+ | System.err.println("ACK... Don't understand your message!!!!! " | ||
+ | + message); | ||
+ | } | ||
} | } | ||
Line 125: | Line 123: | ||
public final Session getSession() { | public final Session getSession() { | ||
return session; | return session; | ||
+ | } | ||
+ | |||
+ | @Override | ||
+ | public String toString() { | ||
+ | return "ChatMessage [message=" + message + ", session=" + session | ||
+ | + ", type=" + type + ", clientId=" + clientId + "]"; | ||
} | } | ||
} | } | ||
+ | |||
</pre> | </pre> | ||
Line 135: | Line 140: | ||
import java.io.IOException; | import java.io.IOException; | ||
− | import java. | + | import java.net.URI; |
import java.util.Iterator; | import java.util.Iterator; | ||
import java.util.Map; | import java.util.Map; | ||
import java.util.concurrent.ArrayBlockingQueue; | import java.util.concurrent.ArrayBlockingQueue; | ||
import java.util.concurrent.BlockingQueue; | import java.util.concurrent.BlockingQueue; | ||
+ | import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.Executor; | import java.util.concurrent.Executor; | ||
− | |||
− | |||
import java.util.concurrent.TimeUnit; | import java.util.concurrent.TimeUnit; | ||
+ | import javax.annotation.PostConstruct; | ||
import javax.ejb.Startup; | import javax.ejb.Startup; | ||
import javax.enterprise.context.ApplicationScoped; | import javax.enterprise.context.ApplicationScoped; | ||
+ | import javax.inject.Inject; | ||
+ | import javax.net.websocket.ContainerProvider; | ||
+ | import javax.net.websocket.DefaultServerConfiguration; | ||
+ | import javax.net.websocket.ServerContainer; | ||
+ | import javax.net.websocket.ServerEndpointConfiguration; | ||
+ | |||
+ | import com.example.ChatMessage.MessageType; | ||
+ | import com.example.websocket.ChatServerWebSocketEndpoint; | ||
@ApplicationScoped | @ApplicationScoped | ||
Line 152: | Line 165: | ||
public class ChatRoom { | public class ChatRoom { | ||
− | Map<String, | + | Map<String, ChatClientPeerHandler> chatClients = new ConcurrentHashMap<String, ChatClientPeerHandler>(); |
+ | |||
BlockingQueue<ChatMessage> readQueue = new ArrayBlockingQueue<ChatMessage>( | BlockingQueue<ChatMessage> readQueue = new ArrayBlockingQueue<ChatMessage>( | ||
2000); | 2000); | ||
− | Executor executor; | + | @Inject Executor executor; |
− | + | @Inject ChatServerWebSocketEndpoint endpoint; | |
− | + | ||
− | + | @PostConstruct | |
− | + | void init() throws Exception { | |
− | + | System.out.println("POST CONSTRUCT CALLED"); | |
− | ( | + | ServerEndpointConfiguration serverConfiguration; |
+ | serverConfiguration = new DefaultServerConfiguration(new URI("/chat")); | ||
+ | ServerContainer serverContainer = ContainerProvider.getServerContainer(); | ||
+ | serverContainer.publishServer(endpoint,serverConfiguration); | ||
+ | |||
+ | executor.execute(new Runnable() { | ||
public void run() { | public void run() { | ||
ChatRoom.this.run(); | ChatRoom.this.run(); | ||
− | } | + | }}); |
− | + | ||
} | } | ||
+ | |||
private void run() { | private void run() { | ||
while (true) { | while (true) { | ||
try { | try { | ||
− | |||
ChatMessage message = readQueue | ChatMessage message = readQueue | ||
.poll(500, TimeUnit.MILLISECONDS); | .poll(500, TimeUnit.MILLISECONDS); | ||
Line 179: | Line 197: | ||
} | } | ||
} catch (InterruptedException e) { | } catch (InterruptedException e) { | ||
− | + | if (Thread.currentThread().isInterrupted()) { | |
+ | Thread.interrupted(); | ||
+ | } | ||
} | } | ||
} | } | ||
} | } | ||
− | private void launchNewClient(ChatMessage message | + | private void launchNewClient(ChatMessage message) { |
− | System.out.println("launchNewClient::::" + | + | System.out.println("launchNewClient::::" + message.getClientId()); |
− | + | ||
− | message.getSession(), | + | ChatClientPeerHandler chatClientHandler = new ChatClientPeerHandler( |
+ | message.getSession(), message.getClientId()); | ||
chatClients.put(chatClientHandler.getName(), chatClientHandler); | chatClients.put(chatClientHandler.getName(), chatClientHandler); | ||
executor.execute(chatClientHandler); | executor.execute(chatClientHandler); | ||
Line 195: | Line 216: | ||
private void dispatchMessage(ChatMessage message) { | private void dispatchMessage(ChatMessage message) { | ||
+ | System.out.println("Dispatch Message::" + message); | ||
+ | |||
+ | switch(message.getType()) { | ||
+ | case ADD_CLIENT: | ||
+ | launchNewClient(message); | ||
+ | break; | ||
+ | case REMOVE_CLIENT: | ||
+ | doRemoveClient(message.getClientId()); | ||
+ | break; | ||
+ | case SEND_MESSAGE: | ||
+ | doSendMessage(message.getClientId(), message.getMessage()); | ||
+ | break; | ||
+ | default: | ||
+ | System.err.println("ACK... Don't understand your message!!!!! " | ||
+ | + message); | ||
+ | |||
+ | } | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
} | } | ||
Line 215: | Line 239: | ||
String sendMessage = String.format("%s : %s", client, message); | String sendMessage = String.format("%s : %s", client, message); | ||
− | System.out.printf(" | + | System.out.printf("SendMessage::Sending message %s\n", sendMessage); |
− | Iterator< | + | Iterator<ChatClientPeerHandler> iterator = chatClients.values().iterator(); |
while (iterator.hasNext()) { | while (iterator.hasNext()) { | ||
− | + | ChatClientPeerHandler chatClientHandler = iterator.next(); | |
− | + | ||
− | + | //prevents sending messages to yourself | |
− | + | if (client.equals(chatClientHandler.getName())){ | |
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | if ( | + | |
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
continue; | continue; | ||
} | } | ||
System.out.printf("sendMessage::Sending message %s to %s\n", | System.out.printf("sendMessage::Sending message %s to %s\n", | ||
− | + | sendMessage, chatClientHandler.getName()); | |
− | + | ||
chatClientHandler.sendMessage(sendMessage); | chatClientHandler.sendMessage(sendMessage); | ||
− | |||
} | } | ||
} | } | ||
− | private void | + | private void doRemoveClient(String client) { |
System.out.println("removeClient::::[" + client + "]::::"); | System.out.println("removeClient::::[" + client + "]::::"); | ||
− | + | ChatClientPeerHandler chatClientHandler = chatClients.get(client); | |
if (chatClientHandler != null) { | if (chatClientHandler != null) { | ||
Line 280: | Line 281: | ||
public void sendMessage(ChatMessage message) { | public void sendMessage(ChatMessage message) { | ||
try { | try { | ||
− | readQueue.offer(message, | + | readQueue.offer(message, 10, TimeUnit.SECONDS); |
} catch (InterruptedException e) { | } catch (InterruptedException e) { | ||
− | + | if (Thread.currentThread().isInterrupted()) { | |
+ | Thread.interrupted(); | ||
+ | } | ||
+ | e.printStackTrace(); | ||
+ | } | ||
+ | } | ||
+ | |||
+ | public void removeClient(String clientId) { | ||
+ | ChatMessage message = new ChatMessage(MessageType.REMOVE_CLIENT, clientId); | ||
+ | |||
+ | try { | ||
+ | readQueue.offer(message, 100, TimeUnit.SECONDS); | ||
+ | } catch (InterruptedException e) { | ||
+ | if (Thread.currentThread().isInterrupted()) { | ||
+ | Thread.interrupted(); | ||
+ | } | ||
+ | e.printStackTrace(); | ||
} | } | ||
} | } | ||
} | } | ||
− | |||
</pre> | </pre> | ||
− | ''' | + | The '''ChatClientPeerHandler''' manages sending messages back to the clients. |
+ | |||
+ | '''ChatClientPeerHandler''' | ||
<pre> | <pre> | ||
Line 301: | Line 319: | ||
import java.util.concurrent.TimeUnit; | import java.util.concurrent.TimeUnit; | ||
+ | import javax.enterprise.context.ApplicationScoped; | ||
+ | import javax.inject.Inject; | ||
import javax.net.websocket.Session; | import javax.net.websocket.Session; | ||
− | public class | + | public class ChatClientPeerHandler implements Runnable, Closeable { |
/** Name of the client. */ | /** Name of the client. */ | ||
− | String | + | String clientId; |
/** Session to send messages to the browser client. */ | /** Session to send messages to the browser client. */ | ||
Line 316: | Line 336: | ||
*/ | */ | ||
BlockingQueue<String> writeQueue = new ArrayBlockingQueue<String>(2000); | BlockingQueue<String> writeQueue = new ArrayBlockingQueue<String>(2000); | ||
+ | |||
+ | @Inject @ApplicationScoped | ||
+ | ChatRoom room; | ||
+ | |||
/** Keeps track if the thread for this ChatClientHandler is alive. */ | /** Keeps track if the thread for this ChatClientHandler is alive. */ | ||
volatile boolean alive; | volatile boolean alive; | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
− | + | ||
/** Flag to indicate whether we should close or not. */ | /** Flag to indicate whether we should close or not. */ | ||
volatile boolean close; | volatile boolean close; | ||
− | public | + | public ChatClientPeerHandler(Session session, String name) { |
this.session = session; | this.session = session; | ||
− | this. | + | this.clientId = name; |
} | } | ||
Line 343: | Line 361: | ||
public void run() { | public void run() { | ||
alive = true; | alive = true; | ||
− | |||
while (true) { | while (true) { | ||
try { | try { | ||
− | if ( | + | |
+ | if (close) { | ||
alive = false; | alive = false; | ||
− | |||
− | |||
− | |||
− | |||
break; | break; | ||
} | } | ||
/* Get a message, send a message to client if not null. */ | /* Get a message, send a message to client if not null. */ | ||
− | String message = writeQueue.poll( | + | String message = writeQueue.poll(60, TimeUnit.SECONDS); |
if (message != null) { | if (message != null) { | ||
System.out | System.out | ||
.printf("ChatClientHandler::::run loop::::SENDING MESSAGE %s to %s\n", | .printf("ChatClientHandler::::run loop::::SENDING MESSAGE %s to %s\n", | ||
− | message, | + | message, clientId); |
session.getRemote().sendString(message); | session.getRemote().sendString(message); | ||
} | } | ||
} catch (InterruptedException e) { | } catch (InterruptedException e) { | ||
− | + | if (Thread.currentThread().isInterrupted()) { | |
− | + | Thread.interrupted(); | |
+ | } | ||
} catch (IOException e) { | } catch (IOException e) { | ||
alive = false; | alive = false; | ||
− | |||
− | |||
e.printStackTrace(); | e.printStackTrace(); | ||
break; | break; | ||
Line 379: | Line 392: | ||
public String getName() { | public String getName() { | ||
− | return | + | return clientId; |
} | } | ||
Line 386: | Line 399: | ||
} | } | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
@Override | @Override | ||
Line 400: | Line 406: | ||
session.close(); | session.close(); | ||
} | } | ||
+ | room.removeClient(this.clientId); | ||
} | } | ||
Latest revision as of 00:00, 14 November 2012
This version will scale to however many threads your box can run (64 bit OS running a multi core machine can run thousands of threads).
There is another version that will run to however many FILE_DESCRIPTORS your OS supports using a poor man's actor model.
WebSocket API Chat Room using JSR 356 and poor man's actor model
ChatServerWebSocketEndpoint
package com.example.websocket; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.net.websocket.Endpoint; import javax.net.websocket.Session; import com.example.ChatRoom; public class ChatServerWebSocketEndpoint extends Endpoint{ @Inject @ApplicationScoped ChatRoom room; @Override public void onOpen(Session session) { session.addMessageHandler(new ChatWebsocketMessageHandler(session, room)); } }
The ChatServerWebSocketEndpoint binds a ChatWebsocketMessageHandler to the WebSocket session.
The ChatWebsocketMessageHandler uses a chatRoom to send a message to other clients, when it gets a message.
package com.example.websocket; import javax.net.websocket.MessageHandler; import javax.net.websocket.Session; import com.example.ChatMessage; import com.example.ChatRoom; public class ChatWebsocketMessageHandler implements MessageHandler.Text { ChatRoom chatRoom; Session session; public ChatWebsocketMessageHandler(Session session, ChatRoom chatRoom) { this.session = session; this.chatRoom = chatRoom; } @Override public void onMessage(String message) { chatRoom.sendMessage(new ChatMessage(message, session)); } }
The ChatWebsocketMessageHandler wraps the message in a ChatMessage. The ChatMessage parses what type of chat message this is.
ChatMessage
package com.example; import javax.net.websocket.Session; public class ChatMessage { static enum MessageType{REMOVE_CLIENT, SEND_MESSAGE, ADD_CLIENT}; String message; Session session; MessageType type; String clientId; public MessageType getType() { return type; } public String getClientId() { return clientId; } public ChatMessage(MessageType type, String clientId) { } public ChatMessage(String strMessage, Session session) { String[] strings = strMessage.split("::::"); clientId = strings[1]; this.session = session; if (strMessage.startsWith("remove client::::")) { type = MessageType.REMOVE_CLIENT; this.session = session; } else if (strMessage.startsWith("send message::::")) { type = MessageType.SEND_MESSAGE; message = strings[2]; } else if (strMessage.startsWith("add client::::")) { type = MessageType.ADD_CLIENT; } else { System.err.println("ACK... Don't understand your message!!!!! " + message); } } public final String getMessage() { return message; } public final Session getSession() { return session; } @Override public String toString() { return "ChatMessage [message=" + message + ", session=" + session + ", type=" + type + ", clientId=" + clientId + "]"; } }
ChatRoom
package com.example; import java.io.IOException; import java.net.URI; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.ejb.Startup; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.net.websocket.ContainerProvider; import javax.net.websocket.DefaultServerConfiguration; import javax.net.websocket.ServerContainer; import javax.net.websocket.ServerEndpointConfiguration; import com.example.ChatMessage.MessageType; import com.example.websocket.ChatServerWebSocketEndpoint; @ApplicationScoped @Startup public class ChatRoom { Map<String, ChatClientPeerHandler> chatClients = new ConcurrentHashMap<String, ChatClientPeerHandler>(); BlockingQueue<ChatMessage> readQueue = new ArrayBlockingQueue<ChatMessage>( 2000); @Inject Executor executor; @Inject ChatServerWebSocketEndpoint endpoint; @PostConstruct void init() throws Exception { System.out.println("POST CONSTRUCT CALLED"); ServerEndpointConfiguration serverConfiguration; serverConfiguration = new DefaultServerConfiguration(new URI("/chat")); ServerContainer serverContainer = ContainerProvider.getServerContainer(); serverContainer.publishServer(endpoint,serverConfiguration); executor.execute(new Runnable() { public void run() { ChatRoom.this.run(); }}); } private void run() { while (true) { try { ChatMessage message = readQueue .poll(500, TimeUnit.MILLISECONDS); if (message != null) { dispatchMessage(message); } } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } } } } private void launchNewClient(ChatMessage message) { System.out.println("launchNewClient::::" + message.getClientId()); ChatClientPeerHandler chatClientHandler = new ChatClientPeerHandler( message.getSession(), message.getClientId()); chatClients.put(chatClientHandler.getName(), chatClientHandler); executor.execute(chatClientHandler); doSendMessage(chatClientHandler.getName(), chatClientHandler.getName() + " has join the chat room"); } private void dispatchMessage(ChatMessage message) { System.out.println("Dispatch Message::" + message); switch(message.getType()) { case ADD_CLIENT: launchNewClient(message); break; case REMOVE_CLIENT: doRemoveClient(message.getClientId()); break; case SEND_MESSAGE: doSendMessage(message.getClientId(), message.getMessage()); break; default: System.err.println("ACK... Don't understand your message!!!!! " + message); } } private void doSendMessage(String client, String message) { String sendMessage = String.format("%s : %s", client, message); System.out.printf("SendMessage::Sending message %s\n", sendMessage); Iterator<ChatClientPeerHandler> iterator = chatClients.values().iterator(); while (iterator.hasNext()) { ChatClientPeerHandler chatClientHandler = iterator.next(); //prevents sending messages to yourself if (client.equals(chatClientHandler.getName())){ continue; } System.out.printf("sendMessage::Sending message %s to %s\n", sendMessage, chatClientHandler.getName()); chatClientHandler.sendMessage(sendMessage); } } private void doRemoveClient(String client) { System.out.println("removeClient::::[" + client + "]::::"); ChatClientPeerHandler chatClientHandler = chatClients.get(client); if (chatClientHandler != null) { System.out.println("removeClient:::: found " + client + " to remove."); doSendMessage(chatClientHandler.getName(), chatClientHandler.getName() + " has become bored with this chat room"); chatClients.remove(client); try { chatClientHandler.close(); } catch (IOException e) { } } } public void sendMessage(ChatMessage message) { try { readQueue.offer(message, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } e.printStackTrace(); } } public void removeClient(String clientId) { ChatMessage message = new ChatMessage(MessageType.REMOVE_CLIENT, clientId); try { readQueue.offer(message, 100, TimeUnit.SECONDS); } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } e.printStackTrace(); } } }
The ChatClientPeerHandler manages sending messages back to the clients.
ChatClientPeerHandler
package com.example; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.net.websocket.Session; public class ChatClientPeerHandler implements Runnable, Closeable { /** Name of the client. */ String clientId; /** Session to send messages to the browser client. */ Session session; /** * Queue that ChatClientHandler are monitoring. When a message comes in on * this queue, ChatClientHandler sends it. */ BlockingQueue<String> writeQueue = new ArrayBlockingQueue<String>(2000); @Inject @ApplicationScoped ChatRoom room; /** Keeps track if the thread for this ChatClientHandler is alive. */ volatile boolean alive; /** Flag to indicate whether we should close or not. */ volatile boolean close; public ChatClientPeerHandler(Session session, String name) { this.session = session; this.clientId = name; } /** Chat room calls this method to send a message. */ public void sendMessage(String sendMessage) { System.out.println("ChatClientHandler::::" + sendMessage); this.writeQueue.offer(sendMessage); } @Override public void run() { alive = true; while (true) { try { if (close) { alive = false; break; } /* Get a message, send a message to client if not null. */ String message = writeQueue.poll(60, TimeUnit.SECONDS); if (message != null) { System.out .printf("ChatClientHandler::::run loop::::SENDING MESSAGE %s to %s\n", message, clientId); session.getRemote().sendString(message); } } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } } catch (IOException e) { alive = false; e.printStackTrace(); break; } } } public String getName() { return clientId; } public boolean isAlive() { return alive; } @Override public void close() throws IOException { close = true; if (session != null) { session.close(); } room.removeClient(this.clientId); } }
The HTML 5 / JavaScript client that uses this chat room.....
<html> <head> <title>Chat Client</title> <link rel="stylesheet" type="text/css" href="css/style.css" media="screen" /> <script type="text/javascript" src="scripts/jquery-1.7.1.js"></script> <script type="text/javascript"> var ENTER_KEY = '13'; var TOKEN_DELIM = "::::"; function buildWebSocketURL() { var url = document.URL; var parts = url.split('/'); var scheme = parts[0]; var hostPort = parts[2]; var wssScheme = null; if (scheme=="http:") { wssScheme="ws:"; } else if (scheme=="https:") { wssScheme="wss:"; } wssUrl = wssScheme + "//" + hostPort + "/chat/chat"; return wssUrl; } var chatUserSession = { version : "1.0", webSocketProtocol : "caucho-example-chat-protocol", webSocketURL : buildWebSocketURL(), webSocket : null,//WebSocket userName : null }; function chat_sendMessage(message) { $("#chatHistory").prepend("<p style='color:green'> ME : " + message + "</p>"); chatUserSession.webSocket.send("send message" + TOKEN_DELIM + chatUserSession.userName + TOKEN_DELIM + message); } function chat_joinChat() { chatUserSession.webSocket.send("add client" + TOKEN_DELIM + chatUserSession.userName); } function chat_leaveChat() { chatUserSession.status(chatUserSession.userName + " is leaving chat"); chatUserSession.webSocket.send("remove client" + TOKEN_DELIM + chatUserSession.userName); } function chat_openWebSocket() { chatUserSession.webSocket = new WebSocket(chatUserSession.webSocketURL, chatUserSession.webSocketProtocol); var socket = chatUserSession.webSocket; socket.onmessage = function(msg) { chatUserSession.onMessage(msg); } socket.onerror = function(errorEvent) { chatUserSession.onError(errorEvent); } socket.onopen = function() { chatUserSession.onOpen(); } socket.onclose = function(closeEvent) { chatUserSession.onClose(closeEvent); } } function chat_onMessage(msgEvent) { chatUserSession.status("New Message :" + msgEvent.data); $("#chatHistory").prepend("<p style='color:blue'>" + msgEvent.data + "</p>"); } function chat_Login() { chatUserSession.userName = $("#userName").val(); $("#loginDiv").hide(500); $('#header').text( "Chat Client (logging in...) : " + chatUserSession.userName); chatUserSession.status(chatUserSession.userName + " is logging in..."); chatUserSession.open(); } function chat_onOpen() { chatUserSession.joinChat(); chatUserSession.status("Chat Client (logged in) : " + chatUserSession.userName); $('#header').text( "Chat Client (logged in...) : " + chatUserSession.userName); $("#inputArea").show(500); $("#statusBar").show(500); $("#chatInput").focus(); } function chat_Status(message) { $('#statusBarPara1').text(message); $("#statusBar").show(500); } function chat_onClose(closeEvent) { $("#loginDiv").show(500); $('#header').text( "Chat Client (not connected) : " + chatUserSession.userName); $('#statusBarPara1').text(chatUserSession.userName + " not logged in. " + ":: Reason: " + closeEvent.reason + " Code: " + closeEvent.code); $("#inputArea").hide(500); $("#statusBar").show(500); $("#userName").val(chatUserSession.userName); $("#userName").focus(); } function chat_onError(msg) { $('#statusBarPara1').text(" Websocket error :" + JSON.stringfy(msg)); $("#statusBar").show(500); } chatUserSession.open = chat_openWebSocket; chatUserSession.onMessage = chat_onMessage; chatUserSession.onOpen = chat_onOpen; chatUserSession.login = chat_Login; chatUserSession.onClose = chat_onClose; chatUserSession.onError = chat_onError; chatUserSession.joinChat = chat_joinChat; chatUserSession.sendMessage = chat_sendMessage; chatUserSession.leaveChat = chat_leaveChat; chatUserSession.status = chat_Status; $(document).ready(function() { $("#inputArea").hide(); $("#userName").focus(); $("#statusBar").click(function() { $("#statusBar").hide(300); }); $("#chatInput").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { var textMessage = $("#chatInput").val(); if (textMessage=="bye!") { chatUserSession.leaveChat(); } else { $("#chatInput").val(""); $("#hint").hide(500); chatUserSession.sendMessage(textMessage); } } event.stopPropagation(); }); $("#login").click(function(event) { chatUserSession.login(); event.stopPropagation(); }); $("#userName").keypress(function(event) { var keycode = (event.keyCode ? event.keyCode : event.which); if (keycode == ENTER_KEY) { chatUserSession.login() event.stopPropagation(); } }); }); </script> </head> <body> <h1 id="header">Chat Client</h1> <div id="statusBar"> <p id="statusBarPara1">Welcome to Chat App, Click to hide</p> </div> <div id="loginDiv"> User name <input id="userName" type="text" /> <input id="login" type="submit" value="Login" /> </div> <div id="inputArea"> <p id="hint">Type your message here and then hit return (entering in 'bye!' logs out)</p> <input id="chatInput" type="text" value="" /> </div> <div id="chatHistoryDiv"> <p id="chatHistory"></p> </div> </body> </html>