A Last Value Queue is configured with the name of a message header that is used as a key. The queue behaves as a normal FIFO queue with the exception that when a message is enqueued, any other message in the queue with the same value in the key header is removed and discarded. Thus, for any given key value, the queue holds only the most recent message.
The following example illustrates the operation of a Last Value Queue. The example shows an empty queue with no consumers and a sequence of produced messages. The numbers represent the key for each message.
<empty queue> 1 => 1 2 => 1 2 3 => 1 2 3 4 => 1 2 3 4 2 => 1 3 4 2 1 => 3 4 2 1
Note that the first four messages are enqueued normally in FIFO order. The fifth message has key '2' and is also enqueued on the tail of the queue. However the message already in the queue with the same key is discarded.
If the set of keys used in the messages in a LVQ is constrained, the number of messages in the queue shall not exceed the number of distinct keys in use.
LVQ with zero or one consuming subscriptions - In this case, if the consumer drops momentarily or is slower than the producer(s), it will only receive current information relative to the message keys.
LVQ with zero or more browsing subscriptions - A browsing consumer can subscribe to the LVQ and get an immediate dump of all of the "current" messages and track updates thereafter. Any number of independent browsers can subscribe to the same LVQ with the same effect. Since messages are never consumed, they only disappear when replaced with a newer message with the same key or when their TTL expires.
A LVQ may be created using directives in the API's address syntax. The important argument is "qpid.last_value_queue_key". The following Python example shows how a producer of stock price updates can create a LVQ to hold the latest stock prices for each ticker symbol. The message header used to hold the ticker symbol is called "ticker".
conn = Connection(url) conn.open() sess = conn.session() tx = sess.sender("prices;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key':'ticker'}}}}")
from qpid.messaging import Connection, Message def send(sender, key, message): message.properties["ticker"] = key sender.send(message) conn = Connection("localhost") conn.open() sess = conn.session() tx = sess.sender("prices;{create:always, node:{type:queue,x-declare:{arguments:{'qpid.last_value_queue_key':ticker}}}}") msg = Message("Content") send(tx, "key1", msg); send(tx, "key2", msg); send(tx, "key3", msg); send(tx, "key4", msg); send(tx, "key2", msg); send(tx, "key1", msg); conn.close()
Apache Qpid, Messaging built on AMQP; Copyright © 2015 The Apache Software Foundation; Licensed under the Apache License, Version 2.0; Apache Qpid, Qpid, Qpid Proton, Proton, Apache, the Apache feather logo, and the Apache Qpid project logo are trademarks of The Apache Software Foundation; All other marks mentioned may be trademarks or registered trademarks of their respective owners