apache - Lost message when stoping and restarting Embedded ActiveMQ -
hi trying write test-case implementing fail on support activemq.
here code
val brokera = createbroker("a") brokera.start val failoverurl = s"failover:(vm://brokera?create=false)" + s"?randomize=false&maxreconnectattempts=-1&reconnectsupported=true" val cfactory = new activemqconnectionfactory(failoverurl) val qconnection = getqueueconnection val session = createqueuesession(qconnection) private def totalreadmessagescount(queuereceiver: queuereceiver) = { val messages = iterator.continually(option(queuereceiver.receive(2000))).takewhile(_.isdefined).flatten.toseq messages.size } private def getreceiver = { val queueconnection = getqueueconnection queueconnection.start() val queuesession = queueconnection.createqueuesession(false, session.auto_acknowledge) val queuereceiver = createqueuereceiver(queuesession, brokera.getbrokername) queuereceiver } def getqueueconnection =cfactory.createqueueconnection("admin", "") def createbroker(name:string) = { val broker = new brokerservice() val adaptor = new kahadbpersistenceadapter() broker.setbrokername("broker" + name) broker.addconnector(getbrokerurl) broker.setpersistent(true) broker.setusejmx(false) broker.setuseshutdownhook(false) broker } def getbrokerurl = "tcp://localhost:0" val queuereceiver: queuereceiver = getreceiver val messagecount = 500 (1 messagecount) map {count => //calling method send message activemq if(count == 200){ brokera.stop() brokera.waituntilstopped() brokera.start(true) } } val totalcount = totalreadmessagescount(queuereceiver) println(s"read ${totalcount} messages") assert(totalcount == messagecount)
i able reconnect activemq after restart totalcount
displaying 300 instead of 500. seems previous messages lost. when run same scenario in non-embedded mode. able messages.
please me how can prevent loose message while restarting embedded active mq.
you have set persistent true, don't know scala here java code
public brokerservice broker() throws exception { final brokerservice broker = new brokerservice(); //broker.addconnector("tcp://localhost:61616"); broker.addconnector("stomp://localhost:61613"); broker.addconnector("vm://localhost"); persistenceadapter persistenceadapter = new kahadbpersistenceadapter(); file dir = new file(system.getproperty("user.home") + file.separator + "kaha"); if (!dir.exists()) { dir.mkdirs(); } persistenceadapter.setdirectory(dir); broker.setpersistenceadapter(persistenceadapter); broker.setpersistent(true); return broker; }
Comments
Post a Comment