Hi All,
Thanks for reading my post.
I am learning JMS and having problem with Request/Reply Pattern. In my sender I am sending a message and listening to reply queue. In my receiver I am receiving the message and sending a reply to the reply queue. Both work fine but setJMSCorrelationID() is not working, meaning before my sender sends a message onto reply queue if someone else sends a message to the reply queue, my requestor will consume that message.
Please Have a look at my programs and let me know where I am going wrong.
Sender:
public class RequestReplySender {
public static void main (String args[]){
QueueConnection connection = null;
QueueSession session = null;
try {
MQQueueConnectionFactory factory = new MQQueueConnectionFactory ();
factory.setTransportType (1);
factory.setQueueManager ("QM");
factory.setHostName ("WMQ");
factory.setPort (1400);
factory.setChannel ("serverConnection");
connection = factory.createQueueConnection ();
session = connection.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
connection.start ();
QueueSender sender = session.createSender (session.createQueue ("MSG_REQ"));
Destination replyDestination = session.createQueue ("MSG_REPLY");
TextMessage requestMessage = session.createTextMessage();
requestMessage.setText("From RequestReplySender");
requestMessage.setJMSReplyTo(replyDestination);
sender.send (requestMessage);
Queue replyQueue = session.createQueue ("MSG_REPLY");
QueueReceiver queueReceiver = session.createReceiver(replyQueue);
TextMessage replyMessage = (TextMessage)queueReceiver.receive();
System.out.println("\tContents: " + replyMessage.getText());
} catch (Exception e) {
e.printStackTrace ();
} finally {try {if(session!=null)session.close ();if(connection!=null)connection.close ();} catch (Exception e1) {e1.printStackTrace ();}
System.out.println("Completed");
}
}
}
Receiver:
public class RequestReplyReceiver implements MessageListener {
private QueueConnection queueConnection;
private QueueSession session;
private Queue queue;
private QueueReceiver queueReceiver;
public RequestReplyReceiver(){
try{
MQQueueConnectionFactory factory = new MQQueueConnectionFactory ();
factory.setTransportType (1);
factory.setQueueManager ("QM");
factory.setHostName ("WQM");
factory.setPort (1400);
factory.setChannel ("serverConnection");
queueConnection = factory.createQueueConnection ();
session = queueConnection.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue ("MSG_REQ");
queueReceiver = session.createReceiver(queue);
queueReceiver.setMessageListener(this);
queueConnection.start();
char zs = (char)System.in.read();
} catch (Exception e) {
e.printStackTrace ();
}
}
public void onMessage(Message message) {
try{
Thread.sleep(20000);
TextMessage requestMessage = (TextMessage) message;
String contents = requestMessage.getText();
Destination replyDestination = message.getJMSReplyTo();
QueueSender replyProducer = session.createSender((Queue)replyDestination);
TextMessage replyMessage = session.createTextMessage();
replyMessage.setText(contents);
replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
replyProducer.send(replyMessage);
}catch (Exception e) {
e.printStackTrace ();
}
}
public static void main(String args[]){
try {
RequestReplyReceiver listener = new RequestReplyReceiver();
} catch (Exception e){
e.printStackTrace();
}
}
}
The issue is Requestor while waiting for reply from Receiver, if someone else sends message onto Reply queue then Requestor will pickup that message instead continue to wait for reply from Receiver.
-
Why setJMSCorrelationID() not working? (7 messages)
- Posted by: Gautham GV
- Posted on: October 03 2005 17:21 EDT
Threaded Messages (7)
- Why setJMSCorrelationID() not working? by James Strachan on October 04 2005 01:23 EDT
- Why setJMSCorrelationID() not working? by Einar Valen on October 04 2005 04:05 EDT
- Why setJMSCorrelationID() not working? by James Strachan on October 04 2005 13:16 EDT
-
Why setJMSCorrelationID() not working? by Gautham GV on October 07 2005 12:40 EDT
- Why setJMSCorrelationID() not working? by James Strachan on October 10 2005 12:25 EDT
-
Why setJMSCorrelationID() not working? by Gautham GV on October 07 2005 12:40 EDT
- Why setJMSCorrelationID() not working? by Gautham GV on October 07 2005 00:42 EDT
- Why setJMSCorrelationID() not working? by Gautham GV on October 07 2005 03:08 EDT
- Why setJMSCorrelationID() not working? by James Strachan on October 04 2005 13:16 EDT
-
Why setJMSCorrelationID() not working?[ Go to top ]
- Posted by: James Strachan
- Posted on: October 04 2005 01:23 EDT
- in response to Gautham GV
JMSCorrelationID does not magically hide unwanted messages for you.
To avoid receiving messsages that are not meant for you, each client process should create a temporary queue and use that on the JMSReplyTo address.
Then only messages sent to a given temporary queue are received by your client. Even in this case, if you have multiple threads or if you time out slow requests there is still the chance you could receive an old message out of step with your last request. So you should use the setJMSCorrelationID() on the *request* to give each request a unique value that the client can then use to ensure that responses match up with the request (so you can discard unnecesary messages).
If you want to see this code in action, try looking at the Lingo project
http://lingo.codehaus.org/
Lingo implements Spring Remoting (i.e. a POJO based remoting model) using JMS. It uses a multiplexing request-reply model using multiple asynchronous request-response operations over a single JMS session/producer/consumer with a single temporary queue - using JMSCorrelationID to associate responses with requests & threads.
James
LogicBlaze -
Why setJMSCorrelationID() not working?[ Go to top ]
- Posted by: Einar Valen
- Posted on: October 04 2005 04:05 EDT
- in response to Gautham GV
I just had a quick look at your code, and one thing struck me - you don't have a message selector on the receiver.
Have a look at the main() in your class RequestReplySender, at the line reading:
QueueReceiver queueReceiver = session.createReceiver(replyQueue);
Try to change it to something like this:
QueueReceiver queueReceiver = session.createReceiver( replyQueue, "JMSCorrelationID='" + requestMessage.getJMSMessageID() +"'" );
Einar Valen -
Why setJMSCorrelationID() not working?[ Go to top ]
- Posted by: James Strachan
- Posted on: October 04 2005 13:16 EDT
- in response to Einar Valen
I just had a quick look at your code, and one thing struck me - you don't have a message selector on the receiver.Have a look at the main() in your class RequestReplySender, at the line reading: QueueReceiver queueReceiver = session.createReceiver(replyQueue);Try to change it to something like this: QueueReceiver queueReceiver = session.createReceiver( replyQueue, "JMSCorrelationID='" + requestMessage.getJMSMessageID() +"'" );Einar Valen
There's a reason for that :)
You don't wanna use a selector on a specific message ID or correlationID. JMS is designed to create consumers up front and reuse then for a long time; not create a consumer & selector for each message request.
So the Lingo code creates a single consumer and then handles the correlation of responses to requests itself. Its the fastest & most efficient way to do it with JMS
James
LogicBlaze -
Why setJMSCorrelationID() not working?[ Go to top ]
- Posted by: Gautham GV
- Posted on: October 07 2005 00:40 EDT
- in response to James Strachan
Hi James,
Thanks for the reply. I was not able to see the lingo code.
Thanks
Gautham -
Why setJMSCorrelationID() not working?[ Go to top ]
- Posted by: James Strachan
- Posted on: October 10 2005 00:25 EDT
- in response to Gautham GV
Hi James,Thanks for the reply. I was not able to see the lingo code.ThanksGautham
Its described here...
http://lingo.codehaus.org/CVS
e.g. the Requestor implementation classes are here...
http://cvs.lingo.codehaus.org/lingo/src/java/org/logicblaze/lingo/jms/impl/
James
LogicBlaze -
Why setJMSCorrelationID() not working?[ Go to top ]
- Posted by: Gautham GV
- Posted on: October 07 2005 00:42 EDT
- in response to Einar Valen
Hi Einar,
Thanks for the post. I get the following exception when I modify the code as advised by you.
Exception in thread "main" com.ibm.disthubmq.impl.matching.parser.TokenMgrError: Lexical error at line 0, column 2. Enc
ountered: ":" (58), after : ""
at com.ibm.disthubmq.impl.matching.parser.MatchParserTokenManager.getNextToken(MatchParserTokenManager.java:1052
)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_scan_token(MatchParser.java:2293)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_55(MatchParser.java:2036)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_51(MatchParser.java:2200)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_49(MatchParser.java:2170)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_42(MatchParser.java:1426)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_37(MatchParser.java:1471)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_32(MatchParser.java:1517)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_30(MatchParser.java:1590)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_27(MatchParser.java:1695)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3R_8(MatchParser.java:1741)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_3_1(MatchParser.java:2012)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.jj_2_1(MatchParser.java:1294)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.Predicate(MatchParser.java:303)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.Boolean_primary(MatchParser.java:292)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.Boolean_factor(MatchParser.java:282)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.Boolean_term(MatchParser.java:192)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.BooleanExpr(MatchParser.java:146)
at com.ibm.disthubmq.impl.matching.parser.MatchParser.QueryExpr(MatchParser.java:139)
at com.ibm.disthubmq.impl.matching.ExpressionParser.setExpression(ExpressionParser.java:249)
at com.ibm.mq.jms.MQMessageSelector.setSelector(MQMessageSelector.java:385)
at com.ibm.mq.jms.MQMessageConsumer.setMessageSelector(MQMessageConsumer.java:175)
at com.ibm.mq.jms.MQQueueReceiver.<init>(MQQueueReceiver.java:222)
at com.ibm.mq.jms.MQQueueSession.createReceiver(MQQueueSession.java:338)
at temp.RequestReplySender.main(RequestReplySender.java:55) -
Why setJMSCorrelationID() not working?[ Go to top ]
- Posted by: Gautham GV
- Posted on: October 07 2005 15:08 EDT
- in response to Gautham GV
Hi Einar & james,
It worked thanks, made code change as advised by Einar.
Thanks
Vijay