Discussions

General J2EE: Why setJMSCorrelationID() not working?

  1. Why setJMSCorrelationID() not working? (7 messages)

    Hi All,
    Thanks for reading my post.
    I am learning JMS and having problem with Request/Reply Pattern. In my sender I am sending a message and listening to reply queue. In my receiver I am receiving the message and sending a reply to the reply queue. Both work fine but setJMSCorrelationID() is not working, meaning before my sender sends a message onto reply queue if someone else sends a message to the reply queue, my requestor will consume that message.

    Please Have a look at my programs and let me know where I am going wrong.

    Sender:
    public class RequestReplySender {
    public static void main (String args[]){
    QueueConnection connection = null;
            QueueSession session = null;
            try {
                MQQueueConnectionFactory factory = new MQQueueConnectionFactory ();
                factory.setTransportType (1);
                factory.setQueueManager ("QM");
                factory.setHostName ("WMQ");
                factory.setPort (1400);
                factory.setChannel ("serverConnection");
                connection = factory.createQueueConnection ();
                session = connection.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
                connection.start ();
                QueueSender sender = session.createSender (session.createQueue ("MSG_REQ"));
                Destination replyDestination = session.createQueue ("MSG_REPLY");
         TextMessage requestMessage = session.createTextMessage();
         requestMessage.setText("From RequestReplySender");
         requestMessage.setJMSReplyTo(replyDestination);
         sender.send (requestMessage);
         Queue replyQueue = session.createQueue ("MSG_REPLY");
         QueueReceiver queueReceiver = session.createReceiver(replyQueue);
         TextMessage replyMessage = (TextMessage)queueReceiver.receive();
    System.out.println("\tContents: " + replyMessage.getText());
        
            } catch (Exception e) {
                e.printStackTrace ();
            } finally {try {if(session!=null)session.close ();if(connection!=null)connection.close ();} catch (Exception e1) {e1.printStackTrace ();}
             System.out.println("Completed");
            }
    }
    }

    Receiver:
    public class RequestReplyReceiver implements MessageListener {
    private QueueConnection queueConnection;
    private QueueSession session;
    private Queue queue;
    private QueueReceiver queueReceiver;

    public RequestReplyReceiver(){
    try{
    MQQueueConnectionFactory factory = new MQQueueConnectionFactory ();
    factory.setTransportType (1);
    factory.setQueueManager ("QM");
    factory.setHostName ("WQM");
    factory.setPort (1400);
    factory.setChannel ("serverConnection");
    queueConnection = factory.createQueueConnection ();
    session = queueConnection.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
    queue = session.createQueue ("MSG_REQ");
    queueReceiver = session.createReceiver(queue);
    queueReceiver.setMessageListener(this);
    queueConnection.start();
    char zs = (char)System.in.read();
    } catch (Exception e) {
    e.printStackTrace ();
    }
    }

    public void onMessage(Message message) {
    try{
    Thread.sleep(20000);
    TextMessage requestMessage = (TextMessage) message;
    String contents = requestMessage.getText();
    Destination replyDestination = message.getJMSReplyTo();
    QueueSender replyProducer = session.createSender((Queue)replyDestination);
    TextMessage replyMessage = session.createTextMessage();
    replyMessage.setText(contents);
    replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
    replyProducer.send(replyMessage);
    }catch (Exception e) {
    e.printStackTrace ();
    }
    }

    public static void main(String args[]){
    try {
    RequestReplyReceiver listener = new RequestReplyReceiver();
    } catch (Exception e){
    e.printStackTrace();
    }
    }
    }


    The issue is Requestor while waiting for reply from Receiver, if someone else sends message onto Reply queue then Requestor will pickup that message instead continue to wait for reply from Receiver.
  2. JMSCorrelationID does not magically hide unwanted messages for you.

    To avoid receiving messsages that are not meant for you, each client process should create a temporary queue and use that on the JMSReplyTo address.

    Then only messages sent to a given temporary queue are received by your client. Even in this case, if you have multiple threads or if you time out slow requests there is still the chance you could receive an old message out of step with your last request. So you should use the setJMSCorrelationID() on the *request* to give each request a unique value that the client can then use to ensure that responses match up with the request (so you can discard unnecesary messages).

    If you want to see this code in action, try looking at the Lingo project

    http://lingo.codehaus.org/

    Lingo implements Spring Remoting (i.e. a POJO based remoting model) using JMS. It uses a multiplexing request-reply model using multiple asynchronous request-response operations over a single JMS session/producer/consumer with a single temporary queue - using JMSCorrelationID to associate responses with requests & threads.

    James
    LogicBlaze
  3. Why setJMSCorrelationID() not working?[ Go to top ]

    I just had a quick look at your code, and one thing struck me - you don't have a message selector on the receiver.

    Have a look at the main() in your class RequestReplySender, at the line reading:
       QueueReceiver queueReceiver = session.createReceiver(replyQueue);

    Try to change it to something like this:

       QueueReceiver queueReceiver = session.createReceiver( replyQueue, "JMSCorrelationID='" + requestMessage.getJMSMessageID() +"'" );

    Einar Valen
  4. I just had a quick look at your code, and one thing struck me - you don't have a message selector on the receiver.Have a look at the main() in your class RequestReplySender, at the line reading:   QueueReceiver queueReceiver = session.createReceiver(replyQueue);Try to change it to something like this:   QueueReceiver queueReceiver = session.createReceiver( replyQueue, "JMSCorrelationID='" + requestMessage.getJMSMessageID() +"'" );Einar Valen

    There's a reason for that :)

    You don't wanna use a selector on a specific message ID or correlationID. JMS is designed to create consumers up front and reuse then for a long time; not create a consumer & selector for each message request.

    So the Lingo code creates a single consumer and then handles the correlation of responses to requests itself. Its the fastest & most efficient way to do it with JMS

    James
    LogicBlaze
  5. Hi James,
    Thanks for the reply. I was not able to see the lingo code.
    Thanks
    Gautham
  6. Hi James,Thanks for the reply. I was not able to see the lingo code.ThanksGautham

    Its described here...

    http://lingo.codehaus.org/CVS

    e.g. the Requestor implementation classes are here...

    http://cvs.lingo.codehaus.org/lingo/src/java/org/logicblaze/lingo/jms/impl/

    James
    LogicBlaze
  7. Why setJMSCorrelationID() not working?[ Go to top ]

    Hi Einar,
    Thanks for the post. I get the following exception when I modify the code as advised by you.
    Exception in thread "main" com.ibm.disthubmq.impl.matching.parser.TokenMgrError: Lexical error at line 0, column 2. Enc
    ountered: ":" (58), after : ""
            at com.ibm.disthubmq.impl.matching.parser.MatchParserTokenManager.getNextToken(MatchParserTokenManager.java:1052
    )
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_scan_token(MatchParser.java:2293)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_55(MatchParser.java:2036)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_51(MatchParser.java:2200)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_49(MatchParser.java:2170)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_42(MatchParser.java:1426)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_37(MatchParser.java:1471)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_32(MatchParser.java:1517)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_30(MatchParser.java:1590)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_27(MatchParser.java:1695)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_8(MatchParser.java:1741)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3_1(MatchParser.java:2012)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_2_1(MatchParser.java:1294)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.Predicate(MatchParser.java:303)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.Boolean_primary(MatchParser.java:292)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.Boolean_factor(MatchParser.java:282)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.Boolean_term(MatchParser.java:192)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.BooleanExpr(MatchParser.java:146)
            at com.ibm.disthubmq.impl.matching.parser.MatchParser.QueryExpr(MatchParser.java:139)
            at com.ibm.disthubmq.impl.matching.ExpressionParser.setExpression(ExpressionParser.java:249)
            at com.ibm.mq.jms.MQMessageSelector.setSelector(MQMessageSelector.java:385)
            at com.ibm.mq.jms.MQMessageConsumer.setMessageSelector(MQMessageConsumer.java:175)
            at com.ibm.mq.jms.MQQueueReceiver.<init>(MQQueueReceiver.java:222)
            at com.ibm.mq.jms.MQQueueSession.createReceiver(MQQueueSession.java:338)
            at temp.RequestReplySender.main(RequestReplySender.java:55)
  8. Why setJMSCorrelationID() not working?[ Go to top ]

    Hi Einar & james,
    It worked thanks, made code change as advised by Einar.
    Thanks
    Vijay