WebSocket API Chat Room using JSR 356 and poor man's actor model

From Resin 4.0 Wiki

Revision as of 00:00, 14 November 2012 by Rick (Talk | contribs)
Jump to: navigation, search

This one should scale to however many FILE handles your OS allows. You should be able to have 50,000 or 100,000 connections.

ChatClientPeerHandler

package com.example;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;

import javax.net.websocket.Session;

import com.example.concurrent.ThreadedQueue;

public class ChatClientPeerHandler implements Runnable, Closeable {

    /** Name of the client. */
    String clientId;

    /** Session to send messages to the browser client. */
    Session session;

    /**
     * Queue that ChatClientHandler are monitoring. When a message comes in on
     * this queue, ChatClientHandler sends it.
     */
    BlockingQueue<String> writeQueue;
    
    ChatRoom room;

    /** Flag to indicate whether we should close or not. */
    volatile boolean close;


    public ChatClientPeerHandler(ChatRoom room, Session session, String name, Executor executor) {
        this.session = session;
        this.clientId = name;
        this.room = room;
        
        writeQueue = new ThreadedQueue<String>(new ArrayBlockingQueue<String>(2000), this,  executor);
        
    }

    /** Chat room calls this method to send a message. */
    public void sendMessage(String sendMessage) {
        System.out.println("" + this + "sendMessage(" + sendMessage);
        try {
            /* We could do some error handling here
             * We could try to writeQueue.offer() or writeQueue.add() and if the queue is full, kill this client.
             * We could assume if you are backed up 2000 messages, that you are not active for example
             * This would be a biz logic decision and since this is a sample app... 
             */
            this.writeQueue.put(sendMessage);
        } catch (InterruptedException e) {
            //this should never happen
        }
    }

    @Override
    public void run() {
        String message;

        /* Get messages, send a message to client if not null, if message is null break. */
        while ((message = writeQueue.poll()) != null) {
            try {
                if (close) {
                    break;
                }
                session.getRemote().sendString(message);
            }  catch (IOException e) {
                e.printStackTrace();
                break;
            }
        }
    }

    public String getName() {
        return clientId;
    }



    @Override
    public void close() throws IOException {
        close = true;
        if (session != null) {
            session.close();
        }
        room.removeClient(this.clientId);
    }

    @Override
    public String toString() {
        return "ChatClientPeerHandler [clientId=" + clientId + "]";
    }
    
    

}


ChatMessage

package com.example;

import javax.net.websocket.Session;

public class ChatMessage {
    static enum MessageType{REMOVE_CLIENT, SEND_MESSAGE, ADD_CLIENT};

    String message;
    Session session;
    MessageType type;
    String clientId;
    
 
    public MessageType getType() {
        return type;
    }

    public String getClientId() {
        return clientId;
    }

    public ChatMessage(MessageType type, String clientId) {
        
    }
    public ChatMessage(String strMessage, Session session) {
        
        String[] strings = strMessage.split("::::");
        clientId = strings[1];
        this.session = session;


        if (strMessage.startsWith("remove client::::")) {
            type = MessageType.REMOVE_CLIENT;
            this.session = session;
        } else if (strMessage.startsWith("send message::::")) {
            type = MessageType.SEND_MESSAGE;
            message = strings[2];
        } else if (strMessage.startsWith("add client::::")) {
            type = MessageType.ADD_CLIENT;
        } else {
            System.err.println("ACK... Don't understand your message!!!!! "
                    + message);
        }
    }

    public final String getMessage() {
        return message;
    }
    public final Session getSession() {
        return session;
    }

    @Override
    public String toString() {
        return "ChatMessage [message=" + message + ", session=" + session
                + ", type=" + type + ", clientId=" + clientId + "]";
    }

}

ChatRoom

package com.example;

import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.ejb.Startup;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.net.websocket.ContainerProvider;
import javax.net.websocket.DefaultServerConfiguration;
import javax.net.websocket.ServerContainer;
import javax.net.websocket.ServerEndpointConfiguration;

import com.example.ChatMessage.MessageType;
import com.example.websocket.ChatServerWebSocketEndpoint;

@ApplicationScoped
@Startup
public class ChatRoom {

    Map<String, ChatClientPeerHandler> chatClients = new ConcurrentHashMap<String, ChatClientPeerHandler>();

