data:image/s3,"s3://crabby-images/92d5d/92d5d088794b66adfe8f0a39aecb5b6b10feadd6" alt="Learning RabbitMQ"
Point-to-point communication
The following diagram provides an overview of the scenario that we will implement:
data:image/s3,"s3://crabby-images/fe48e/fe48e347f0bd8bdcb1cd90c292de74a22c9f72e9" alt=""
For point-to-point communication, the sender can use either the default exchange or a direct exchange (that uses the routing key to determine to which queue a message must be sent; the routing key should match the binding key between the exchange and the queue). The CompetingReceiver
class can be used to subscribe to a particular queue and receive messages from that queue:
import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class CompetingReceiver { private final static String QUEUE_NAME = "event_queue"; private final static Logger LOGGER = LoggerFactory.getLogger(Sender.class); private Connection connection = null; private Channel channel = null; public void initialize() { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } } }
The receive()
method is used to receive a message from the queue named event_queue
by doing the following:
- Creating the
event_queue
in the message broker, if not already created, using thequeueDeclare()
method - Creating a
QueueingConsumer
instance that is used as the handler for messages from theevent_queue
queue - Registering the
QueueingConsumer
as a message consumer using thebasicConsume()
method of theChannel
instance that represents the AMQP channel to the message broker - Consuming a message from the
event_queue
queue using thenextDeliver()
method of theQueueingConsumer
instance, which blocks until a message arrives on the queue;QueueingConsumer.Delivery
represents the received message:public String receive() { if (channel == null) { initialize(); } String message = null; try { channel.queueDeclare(QUEUE_NAME, false, false, false, null); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); message = new String(delivery.getBody()); LOGGER.info("Message received: " + message); return message; } catch (IOException e) { LOGGER.error(e.getMessage(), e); } catch (ShutdownSignalException e) { LOGGER.error(e.getMessage(), e); } catch (ConsumerCancelledException e) { LOGGER.error(e.getMessage(), e); } catch (InterruptedException e) { LOGGER.error(e.getMessage(), e); } return message; }
The destroy()
method closes the AMQP connection and must be called explicitly when needed; closing the connection closes all AMQP channels created in that connection:
public void destroy() { if (connection != null) { try { connection.close(); } catch (IOException e) { LOGGER.warn(e.getMessage(), e); } } }
In order to demonstrate the usage of the CompetingConsumer
class in a point-to-point channel, we can use the DefaultExchangeSenderDemo
class to send a message to the default exchange:
public class DefaultExchangeSenderDemo { public static void sendToDefaultExchange() { Sender sender = new Sender(); sender.initialize(); sender.send("Test message."); sender.destroy(); } public static void main(String[] args) { sendToDefaultExchange(); } }
When invoking the main()
method, a message is sent to the RabbitMQ server instance running on localhost; if no instance is running then a java.net.ConnectionException
is thrown from the client. Assuming that there are no defined queues yet in the message broker, if you open the RabbitMQ management console you will notice the following before invoking the main()
method:
data:image/s3,"s3://crabby-images/1db27/1db277709204502cbf79a49e8523b1cb5bead434" alt=""
After invoking the main()
method, you will notice that the event_queue
is created:
data:image/s3,"s3://crabby-images/85dc3/85dc3d61e161c8c0a112db114008a9d7ad9c7615" alt=""
Moreover, there is one unprocessed message in the queue; the Ready
section gives the number of unprocessed messages on the particular queue. In order to consume the message CompetingReceiverDemo
class, perform the following:
public class CompetingReceiverDemo { public static void main(String[] args) throws InterruptedException { final CompetingReceiver receiver1 = new CompetingReceiver(); receiver1.initialize(); final CompetingReceiver receiver2 = new CompetingReceiver(); receiver2.initialize(); Thread t1 = new Thread(new Runnable() { public void run() { receiver1.receive(); } }); Thread t2 = new Thread(new Runnable() { public void run() { receiver2.receive(); } }); t1.start(); t2.start(); t1.join(); t2.join(); receiver1.destroy(); receiver2.destroy(); } }
We create two CompetingReceiver
instances and invoke the receive()
methods of the two instances in separate threads so that we have two subscribers for the same queue waiting for a message. The two threads are joined to the main application thread so that method execution continues once both consumers receive a message from the queue. Since our queue already has one message, one of the two consumers will receive the message while the other will continue to wait for a message. If we invoke the main()
method of the DefaultExchangeSenderDemo
class once again, the other consumer will also receive a message from the queue and the main()
method of CompetingReceiverDemo()
will terminate.