Thursday, December 16, 2010

Sample Spring Publishing/Subscribing Application for ActiveMQ

Here is an example of an application that uses Spring to connect to ActiveMQ messaging service. This code is very similar to the one described in the post about a simple publisher for MQSeries.
The software below allows you to send a text message (sent in the main method of SpringJMSSendTest: sender.send("Test Message 12345");). Once the message is sent, the same code will receive it.
The listener reconnects on exception - i.e. if ActiveMQ is stopped, or has not been started, this program will be trying to reconnect until it connects (this behavior is defined by a property of CachingConnectionFactory <property name="reconnectOnException" value="true" />.


We can start with XML config (jms-config.xml)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://www.springframework.org/schema/beans 
 http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">

 <!-- a pool based JMS provider -->
 <bean id="jmsFactory" 
          class="org.apache.activemq.pool.PooledConnectionFactory"
          destroy-method="stop">
  <property name="connectionFactory">
   <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 
    <property name="brokerURL">   
      <value>tcp://localhost:61616</value>
    </property>
   </bean>
  </property>
 </bean>
 
 <bean id="MQcachedConnectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="jmsFactory" />
    <property name="reconnectOnException" value="true" />
    <property name="exceptionListener" ref="exceptionListener" />
    <property name="sessionCacheSize" value="1" />
 </bean>
 
 <bean id="exceptionListener" class="com.amq.test.ExceptionListenerTest">
    <property name="cachingConnectionFactory" 
        ref="MQcachedConnectionFactory"   
    /> 
 </bean>
 
 <!-- Spring JMS Template -->
 <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
   <property name="connectionFactory">
     <ref local="MQcachedConnectionFactory" />
   </property>
   <property name="defaultDestination" ref="defaultDestination" />
 </bean>

 <!-- ActiveMQ destination to use by default -->
 <bean id="defaultDestination" 
          class="org.apache.activemq.command.ActiveMQQueue">
   <constructor-arg value="test_queue" />
 </bean>

 <bean id="springJMSListenTest" class="com.amq.test.SpringJMSListenTest">
 </bean>

 <bean id="messageListenerTest" class="com.amq.test.MessageListenerTest" />

 <bean id="listenerContainer"
   class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="MQcachedConnectionFactory" />
  <property name="destination" ref="defaultDestination" />
  <property name="messageListener" ref="messageListenerTest" />
  <property name="concurrentConsumers" value="6" />
  <property name="acceptMessagesWhileStopping" value="false" />
  <property name="recoveryInterval" value="10000" />
  <property name="cacheLevelName" value="CACHE_CONSUMER" /> 
 </bean>

 <!--  Publisher -->
 <bean id="jmsSendTest" class="com.amq.test.SpringJMSSendTest">
    <property name="jmsTemplate" ref="myJmsTemplate"/>
 </bean>

</beans>


********************************* Now the Java part ************************

ExceptionListener - you can live without it, but if you want to be able to process exceptions, it is a good idea to create one. ExceptionListener is referenced in XML in:
<bean id="exceptionListener" class="com.amq.test.ExceptionListenerTest">


package com.amq.test;

import javax.jms.ExceptionListener;
import javax.jms.JMSException;

import org.springframework.jms.connection.CachingConnectionFactory;

                                                                           
// import com.sssw.jms.api.JMQConnectionLostException;

public class ExceptionListenerTest implements ExceptionListener
{
 CachingConnectionFactory cachingConnectionFactory;
 
 public void onException(JMSException arg0) {
  System.err.println("Exception occurred "+arg0);
  cachingConnectionFactory.onException(arg0);
 }

 public CachingConnectionFactory getCachingConnectionFactory() {
  return cachingConnectionFactory;
 }

 public void setCachingConnectionFactory(
                        CachingConnectionFactory  cachingConnectionFactory) {
  this.cachingConnectionFactory = cachingConnectionFactory;
 }
}



MessageListener - This is the place where the messages we receive are being processed.


package com.amq.test;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MessageListenerTest implements MessageListener {

  public void onMessage(Message message) {

    if (message instanceof TextMessage) {

      try {
         System.out.println("Received Message:["+
                      ((TextMessage) message).getText()+"]");
      } 
      catch (Exception ex) {    
         System.out.println("Exception in onMessage " + ex.toString() + "\n" +
                     ex.getStackTrace());
      }
    } 
    else {
      System.out.println("Message must be of type TextMessage");
    }
  }
}



SpringJMSListen - use it if you do not want to publish anything, just to listen


package com.amq.test;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;


public class SpringJMSListenTest {

  public SpringJMSListenTest() {
  }

  public static void main(String[] args) {
      try {
            System.out.println("trying to get spring context");
            ApplicationContext ctx =  
                 new FileSystemXmlApplicationContext("config/jms-config.xml");  

            System.out.println("got spring context");

            SpringJMSListenTest springJMSTest =   
                (SpringJMSListenTest)ctx.getBean("springJMSListenTest");
         } 
         catch (Exception ex) {
            ex.printStackTrace();
         }
  }
}


SpringJMSSend - use it to start the software if you need to send a test message to the queue and receive it.


package com.amq.test;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class SpringJMSSendTest {
 
 private JmsTemplate jmsTemplate;

 public JmsTemplate getJmsTemplate() {
     return jmsTemplate;
 }

 public void setJmsTemplate(JmsTemplate MQjmsTemplate) {
     this.jmsTemplate = MQjmsTemplate;
 }

 public void send(final String txt){

     if(jmsTemplate==null){
        System.out.println("Template is null!!");
     }

     System.out.println("Sending Message:["+txt+"]");
     jmsTemplate.send( 
           new MessageCreator() {
                public Message createMessage(Session session) 
                throws JMSException {
                     return session.createTextMessage(txt);
         }
           }
     );
  
     System.out.println("Message Sent");
 }

 public static void main(String argc[]){
     try {
           System.out.println("trying to get spring context");
           ApplicationContext ctx =  
             new FileSystemXmlApplicationContext("config/jms-config.xml");  
           System.out.println("got spring context");

           SpringJMSSendTest sender = (SpringJMSSendTest)ctx.getBean("jmsSendTest");
            
           sender.send("Test Message 12345");
     } 
     catch (Exception ex) {
           System.out.println("Exception: "+ex.toString());
           ex.printStackTrace();
     }

 }
}

NOTES
- The configuration XML assumes that ActiveMQ server is running locally.
- Publisher will not try to reconnect automatically, only listeners do that
- The code above was tested in Eclpse with Java 1.6

1 comment:

  1. Wow wat a post .. excellent one dude .. !! but i had a small question .. i have a profiler running on my app .. and it shows all the consumers running concurrently but ... all r in waiting state .. all the message i posted round 500 are consumed only by single consumer ...

    ReplyDelete