spring - "Dispatcher has no subscribers" during startup -


i facing problem happens , hard reproduce. therefore have problems creating compelling test case.

our setup looks this:

  • spring integration spring boot
  • rabbitmq listener
  • custom bus manages transaction aware messages

the setup not newest anymore , lot of code replaced more idiomatic spring wiring.

though exception getting when service started:

 dispatcher has no subscribers, failedmessage=genericmessage [payload=com.foobar.service.greencheckout.message.greenorderpropertyaddedmessage@2ba0efb2, headers={id=f750a792-6b01-16d3-8206-6e553a03f8fa, type=com.foobar.service.greencheckout.message.greenorderpropertyaddedmessage, amqp_deliverymode=persistent, timestamp=1505797680967}], failedmessage=genericmessage [payload=com.foobar.service.greencheckout.message.greenorderpropertyaddedmessage@2ba0efb2, headers={id=f750a792-6b01-16d3-8206-6e553a03f8fa, type=com.foobar.service.greencheckout.message.greenorderpropertyaddedmessage, amqp_deliverymode=persistent, timestamp=1505797680967}]     @ org.springframework.integration.channel.abstractsubscribablechannel.dosend(abstractsubscribablechannel.java:93)     @ org.springframework.integration.channel.abstractmessagechannel.send(abstractmessagechannel.java:423)     @ org.springframework.integration.channel.abstractmessagechannel.send(abstractmessagechannel.java:373)     @ org.springframework.messaging.core.genericmessagingtemplate.dosend(genericmessagingtemplate.java:115)     @ org.springframework.messaging.core.genericmessagingtemplate.dosend(genericmessagingtemplate.java:45)     @ org.springframework.messaging.core.abstractmessagesendingtemplate.send(abstractmessagesendingtemplate.java:105)     @ org.springframework.messaging.core.abstractmessagesendingtemplate.convertandsend(abstractmessagesendingtemplate.java:143)     @ org.springframework.messaging.core.abstractmessagesendingtemplate.convertandsend(abstractmessagesendingtemplate.java:135)     @ org.springframework.integration.gateway.messaginggatewaysupport.send(messaginggatewaysupport.java:392)     @ org.springframework.integration.gateway.gatewayproxyfactorybean.invokegatewaymethod(gatewayproxyfactorybean.java:477)     @ org.springframework.integration.gateway.gatewayproxyfactorybean.doinvoke(gatewayproxyfactorybean.java:429)     @ org.springframework.integration.gateway.gatewayproxyfactorybean.invoke(gatewayproxyfactorybean.java:420)     @ org.springframework.aop.framework.reflectivemethodinvocation.proceed(reflectivemethodinvocation.java:179)     @ org.springframework.aop.framework.jdkdynamicaopproxy.invoke(jdkdynamicaopproxy.java:213)     @ com.sun.proxy.$proxy223.send(unknown source)     @ com.foobar.library.messaging.bus.transactionawarebus.aftercommit(transactionawarebus.java:50)     @ org.springframework.transaction.support.transactionsynchronizationutils.invokeaftercommit(transactionsynchronizationutils.java:133)     @ org.springframework.transaction.support.transactionsynchronizationutils.triggeraftercommit(transactionsynchronizationutils.java:121)     @ org.springframework.transaction.support.abstractplatformtransactionmanager.triggeraftercommit(abstractplatformtransactionmanager.java:958)     @ org.springframework.transaction.support.abstractplatformtransactionmanager.processcommit(abstractplatformtransactionmanager.java:803)     @ org.springframework.transaction.support.abstractplatformtransactionmanager.commit(abstractplatformtransactionmanager.java:730)     @ org.springframework.transaction.interceptor.transactionaspectsupport.committransactionafterreturning(transactionaspectsupport.java:504)     @ org.springframework.transaction.interceptor.transactionaspectsupport.invokewithintransaction(transactionaspectsupport.java:292)     @ org.springframework.transaction.interceptor.transactioninterceptor.invoke(transactioninterceptor.java:96)     @ org.springframework.aop.framework.reflectivemethodinvocation.proceed(reflectivemethodinvocation.java:179)     @ org.springframework.aop.framework.cglibaopproxy$dynamicadvisedinterceptor.intercept(cglibaopproxy.java:673)     @ com.foobar.service.greencheckout.silo.domain.modifyingservice.transitionservice$$enhancerbyspringcglib$$de36730c.execute(<generated>)     @ com.foobar.service.greencheckout.silo.domain.statemachine.orderfsm.invokecreationalcommandmethod(orderfsm.java:380)     @ com.foobar.service.greencheckout.silo.domain.statemachine.orderfsm.lambda$allowcreational$1(orderfsm.java:345)     @ akka.japi.pf.fsmstatefunctionbuilder$2.apply(fsmstatefunctionbuilder.java:80)     @ akka.japi.pf.fsmstatefunctionbuilder$2.apply(fsmstatefunctionbuilder.java:77)     @ akka.japi.pf.casestatement.apply(casestatements.scala:18)     @ scala.partialfunction$class.applyorelse(partialfunction.scala:123)     @ akka.japi.pf.casestatement.applyorelse(casestatements.scala:13)     @ scala.partialfunction$orelse.apply(partialfunction.scala:167)     @ scala.partialfunction$class.applyorelse(partialfunction.scala:123)     @ akka.japi.pf.casestatement.applyorelse(casestatements.scala:13)     @ scala.partialfunction$orelse.apply(partialfunction.scala:167)     @ scala.partialfunction$class.applyorelse(partialfunction.scala:123)     @ akka.japi.pf.casestatement.applyorelse(casestatements.scala:13)     @ scala.partialfunction$orelse.apply(partialfunction.scala:167)     @ scala.partialfunction$class.applyorelse(partialfunction.scala:123)     @ akka.japi.pf.casestatement.applyorelse(casestatements.scala:13)     @ scala.partialfunction$orelse.apply(partialfunction.scala:167)     @ akka.actor.fsm$class.processevent(fsm.scala:663)     @ akka.actor.abstractfsm.processevent(abstractfsm.scala:36)     @ akka.actor.fsm$class.akka$actor$fsm$$processmsg(fsm.scala:657)     @ akka.actor.fsm$$anonfun$receive$1.applyorelse(fsm.scala:651)     @ akka.actor.actor$class.aroundreceive(actor.scala:497)     @ akka.actor.abstractfsm.aroundreceive(abstractfsm.scala:36)     @ akka.actor.actorcell.receivemessage(actorcell.scala:526)     @ akka.actor.actorcell.invoke(actorcell.scala:495)     @ akka.dispatch.mailbox.processmailbox(mailbox.scala:257)     @ akka.dispatch.mailbox.run(mailbox.scala:224)     @ akka.dispatch.mailbox.exec(mailbox.scala:234)     @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260)     @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339)     @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979)     @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) caused by: org.springframework.integration.messagedispatchingexception: dispatcher has no subscribers, failedmessage=genericmessage [payload=com.foobar.service.greencheckout.message.greenorderpropertyaddedmessage@2ba0efb2, headers={id=f750a792-6b01-16d3-8206-6e553a03f8fa, type=com.foobar.service.greencheckout.message.greenorderpropertyaddedmessage, amqp_deliverymode=persistent, timestamp=1505797680967}]     @ org.springframework.integration.dispatcher.unicastingdispatcher.dodispatch(unicastingdispatcher.java:154)     @ org.springframework.integration.dispatcher.unicastingdispatcher.dispatch(unicastingdispatcher.java:121)     @ org.springframework.integration.channel.abstractsubscribablechannel.dosend(abstractsubscribablechannel.java:89)     ... 58 more 

