An address is the name of a message target or message source. [1] The methods that create senders and receivers require an address. The details of sending to a particular target or receiving from a particular source are then handled by the sender or receiver. A different target or source can be used simply by using a different address.
An address resolves to a node. The Qpid Messaging API recognises two kinds of nodes, queues and topics [2]. A queue stores each message until it has been received and acknowledged, and only one receiver can receive a given message [3]. A topic immediately delivers a message to all eligible receivers; if there are no eligible receivers, it discards the message. In the AMQP 0-10 implementation of the API, [4] queues map to AMQP queues, and topics map to AMQP exchanges. [5]
In the rest of this tutorial, we present many examples using two programs that take an address as a command line parameter. spout sends messages to the target address, drain receives messages from the source address. The source code is available in C++, Python, and .NET C# and can be found in the examples directory for each language. These programs can use any address string as a source or a destination, and have many command line options to configure behavior—use the -h option for documentation on these options. [6] The examples in this tutorial also use the qpid-config utility to configure AMQP 0-10 queues and exchanges on a Qpid broker.
Example 2.3. Queues
Create a queue with qpid-config, send a message using spout, and read it using drain:
$ qpid-config add queue hello-world $ ./spout hello-world $ ./drain hello-world Message(properties={spout-id:c877e622-d57b-4df2-bf3e-6014c68da0ea:0}, content='')
The queue stored the message sent by spout and delivered it to drain when requested.
Once the message has been delivered and and acknowledged by drain, it is no longer available on the queue. If we run drain one more time, no messages will be retrieved.
$ ./drain hello-world $
Example 2.4. Topics
This example is similar to the previous example, but it uses a topic instead of a queue.
First, use qpid-config to remove the queue and create an exchange with the same name:
$ qpid-config del queue hello-world $ qpid-config add exchange topic hello-world
Now run drain and spout the same way we did in the previous example:
$ ./spout hello-world $ ./drain hello-world $
Topics deliver messages immediately to any interested receiver, and do not store messages. Because there were no receivers at the time spout sent the message, it was simply discarded. When we ran drain, there were no messages to receive.
Now let's run drain first, using the
-t
option to specify a timeout in seconds.
While drain is waiting for messages,
run spout in another window.
First Window:
$ ./drain -t 30 hello-word
Second Window:
$ ./spout hello-word
Once spout has sent a message, return to the first window to see the output from drain:
Message(properties={spout-id:7da2d27d-93e6-4803-8a61-536d87b8d93f:0}, content='')
You can run drain in several separate windows; each creates a subscription for the exchange, and each receives all messages sent to the exchange.
So far, our examples have used address strings that contain only the name of a node. An address string can also contain a subject and options.
The syntax for an address string is:
address_string ::= <address> [ / <subject> ] [ ; <options> ] options ::= { <key> : <value>, ... }
Addresses, subjects, and keys are strings. Values can be numbers, strings (with optional single or double quotes), maps, or lists. A complete BNF for address strings appears in Section 2.4.4, “Address String Grammar”.
So far, the address strings in this tutorial have only used simple names. The following sections show how to use subjects and options.
Every message has a property called subject, which is analogous to the subject on an email message. If no subject is specified, the message's subject is null. For convenience, address strings also allow a subject. If a sender's address contains a subject, it is used as the default subject for the messages it sends.
If a receiver's address contains a subject, it is used to select only messages that match the subject—the matching algorithm depends on the message source. In AMQP 0-10, each exchange type has its own matching algorithm.
Currently, a receiver bound to a queue ignores subjects, receiving messages from the queue without filtering. Support for subject filtering on queues will be implemented soon.
Example 2.5. Using subjects
In this example we show how subjects affect message flow.
First, let's use qpid-config to create a topic exchange.
$ qpid-config add exchange topic news-service
Now we use drain to receive messages from news-service
that match the subject sports
.
First Window:
$ ./drain -t 30 news-service/sports
In a second window, let's send messages to news-service
using two different subjects:
Second Window:
$ ./spout news-service/sports $ ./spout news-service/news
Now look at the first window, the message with the
subject sports
has been received, but not
the message with the subject news
:
Message(properties={qpid.subject:sports, spout-id:9441674e-a157-4780-a78e-f7ccea998291:0}, content='')
If you run drain in multiple windows using the same subject, all instances of drain receive the messages for that subject.
The AMQP exchange type we are using here,
amq.topic
, can also do more sophisticated
matching.
A sender's subject can contain multiple words separated by a
“.” delimiter. For instance, in a news
application, the sender might use subjects like
usa.news
, usa.weather
,
europe.news
, or
europe.weather
.
The receiver's subject can include wildcard characters—
“#” matches one or more words in the message's
subject, “*” matches a single word.
For instance, if the subject in the source address is
*.news
, it matches messages with the
subject europe.news
or
usa.news
; if it is
europe.#
, it matches messages with subjects
like europe.news
or
europe.pseudo.news
.
Example 2.6. Subjects with multi-word keys
This example uses drain and spout to demonstrate the use of subjects with two-word keys.
Let's use drain with the subject
*.news
to listen for messages in which
the second word of the key is
news
.
First Window:
$ ./drain -t 30 news-service/*.news
Now let's send messages using several different two-word keys:
Second Window:
$ ./spout news-service/usa.news $ ./spout news-service/usa.sports $ ./spout news-service/europe.sports $ ./spout news-service/europe.news
In the first window, the messages with
news
in the second word of the key have
been received:
Message(properties={qpid.subject:usa.news, spout-id:73fc8058-5af6-407c-9166-b49a9076097a:0}, content='') Message(properties={qpid.subject:europe.news, spout-id:f72815aa-7be4-4944-99fd-c64c9747a876:0}, content='')
Next, let's use drain with the
subject #.news
to match any sequence of
words that ends with news
.
First Window:
$ ./drain -t 30 news-service/#.news
In the second window, let's send messages using a variety of different multi-word keys:
Second Window:
$ ./spout news-service/news $ ./spout news-service/sports $ ./spout news-service/usa.news $ ./spout news-service/usa.sports $ ./spout news-service/usa.faux.news $ ./spout news-service/usa.faux.sports
In the first window, messages with
news
in the last word of the key have been
received:
Message(properties={qpid.subject:news, spout-id:cbd42b0f-c87b-4088-8206-26d7627c9640:0}, content='') Message(properties={qpid.subject:usa.news, spout-id:234a78d7-daeb-4826-90e1-1c6540781eac:0}, content='') Message(properties={qpid.subject:usa.faux.news, spout-id:6029430a-cfcb-4700-8e9b-cbe4a81fca5f:0}, content='')
The options in an address string can contain additional information for the senders or receivers created for it, including:
Policies for assertions about the node to which an address refers.
For instance, in the address string my-queue;
{assert: always, node:{ type: queue }}
, the node
named my-queue
must be a queue; if not,
the address does not resolve to a node, and an exception
is raised.
Policies for automatically creating or deleting the node to which an address refers.
For instance, in the address string xoxox ; {create: always}
,
the queue xoxox
is created, if it does
not exist, before the address is resolved.
Extension points that can be used for sender/receiver configuration.
For instance, if the address for a receiver is
my-queue; {mode: browse}
, the receiver
works in browse
mode, leaving messages
on the queue so other receivers can receive them.
Extension points providing more direct control over the underlying protocol.
For instance, the x-bindings
property
allows greater control over the AMQP 0-10 binding process
when an address is resolved.
Let's use some examples to show how these different kinds of address string options affect the behavior of senders and receives.
In this section, we use the assert
option
to ensure that the address resolves to a node of the required
type.
Example 2.7. Assertions on Nodes
Let's use qpid-config to create a queue and a topic.
$ qpid-config add queue my-queue $ qpid-config add exchange topic my-topic
We can now use the address specified to drain to assert that it is of a particular type:
$ ./drain 'my-queue; {assert: always, node:{ type: queue }}' $ ./drain 'my-queue; {assert: always, node:{ type: topic }}' 2010-04-20 17:30:46 warning Exception received from broker: not-found: not-found: Exchange not found: my-queue (../../src/qpid/broker/ExchangeRegistry.cpp:92) [caused by 2 \x07:\x01] Exchange my-queue does not exist
The first attempt passed without error as my-queue is indeed a queue. The second attempt however failed; my-queue is not a topic.
We can do the same thing for my-topic:
$ ./drain 'my-topic; {assert: always, node:{ type: topic }}' $ ./drain 'my-topic; {assert: always, node:{ type: queue }}' 2010-04-20 17:31:01 warning Exception received from broker: not-found: not-found: Queue not found: my-topic (../../src/qpid/broker/SessionAdapter.cpp:754) [caused by 1 \x08:\x01] Queue my-topic does not exist
Now let's use the create
option to
create the queue xoxox
if it does not already
exist:
In previous examples, we created the queue before
listening for messages on it. Using create:
always
, the queue is automatically created if it
does not exist.
Example 2.8. Creating a Queue Automatically
First Window:
$ ./drain -t 30 "xoxox ; {create: always}"
Now we can send messages to this queue:
Second Window:
$ ./spout "xoxox ; {create: always}"
Returning to the first window, we see that drain has received this message:
Message(properties={spout-id:1a1a3842-1a8b-4f88-8940-b4096e615a7d:0}, content='')
The details of the node thus created can be controlled by further options within the node. See Table 2.15, “Node Properties” for details.
Some options specify message transfer semantics; for
instance, they may state whether messages should be consumed or
read in browsing mode, or specify reliability
characteristics. The following example uses the
browse
option to receive messages without
removing them from a queue.
Example 2.9. Browsing a Queue
Let's use the browse mode to receive messages without removing them from the queue. First we send three messages to the queue:
$ ./spout my-queue --content one $ ./spout my-queue --content two $ ./spout my-queue --content three
Now we use drain to get those messages, using the browse option:
$ ./drain 'my-queue; {mode: browse}' Message(properties={spout-id:fbb93f30-0e82-4b6d-8c1d-be60eb132530:0}, content='one') Message(properties={spout-id:ab9e7c31-19b0-4455-8976-34abe83edc5f:0}, content='two') Message(properties={spout-id:ea75d64d-ea37-47f9-96a9-d38e01c97925:0}, content='three')
We can confirm the messages are still on the queue by repeating the drain:
$ ./drain 'my-queue; {mode: browse}' Message(properties={spout-id:fbb93f30-0e82-4b6d-8c1d-be60eb132530:0}, content='one') Message(properties={spout-id:ab9e7c31-19b0-4455-8976-34abe83edc5f:0}, content='two') Message(properties={spout-id:ea75d64d-ea37-47f9-96a9-d38e01c97925:0}, content='three')
Greater control over the AMQP 0-10 binding process can
be achieved by including an x-bindings
option in an address string.
For instance, the XML Exchange is an AMQP 0-10 custom exchange
provided by the Apache Qpid C++ broker. It allows messages to
be filtered using XQuery; queries can address either message
properties or XML content in the body of the message. The
xquery is specified in the arguments field of the AMQP 0-10
command. When using the messaging API an xquery can be
specified in and address that resolves to an XML exchange by
using the x-bindings property.
An instance of the XML Exchange must be added before it can be used:
$ qpid-config add exchange xml xml
When using the XML Exchange, a receiver provides an
XQuery as an x-binding argument. If the query contains a
context item (a path starting with “.”), then it
is applied to the content of the message, which must be
well-formed XML. For instance, ./weather
is
a valid XQuery, which matches any message in which the root
element is named weather
. Here is an
address string that contains this query:
xml; { link: { x-bindings: [{exchange:xml, key:weather, arguments:{xquery:"./weather"} }] } }
When using longer queries with drain, it is often useful to place the query in a file, and use cat in the command line. We do this in the following example.
Example 2.10. Using the XML Exchange
This example uses an x-binding that contains queries, which filter based on the content of XML messages. Here is an XQuery that we will use in this example:
let $w := ./weather return $w/station = 'Raleigh-Durham International Airport (KRDU)' and $w/temperature_f > 50 and $w/temperature_f - $w/dewpoint > 5 and $w/wind_speed_mph > 7 and $w/wind_speed_mph < 20
We can specify this query in an x-binding to listen to messages that meet the criteria specified by the query:
First Window:
$ ./drain -f "xml; {link:{x-bindings:[{key:'weather', arguments:{xquery:\"$(cat rdu.xquery )\"}}]}}"
In another window, let's create an XML message that meets the criteria in the query, and place it in the file rdu.xml
:
<weather> <station>Raleigh-Durham International Airport (KRDU)</station> <wind_speed_mph>16</wind_speed_mph> <temperature_f>70</temperature_f> <dewpoint>35</dewpoint> </weather>
Now let's use spout to send this message to the XML exchange:
Second Window:
spout --content "$(cat rdu.xml)" xml/weather
Returning to the first window, we see that the message has been received:
$ ./drain -f "xml; {link:{x-bindings:[{exchange:'xml', key:'weather', arguments:{xquery:\"$(cat rdu.xquery )\"}}]}}" Message(properties={qpid.subject:weather, spout-id:31c431de-593f-4bec-a3dd-29717bd945d3:0}, content='<weather> <station>Raleigh-Durham International Airport (KRDU)</station> <wind_speed_mph>16</wind_speed_mph> <temperature_f>40</temperature_f> <dewpoint>35</dewpoint> </weather>')
Table 2.14. Address String Options
option | value | semantics |
---|---|---|
assert | one of: always, never, sender or receiver | Asserts that the properties specified in the node option match whatever the address resolves to. If they do not, resolution fails and an exception is raised. |
create | one of: always, never, sender or receiver | Creates the node to which an address refers if it does not exist. No error is raised if the node does exist. The details of the node may be specified in the node option. |
delete | one of: always, never, sender or receiver | Delete the node when the sender or receiver is closed. |
node | A nested map containing the entries shown in Table 2.15, “Node Properties”. | Specifies properties of the node to which the address refers. These are used in conjunction with the assert or create options. |
link | A nested map containing the entries shown in Table 2.16, “Link Properties”. | Used to control the establishment of a conceptual link from the client application to or from the target/source address. |
mode | one of: browse, consume | This option is only of relevance for source addresses that resolve to a queue. If browse is specified the messages delivered to the receiver are left on the queue rather than being removed. If consume is specified the normal behaviour applies; messages are removed from the queue once the client acknowledges their receipt. |
Table 2.15. Node Properties
property | value | semantics |
---|---|---|
type | topic, queue | Indicates the type of the node. |
durable | True, False | Indicates whether the node survives a loss of volatile storage e.g. if the broker is restarted. |
x-declare | A nested map whose values correspond to the valid fields on an AMQP 0-10 queue-declare or exchange-declare command. | These values are used to fine tune the creation or assertion process. Note however that they are protocol specific. |
x-bindings |
A nested list in which each binding is represented by
a map. The entries of the map for a binding contain
the fields that describe an AMQP 0-10 binding. Here is
the format for x-bindings:
[ { exchange: <exchange>, queue: <queue>, key: <key>, arguments: { <key_1>: <value_1>, ..., <key_n>: <value_n> } }, ... ] | In conjunction with the create option, each of these bindings is established as the address is resolved. In conjunction with the assert option, the existence of each of these bindings is verified during resolution. Again, these are protocol specific. |
Table 2.16. Link Properties
option | value | semantics |
---|---|---|
reliability | one of: unreliable, at-least-once, at-most-once, exactly-once |
Reliability indicates the level of reliability that
the sender or receiver. unreliable
and at-most-once are currently
treated as synonyms, and allow messages to be lost if
a broker crashes or the connection to a broker is
lost. at-least-once guarantees that
a message is not lost, but duplicates may be
received. exactly-once guarantees
that a message is not lost, and is delivered precisely
once. Currently only unreliable
and at-least-once are supported.
[a]
|
durable | True, False | Indicates whether the link survives a loss of volatile storage e.g. if the broker is restarted. |
x-declare | A nested map whose values correspond to the valid fields of an AMQP 0-10 queue-declare command. | These values can be used to customise the subscription queue in the case of receiving from an exchange. Note however that they are protocol specific. |
x-subscribe | A nested map whose values correspond to the valid fields of an AMQP 0-10 message-subscribe command. | These values can be used to customise the subscription. |
x-bindings | A nested list each of whose entries is a map that may contain fields (queue, exchange, key and arguments) describing an AMQP 0-10 binding. | These bindings are established during resolution independent of the create option. They are considered logically part of the linking process rather than of node creation. |
delay | long | The delay (in milliseconds) between the time a message is sent by a MessageProducer, and the earliest time it becomes visible to consumers on any queue onto which it has been placed. Note that this value only has an affect on brokers which support the feature (currently only the Apache Qpid Broker-J), and only on queues where delivery delay has been enabled. |
[a] If at-most-once is requested, unreliable will be used and for durable messages on durable queues there is the possibility that messages will be redelivered; if exactly-once is requested, at-least-once will be used and the application needs to be able to deal with duplicates. |
This section provides a formal grammar for address strings.
Tokens. The following regular expressions define the tokens used to parse address strings:
LBRACE: \\{ RBRACE: \\} LBRACK: \\[ RBRACK: \\] COLON: : SEMI: ; SLASH: / COMMA: , NUMBER: [+-]?[0-9]*\\.?[0-9]+ ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])? STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\' ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F] SYM: [.#*%@$^!+-] WSPACE: [ \\n\\r\\t]+
Grammar. The formal grammar for addresses is given below:
address := name [ SLASH subject ] [ ";" options ] name := ( part | quoted )+ subject := ( part | quoted | SLASH )* quoted := STRING / ESC part := LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM options := map map := "{" ( keyval ( "," keyval )* )? "}" keyval "= ID ":" value value := NUMBER / STRING / ID / map / list list := "[" ( value ( "," value )* )? "]"
Address String Options. The address string options map supports the following parameters:
<name> [ / <subject> ] ; { create: always | sender | receiver | never, delete: always | sender | receiver | never, assert: always | sender | receiver | never, mode: browse | consume, node: { type: queue | topic, durable: True | False, x-declare: { ... <declare-overrides> ... }, x-bindings: [<binding_1>, ... <binding_n>] }, link: { name: <link-name>, durable: True | False, reliability: unreliable | at-most-once | at-least-once | exactly-once, x-declare: { ... <declare-overrides> ... }, x-bindings: [<binding_1>, ... <binding_n>], x-subscribe: { ... <subscribe-overrides> ... } } }
Create, Delete, and Assert Policies
The create, delete, and assert policies specify who should perfom the associated action:
always: the action is performed by any messaging client
sender: the action is only performed by a sender
receiver: the action is only performed by a receiver
never: the action is never performed (this is the default)
Node-Type
The node-type is one of:
topic: in the AMQP 0-10 mapping, a topic node defaults to the topic exchange, x-declare may be used to specify other exchange types
queue: this is the default node-type
[1] In the programs we have just seen, we used
amq.topic
as the default address if none is
passed in. This is the name of a standard exchange that always
exists on an AMQP 0-10 messaging broker.
[2] The terms queue and topic here were chosen to align with their meaning in JMS. These two addressing 'patterns', queue and topic, are sometimes refered as point-to-point and publish-subscribe. AMQP 0-10 has an exchange type called a topic exchange. When the term topic occurs alone, it refers to a Messaging API topic, not the topic exchange.
[3] There are exceptions to this rule; for instance,
a receiver can use browse
mode, which leaves
messages on the queue for other receivers to
read.
[4] The AMQP 0-10 implementation is the only one that currently exists.
[5] In AMQP 0-10, messages are sent to exchanges, and read from queues. The Messaging API also allows a sender to send messages to a queue; internally, Qpid implements this by sending the message to the default exchange, with the name of the queue as the routing key. The Messaging API also allows a receiver to receive messages from a topic; internally, Qpid implements this by setting up a private subscription queue for the receiver and binding the subscription queue to the exchange that corresponds to the topic.
[6] Currently, the C++, Python, and .NET C# implementations of drain and spout have slightly different options. This tutorial uses the C++ implementation. The options will be reconciled in the near future.
Apache Qpid, Messaging built on AMQP; Copyright © 2015 The Apache Software Foundation; Licensed under the Apache License, Version 2.0; Apache Qpid, Qpid, Qpid Proton, Proton, Apache, the Apache feather logo, and the Apache Qpid project logo are trademarks of The Apache Software Foundation; All other marks mentioned may be trademarks or registered trademarks of their respective owners