Menu Search

7.2. Other Queue Types

7.2.1. Introduction

In addition to the standard queue type where messages are delivered in the same order that they were sent, the Java Broker supports three additional queue types which allows for alternative delivery behaviours. These are priority-queues, sorted-queues-, and last-value-queues (LVQs).

In the following sections, the semantics of each queue type is described, followed by a description of how instances of these queue can be created via configuration or programmatically.

The final section discusses the importance of using a low client pre-fetch with these queued.

7.2.2. Priority Queues

In a priority queue, messages on the queue are delivered in an order determined by the JMS priority message header within the message. By default Qpid supports the 10 priority levels mandated by JMS, with priority value 0 as the lowest priority and 9 as the highest.

It is possible to reduce the effective number of priorities if desired.

JMS defines the default message priority as 4. Messages sent without a specified priority use this default.

7.2.3. Sorted Queues

Sorted queues allow the message delivery order to be determined by value of an arbitrary JMS message property. Sort order is alpha-numeric and the property value must have a type java.lang.String.

Messages sent to a sorted queue without the specified JMS message property will be inserted into the 'last' position in the queue.

7.2.4. Last Value Queues (LVQ)

LVQs (or conflation queues) are special queues that automatically discard any message when a newer message arrives with the same key value. The key is specified by arbitrary JMS message property.

An example of an LVQ might be where a queue represents prices on a stock exchange: when you first consume from the queue you get the latest quote for each stock, and then as new prices come in you are sent only these updates.

Like other queues, LVQs can either be browsed or consumed from. When browsing an individual subscriber does not remove the message from the queue when receiving it. This allows for many subscriptions to browse the same LVQ (i.e. you do not need to create and bind a separate LVQ for each subscriber who wishes to receive the contents of the LVQ).

Messages sent to an LVQ without the specified property will be delivered as normal and will never be "replaced".

7.2.5. Creating a Priority, Sorted or LVQ Queue

To create a priority, sorted or LVQ queue, it can be defined in the virtualhost configuration file, or the queue can be created programmtically from a client via AMQP (using an extension to JMS), or using JMX. These methods are described below.

Once a queue is created you cannot change its type (without deleting it and re-creating). Also note you cannot currently mix the natures of these queue types, for instance, you cannot define a queue which it both an LVQ and a priority-queue.

7.2.5.1. Using configuration

To create a priority, sorted or LVQ queue within configuration, add the appropriate xml to the virtualhost.xml configuration file within the queues element.

Priority

To defining a priority queue, add a <priority>true</priority> element. By default the queue will have 10 distinct priorities.

Example 7.1. Configuring a priority queue

<queue>
    <name>myqueue</name>
    <myqueue>
        <exchange>amq.direct</exchange>
        <priority>true</priority>
    </myqueue>
</queue>

If you require fewer priorities, it is possible to specify a priorities element (whose value is a integer value between 2 and 10 inclusive) which will give the queue that number of distinct priorities. When messages are sent to that queue, their effective priority will be calculated by partitioning the priority space. If the number of effective priorities is 2, then messages with priority 0-4 are treated the same as "lower priority" and messages with priority 5-9 are treated equivalently as "higher priority".

Example 7.2. Configuring a priority queue with fewer priorities

<queue>
    <name>myqueue</name>
    <myqueue>
        <exchange>amq.direct</exchange>
        <priority>true</priority>
        <priorities>4</priorities>
    </myqueue>
</queue>

Sorted

To define a sorted queue, add a sortKey element. The value of the sortKey element defines the message property to use the value of when sorting the messages put onto the queue.

Example 7.3. Configuring a sorted queue

<queue>
    <name>myqueue</name>
    <myqueue>
        <exchange>amq.direct</exchange>
        <sortKey>message-property-to-sort-by</sortKey>
    </myqueue>
</queue>

LVQ

