rabbitmq - convertSendAndReceive is not working when the queue is Shoveled -
fyi, i'm newbie rabbitmq.
i have use case application i'm trying use rabbitmq:
- producer sends message queue
- consumer process , send reply
- based on reply producer acts on
for above scenario, used convertsendandreceive works charm when both producer , consumer in same rabbitmq server. same not work when queue shoveled.
please let me know if using wrong method/configuration w.r.t rabbitmq.
thanks in advance.
adding code
consumer
public static void main(string[] args) throws interruptedexception { connectionfactory cf = new cachingconnectionfactory("10.223.19.89"); // set queue, exchange, binding on broker rabbitadmin admin = new rabbitadmin(cf); queue queue = new queue("myqueue"); queue queuereply = new queue("myqueue_reply"); admin.declarequeue(queue); admin.declarequeue(queuereply); topicexchange exchange = new topicexchange("myexchange"); admin.declareexchange(exchange); admin.declarebinding( bindingbuilder.bind(queue).to(exchange).with("foo.*")); admin.declarebinding( bindingbuilder.bind(queuereply).to(exchange).with("foo.*")); simplemessagelistenercontainer container = new simplemessagelistenercontainer(cf); object listener = new object() { public string handlemessage(string foo) { return foo + "test"; } }; messagelisteneradapter adapter = new messagelisteneradapter(listener); container.setmessagelistener(adapter); container.setqueuenames("myqueue"); container.start(); }
producer
public void run() { thread t = thread.currentthread(); connectionfactory cf = new cachingconnectionfactory("10.223.19.93"); rabbittemplate template = new rabbittemplate(cf); template.setexchange("myexchange"); template.setroutingkey("foo.bar"); queue queuereply = new queue("myqueue_reply"); template.setreplyqueue(queuereply); object test = template.convertsendandreceive("hello world"); system.out.println(test.tostring()); } public static void main(string[] args) throws interruptedexception { for(int i=0; i< 5; i++) { thread t = new thread(new sendreceivethread()); t.setname("thread # " + i); t.start(); thread.sleep(100); } }
best guess need use named reply queue , shovel well.
you need <reply-listener/>
scenario.
prior rabbitmq 3.4, temporary queue used replies; direct reply-to used 3.4 , above, guess rabbit doesn't shovel pseudo queue created that.
edit:
when using fixed reply queue , creating rabbit template programmatically, have wire listener container , set template listener. see the documentation.
if define rabbittemplate
<bean/>
, or using@configuration
class define@bean
, or when creating template programmatically, need define , wire reply listener container yourself. if fail this, template never receive replies , time out , return null reply call sendandreceive method.
Comments
Post a Comment