Spring amqp delay messaging with rabbitMQ -
i struggling hard find out way scheduled/delaying messages in spring amqp/rabbit mq , found solution in here.but still prolem spring amqp/rabbit mq can not received message.
my source following:
@configuration public class amqpconfig {
@bean public connectionfactory connectionfactory() { cachingconnectionfactory connectionfactory = new cachingconnectionfactory(); connectionfactory.setaddresses("172.16.101.14:5672"); connectionfactory.setusername("admin"); connectionfactory.setpassword("admin"); connectionfactory.setpublisherconfirms(true); return connectionfactory; } @bean @scope("prototype") public rabbittemplate rabbittemplate() { rabbittemplate template = new rabbittemplate(connectionfactory()); return template; } @bean customexchange delayexchange() { map<string, object> args = new hashmap<string, object>(); args.put("x-delayed-type", "direct"); return new customexchange("my-exchange", "x-delayed-message", true, false, args); } @bean public queue queue() { return new queue("spring-boot-queue", true); } @bean binding binding(queue queue, exchange delayexchange) { return bindingbuilder.bind(queue).to(delayexchange).with("spring-boot-queue").noargs(); } @bean public simplemessagelistenercontainer messagecontainer() { simplemessagelistenercontainer container = new simplemessagelistenercontainer(connectionfactory()); container.setqueues(queue()); container.setexposelistenerchannel(true); container.setmaxconcurrentconsumers(1); container.setconcurrentconsumers(1); container.setacknowledgemode(acknowledgemode.manual); container.setmessagelistener(new channelawaremessagelistener() { public void onmessage(message message, channel channel) throws exception { byte[] body = message.getbody(); system.err.println("receive msg : " + new string(body)); channel.basicack(message.getmessageproperties().getdeliverytag(), false); //确认消息成功消费 } }); return container; }
}
@component public class send implements rabbittemplate.confirmcallback{
private rabbittemplate rabbittemplate; @autowired public send(rabbittemplate rabbittemplate) { this.rabbittemplate = rabbittemplate; this.rabbittemplate.setconfirmcallback(this); rabbittemplate.setmandatory(true); } public void sendmsg(string content) { correlationdata correlationid = new correlationdata(uuid.randomuuid().tostring()); rabbittemplate.convertandsend("my-exchange", "", content, new messagepostprocessor() { @override public message postprocessmessage(message message) throws amqpexception { message.getmessageproperties().setheader("x-delay", 6000); return message; } },correlationid); system.err.println("delay message send ................"); } /** * 回调 */ @override public void confirm(correlationdata correlationdata, boolean ack, string cause) { system.err.println(" callback id :" + correlationdata); if (ack) { system.err.println("ok"); } else { system.err.println("fail:" + cause); } }
}
is there give help.
thanks all.
delay messaging nothing spring amqp, it's library reside code, library can't hold message such. there 2 approaches can try:
old approach: set ttl(time live) header in each message/queue(policy) , introduce dlq handle it. once ttl expired messages move dlq main queue listener can process it.
latest approach: rabbitmq came rabbitmq delayed message plugin , using can achieve same , plugin support available since rabbitmq-3.5.8.
you can declare exchange type x-delayed-message , publish messages custom header x-delay expressing in milliseconds delay time message. message delivered respective queues after x-delay milliseconds
more here: git
Comments
Post a Comment