Spring amqp delay messaging with rabbitMQ -


i struggling hard find out way scheduled/delaying messages in spring amqp/rabbit mq , found solution in here.but still prolem spring amqp/rabbit mq can not received message.

my source following:

@configuration public class amqpconfig {

@bean   public connectionfactory connectionfactory() {       cachingconnectionfactory connectionfactory = new cachingconnectionfactory();       connectionfactory.setaddresses("172.16.101.14:5672");       connectionfactory.setusername("admin");       connectionfactory.setpassword("admin");      connectionfactory.setpublisherconfirms(true);      return connectionfactory;   }     @bean   @scope("prototype")  public rabbittemplate rabbittemplate() {       rabbittemplate template = new rabbittemplate(connectionfactory());       return template;   }    @bean customexchange delayexchange() {     map<string, object> args = new hashmap<string, object>();     args.put("x-delayed-type", "direct");     return new customexchange("my-exchange", "x-delayed-message", true, false, args); }  @bean   public queue queue() {       return new queue("spring-boot-queue", true);     }    @bean binding binding(queue queue, exchange delayexchange) {     return bindingbuilder.bind(queue).to(delayexchange).with("spring-boot-queue").noargs(); }  @bean   public simplemessagelistenercontainer messagecontainer() {       simplemessagelistenercontainer container = new simplemessagelistenercontainer(connectionfactory());       container.setqueues(queue());       container.setexposelistenerchannel(true);       container.setmaxconcurrentconsumers(1);       container.setconcurrentconsumers(1);       container.setacknowledgemode(acknowledgemode.manual);       container.setmessagelistener(new channelawaremessagelistener() {            public void onmessage(message message, channel channel) throws exception {               byte[] body = message.getbody();               system.err.println("receive msg : " + new string(body));               channel.basicack(message.getmessageproperties().getdeliverytag(), false); //确认消息成功消费            }       });        return container;   }   

}

@component public class send implements rabbittemplate.confirmcallback{

private rabbittemplate rabbittemplate;    @autowired   public send(rabbittemplate rabbittemplate) {       this.rabbittemplate = rabbittemplate;      this.rabbittemplate.setconfirmcallback(this);     rabbittemplate.setmandatory(true); }    public void sendmsg(string content) {        correlationdata correlationid = new correlationdata(uuid.randomuuid().tostring());       rabbittemplate.convertandsend("my-exchange", "", content, new messagepostprocessor() {         @override         public message postprocessmessage(message message) throws amqpexception {             message.getmessageproperties().setheader("x-delay", 6000);             return message;         }     },correlationid);      system.err.println("delay message send ................");  }  /**    * 回调    */   @override   public void confirm(correlationdata correlationdata, boolean ack, string cause) {        system.err.println(" callback id :" + correlationdata);        if (ack) {           system.err.println("ok");       } else {           system.err.println("fail:" + cause);       }   }   

}

is there give help.

thanks all.

delay messaging nothing spring amqp, it's library reside code, library can't hold message such. there 2 approaches can try:

old approach: set ttl(time live) header in each message/queue(policy) , introduce dlq handle it. once ttl expired messages move dlq main queue listener can process it.

latest approach: rabbitmq came rabbitmq delayed message plugin , using can achieve same , plugin support available since rabbitmq-3.5.8.

you can declare exchange type x-delayed-message , publish messages custom header x-delay expressing in milliseconds delay time message. message delivered respective queues after x-delay milliseconds

more here: git


Comments

Popular posts from this blog

Command prompt result in label. Python 2.7 -

javascript - How do I use URL parameters to change link href on page? -

amazon web services - AWS Route53 Trying To Get Site To Resolve To www -