This add-on is operated by 84codes AB
RabbitMQ as a Service
CloudAMQP
Last updated December 06, 2022
Table of Contents
CloudAMQP is an add-on providing RabbitMQ as a service. RabbitMQ is a high performance message broker, built in Erlang, which implements the AMQP protocol.
Messaging is the easiest and most efficient way to decouple, distribute and scale applications.
All AMQP client libraries work with CloudAMQP and there’s AMQP client libraries for almost every platform out there, including Ruby, Node.js, Java, Python, Clojure and Erlang.
Installing the add-on
CloudAMQP can be installed to a Heroku application via the CLI:
A list of all plans available can be found here.
$ heroku addons:create cloudamqp
-----> Adding cloudamqp to sharp-mountain-4005... done, v18 (free)
Example for Power Panda with one node:
$ heroku addons:create cloudamqp:panda-1
Once CloudAMQP has been added a CLOUDAMQP_URL
setting will be available in the app configuration and will contain the canonical URL used to access the RabbitMQ cluster. This can be confirmed using the heroku config
command.
$ heroku config | grep CLOUDAMQP_URL
CLOUDAMQP_URL => amqp://user:pass@ec2.clustername.cloudamqp.com/vhost
After installing CloudAMQP the application should be configured to fully integrate with the add-on.
Local workstation setup
RabbitMQ is easy to install locally for development.
If you have… | Install with… |
---|---|
Mac OS X | brew install rabbitmq |
Windows | The installer |
Ubuntu Linux |
|
Other | See RabbitMQ’s installation page |
Use with Ruby
Ruby developers has a number of options for AMQP client libraries:
- amqp-client modern AMQP 0-9-1 Ruby client, maintained by CloudAMQP.
- Bunny the AMQP client, synchronous and very well maintained.
- AMQP an evented AMQP client, good in combination with an evented web server like Thin.
- March Hare AMQP client for JRuby, uses the Java RabbitMQ library underneath
The amqp-client has two APIs. A low level API that matches the AMQP protocol very well, it can do everything the protocol allows, but requires some knowledge about the protocol, and doesn’t handle reconnects. And, a high-level API that is a bit easier to get started with, and also handles reconnection automatically.
Low level API
# Put gem "amqp-client" in your Gemfile
require "amqp-client"
# Opens and establishes a connection
conn = AMQP::Client.new(ENV["CLOUDAMQP_URL"])
# Open a channel
ch = conn.channel
# Create a temporary queue
q = ch.queue_declare
# Publish a message to said queue
ch.basic_publish_confirm "Hello World!", "", q.queue_name, persistent: true
# Poll the queue for a message
msg = ch.basic_get(q.queue_name)
# Print the message's body to STDOUT
puts msg.body
High level API
# Put gem "amqp-client" in your Gemfile
require "amqp-client"
require "json"
require "zlib"
# Start the client, it will connect and once connected it will reconnect if that connection is lost
# Operation pending when the connection is lost will raise an exception (not timeout)
amqp = AMQP::Client.new(ENV["CLOUDAMQP_URL"]).start
# Declares a durable queue
myqueue = amqp.queue("myqueue")
# Bind the queue to any exchange, with any binding key
myqueue.bind("amq.topic", "my.events.*")
# The message will be reprocessed if the client loses connection to the broker
# between message arrival and when the message was supposed to be ack'ed.
myqueue.subscribe(prefetch: 20) do |msg|
puts JSON.parse(msg.body)
msg.ack
rescue
msg.reject(requeue: false)
end
# Publish directly to the queue
myqueue.publish({ foo: "bar" }.to_json, content_type: "application/json")
# Publish to any exchange
amqp.publish("my message", "amq.topic", "topic.foo", headers: { foo: 'bar' })
amqp.publish(Zlib.gzip("an event"), "amq.topic", "my.event", content_encoding: 'gzip')
# Wait for 5 seconds for messages to be consumed
sleep 5
Here’s another example which uses the evented AMQP library, in combination with Sinatra and Server Sent Events. It shows how to build a simple real time application, using CloudAMQP as the backbone.
Find the Ruby Sinatra SSE sample on GitHub.
Source code or
The application can also be seen live at amqp-sse.herokuapp.com.
Further reading
- RubyAMQP.info is a great resource for working with AMQP in Ruby.
- A high level intro to messaging for rubyists
Use with Node.js
For Node.js we recommend amqplib. We highly advise against the former popular library node-amqp, it’s not very well maintained has some serious bugs.
Add amqplib
as a dependency in your package.json
file.
"dependencies": {
"amqplib": "*"
}
The following code snippet show how to connect and both publish messages and how to subscribe to a queue:
var q = 'tasks';
var url = process.env.CLOUDAMQP_URL || "amqp://localhost";
var open = require('amqplib').connect(url);
// Consumer
open.then(function(conn) {
var ok = conn.createChannel();
ok = ok.then(function(ch) {
ch.assertQueue(q);
ch.consume(q, function(msg) {
if (msg !== null) {
console.log(msg.content.toString());
ch.ack(msg);
}
});
});
return ok;
}).then(null, console.warn);
// Publisher
open.then(function(conn) {
var ok = conn.createChannel();
ok = ok.then(function(ch) {
ch.assertQueue(q);
ch.sendToQueue(q, new Buffer('something to do'));
});
return ok;
}).then(null, console.warn);
Use with Clojure
Langohr is a full featured and well maintained Clojure wrapper for Java’s AMQP library.
To use it put [com.novemberain/langohr "2.9.0"]
in your project.clj
file and run lein deps
.
The following code snippet show how to both publish and to consume a message via CloudAMQP:
(ns clojure-amqp-example.core
(:require [langohr.core :as rmq]
[langohr.channel :as lch]
[langohr.queue :as lq]
[langohr.consumers :as lc]
[langohr.basic :as lb]))
(defn -main [& args]
(let [url (System/getenv "CLOUDAMQP_URL" "amqp://localhost")
conn (rmq/connect {:uri url})
ch (lch/open conn)
qname "langohr.examples.hello-world"]
(println (format "[main] Connected. Channel id: %d" (.getChannelNumber ch)))
(lq/declare ch qname :exclusive false :auto-delete true)
(start-consumer ch qname)
(println "[main] Publishing...")
(lb/publish ch default-exchange-name qname "Hello!" :content-type "text/plain")
(Thread/sleep 2000)
(println "[main] Disconnecting...")
(rmq/close ch)
(rmq/close conn)))
The full Clojure sample app is available on GitHub.
Source code or
Use with Python
The recommended library for Python to access RabbitMQ servers is Pika.
Put pika==1.1.0
in your requirement.txt
file.
The following code connects to CloudAMQP, declares a queue, publishes a message to it, sets up a subscription and prints messages coming to the queue.
# publish.py
import pika, os
# Access the CLODUAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel() # start a channel
channel.queue_declare(queue='hello') # Declare a queue
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello CloudAMQP!')
print(" [x] Sent 'Hello World!'")
connection.close()
# consume.py
import pika, os
# Access the CLODUAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel() # start a channel
channel.queue_declare(queue='hello') # Declare a queue
def callback(ch, method, properties, body):
print(" [x] Received " + str(body))
channel.basic_consume('hello',
callback,
auto_ack=True)
print(' [*] Waiting for messages:')
channel.start_consuming()
connection.close()
The full code can be seen at github.com/cloudamqp/python-amqp-example.
Celery
Celery is a great task queue library for Python, and the AMQP backend works perfectly with CloudAMQP. But remember to tweak the BROKER_POOL_LIMIT if you’re using the free plan. Set it to 1 and you should be good. If you have connection problems, try reduce the concurrency of both your web workers and in the celery worker.
Use with Java
RabbitMQ has developed an excellent Java AMQP library.
Begin to add the AMQP library as an dependency in your pom.xml
file:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.4</version>
</dependency>
Then may a simple publish and subscribe look like this:
public static void main(String[] args) throws Exception {
String uri = System.getenv("CLOUDAMQP_URL");
if (uri == null) uri = "amqp://guest:guest@localhost";
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(uri);
factory.setRequestedHeartbeat(30);
factory.setConnectionTimeout(30);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
String message = "Hello CloudAMQP!";
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("hello", true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String receivedMessage = new String(delivery.getBody());
System.out.println(" [x] Received '" + receivedMessage + "'");
}
}
The full Java sample app is available on GitHub.
Source code or
Java AMQP resources
Use with PHP
For PHP the recommend library is php-amqplib.
Your composer.json
should define both php-amqplib
and ext-bcmath
:
{
"require": {
"videlalvaro/php-amqplib": "2.2.*",
"ext-bcmath": "*"
}
}
Here’s an example which publishes to a queue and then retrieves it.
<?php
require('vendor/autoload.php');
define('AMQP_DEBUG', true);
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
$url = parse_url(getenv('CLOUDAMQP_URL'));
$conn = new AMQPConnection($url['host'], 5672, $url['user'], $url['pass'], substr($url['path'], 1));
$ch = $conn->channel();
$exchange = 'amq.direct';
$queue = 'basic_get_queue';
$ch->queue_declare($queue, false, true, false, false);
$ch->exchange_declare($exchange, 'direct', true, true, false);
$ch->queue_bind($queue, $exchange);
$msg_body = 'the body';
$msg = new AMQPMessage($msg_body, array('content_type' => 'text/plain', 'delivery_mode' => 2));
$ch->basic_publish($msg, $exchange);
$retrived_msg = $ch->basic_get($queue);
var_dump($retrived_msg->body);
$ch->basic_ack($retrived_msg->delivery_info['delivery_tag']);
$ch->close();
$conn->close();
An example PHP app can be found on GitHub.
Source code or
For many more involved examples see php-amqplib’s demo directory
Dashboard
The CloudAMQP dashboard allows you to show the current message rate, which queues and exchanges you have, and the bindings between them. You can also queue and pop messages manually, among other things.
The dashboard can be accessed via the CLI:
$ heroku addons:open cloudamqp
Opening cloudamqp for sharp-mountain-4005…
or by visiting the Heroku Dashboard and selecting the application in question. Select CloudAMQP from the Add-ons menu.
Separate applications
Virtual Hosts (vhosts) makes it possible to separate applications on one broker. You can isolate users, exchanges, queues etc to one specific vhost. You can separate environments, e.g. production to one vhost and staging to another vhost within the same broker, instead of setting up multiple brokers.
vhosts can be set up on all dedicated instance.
Migrating between plans
Migrating between shared plans
Plan migrations are easy and instant when migrating between shared plans (Little Lemur and Tough Tiger). Use the heroku addons:upgrade
command to migrate to a new plan.
$ heroku addons:upgrade cloudamqp:rabbit
-----> Upgrading cloudamqp:rabbit to sharp-mountain-4005... done, v18 ($600/mo)
Your plan has been updated to: cloudamqp:rabbit
Migrating between a shared plan and a dedicated plan
There is no automatic upgrade between a shared plan and a dedicated. Instead we recommend you to create the new plan and point your publishers to the new plan. Let your consumers empty the queues on the old plan and then point them to the new plan and finally delete the old plan.
Migrating between dedicated plans
You can automatically upgrade between dedicated plans. The upgrade process keeps your cluster live under the upgrade process so there is no downtime in the upgrade process.
$ heroku addons:upgrade cloudamqp:rabbit
-----> Upgrading cloudamqp:rabbit to sharp-mountain-4005... done, v18 ($600/mo)
Your plan has been updated to: cloudamqp:rabbit
Removing the add-on
CloudAMQP can be removed via the CLI.
This will destroy all associated data and cannot be undone!
$ heroku addons:destroy cloudamqp
-----> Removing cloudamqp from sharp-mountain-4005... done, v20 (free)
Error codes
We log errors to your heroku log, below we explain the different codes.
410 - Transfer limit reached
You’ve reached your monthly transfer quota. Upgrade to a larger plan or wait until the next calendar month.
210 - Transfer in compliance
You’ve either upgraded your account (got a higher transfer limit) or it’s a new calendar month and your quota has been reset.
420 - Connection limit reached
You’re using all your connection slots so new connections will be blocked. Either lower your connection count or upgrade to larger plan to accommodate more connections.
220 - Connections in compliance
You can now open more connections again because you’re not using all connection slots.
431 - Max channels per connection
One of your connections was closed because you’d open to many channels on it. This is often due to a bug, so check your code and make sure that you close unused channels.
432 - Max consumers per connection
One of your connections was closed because it had opened more than 12000 consumers. This is often due to a bug, so make sure that you close unused consumers.
GDPR
Information about GDPR can be found here: https://www.cloudamqp.com/legal/gdpr.html.
For all our customers who collect personal data from individuals in the EU, we offer a DPA. Our DPA offers terms that meet GDPR requirements and that reflects our data privacy and security commitments to our customers and their data.
You can view the DPA here: https://www.cloudamqp.com/legal/terms_of_service.html#dpa, and you need to send us an email if you need a signed version.
Support
All CloudAMQP support and runtime issues should be logged with Heroku Support at support.heroku.com. Any non-support related issues or product feedback is welcome at support@cloudamqp.com or via Twitter @CloudAMQP. For our plans Power Panda and larger, we provide 24/7 critical support with a 30-minutes maximum initial response time and call-in phone numbers.