To define a LVQ, add a lvq element with the value true. Without any further configuration this will define an LVQ which uses the JMS message property qpid.LVQ_key as the key for replacement.

Example 7.4. Configuring a LVQ queue

<queue>
    <name>myqueue</name>
    <myqueue>
        <exchange>amq.direct</exchange>
        <lvq>true</lvq>
    </myqueue>
</queue>

If you wish to define your own property then you can do so using the lvqKey element.

Example 7.5. Configuring a LVQ queue with custom message property name

<queue>
    <name>myqueue</name>
    <myqueue>
        <exchange>amq.direct</exchange>
        <lvq>true</lvq>
        <lvqKey>ISIN</lvqKey>
    </myqueue>
</queue>

7.2.5.2. Using JMX or AMQP

To create a priority, sorted or LVQ queue programmatically from JMX or using a Qpid extension to JMS, pass the appropriate queue-declare arguments.

Table 7.1. Queue-declare arguments understood for priority, sorted and LVQ queues

Queue typeArgument nameArgument nameArgument Description
priorityprioritiesjava.lang.IntegerSpecifies a priority queue with given number priorities
sortedqpid.queue_sort_keyjava.lang.StringSpecifies sorted queue with given message property used to sort the entries
lvqqpid.last_value_queue_keyjava.lang.StringSpecifies lvq queue with given message property used to conflate the entries

The following example illustrates the creation of the a LVQ queue from a javax.jms.Session object. Note that this utilises a Qpid specific extension to JMS and involves casting the session object back to its Qpid base-class.

Example 7.6. Creation of an LVQ using the Qpid extension to JMS

Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("qpid.last_value_queue_key","ISIN");
((AMQSession<?,?>) session).createQueue(queueName, autoDelete, durable, exclusive, arguments);

The following example illustrates the creation of the sorted queue from a the JMX interface using the ManagedBroker interface.

Example 7.7. Creation of a sorted queue using JMX

Map<String, Object> environment = new HashMap<String, Object>();
environment.put(JMXConnector.CREDENTIALS, new String[] {"admin","password"});
// Connect to service
JMXServiceURL url =  new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi");
JMXConnector jmxConnector = JMXConnectorFactory.connect(url, environment);
MBeanServerConnection mbsc =  jmxConnector.getMBeanServerConnection();
// Object name for ManagedBroker for virtualhost myvhost
ObjectName objectName = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=myvhost");
// Get the ManagedBroker object
ManagedBroker managedBroker = JMX.newMBeanProxy(mbsc, objectName, ManagedBroker.class);;

// Create the queue passing arguments
Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("qpid.queue_sort_key","myheader");
managedBroker.createNewQueue("myqueue", null, true, arguments);

7.2.6. Low pre-fetch

Qpid clients receive buffered messages in batches, sized according to the pre-fetch value. The current default is 500.

However, if you use the default value you will probably not see desirable behaviour when using priority, sorted or lvq queues. Once the broker has sent a message to the client its delivery order is then fixed, regardless of the special behaviour of the queue.

For example, if using a priority queue and a prefetch of 100, and 100 messages arrive with priority 2, the broker will send these messages to the client. If then a new message arrives will priority 1, the broker cannot leap frog messages of lower priority. The priority 1 will be delivered at the front of the next batch of messages to be sent to the client.

So, you need to set the prefetch values for your client (consumer) to make this sensible. To do this set the Java system property max_prefetch on the client environment (using -D) before creating your consumer.

A default for all client connections can be set via a system property:

-Dmax_prefetch=1

The prefetch can be also be adjusted on a per connection basis by adding a maxprefetch value to the Connection URLs

amqp://guest:guest@client1/development?maxprefetch='1'&brokerlist='tcp://localhost:5672'

Setting the Qpid pre-fetch to 1 will give exact queue-type semantics as perceived by the client however, this brings a performance cost. You could test with a slightly higher pre-fetch to trade-off between throughput and exact semantics.