the code set looks like:

import org.springframework.amqp.core.amqpadmin; import org.springframework.amqp.core.amqptemplate; import org.springframework.amqp.core.bindingbuilder; import org.springframework.amqp.core.exchange; import org.springframework.amqp.core.queue; import org.springframework.beans.factory.beanfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.context.annotation.dependson; import org.springframework.context.annotation.scope; import org.springframework.context.annotation.scopedproxymode; import org.springframework.expression.expressionparser; import org.springframework.integration.amqp.outbound.amqpoutboundendpoint; import org.springframework.integration.amqp.support.defaultamqpheadermapper; import org.springframework.integration.channel.directchannel; import org.springframework.integration.channel.executorchannel; import org.springframework.integration.channel.nullchannel; import org.springframework.integration.endpoint.eventdrivenconsumer; import org.springframework.integration.gateway.gatewayproxyfactorybean; import org.springframework.messaging.subscribablechannel;  import java.util.concurrent.executors;  @configuration public class messagingoutboundconfiguration {     @autowired     private amqptemplate amqptemplate;      @autowired     private exchange publisherexchange;      @autowired     private amqpadmin amqpadmin;      @autowired     private exchange errorexchange;      @autowired     private queue errorqueue;      @autowired     private beanfactory beanfactory;      @autowired     private expressionparser expressionparser;      @autowired     private messagingsettings messagingsettings;      @bean     @dependson({"connectionfactory", "consumer"})     public asynchronousbus asyncbus(subscribablechannel amqpoutboundchannel) throws exception     {         gatewayproxyfactorybean factorybean = new gatewayproxyfactorybean(asynchronousbus.class);         factorybean.setbeanfactory(beanfactory);         factorybean.setdefaultrequestchannel(amqpoutboundchannel);         factorybean.afterpropertiesset();         return (asynchronousbus) factorybean.getobject();     }      @bean     @scope(value = "request", proxymode = scopedproxymode.target_class)     public transactionawarebus transactionawarebus()     {         return new transactionawarebus();     }      /**      * channel message bus outbound channel adapter      */     @bean     public subscribablechannel amqpoutboundchannel(headerchannelinterceptor headerchannelinterceptor)     {         directchannel channel = new directchannel();         channel.setcomponentname("amqp-outbound-channel");         channel.addinterceptor(headerchannelinterceptor);         return channel;     }      /**      * outbound channel adapter      */     @bean     public amqpoutboundendpoint endpoint(subscribablechannel confirmationchannel)     {         defaultamqpheadermapper headermapper = defaultamqpheadermapper.outboundmapper();         string[] allowedheaders = new string[]{"*"};         headermapper.setrequestheadernames(allowedheaders);         headermapper.setreplyheadernames(allowedheaders);          amqpoutboundendpoint endpoint = new phasedamqpoutboundendpoint(amqptemplate);         endpoint.setheadermapper(headermapper);         endpoint.setexchangename(publisherexchange.getname());         endpoint.setroutingkeyexpression(expressionparser.parseexpression("headers.type"));         endpoint.setconfirmcorrelationexpression(expressionparser.parseexpression("#this"));          if (messagingsettings.getpublisherconfirmations()) {             endpoint.setconfirmnackchannel(confirmationchannel);             endpoint.setconfirmackchannel(new nullchannel());         }          return endpoint;     }      @bean     public headerchannelinterceptor headerchannelinterceptor()     {         return new headerchannelinterceptor();     }      /**      * shovels messages channel outbound channel adapter      */     @bean     public eventdrivenconsumer consumer(subscribablechannel amqpoutboundchannel, amqpoutboundendpoint endpoint)     {         final eventdrivenconsumer consumer = new eventdrivenconsumer(amqpoutboundchannel, endpoint);         consumer.setbeanname("amqp-outbound-consumer");          return consumer;     }      /**      * outbound channel adapter      */     @bean     public amqpoutboundendpoint errorendpoint()     {         amqpadmin.declarebinding(bindingbuilder.bind(errorqueue).to(errorexchange).with("#").noargs());          defaultamqpheadermapper headermapper = defaultamqpheadermapper.outboundmapper();         string[] allowedheaders = new string[1];         allowedheaders[0] = "*";         headermapper.setrequestheadernames(allowedheaders);          amqpoutboundendpoint endpoint = new phasedamqpoutboundendpoint(amqptemplate);         endpoint.setheadermapper(headermapper);         endpoint.setexchangename(errorexchange.getname());         endpoint.setroutingkeyexpression(expressionparser.parseexpression("headers.routing"));         return endpoint;     }      @bean     public eventdrivenconsumer errorconsumer(subscribablechannel errorchannel, amqpoutboundendpoint errorendpoint)     {         final eventdrivenconsumer consumer = new eventdrivenconsumer(errorchannel, errorendpoint);         consumer.setbeanname("amqp-error-consumer");          return consumer;     }      @bean     public subscribablechannel errorchannel()     {         directchannel channel = new directchannel();         channel.setcomponentname("amqp-error-channel");         return channel;     }      @bean     public subscribablechannel confirmationchannel()     {         executorchannel channel = new executorchannel(executors.newsinglethreadexecutor());         channel.setcomponentname("amqp-confirmation-channel");         return channel;     }  } 

