Menu Search

4.2. Publish/subscribe example

In this second example, we illustrate publish/subscribe messaging. Again, we create a JNDI context using a properties file, use the context to lookup a connection factory, create and start a connection, create a session, and lookup a destination (a topic) from the JNDI context. Then we create a producer and two durable subscribers , send a message with the producer. Both subscribers receive the same message.

Example 4.3. JMS Example - Publish/subscribe Messaging

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;

import java.util.Properties;

public class StocksExample {

    public StocksExample() {
    }

    public static void main(String[] args) throws Exception {
      StocksExample stocks = new StocksExample();
      stocks.runTest();
    }

    private void runTest() throws Exception {
      Properties properties = new Properties();
      properties.load(this.getClass().getResourceAsStream("stocks.properties"));
      Context context = new InitialContext(properties);

      ConnectionFactory connectionFactory
          = (ConnectionFactory) context.lookup("qpidConnectionFactory");
      Connection connection = connectionFactory.createConnection();
      connection.start();

      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
      Topic priceTopic = (Topic) context.lookup("myprices");                             (1)

      MessageConsumer subscriber1 = session.createDurableSubscriber(priceTopic, "sub1"); (2)
      MessageConsumer subscriber2 = session.createDurableSubscriber(priceTopic, "sub2" /*, "price > 150", false*/ );
      MessageProducer messageProducer = session.createProducer(priceTopic);

      Message message = session.createMessage();
      message.setStringProperty("instrument", "IBM");
      message.setIntProperty("price", 100);
      messageProducer.send(message);
      session.commit();

      message = subscriber1.receive(1000);
      session.commit();
      System.out.println("Subscriber 1 received : " + message);

      message = subscriber2.receive(1000);
      session.commit();
      System.out.println("Subscriber 2 received : " + message);

      session.unsubscribe("sub1");                                                       (3)
      session.unsubscribe("sub2");
      connection.close();
      context.close();
    }
}
	

(1)

Looks up a destination for the topic with JNDI name myprices.

(2)

Creates two durable subscribers, sub1 and sub2. Durable subscriptions retain messages for the client even when the client is disconnected, until the subscription is unsubscribed. Subscription 2 has a (commented out) message selector argument so you can conveniently experiement with the effect of those. [2]

(3)

Unsubscribes the two durable subscribers, permanently removing the knowledge of the subscriptions from the system. An application would normally NOT do this. The typical use-case for durable subsciption is one where the subscription exists over an extended period of time.

The contents of the stocks.properties file are shown below.

Example 4.4. JMS Example - Publish/subscribe Messaging - JNDI Properties

java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
connectionfactory.qpidConnectionFactory = amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672'
topic.myprices = prices (1)
	

(1)

Defines a topic for which MessageProducers and/or MessageConsumers send and receive messages. The format of this entry is described in Section 6.3, “Topic”.



[2] Each durable subscription is implemented as a queue on the Broker. See Section 5.6.2, “Topic Subscriptions” for details.