Menu Search

Chapter 9. Queues

9.1. Queue Types

9.1.1. Introduction

In addition to the standard queue type where messages are delivered in the same order that they were sent, the Java Broker supports three additional queue types which allows for alternative delivery behaviours. These are priority-queues, sorted-queues-, last-value-queues (LVQs). Additionally, Java Broker supports message grouping.

In the following sections, the semantics of each queue type is described, followed by a description of how instances of these queue can be created via configuration, programmatically or Web Management Console.

The final section discusses the importance of using a low client pre-fetch with these queued.

9.1.2. Priority Queues

In a priority queue, messages on the queue are delivered in an order determined by the JMS priority message header within the message. By default Qpid supports the 10 priority levels mandated by JMS, with priority value 0 as the lowest priority and 9 as the highest.

It is possible to reduce the effective number of priorities if desired.

JMS defines the default message priority as 4. Messages sent without a specified priority use this default.

9.1.3. Sorted Queues

Sorted queues allow the message delivery order to be determined by value of an arbitrary JMS message property. Sort order is alpha-numeric and the property value must have a type java.lang.String.

Messages sent to a sorted queue without the specified JMS message property will be inserted into the 'last' position in the queue.

9.1.4. Last Value Queues (LVQ)

LVQs (or conflation queues) are special queues that automatically discard any message when a newer message arrives with the same key value. The key is specified by arbitrary JMS message property.

An example of an LVQ might be where a queue represents prices on a stock exchange: when you first consume from the queue you get the latest quote for each stock, and then as new prices come in you are sent only these updates.

Like other queues, LVQs can either be browsed or consumed from. When browsing an individual subscriber does not remove the message from the queue when receiving it. This allows for many subscriptions to browse the same LVQ (i.e. you do not need to create and bind a separate LVQ for each subscriber who wishes to receive the contents of the LVQ).

Messages sent to an LVQ without the specified property will be delivered as normal and will never be "replaced".

9.1.5. Creating a Priority, Sorted or LVQ Queue

To create a priority, sorted or LVQ queue, it can be defined in the virtualhost configuration file, can be created programmtically from a client via AMQP (using an extension to JMS), using JMX, using REST interfaces or created in Web Management Console. These methods are described below.

Once a queue is created you cannot change its type (without deleting it and re-creating). Also note you cannot currently mix the natures of these queue types, for instance, you cannot define a queue which it both an LVQ and a priority-queue.

9.1.5.1. Using Web Management Console

On clicking on "Add Queue" button on Virtual Host tab the pop-up dialog to create a queue is displayed.

For a Simple queue a Queue Type "Standard" should be selected

For a Priority queue a Queue Type "Priority" and the priority value (10 by default) should be selected.

For a Sorted queue a Queue Type "Sorted" and Sort Message Property should be specified.

For a LVQ queue a Queue Type "LVQ" and LVQ Message Property should be specified.

Additionally, for each type of the queue Flow Control Thresholds and Alert Thresholds can be specified in optional fields.

Also, a Dead Letter Queue can be configured for the Queue by checking "Create DLQ" check box. The maximum number of delivery retries before message is sent to the DLQ can be specified in "Maximum Delivery Retries" field. However, configuring of maximum delivery retries on a queue without DLQ(AlternateExchange) will result in messages being discarded after the limit is reached.

9.1.5.2. Using JMX or AMQP

To create a priority, sorted or LVQ queue programmatically from JMX or using a Qpid extension to JMS, pass the appropriate queue-declare arguments.

Table 9.1. Queue-declare arguments understood for priority, sorted and LVQ queues

Queue typeArgument nameArgument nameArgument Description
priorityx-qpid-prioritiesjava.lang.IntegerSpecifies a priority queue with given number priorities
sortedqpid.queue_sort_keyjava.lang.StringSpecifies sorted queue with given message property used to sort the entries
lvqqpid.last_value_queue_keyjava.lang.StringSpecifies lvq queue with given message property used to conflate the entries

The following example illustrates the creation of the a LVQ queue from a javax.jms.Session object. Note that this utilises a Qpid specific extension to JMS and involves casting the session object back to its Qpid base-class.

Example 9.1. Creation of an LVQ using the Qpid extension to JMS

Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("qpid.last_value_queue_key","ISIN");
AMQDestination amqQueue = (AMQDestination) context.lookup("myqueue");
((AMQSession<?,?>) session).createQueue(
        AMQShortString.valueOf(amqQueue.getQueueName()),
        amqQueue.isAutoDelete(),
        amqQueue.isDurable(),
        amqQueue.isExclusive(),
        arguments);

The following example illustrates the creation of the sorted queue from a the JMX interface using the ManagedBroker interface.

