Asynchronous Web-Worker Model Using RabbitMQ in Java

Last Updated: 28 October 2013

async java rabbit-mq worker

Table of Contents

It is a best practice to process long running web requests asynchronously through a worker process. For a more in-depth understanding of this architectural pattern read the Worker Dynos, Background Jobs and Queueing article.

The article demonstrates this pattern using an sample Java application with Spring MVC and RabbitMQ. It leverages CloudAMQP add-on which is one of the RabbitMQ addons in the Heroku add-ons catalog.

If you have questions about Java on Heroku, consider discussing them in the Java on Heroku forums.

Getting started

Follow the below steps below to clone this application into your Heroku account:

  1. Clone this sample application into your Heroku account
  2. Go to http://yourappname.herokuapp.com/spring/bigOp to try out the application
  3. Following the instructions at http://yourappname.herokuapp.com/ to make changes using Eclipse or Heroku toolbelt.

You can get the sample code of this application on GitHub. The rest of this article goes into detail about some of the key Java classes in the application, that are used to implement this pattern.

Application overview

The application is comprised of two processes: web and worker.

  • web : A simple Spring MVC app that receives web requests and queues them in RabbitMQ for processing.
  • worker : A standalone Java application using Spring AMQP to read & processes messages from RabbitMQ.

Because these are separate processes, they can be scaled independently based on specific application needs. Read the Process Model article for a more in-depth understanding of Heroku’s process model.

RabbitMQ configuration

The RabbitMQ configuration is done through RabbitConfiguration.java which reads the CLOUDAMQP_URL environment variable provided by the CloudAMQP add-on, and makes it available to the rest of the application.

 @Bean
 public ConnectionFactory connectionFactory() {
     final URI rabbitMqUrl;
     try {
         rabbitMqUrl = new URI(getEnvOrThrow("CLOUDAMQP_URL"));
     } catch (URISyntaxException e) {
         throw new RuntimeException(e);
     }

     final CachingConnectionFactory factory = new CachingConnectionFactory();
     factory.setUsername(rabbitMqUrl.getUserInfo().split(":")[0]);
     factory.setPassword(rabbitMqUrl.getUserInfo().split(":")[1]);
     factory.setHost(rabbitMqUrl.getHost());
     factory.setPort(rabbitMqUrl.getPort());
     factory.setVirtualHost(rabbitMqUrl.getPath().substring(1));

     return factory;
 }

If you want to to add the Cloud AMQP addon to an existing Heroku application, use the following command:

$ heroku addons:add cloudamqp

Web process

BigOperationWebController.java, a Spring MVC controller queues up the web requests into RabbitMQ. This class has the the Spring AMQP Template and queue configuration @autowired.

@Autowired private AmqpTemplate amqpTemplate;
@Autowired private Queue rabbitQueue;

When web requests are received by the controller, they are coverted to AMPQ messages and sent to RabbitMQ. The AmqpTemplate makes this easy by including the following line:

amqpTemplate.convertAndSend(rabbitQueue.getName(), bigOp);

The web process immediately returns a confirmation page to the user.

Worker process

The worker process is running as a separate process outside of the Spring web application context. Hence the configuration must be explicitly wired from RabbitConfiguration. BigOperationWorker is the main Java class executed in the worker processes and loads the RabbitMQ configuration as below:

ApplicationContext rabbitConfig = new AnnotationConfigApplicationContext(RabbitConfiguration.class);
ConnectionFactory rabbitConnectionFactory = rabbitConfig.getBean(ConnectionFactory.class);
Queue rabbitQueue = rabbitConfig.getBean(Queue.class);
MessageConverter messageConverter = new SimpleMessageConverter();

Spring provides a convenience class SimpleMessageListenerContainer to receive messages from a queue and delegate it to the MessageListener that is injected into it. The RabbitMQ connection for SimpleMessageListenerContainer is setup as follows:

SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(rabbitConnectionFactory);
listenerContainer.setQueueNames(rabbitQueue.getName());

The listener is defined by implementing the MessageListener interface. The long running BigOperation is invoked from the listener.

 listenerContainer.setMessageListener(new MessageListener() {
         public void onMessage(Message message) {
             // message is converted back into model object
             final BigOperation bigOp = (BigOperation) messageConverter.fromMessage(message);

             // simply printing out the operation, but expensive computation would happen here
             System.out.println("Received from RabbitMQ: " + bigOp);
         }
     });

To start listening for messages on the queue, the listener container needs to be started.

listenerContainer.start();

Scaling your worker process

The Procfile for this application defines the web and worker as below:

web: java $JAVA_OPTS -jar web/target/dependency/webapp-runner.jar --port $PORT web/target/*.war
worker: sh worker/target/bin/worker

You can scale your worker process independent of the web process using the heroku scale command as below:

$ heroku scale worker=1

To test out this application go to http://yourappname.herokuapp.com/spring/bigOp and enter anything in the text field. Once you submit, you will notice the web request returns immediately and you will observe the following the application logs:

$ heroku logs -t
16:50:29 web.1 | Sent to RabbitMQ: BigOperation{name='text you entered'}
16:50:30 worker.1 | Received from RabbitMQ: BigOperation{name='text you entered'}

Maven project setup

The application is structured as a Maven multi-module project with 3 modules: web, worker (for each of the two processes) and common module which contains the BigOperation.java model class and the RabbitConfiguration.java configuration class.