Writing Scalable WebSocket Applications on Heroku with Java and the Play Framework

Last Updated: 09 October 2013

java play websockets

Table of Contents

This tutorial describes techniques for designing scalability into Java Play Framework applications that make use of WebSockets on Heroku.

Sample code for the demo application is available on GitHub. Edits and enhancements are welcome. Just fork the repository, make your changes and send us a pull request.

Prerequisites

Principles

The use of WebSockets requires some tweaks to the traditional way of thinking about an application. The typical goal is to have each of your application instances (dynos) be completely stateless and to have any state be stored in a location they can all access. This way client requests can be load balanced across all dynos and the data a client needs will be present regardless of which dyno a particular request hits.

This design is only partially valid when using WebSockets. It’s still best to keep all application data in a shared location. However now a client will connect and keep affinity to a single dyno. All communication with that client will happen from that dyno until a reconnection occurs. Furthermore that client will likely expect real time updates to shared data so each dyno must be able to be notified when something changes.

This change creates a need to share global state among dynos (application data) while also maintaining local state on each dyno (which clients are connected and who they are).

The sample application

The sample application is a modification of the WebSocket chat sample that ships with Play 2.x distributions. Out of the box the chat sample only works as a single instance application. The changes below describe how it can be modified to share state across multiple nodes.

Architecture

In its original form the chat application deals with two pieces of data: a list of members in the room and chat messages. The member list is stored on the actor that handles the room. It is a map of member names with a handle to the output stream of the WebSocket that each member is connected through. The chat messages aren’t stored at all. As soon as one comes in it’s simply sent to all connected members from the member list.

When deployed and scaled as-is this design causes each dyno to act as its own chat room rather than having them work together to create a single shared room. In order to scale, data needs to be shared among dynos. The two pieces of data that must be shared are: a global list of the members that are in the room and chat messages. Both of these will be stored on a Redis instance that all the dynos can share and communicate through.

There is also a 3rd piece of data that must be kept. Each dyno must keep track of which members are connected to it and keep handles to the WebSocket for each user. This data will be local to the dyno. There is no need to keep a global picture of all dynos and where each member is connected.

This diagram illustrates the design: Architecture Diagram

Connecting

The application is a simple Play 2.2 app. The interesting entry point is the WebSocket that is opened once a user is signed into the chat room. On the server side this is a controller method in Application.java that returns a WebSocket.