Example 9.2. Creation of a sorted queue using JMX

Map<String, Object> environment = new HashMap<String, Object>();
environment.put(JMXConnector.CREDENTIALS, new String[] {"admin","password"});
// Connect to service
JMXServiceURL url =  new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi");
JMXConnector jmxConnector = JMXConnectorFactory.connect(url, environment);
MBeanServerConnection mbsc =  jmxConnector.getMBeanServerConnection();
// Object name for ManagedBroker for virtualhost myvhost
ObjectName objectName = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=myvhost");
// Get the ManagedBroker object
ManagedBroker managedBroker = JMX.newMBeanProxy(mbsc, objectName, ManagedBroker.class);;

// Create the queue passing arguments
Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("qpid.queue_sort_key","myheader");
managedBroker.createNewQueue("myqueue", null, true, arguments);

9.1.5.3. Using configuration

How to declare queues in the Virtual Host configuration file is described in Section 14.8, “Configuring Queues”.

9.1.6. Binding queues to exchanges

Queues can be bound to the broker exchanges in the virtualhost configuration file or programmtically from a client using AMQP bind API (using an extension to JMS), using JMX API, using REST interfaces or Web Management Console.

A queue can be bound to different exchanges at the same time. Also, a queue can be bound to the same exchange multiple times. Differenent binding keys can be used to bind a queue to the same topic or direct exchanges.

Binding attributes can be specified on binding creation to allow filtering of messages accepted by the queue using a selector expression or/and specifying whether messages published by its own connection should be delivered to it.

9.1.6.1. Using Web Management Console

A queue can be bound to an exchange by clicking on "Add Binding" button on a Queue tab or an Exchange tab.

9.1.6.2. Using JMX or AMQP

The following example illustrates the creation of queue binding to topic exchange with JMS client.

Example 9.3. Binding a queue using JMS

ConnectionFactory connectionFactory = ...
Connection connection = connectionFactory.createConnection();
AMQSession<?, ?> session = (AMQSession<?,?>)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

...

AMQShortString queueName = new AMQShortString("testQueue");
AMQShortString routingKey = new AMQShortString("testRoutingKey");
AMQDestination destination = (AMQDestination) session.createQueue(queueName.asString());

...

// binding arguments
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-filter-jms-selector", "application='app1'");

// create binding
session.bindQueue(queueName, routingKey, FieldTable.convertToFieldTable(arguments),
    new AMQShortString("amq.topic"), destination);

The following example illustrates the creation of queue binding to topic exchange with JMX interface using the ManagedExchange interface.

Example 9.4. Binding a queue using JMX

Map<String, Object> environment = new HashMap<String, Object>();
environment.put(JMXConnector.CREDENTIALS, new String[] {"admin","password"});

// Connect to service
JMXServiceURL url =  new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi");
JMXConnector jmxConnector = JMXConnectorFactory.connect(url, environment);
MBeanServerConnection mbsc =  jmxConnector.getMBeanServerConnection();

// Object name for topic Exchange MBean for virtualhost 'default'
ObjectName objectName = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,"
    + "VirtualHost=\"default\",name=\"amq.topic\",ExchangeType=topic");

// Get the ManagedExchange object
ManagedExchange topicExchange = JMX.newMBeanProxy(mbsc, objectName, ManagedExchange.class);;

// Create the binding arguments
Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-filter-jms-selector", "application='app1'");

// create binding
topicExchange.createNewBinding("queue", "testBindingKey", arguments);

9.1.6.3. Using configuration

How to bind queues in the Virtual Host configuration file is shown in Section 14.9, “Queue Binding”.

9.1.7.  Messaging Grouping

The broker allows messaging applications to classify a set of related messages as belonging to a group. This allows a message producer to indicate to the consumer that a group of messages should be considered a single logical operation with respect to the application.

The broker can use this group identification to enforce policies controlling how messages from a given group can be distributed to consumers. For instance, the broker can be configured to guarantee all the messages from a particular group are processed in order across multiple consumers.

For example, assume we have a shopping application that manages items in a virtual shopping cart. A user may add an item to their shopping cart, then change their mind and remove it. If the application sends an add message to the broker, immediately followed by a remove message, they will be queued in the proper order - add, followed by remove.

However, if there are multiple consumers, it is possible that once a consumer acquires the add message, a different consumer may acquire the remove message. This allows both messages to be processed in parallel, which could result in a "race" where the remove operation is incorrectly performed before the add operation.

9.1.7.1.  Grouping Messages

In order to group messages, the application would designate a particular message header as containing a message's group identifier. The group identifier stored in that header field would be a string value set by the message producer. Messages from the same group would have the same group identifier value. The key that identifies the header must also be known to the message consumers. This allows the consumers to determine a message's assigned group.

