Menu Search

4.2. Publish/subscribe example

In this second example, we illustrate publish/subscribe messaging. Again, we create a JNDI context using a properties file, use the context to lookup a connection factory, create and start a connection, create a session, and lookup a destination (a topic) from the JNDI context. Then we create a producer and two durable subscribers , send a message with the producer. Both subscribers receive the same message.

Example 4.3. JMS Example - Publish/subscribe Messaging

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;

import java.util.Properties;

public class StocksExample {

    public StocksExample() {
    }

    public static void main(String[] args) throws Exception {
      StocksExample stocks = new StocksExample();
      stocks.runTest();
    }

    private void runTest() throws Exception {
      Properties properties = new Properties();
      properties.load(this.getClass().getResourceAsStream("stocks.properties"));
      Context context = new InitialContext(properties);

      ConnectionFactory connectionFactory
          = (ConnectionFactory) context.lookup("qpidConnectionFactory");
      Connection connection = connectionFactory.createConnection();
      connection.start();

      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
      Topic priceTopic = (Topic) context.lookup("myprices");                             1

      MessageConsumer subscriber1 = session.createDurableSubscriber(priceTopic, "sub1"); 2
      MessageConsumer subscriber2 = session.createDurableSubscriber(priceTopic, "sub2" /*, "price > 150", false*/ );
      MessageProducer messageProducer = session.createProducer(priceTopic);

      Message message = session.createMessage();
      message.setStringProperty("instrument", "IBM");
      message.setIntProperty("price", 100);
      messageProducer.send(message);
      session.commit();

      message = subscriber1.receive(1000);
      session.commit();
      System.out.println("Subscriber 1 received : " + message);

      message = subscriber2.receive(1000);
      session.commit();
      System.out.println("Subscriber 2 received : " + message);

      session.unsubscribe("sub1");                                                       3
      session.unsubscribe("sub2");
      connection.close();
      context.close();
    }
}
	

1

Looks up a destination for the topic with JNDI name myprices.

2

Creates two durable subscribers, sub1 and sub2. Durable subscriptions retain messages for the client even when the client is disconnected, until the subscription is unsubscribed. Subscription 2 has a (commented out) message selector argument so you can conveniently experiement with the effect of those. [3]

3

Unsubscribes the two durable subscribers, permanently removing the knowledge of the subscriptions from the system. An application would normally NOT do this. The typical use-case for durable subsciption is one where the subscription exists over an extended period of time.

The contents of the stocks.properties file are shown below.

Example 4.4. JMS Example - Publish/subscribe Messaging - JNDI Properties

java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
connectionfactory.qpidConnectionFactory = amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672'
topic.myprices = prices 1
	

1

Defines a topic for which MessageProducers and/or MessageConsumers send and receive messages. The format of this entry is described in Section 6.3, “Topic”.



[3] Each durable subscription is implemented as a queue on the Broker. See Section 5.6.2, “Topic Subscriptions” for details.