    BlockingQueue<ChatMessage> readQueue = new ArrayBlockingQueue<ChatMessage>(
            2000);

    @Inject Executor executor;
    @Inject ChatServerWebSocketEndpoint endpoint;
    
    @PostConstruct
    void init() throws Exception {
        System.out.println("POST CONSTRUCT CALLED");
        ServerEndpointConfiguration serverConfiguration;
        serverConfiguration = new DefaultServerConfiguration(new URI("/chat"));
        ServerContainer serverContainer = ContainerProvider.getServerContainer();
        serverContainer.publishServer(endpoint,serverConfiguration);
        
        executor.execute(new Runnable() {
            public void run() {
                ChatRoom.this.run();
            }});
    }


    private void run() {
        while (true) {
            try {
                ChatMessage message = readQueue
                        .poll(500, TimeUnit.MILLISECONDS);
                if (message != null) {
                    dispatchMessage(message);
                }
            } catch (InterruptedException e) {
                if (Thread.currentThread().isInterrupted()) {
                    Thread.interrupted();
                }
            }
        }
    }

    private void launchNewClient(ChatMessage message) {
        System.out.println("launchNewClient::::" + message.getClientId());
        
        ChatClientPeerHandler chatClientHandler = new ChatClientPeerHandler(this,
                message.getSession(), message.getClientId(), executor);
        chatClients.put(chatClientHandler.getName(), chatClientHandler);
        doSendMessage(chatClientHandler.getName(), chatClientHandler.getName()
                + " has join the chat room");
    }

    private void dispatchMessage(ChatMessage message) {
        System.out.println("Dispatch Message::" + message);
        
        switch(message.getType()) {
        case ADD_CLIENT:
            launchNewClient(message);
            break;
        case REMOVE_CLIENT:
            doRemoveClient(message.getClientId());
            break;
        case SEND_MESSAGE:
            doSendMessage(message.getClientId(), message.getMessage());
            break;
        default:
             System.err.println("ACK... Don't understand your message!!!!! "
                     + message);
             
        }

    }

    private void doSendMessage(String client, String message) {

        String sendMessage = String.format("%s : %s", client, message);
        System.out.printf("SendMessage::Sending message %s\n", sendMessage);
        Iterator<ChatClientPeerHandler> iterator = chatClients.values().iterator();
        while (iterator.hasNext()) {
            ChatClientPeerHandler chatClientHandler = iterator.next();
            
            //prevents sending messages to yourself
            if (client.equals(chatClientHandler.getName())){
                continue;
            }

            System.out.printf("sendMessage::Sending message %s to %s\n",
                        sendMessage, chatClientHandler.getName());
            chatClientHandler.sendMessage(sendMessage);
        }
    }

    private void doRemoveClient(String client) {

        System.out.println("removeClient::::[" + client + "]::::");

        ChatClientPeerHandler chatClientHandler = chatClients.get(client);

        if (chatClientHandler != null) {

            System.out.println("removeClient:::: found " + client
                    + " to remove.");

            doSendMessage(chatClientHandler.getName(),
                    chatClientHandler.getName()
                            + " has become bored with this chat room");

            chatClients.remove(client);
            try {
                chatClientHandler.close();
            } catch (IOException e) {
            }
        }

    }