from understanding:

this message means (especially in shutdown case) beans not correctly wired or context not know best order stopping them.

what not understand: what's wrong setup? :(

update:

as can see code called inside actor. actor system configured way:

@bean @dependson({"asyncbus", "prototypedtransactionawarebus", "transactionawarebus", "syncbus"}) public springactorsystem actorsystem() throws exception {     string profile = environment.getactiveprofiles()[0];      actorsystem system = actorsystem.create(akkasettings.getsystemname(), akkaconfiguration(profile));      if ("testing".equals(profile)) {         cluster.get(system).joinseednodes(lists.newarraylist(cluster.get(system).selfaddress()));     }     if ("kubernetes".equals(profile)) {         joinkubernetesseednodes(system);     }      springextension.springextprovider.get(system).initialize(context);      return new springactorsystem(system); } 

and springactorsystem:

public class springactorsystem implements applicationlistener<contextclosedevent> {     private static final logger logger = loggerfactory.getlogger(springactorsystem.class);      private final actorsystem actorsystem;      public springactorsystem(actorsystem actorsystem)     {         this.actorsystem = actorsystem;     }      public actorsystem system()     {         return actorsystem;     }      @override     public void onapplicationevent(contextclosedevent event)     {         final cluster cluster = cluster.get(this.actorsystem);         cluster.leave(cluster.selfaddress());          logger.info("springactorsystem shutdown initiated");          this.actorsystem.terminate();         try {             await.result(actorsystem.whenterminated(), duration.create(10, timeunit.seconds));         } catch (exception e) {             logger.info("exception while waiting termination", e);         }          logger.info("springactorsystem shutdown finished");     } } 

well, think

springextension.springextprovider.get(system).initialize(context);

in initializing phase early.

consider implement smartlifecycle springactorsystem mover initialize(context) start() method.


Comments

Popular posts from this blog

c# - Binding a comma separated list to a List<int> in asp.net web api -

how to prompt save As Box in Excel Interlop c# MVC 4 -