WebSocket API Chat Room using JSR 356 and poor man's actor model
From Resin 4.0 Wiki
(Difference between revisions)
(Created page with " '''ChatClientPeerHandler''' <pre> package com.example; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.uti...") |
|||
Line 1: | Line 1: | ||
− | + | This one should scale to however many FILE handles your OS allows. | |
+ | You should be able to have 50,000 or 100,000 connections. | ||
'''ChatClientPeerHandler''' | '''ChatClientPeerHandler''' | ||
Line 62: | Line 63: | ||
public void run() { | public void run() { | ||
String message; | String message; | ||
− | + | ||
− | + | ||
/* Get messages, send a message to client if not null, if message is null break. */ | /* Get messages, send a message to client if not null, if message is null break. */ | ||
− | while ((writeQueue. | + | while ((message = writeQueue.poll()) != null) { |
try { | try { | ||
− | |||
if (close) { | if (close) { | ||
break; | break; | ||
Line 75: | Line 74: | ||
e.printStackTrace(); | e.printStackTrace(); | ||
break; | break; | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
} | } | ||
− | |||
} | } | ||
} | } |
Revision as of 00:00, 14 November 2012
This one should scale to however many FILE handles your OS allows. You should be able to have 50,000 or 100,000 connections.
ChatClientPeerHandler
package com.example; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import javax.net.websocket.Session; import com.example.concurrent.ThreadedQueue; public class ChatClientPeerHandler implements Runnable, Closeable { /** Name of the client. */ String clientId; /** Session to send messages to the browser client. */ Session session; /** * Queue that ChatClientHandler are monitoring. When a message comes in on * this queue, ChatClientHandler sends it. */ BlockingQueue<String> writeQueue; ChatRoom room; /** Flag to indicate whether we should close or not. */ volatile boolean close; public ChatClientPeerHandler(ChatRoom room, Session session, String name, Executor executor) { this.session = session; this.clientId = name; this.room = room; writeQueue = new ThreadedQueue<String>(new ArrayBlockingQueue<String>(2000), this, executor); } /** Chat room calls this method to send a message. */ public void sendMessage(String sendMessage) { System.out.println("" + this + "sendMessage(" + sendMessage); try { /* We could do some error handling here * We could try to writeQueue.offer() or writeQueue.add() and if the queue is full, kill this client. * We could assume if you are backed up 2000 messages, that you are not active for example * This would be a biz logic decision and since this is a sample app... */ this.writeQueue.put(sendMessage); } catch (InterruptedException e) { //this should never happen } } @Override public void run() { String message; /* Get messages, send a message to client if not null, if message is null break. */ while ((message = writeQueue.poll()) != null) { try { if (close) { break; } session.getRemote().sendString(message); } catch (IOException e) { e.printStackTrace(); break; } } } public String getName() { return clientId; } @Override public void close() throws IOException { close = true; if (session != null) { session.close(); } room.removeClient(this.clientId); } @Override public String toString() { return "ChatClientPeerHandler [clientId=" + clientId + "]"; } }
ChatMessage
package com.example; import javax.net.websocket.Session; public class ChatMessage { static enum MessageType{REMOVE_CLIENT, SEND_MESSAGE, ADD_CLIENT}; String message; Session session; MessageType type; String clientId; public MessageType getType() { return type; } public String getClientId() { return clientId; } public ChatMessage(MessageType type, String clientId) { } public ChatMessage(String strMessage, Session session) { String[] strings = strMessage.split("::::"); clientId = strings[1]; this.session = session; if (strMessage.startsWith("remove client::::")) { type = MessageType.REMOVE_CLIENT; this.session = session; } else if (strMessage.startsWith("send message::::")) { type = MessageType.SEND_MESSAGE; message = strings[2]; } else if (strMessage.startsWith("add client::::")) { type = MessageType.ADD_CLIENT; } else { System.err.println("ACK... Don't understand your message!!!!! " + message); } } public final String getMessage() { return message; } public final Session getSession() { return session; } @Override public String toString() { return "ChatMessage [message=" + message + ", session=" + session + ", type=" + type + ", clientId=" + clientId + "]"; } }
ChatRoom
package com.example; import java.io.IOException; import java.net.URI; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.ejb.Startup; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.net.websocket.ContainerProvider; import javax.net.websocket.DefaultServerConfiguration; import javax.net.websocket.ServerContainer; import javax.net.websocket.ServerEndpointConfiguration; import com.example.ChatMessage.MessageType; import com.example.websocket.ChatServerWebSocketEndpoint; @ApplicationScoped @Startup public class ChatRoom { Map<String, ChatClientPeerHandler> chatClients = new ConcurrentHashMap<String, ChatClientPeerHandler>(); BlockingQueue<ChatMessage> readQueue = new ArrayBlockingQueue<ChatMessage>( 2000); @Inject Executor executor; @Inject ChatServerWebSocketEndpoint endpoint; @PostConstruct void init() throws Exception { System.out.println("POST CONSTRUCT CALLED"); ServerEndpointConfiguration serverConfiguration; serverConfiguration = new DefaultServerConfiguration(new URI("/chat")); ServerContainer serverContainer = ContainerProvider.getServerContainer(); serverContainer.publishServer(endpoint,serverConfiguration); executor.execute(new Runnable() { public void run() { ChatRoom.this.run(); }}); } private void run() { while (true) { try { ChatMessage message = readQueue .poll(500, TimeUnit.MILLISECONDS); if (message != null) { dispatchMessage(message); } } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } } } } private void launchNewClient(ChatMessage message) { System.out.println("launchNewClient::::" + message.getClientId()); ChatClientPeerHandler chatClientHandler = new ChatClientPeerHandler(this, message.getSession(), message.getClientId(), executor); chatClients.put(chatClientHandler.getName(), chatClientHandler); doSendMessage(chatClientHandler.getName(), chatClientHandler.getName() + " has join the chat room"); } private void dispatchMessage(ChatMessage message) { System.out.println("Dispatch Message::" + message); switch(message.getType()) { case ADD_CLIENT: launchNewClient(message); break; case REMOVE_CLIENT: doRemoveClient(message.getClientId()); break; case SEND_MESSAGE: doSendMessage(message.getClientId(), message.getMessage()); break; default: System.err.println("ACK... Don't understand your message!!!!! " + message); } } private void doSendMessage(String client, String message) { String sendMessage = String.format("%s : %s", client, message); System.out.printf("SendMessage::Sending message %s\n", sendMessage); Iterator<ChatClientPeerHandler> iterator = chatClients.values().iterator(); while (iterator.hasNext()) { ChatClientPeerHandler chatClientHandler = iterator.next(); //prevents sending messages to yourself if (client.equals(chatClientHandler.getName())){ continue; } System.out.printf("sendMessage::Sending message %s to %s\n", sendMessage, chatClientHandler.getName()); chatClientHandler.sendMessage(sendMessage); } } private void doRemoveClient(String client) { System.out.println("removeClient::::[" + client + "]::::"); ChatClientPeerHandler chatClientHandler = chatClients.get(client); if (chatClientHandler != null) { System.out.println("removeClient:::: found " + client + " to remove."); doSendMessage(chatClientHandler.getName(), chatClientHandler.getName() + " has become bored with this chat room"); chatClients.remove(client); try { chatClientHandler.close(); } catch (IOException e) { } } } public void sendMessage(ChatMessage message) { try { readQueue.offer(message, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } e.printStackTrace(); } } public void removeClient(String clientId) { ChatMessage message = new ChatMessage(MessageType.REMOVE_CLIENT, clientId); try { readQueue.offer(message, 100, TimeUnit.SECONDS); } catch (InterruptedException e) { if (Thread.currentThread().isInterrupted()) { Thread.interrupted(); } e.printStackTrace(); } } }
ThreadedQueue AKA poor man's Actor model
package com.example.concurrent; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * A Queue that wakes up a runnable every time something is added to it. * This is a poor man's Actor. The thread is tied to the queue. * If you put an item in and there is not a thread, one will be launched. */ public class ThreadedQueue<T> implements BlockingQueue<T> { BlockingQueue<T> queue; Executor executor; Runnable queueProcessor; AtomicBoolean active = new AtomicBoolean(); public ThreadedQueue(BlockingQueue<T> queue, Runnable queueProcessor, Executor executor) { this.queue = queue; this.queueProcessor = queueProcessor; this.executor = executor; } @Override public boolean isEmpty() { return queue.isEmpty(); } @Override public int size() { return queue.size(); } @Override public int remainingCapacity() { return queue.remainingCapacity(); } @Override public boolean contains(Object o) { return queue.contains(o); } @Override public boolean containsAll(Collection<?> c) { return queue.containsAll(c); } @Override public boolean offer(T item) { boolean result = queue.offer(item); launchRunnable(); return result; } @Override public boolean offer(T item, long timeout, TimeUnit unit) throws InterruptedException { boolean result = queue.offer(item); launchRunnable(); if (result) { return result; } else { result = queue.offer(item, timeout, unit); launchRunnable(); } return result; } @Override public boolean add(T item) { boolean result = queue.add(item); launchRunnable(); return result; } @Override public void put(T item) throws InterruptedException { System.out.println("PUT CALLED " + item); /* Offer it if you can, this is non-blocking. */ if (queue.offer(item)) { launchRunnable(); return; } /* If you were unable to offer it, then go ahead and block. */ queue.put(item); launchRunnable(); } @Override public boolean addAll(Collection<? extends T> c) { boolean result = queue.addAll(c); launchRunnable(); return result; } @Override public T peek() { return queue.peek(); } @Override public T poll() { return queue.poll(); } @Override public T poll(long timeout, TimeUnit unit) throws InterruptedException { return queue.poll(timeout, unit); } @Override public T element() { return queue.element(); } @Override public T take() throws InterruptedException { return queue.take(); } @Override public T remove() { return queue.remove(); } @Override public Iterator<T> iterator() { return queue.iterator(); } @Override public Object[] toArray() { return queue.toArray(); } @Override public <X> X[] toArray(X[] array) { return queue.toArray(array); } @Override public boolean removeAll(Collection<?> c) { return queue.removeAll(c); } @Override public boolean retainAll(Collection<?> c) { return queue.retainAll(c); } @Override public void clear() { queue.clear(); } @Override public boolean remove(Object o) { return queue.remove(o); } @Override public int drainTo(Collection<? super T> c) { return queue.drainTo(c); } @Override public int drainTo(Collection<? super T> c, int maxElements) { return queue.drainTo(c, maxElements); } public void launchRunnable() { System.out.println("LAUNCH RUNNABLE"); if (queue.isEmpty()) { return; } if (active.compareAndSet(false, true)) { executor.execute( new Runnable() { @Override public void run() { try { queueProcessor.run(); } finally { active.set(false); } } }); } } }
ChatServerWebSocketEndpoint
package com.example.websocket; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.net.websocket.Endpoint; import javax.net.websocket.Session; import com.example.ChatRoom; public class ChatServerWebSocketEndpoint extends Endpoint{ @Inject @ApplicationScoped ChatRoom room; @Override public void onOpen(Session session) { session.addMessageHandler(new ChatWebsocketMessageHandler(session, room)); } }
ChatWebsocketMessageHandler
package com.example.websocket; import javax.net.websocket.MessageHandler; import javax.net.websocket.Session; import com.example.ChatMessage; import com.example.ChatRoom; public class ChatWebsocketMessageHandler implements MessageHandler.Text { ChatRoom chatRoom; Session session; public ChatWebsocketMessageHandler(Session session, ChatRoom chatRoom) { this.session = session; this.chatRoom = chatRoom; } @Override public void onMessage(String message) { chatRoom.sendMessage(new ChatMessage(message, session)); } }