The software below allows you to send a text message (sent in the main method of SpringJMSSendTest: sender.send("Test Message 12345");). Once the message is sent, the same code will receive it.
The listener reconnects on exception - i.e. if ActiveMQ is stopped, or has not been started, this program will be trying to reconnect until it connects (this behavior is defined by a property of CachingConnectionFactory <property name="reconnectOnException" value="true" />.
We can start with XML config (jms-config.xml)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<!-- a pool based JMS provider -->
<bean id="jmsFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
</property>
</bean>
<bean id="MQcachedConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jmsFactory" />
<property name="reconnectOnException" value="true" />
<property name="exceptionListener" ref="exceptionListener" />
<property name="sessionCacheSize" value="1" />
</bean>
<bean id="exceptionListener" class="com.amq.test.ExceptionListenerTest">
<property name="cachingConnectionFactory"
ref="MQcachedConnectionFactory"
/>
</bean>
<!-- Spring JMS Template -->
<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref local="MQcachedConnectionFactory" />
</property>
<property name="defaultDestination" ref="defaultDestination" />
</bean>
<!-- ActiveMQ destination to use by default -->
<bean id="defaultDestination"
class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="test_queue" />
</bean>
<bean id="springJMSListenTest" class="com.amq.test.SpringJMSListenTest">
</bean>
<bean id="messageListenerTest" class="com.amq.test.MessageListenerTest" />
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="MQcachedConnectionFactory" />
<property name="destination" ref="defaultDestination" />
<property name="messageListener" ref="messageListenerTest" />
<property name="concurrentConsumers" value="6" />
<property name="acceptMessagesWhileStopping" value="false" />
<property name="recoveryInterval" value="10000" />
<property name="cacheLevelName" value="CACHE_CONSUMER" />
</bean>
<!-- Publisher -->
<bean id="jmsSendTest" class="com.amq.test.SpringJMSSendTest">
<property name="jmsTemplate" ref="myJmsTemplate"/>
</bean>
</beans>
|
********************************* Now the Java part ************************
ExceptionListener - you can live without it, but if you want to be able to process exceptions, it is a good idea to create one. ExceptionListener is referenced in XML in:
<bean id="exceptionListener" class="com.amq.test.ExceptionListenerTest">
package com.amq.test;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import org.springframework.jms.connection.CachingConnectionFactory;
// import com.sssw.jms.api.JMQConnectionLostException;
public class ExceptionListenerTest implements ExceptionListener
{
CachingConnectionFactory cachingConnectionFactory;
public void onException(JMSException arg0) {
System.err.println("Exception occurred "+arg0);
cachingConnectionFactory.onException(arg0);
}
public CachingConnectionFactory getCachingConnectionFactory() {
return cachingConnectionFactory;
}
public void setCachingConnectionFactory(
CachingConnectionFactory cachingConnectionFactory) {
this.cachingConnectionFactory = cachingConnectionFactory;
}
}
|
MessageListener - This is the place where the messages we receive are being processed.
package com.amq.test;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MessageListenerTest implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println("Received Message:["+
((TextMessage) message).getText()+"]");
}
catch (Exception ex) {
System.out.println("Exception in onMessage " + ex.toString() + "\n" +
ex.getStackTrace());
}
}
else {
System.out.println("Message must be of type TextMessage");
}
}
}
|
SpringJMSListen - use it if you do not want to publish anything, just to listen
package com.amq.test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
public class SpringJMSListenTest {
public SpringJMSListenTest() {
}
public static void main(String[] args) {
try {
System.out.println("trying to get spring context");
ApplicationContext ctx =
new FileSystemXmlApplicationContext("config/jms-config.xml");
System.out.println("got spring context");
SpringJMSListenTest springJMSTest =
(SpringJMSListenTest)ctx.getBean("springJMSListenTest");
}
catch (Exception ex) {
ex.printStackTrace();
}
}
}
|
SpringJMSSend - use it to start the software if you need to send a test message to the queue and receive it.
package com.amq.test;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class SpringJMSSendTest {
private JmsTemplate jmsTemplate;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate MQjmsTemplate) {
this.jmsTemplate = MQjmsTemplate;
}
public void send(final String txt){
if(jmsTemplate==null){
System.out.println("Template is null!!");
}
System.out.println("Sending Message:["+txt+"]");
jmsTemplate.send(
new MessageCreator() {
public Message createMessage(Session session)
throws JMSException {
return session.createTextMessage(txt);
}
}
);
System.out.println("Message Sent");
}
public static void main(String argc[]){
try {
System.out.println("trying to get spring context");
ApplicationContext ctx =
new FileSystemXmlApplicationContext("config/jms-config.xml");
System.out.println("got spring context");
SpringJMSSendTest sender = (SpringJMSSendTest)ctx.getBean("jmsSendTest");
sender.send("Test Message 12345");
}
catch (Exception ex) {
System.out.println("Exception: "+ex.toString());
ex.printStackTrace();
}
}
}
|
NOTES
- The configuration XML assumes that ActiveMQ server is running locally.
- Publisher will not try to reconnect automatically, only listeners do that
- The code above was tested in Eclpse with Java 1.6
Wow wat a post .. excellent one dude .. !! but i had a small question .. i have a profiler running on my app .. and it shows all the consumers running concurrently but ... all r in waiting state .. all the message i posted round 500 are consumed only by single consumer ...
ReplyDelete