public static WebSocket<JsonNode> chat(final String username) {
    return new WebSocket<JsonNode>() {
        // Called when the Websocket Handshake is done.
        public void onReady(WebSocket.In<JsonNode> in, WebSocket.Out<JsonNode> out){
            // Join the chat room.
            try {
                ChatRoom.join(username, in, out);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    };
}

On the client side there is some JavaScript that creates the connection in chatRoom.scala.js.

var WS = window['MozWebSocket'] ? MozWebSocket : WebSocket
var chatSocket = new WS("@routes.Application.chat(username).webSocketURL(request)")
...
var receiveEvent = function(event) {
    var data = JSON.parse(event.data)

    // Handle errors
    if(data.error) {
        chatSocket.close()
        $("#onError span").text(data.error)
        $("#onError").show()
        return
    } else {
        $("#onChat").show()
    }

    // Create the message element
    var el = $('<div class="message"><span></span><p></p></div>')
    $("span", el).text(data.user)
    $("p", el).text(data.message)
    $(el).addClass(data.kind)
    if(data.user == '@username') $(el).addClass('me')
    $('#messages').append(el)

    // Update the members list
    $("#members").html('')
    $(data.members).each(function() {
        var li = document.createElement('li');
        li.textContent = this;
        $("#members").append(li);
    })
}
...
chatSocket.onmessage = receiveEvent

Here the connection to the WebSocket is opened and receiveEvent is registered to be called when a message comes through the socket. The code in receiveEvent will display the message in the chat room. Each message also includes the member list in the payload so that we can keep that up to date.

Sending Messages

ChatRoom is a singleton actor that handles all of the details of the chat interaction. There are 3 events that happen in the chat room: join, quit, and chat message. The simplest of these 3 is sending a chat message.

Chat messages are received over each user’s WebSocket connection. When the user is added to the room the onMessage event of the input stream of their WebSocket is set.

in.onMessage(new Callback<JsonNode>() {
   public void invoke(JsonNode event) {
       Talk talk = new Talk(username, event.get("text").asText());
       Jedis j = play.Play.application().plugin(RedisPlugin.class).jedisPool().getResource();
       try {
           //All messages are pushed through the pub/sub channel
           j.publish(ChatRoom.CHANNEL, Json.stringify(Json.toJson(talk)));
       } finally {
           play.Play.application().plugin(RedisPlugin.class).jedisPool().returnResource(j);
       }
   }
});

Each time a message is received over the WebSocket, this method will create a new Talk object and put a JSON representation of it onto a Redis pub/sub channel. All messages go over this channel and all instances of the chat application subscribe to it. This is how message state is shared across dynos.

You can use the Akka scheduler to schedule the Redis channel to be subscribed to in another thread:

//subscribe to the message channel
Akka.system().scheduler().scheduleOnce(
        Duration.create(10, TimeUnit.MILLISECONDS),
        new Runnable() {
            public void run() {
                Jedis j = play.Play.application().plugin(RedisPlugin.class).jedisPool().getResource();
                j.subscribe(new MyListener(), CHANNEL);
            }
        },
        Akka.system().dispatcher()
);

This subscribe call registers a JedisPubSub object called MyListener to handle callbacks when events happen on the channel.

public static void remoteMessage(Object message) {
    defaultRoom.tell(message, null);
}

public static class MyListener extends JedisPubSub {
    @Override
    public void onMessage(String channel, String messageBody) {
        //Process messages from the pub/sub channel
        JsonNode parsedMessage = Json.parse(messageBody);
        Object message = null;
        if("talk".equals(parsedMessage.get("type").asText())) {
            message = new Talk(
                    parsedMessage.get("username").asText(),
                    parsedMessage.get("text").asText()
                    );
        } else if("rosterNotify".equals(parsedMessage.get("type").asText())) {
            message = new RosterNotification(
                    parsedMessage.get("username").asText(),
                    parsedMessage.get("direction").asText()
                    );
        } else if("quit".equals(parsedMessage.get("type").asText())) {
            message = new Quit(
                    parsedMessage.get("username").asText()
                    );
        }
        ChatRoom.remoteMessage(message);
    }
    ...
}

This onMessage method will read each message that comes over the pub/sub channel, create the appropriate object from it, and tell the singleton actor defaultRoom about it. The message processing logic is in the onReceive method of the actor. Here the message type is determined and the message logic is executed:

public void onReceive(Object message) throws Exception {
    ...
    if(message instanceof Talk)  {
        // Received a Talk message
        Talk talk = (Talk)message;
        notifyAll("talk", talk.username, talk.text);
    }
    ....
}

notifyAll is where the message is sent to the members that are connected to this dyno:

public void notifyAll(String kind, String user, String text) {
    for(WebSocket.Out<JsonNode> channel: members.values()) {
        ObjectNode event = Json.newObject();
        event.put("kind", kind);
        event.put("user", user);
        event.put("message", text);

        ArrayNode m = event.putArray("members");
        //Go to Redis to read the full roster of members. Push it down with the message.
        Jedis j = play.Play.application().plugin(RedisPlugin.class).jedisPool().getResource();
        try {
            for(String u: j.smembers(MEMBERS)) {
                m.add(u);
            }
        } finally {
            play.Play.application().plugin(RedisPlugin.class).jedisPool().returnResource(j);
        }

        channel.write(event);
    }
}

When each dyno reads a message off of the pub/sub channel it loops through the list of members connected to it and sends the message to each of them. Each message also contains the list of all members in the room.

An important thing to note here is that each dyno deals with two lists of members. The instance variable members contains the list of members that are connected to the dyno along with their WebSocket output streams. However the list of all members in the room that is sent down with the message comes from Redis. This is how a group of dynos are able to present a global picture of who’s in the room to users while each dyno only communicates to the subset of users that are connected to it.

Joining and leaving

When the user connects they’re added as a member of the room by calling ChatRoom.join.

public static void join(final String username, WebSocket.In<JsonNode> in, WebSocket.Out<JsonNode> out) throws Exception{
    String result = (String)Await.result(ask(defaultRoom,new Join(username, out), 3000), Duration.create(3, SECONDS));
    if("OK".equals(result)) {
        // For each event received on the socket,
        in.onMessage(new Callback<JsonNode>() {
           public void invoke(JsonNode event) {
               ...
           }
        });

        // When the socket is closed.
        in.onClose(new Callback0() {
           public void invoke() {
               // Send a Quit message to the room.
               defaultRoom.tell(new Quit(username), null);
           }
        });
    } else {
    ...
    }
}

Joining does two things: it sends a message to the defaultRoom actor which adds the member to the room and it sets up the appropriate callbacks on the user’s WebSocket. The onMessage callback was explained above. The onClose callback is where the quit message is sent to the actor. Messages sent to the defaultRoom actor are handled by its onReceive method.

    public void onReceive(Object message) throws Exception {
       Jedis j = play.Play.application().plugin(RedisPlugin.class).jedisPool().getResource();
        try {
            if(message instanceof Join) {
                // Received a Join message
                Join join = (Join)message;
                // Check if this username is free.
                if(j.sismember(MEMBERS, join.username)) {
                    getSender().tell("This username is already used", getSelf());
                } else {
                    //Add the member to this node and the global roster
                    members.put(join.username, join.channel);
                    j.sadd(MEMBERS, join.username);

                    //Publish the join notification to all nodes
                    RosterNotification rosterNotify = new RosterNotification(join.username, "join");
                    j.publish(ChatRoom.CHANNEL, Json.stringify(Json.toJson(rosterNotify)));
                    getSender().tell("OK", getSelf());
                }

            } else if(message instanceof Quit)  {
                // Received a Quit message
                Quit quit = (Quit)message;
                //Remove the member from this node and the global roster
                members.remove(quit.username);
                j.srem(MEMBERS, quit.username);

                //Publish the quit notification to all nodes
                RosterNotification rosterNotify = new RosterNotification(quit.username, "quit");
                j.publish(ChatRoom.CHANNEL, Json.stringify(Json.toJson(rosterNotify)));
            } else if(message instanceof RosterNotification) {
                //Received a roster notification
                RosterNotification rosterNotify = (RosterNotification) message;
                if("join".equals(rosterNotify.direction)) {
                    notifyAll("join", rosterNotify.username, "has entered the room");
                } else if("quit".equals(rosterNotify.direction)) {
                    notifyAll("quit", rosterNotify.username, "has left the room");
                }
            }
                ...
        } finally {
            play.Play.application().plugin(RedisPlugin.class).jedisPool().returnResource(j);
        }
    }

The portion of onReceive that handles Talk events was already covered above and has been omitted here. Joining and leaving each have to manage the state of who is in the room and who is connected to this dyno. Each of them will manipulate the local and global member lists accordingly.

Joining and leaving also generate a special kind of chat message called a RosterNotification. This is the message that notifies all users when someone joins or quits. When a join or quit occurs the RosterNotification is put onto the pub/sub channel so that all dynos can handle it equally. The message is handled very similarly to a normal talk message. It is sent to all members connected to each dyno via the notifyAll method.

Keeping the connection open

The last important piece of logic is a keep-alive for the WebSocket connection. The request timeout window on Heroku still applies to WebSocket connections. An application must send some data across the connection once every 55 seconds or it will be closed by the router.

The original chat sample included a robot that would send a message to the room once every 30 seconds. This fulfills the keep-alive requirement.

// Make the robot talk every 30 seconds
Akka.system().scheduler().schedule(
    Duration.create(30, SECONDS),
    Duration.create(30, SECONDS),
    chatRoom,
    new ChatRoom.Talk("Robot", "I'm still alive"),
    Akka.system().dispatcher(),
    /** sender **/ null
);

This could very easily be replaced with a true keep-alive message that is hidden by the client side to achieve the same effect in a way that isn’t visible to the user.

Deploying the sample to Heroku

Clone the app

Clone the sample app from GitHub:

$ git clone git@github.com:heroku-examples/play-websockets-chat-sample.git

Create the app

$ heroku create
Creating high-lightning-129... done, stack is cedar
http://high-lightning-129.herokuapp.com/ | git@heroku.com:high-lightning-129.git
Git remote heroku added

Enable websockets

$ heroku labs:enable websockets
Enabling websockets for morning-taiga-2382... done
WARNING: This feature is experimental and may change or be removed without notice.
For more information see: https://devcenter.heroku.com/articles/heroku-labs-websockets

Add Redis

Heroku has a number of Redis addons available. The sample app uses RedisCloud out of the box, but this is easy to change.

$ heroku addons:add rediscloud
Adding rediscloud on damp-badlands-1414... done, v8 (free)
Use `heroku addons:docs rediscloud` to view documentation.

The Redis configuration is in /conf/application.conf.

# Redis configuration
redis.uri=${REDISCLOUD_URL}

To change to another provider just update the environment variable name there.

Deploy the code

$ git push heroku master
Counting objects: 31, done.
Delta compression using up to 8 threads.
Compressing objects: 100% (24/24), done.
Writing objects: 100% (31/31), 38.33 KiB | 0 bytes/s, done.
Total 31 (delta 0), reused 0 (delta 0)

-----> Play 2.x - Java app detected
-----> Installing OpenJDK 1.6...done
-----> Building app with sbt
-----> Running: sbt clean compile stage
       Getting org.scala-sbt sbt 0.13.0 ...
       downloading http://s3pository.heroku.com/ivy-typesafe-releases/org.scala-sbt/sbt/0.13.0/jars/sbt.jar ...
  [SUCCESSFUL ] org.scala-sbt#sbt;0.13.0!sbt.jar (416ms)
...
       [info] Done packaging.
       [success] Total time: 7 s, completed Sep 29, 2013 7:51:08 PM
-----> Dropping ivy cache from the slug
-----> Discovering process types
       Procfile declares types -> web

-----> Compiled slug size: 117.3MB
-----> Launching... done, v6
       http://still-hamlet-4310.herokuapp.com deployed to Heroku

Congratulations! Your web app should now be up and running on Heroku. Visit the application to see it in action:

$ heroku open

This app can be scaled to more than one dyno and will still work as intended (charges may apply):

$ heroku scale web=2
Scaling web processes... done, now running 2