The header that is used to hold the group identifier, as well as the values used as group identifiers, are totally under control of the application.

9.1.7.2.  The Role of the Broker in Message Grouping

The broker will apply the following processing on each grouped message:

  • Enqueue a received message on the destination queue.
  • Determine the message's group by examining the message's group identifier header.
  • Enforce consumption ordering among messages belonging to the same group. Consumption ordering means one of two things depending on how the queue has been configured.
    • In default mode, a group gets assigned to a single consumer for the lifetime of that consumer, and the broker will pass all subsequent messages in the group to that consumer.
    • In 'shared groups' mode (which gives the same behaviour as the Qpid C++ Broker) the broker enforces a looser guarantee, namely that all the currently unacknowledged messages in a group are sent to the same consumer, but the consumer used may change over time even if the consumers do not. This means that only one consumer can be processing messages from a particular group at any given time, however if the consumer acknowledges all of its acquired messages then the broker may pass the next pending message in that group to a different consumer.

The absence of a value in the designated group header field of a message is treated as follows:

  • In default mode, failure for a message to specify a group is treated as a desire for the message not to be grouped at all. Such messages will be distributed to any available consumer, without the ordering quarantees imposed by grouping.
  • In 'shared groups' mode (which gives the same behaviour as the Qpid C++ Broker) the broker assigns messages without a group value to a 'default group'. Therefore, all such "unidentified" messages are considered by the broker as part of the same group, which will handled like any other group. The name of this default group is "qpid.no-group", although it can be customised as detailed below.

Note that message grouping has no effect on queue browsers.

Note well that distinct message groups would not block each other from delivery. For example, assume a queue contains messages from two different message groups - say group "A" and group "B" - and they are enqueued such that "A"'s messages are in front of "B". If the first message of group "A" is in the process of being consumed by a client, then the remaining "A" messages are blocked, but the messages of the "B" group are available for consumption by other consumers - even though it is "behind" group "A" in the queue.

9.1.7.3.  Broker Message Grouping Configuration

In order for the broker to determine a message's group, the key for the header that contains the group identifier must be provided to the broker via configuration. This is done on a per-queue basis, when the queue is first configured.

This means that message group classification is determined by the message's destination queue.

Specifically, the queue "holds" the header key that is used to find the message's group identifier. All messages arriving at the queue are expected to use the same header key for holding the identifer. Once the message is enqueued, the broker looks up the group identifier in the message's header, and classifies the message by its group.

Message group support is specified by providing one or more of the following settings in the arguments map that is used when declaring the queue (e.g. when calling AMQSession.createQueue()).

Table 9.2. Queue Declare Message Group Configuration Arguments

KeyValue
qpid.group_header_keyThe name of the message header that holds the group identifier value. The values in this header may be of any supported type (i.e. not just strings).
qpid.shared_msg_groupProvide a value of "1" to switch on 'shared groups' mode.
qpid.default_msg_groupThe value to use as the default group when operating in 'shared groups' mode.


The default group for groups operating in 'shared groups' mode can be updated broker-wide using a system property as follows, however do note that the queue declaration argument detailed above takes precedence:

-Dqpid.broker_default-shared-message-group="your.default.shared.group"

It is important to note that there is no need to provide the actual group identifer values that will be used. The broker learns these values as messages are received. Also, there is no practical limit - aside from resource limitations - to the number of different groups that the broker can track at run time.

9.1.8. Using low pre-fetch with special queue types

Qpid clients receive buffered messages in batches, sized according to the pre-fetch value. The current default is 500.

However, if you use the default value you will probably not see desirable behaviour when using priority, sorted, lvq or grouped queues. Once the broker has sent a message to the client its delivery order is then fixed, regardless of the special behaviour of the queue.

For example, if using a priority queue and a prefetch of 100, and 100 messages arrive with priority 2, the broker will send these messages to the client. If then a new message arrives will priority 1, the broker cannot leap frog messages of lower priority. The priority 1 will be delivered at the front of the next batch of messages to be sent to the client.

So, you need to set the prefetch values for your client (consumer) to make this sensible. To do this set the Java system property max_prefetch on the client environment (using -D) before creating your consumer.

A default for all client connections can be set via a system property:

-Dmax_prefetch=1

The prefetch can be also be adjusted on a per connection basis by adding a maxprefetch value to the Connection URLs

amqp://guest:guest@client1/development?maxprefetch='1'&brokerlist='tcp://localhost:5672'

Setting the Qpid pre-fetch to 1 will give exact queue-type semantics as perceived by the client however, this brings a performance cost. You could test with a slightly higher pre-fetch to trade-off between throughput and exact semantics.