The software below allows you to send a text message (sent in the main method of SpringJMSSendTest: sender.send("Test Message 12345");). Once the message is sent, the same code will receive it.
The listener reconnects on exception - i.e. if ActiveMQ is stopped, or has not been started, this program will be trying to reconnect until it connects (this behavior is defined by a property of CachingConnectionFactory <property name="reconnectOnException" value="true" />.
We can start with XML config (jms-config.xml)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd"> <!-- a pool based JMS provider --> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> </property> </bean> <bean id="MQcachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="jmsFactory" /> <property name="reconnectOnException" value="true" /> <property name="exceptionListener" ref="exceptionListener" /> <property name="sessionCacheSize" value="1" /> </bean> <bean id="exceptionListener" class="com.amq.test.ExceptionListenerTest"> <property name="cachingConnectionFactory" ref="MQcachedConnectionFactory" /> </bean> <!-- Spring JMS Template --> <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <ref local="MQcachedConnectionFactory" /> </property> <property name="defaultDestination" ref="defaultDestination" /> </bean> <!-- ActiveMQ destination to use by default --> <bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="test_queue" /> </bean> <bean id="springJMSListenTest" class="com.amq.test.SpringJMSListenTest"> </bean> <bean id="messageListenerTest" class="com.amq.test.MessageListenerTest" /> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="MQcachedConnectionFactory" /> <property name="destination" ref="defaultDestination" /> <property name="messageListener" ref="messageListenerTest" /> <property name="concurrentConsumers" value="6" /> <property name="acceptMessagesWhileStopping" value="false" /> <property name="recoveryInterval" value="10000" /> <property name="cacheLevelName" value="CACHE_CONSUMER" /> </bean> <!-- Publisher --> <bean id="jmsSendTest" class="com.amq.test.SpringJMSSendTest"> <property name="jmsTemplate" ref="myJmsTemplate"/> </bean> </beans> |
********************************* Now the Java part ************************
ExceptionListener - you can live without it, but if you want to be able to process exceptions, it is a good idea to create one. ExceptionListener is referenced in XML in:
<bean id="exceptionListener" class="com.amq.test.ExceptionListenerTest">
package com.amq.test; import javax.jms.ExceptionListener; import javax.jms.JMSException; import org.springframework.jms.connection.CachingConnectionFactory; // import com.sssw.jms.api.JMQConnectionLostException; public class ExceptionListenerTest implements ExceptionListener { CachingConnectionFactory cachingConnectionFactory; public void onException(JMSException arg0) { System.err.println("Exception occurred "+arg0); cachingConnectionFactory.onException(arg0); } public CachingConnectionFactory getCachingConnectionFactory() { return cachingConnectionFactory; } public void setCachingConnectionFactory( CachingConnectionFactory cachingConnectionFactory) { this.cachingConnectionFactory = cachingConnectionFactory; } } |
MessageListener - This is the place where the messages we receive are being processed.
package com.amq.test; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class MessageListenerTest implements MessageListener { public void onMessage(Message message) { if (message instanceof TextMessage) { try { System.out.println("Received Message:["+ ((TextMessage) message).getText()+"]"); } catch (Exception ex) { System.out.println("Exception in onMessage " + ex.toString() + "\n" + ex.getStackTrace()); } } else { System.out.println("Message must be of type TextMessage"); } } } |
SpringJMSListen - use it if you do not want to publish anything, just to listen
package com.amq.test; import org.springframework.context.ApplicationContext; import org.springframework.context.support.FileSystemXmlApplicationContext; public class SpringJMSListenTest { public SpringJMSListenTest() { } public static void main(String[] args) { try { System.out.println("trying to get spring context"); ApplicationContext ctx = new FileSystemXmlApplicationContext("config/jms-config.xml"); System.out.println("got spring context"); SpringJMSListenTest springJMSTest = (SpringJMSListenTest)ctx.getBean("springJMSListenTest"); } catch (Exception ex) { ex.printStackTrace(); } } } |
SpringJMSSend - use it to start the software if you need to send a test message to the queue and receive it.
package com.amq.test; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.context.ApplicationContext; import org.springframework.context.support.FileSystemXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class SpringJMSSendTest { private JmsTemplate jmsTemplate; public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate MQjmsTemplate) { this.jmsTemplate = MQjmsTemplate; } public void send(final String txt){ if(jmsTemplate==null){ System.out.println("Template is null!!"); } System.out.println("Sending Message:["+txt+"]"); jmsTemplate.send( new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(txt); } } ); System.out.println("Message Sent"); } public static void main(String argc[]){ try { System.out.println("trying to get spring context"); ApplicationContext ctx = new FileSystemXmlApplicationContext("config/jms-config.xml"); System.out.println("got spring context"); SpringJMSSendTest sender = (SpringJMSSendTest)ctx.getBean("jmsSendTest"); sender.send("Test Message 12345"); } catch (Exception ex) { System.out.println("Exception: "+ex.toString()); ex.printStackTrace(); } } } |
NOTES
- The configuration XML assumes that ActiveMQ server is running locally.
- Publisher will not try to reconnect automatically, only listeners do that
- The code above was tested in Eclpse with Java 1.6
Wow wat a post .. excellent one dude .. !! but i had a small question .. i have a profiler running on my app .. and it shows all the consumers running concurrently but ... all r in waiting state .. all the message i posted round 500 are consumed only by single consumer ...
ReplyDelete