Friday, December 3, 2010

JMS MQ Series Publisher using Spring

Here is a simple program that allows to connect to MQ Series server, read a text file with messages (one message per line) and publish these messages to MQ.

This publisher gets allthe connection parameters from an XML file mq-service-FL.xml
that is looked up in the class path.

The name of the file with data is passed as a parameter in the command line.
If a message in the data file is shorter than 100 bytes, the message is skipped.

Code in Java is below
/**************************************************************
** Class MQAsyncSender
***************************************************************/
package com.mq.sendmsg;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;


import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class MQAsyncSender {

  private JmsTemplate jmsTemplate;

  /**
   * @return
   */
   public JmsTemplate getJmsTemplate() {
       return jmsTemplate;
   }

  /**
   * 
   * @param MQjmsTemplate
   */
   public void setJmsTemplate(JmsTemplate MQjmsTemplate) {
     this.jmsTemplate = MQjmsTemplate;
   }

  /**
   * 
   * @param txt
   */
   public void send(final String txt){
     if(jmsTemplate==null){
        System.out.println("Template is null!!");
     }
     jmsTemplate.send(new MessageCreator() {
          public Message createMessage(Session session) throws JMSException 
          {
            return session.createTextMessage(txt);
          }
     });
     System.out.println("Message Sent:["+txt+"]");
   }
}
/*
* End of MQAsyncSender class definition
*/

/*********************************************************************
** Main class
**********************************************************************/
package com.mq.sendmsg;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Arrays;

import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.connection.*;

import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.jms.*;

public class MQSimpleFileReaderSend {

  public static void main(String args[]){
    String         line           = null;
    StringBuffer   msgTextBuffer  = null;
    BufferedReader in             = null;
    String         msgText        = null;
    String         dataFileName = null;
    String         actionCode     = "A";
    int            msgcount       = 0;
    int            totalMsgs      = -1;
    int            skipMsgs       = 0;
    String         headerRecord   = "";

    try{
      if(args.length <1 ){
         throw new Exception("Incorrect arguments.Need atleast 2" +
            "arguments");        
      }
      dataFileName = args[0];    
   }
   catch(Exception ex){
       ex.printStackTrace();
       usage();
   }

   System.out.println("Starting for config mq-service-FL.xml");    
   ClassPathXmlApplicationContext ctx = 
        new ClassPathXmlApplicationContext("mq-service-FL.xml"); 
   MQAsyncSender sender=(MQAsyncSender)ctx.getBean("jmssend");     
   System.out.println("sender: Messages will be sent to \n"+               
      getSenderConnectionParameters(sender));

   try{
     in = new  BufferedReader(new FileReader(dataFileName));
     while (( line= in.readLine()) != null) {
         msgcount++;
         msgTextBuffer=new StringBuffer(line);          

         if(msgTextBuffer.length()<100) {
            System.out.println("Message "+msgcount + 
                   " less than 100 chars...Hence skipping");            
            skipMsgs++;
            continue;
         }          
         msgText=addHeaderForActionH(msgTextBuffer,actionCode,headerRecord);        
         sender.send(msgText);
 
         if(msgcount==totalMsgs){
            break;
         }
    }
    System.out.println("Number of msgs sent:"+ (msgcount-skipMsgs));     
    
    if(skipMsgs>0){
      System.out.println("Number of Messages skipped:"+skipMsgs);
    }
    in.close();
   }  
   catch(Exception e){
    e.printStackTrace();
   }
  }

 /**
  * 
  */
  private static void usage()
  {
    System.err.println("\nUsage: startpublisher.sh  ");    
    System.exit(0);  
  }
    
 /**
  *
  */
  private static String getSenderConnectionParameters(
     MQAsyncSender sender){

      String strOut="";
      strOut=strOut + "baseQueueName:"+        
        sender.getJmsTemplate().getDefaultDestination().toString()+"\n";
      return strOut;
  }    
 /**
  * addHeaderForActionH (Modify if you need 
  * to add standard header to all records)
  * @param msgText
  * @param actionCode
  * @return
  */
  public static String addHeaderForActionH(StringBuffer msgText,
                                          String actionCode, String header){      
   return msgText.toString();  
  }
}      


List of needed JARs

com.ibm.mq.jar
com.ibm.mq.jmqi.jar
com.ibm.mqjms.jar
commons-logging-1.1.1.jar
dhbcore.jar
j2ee-1.4.jar
spring-2.5.jar
spring-beans.jar
spring-context.jar
spring-core.jar
spring-jms.jar


Below is XML that allows to create JMS connection to MQ Series using Spring

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:lang="http://www.springframework.org/schema/lang"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
     http://www.springframework.org/schema/beans/spring-beans-2.0.xsd     
     http://www.springframework.org/schema/lang
     http://www.springframework.org/schema/lang/spring-lang-2.0.xsd">

  <bean id="MQjmsqueuetemplate" 
      class="org.springframework.jms.core.JmsTemplate">
       <property name="connectionFactory" 
           ref="MQcredentialsconnectionFactory" />
       <property name="defaultDestination" 
           ref="MQdestination" />

  </bean>
 
  <bean id="MQconnectionFactoryparams" 
      class="com.ibm.mq.jms.MQQueueConnectionFactory">
       <property name="transportType">
          <value>1</value>
       </property>
       <property name="queueManager">
          <value>QMGR1</value>
       </property>
       <property name="hostName">
          <value>server1.mydomain.net</value>
       </property>
       <property name="port">
            <value>1111</value>
        </property>
        <property name="channel">
            <value>BUSINESS.CHANL1</value>
        </property> 
  </bean>

  <bean id="MQdestination" class="com.ibm.mq.jms.MQQueue">
        <property name="baseQueueName">
            <value>QUEUE.BUSINESS.IN.TEST</value>
        </property>
  </bean>

  <bean id="MQcredentialsconnectionFactory" class=
"org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
        <property name="targetConnectionFactory" 
           ref="MQconnectionFactoryparams"/>
        <property name="username" value=""/>
        <property name="password" value=""/>   
  </bean>

  <bean id="jmssend" class="com.mq.sendmsg.MQAsyncSender">
        <property name="jmsTemplate" ref="MQjmsqueuetemplate"/>
  </bean>
   
</beans>

No comments:

Post a Comment