    public void sendMessage(ChatMessage message) {
        try {
            readQueue.offer(message, 10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            if (Thread.currentThread().isInterrupted()) {
                Thread.interrupted();
            }
            e.printStackTrace();
        }
    }
    
    public void removeClient(String clientId) {
        ChatMessage message = new ChatMessage(MessageType.REMOVE_CLIENT, clientId);
        
        try {
            readQueue.offer(message, 100, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            if (Thread.currentThread().isInterrupted()) {
                Thread.interrupted();
            }
            e.printStackTrace();
        }
    }

}

ThreadedQueue AKA poor man's Actor model

package com.example.concurrent;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * A Queue that wakes up a runnable every time something is added to it.
 * This is a poor man's Actor. The thread is tied to the queue.
 * If you put an item in and there is not a thread, one will be launched.
 */
public class ThreadedQueue<T> implements BlockingQueue<T> {
    BlockingQueue<T> queue;
    Executor executor;
    Runnable queueProcessor;
    AtomicBoolean active = new AtomicBoolean();

    public ThreadedQueue(BlockingQueue<T> queue, Runnable queueProcessor,
            Executor executor) {
        this.queue = queue;
        this.queueProcessor = queueProcessor;
        this.executor = executor;
    }

    @Override
    public boolean isEmpty() {
        return queue.isEmpty();
    }

    @Override
    public int size() {
        return queue.size();
    }

    @Override
    public int remainingCapacity() {
        return queue.remainingCapacity();
    }

    @Override
    public boolean contains(Object o) {
        return queue.contains(o);
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        return queue.containsAll(c);
    }

    @Override
    public boolean offer(T item) {
        boolean result = queue.offer(item);

        launchRunnable();

        return result;
    }

    @Override
    public boolean offer(T item, long timeout, TimeUnit unit)
            throws InterruptedException {
        boolean result = queue.offer(item);

        launchRunnable();

        if (result) {
            return result;
        } else {
            result = queue.offer(item, timeout, unit);
            launchRunnable();
        }
        
        return result;
    }

    @Override
    public boolean add(T item) {
        boolean result = queue.add(item);

        launchRunnable();

        return result;
    }

    @Override
    public void put(T item) throws InterruptedException {
        System.out.println("PUT CALLED " + item);
        
        /* Offer it if you can, this is non-blocking. */
        if (queue.offer(item)) {
            launchRunnable();
            return;
        }


        /* If you were unable to offer it, then go ahead and block. */
        queue.put(item);

        launchRunnable();
    }

    @Override
    public boolean addAll(Collection<? extends T> c) {
        boolean result = queue.addAll(c);

        launchRunnable();

        return result;
    }

    @Override
    public T peek() {
        return queue.peek();
    }

    @Override
    public T poll() {
        return queue.poll();
    }

    @Override
    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
        return queue.poll(timeout, unit);
    }

    @Override
    public T element() {
        return queue.element();
    }

    @Override
    public T take() throws InterruptedException {
        return queue.take();
    }

    @Override
    public T remove() {
        return queue.remove();
    }

    @Override
    public Iterator<T> iterator() {
        return queue.iterator();
    }

    @Override
    public Object[] toArray() {
        return queue.toArray();
    }

    @Override
    public <X> X[] toArray(X[] array) {
        return queue.toArray(array);
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        return queue.removeAll(c);
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        return queue.retainAll(c);
    }

    @Override
    public void clear() {
        queue.clear();
    }

    @Override
    public boolean remove(Object o) {
        return queue.remove(o);
    }

    @Override
    public int drainTo(Collection<? super T> c) {
        return queue.drainTo(c);
    }

    @Override
    public int drainTo(Collection<? super T> c, int maxElements) {
        return queue.drainTo(c, maxElements);
    }

    public void launchRunnable() {
        System.out.println("LAUNCH RUNNABLE");
        if (queue.isEmpty()) {
            return;
        }

        if (active.compareAndSet(false, true)) {
            executor.execute(
                    new Runnable() {
                        @Override
                        public void run() {
                            try {
                                queueProcessor.run();
                            } finally {
                              active.set(false);  
                            }
                        }
                    });
        }
    }

 }

ChatServerWebSocketEndpoint

package com.example.websocket;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.net.websocket.Endpoint;
import javax.net.websocket.Session;

import com.example.ChatRoom;


public class ChatServerWebSocketEndpoint extends Endpoint{

    @Inject @ApplicationScoped 
    ChatRoom room;
    

    @Override
    public void onOpen(Session session) {
        session.addMessageHandler(new ChatWebsocketMessageHandler(session, room));
    }
    

}

ChatWebsocketMessageHandler

package com.example.websocket;


import javax.net.websocket.MessageHandler;
import javax.net.websocket.Session;

import com.example.ChatMessage;
import com.example.ChatRoom;

public class ChatWebsocketMessageHandler implements MessageHandler.Text {

    ChatRoom chatRoom;
    Session session;
    
    public ChatWebsocketMessageHandler(Session session, ChatRoom chatRoom) {
        this.session = session;
        this.chatRoom = chatRoom;
    }

    @Override
    public void onMessage(String message) {
        chatRoom.sendMessage(new ChatMessage(message, session));
    }

}

Personal tools
TOOLBOX
LANGUAGES