spring - "Dispatcher has no subscribers" during startup -
i facing problem happens , hard reproduce. therefore have problems creating compelling test case.
our setup looks this:
- spring integration spring boot
- rabbitmq listener
- custom bus manages transaction aware messages
the setup not newest anymore , lot of code replaced more idiomatic spring wiring.
though exception getting when service started:
dispatcher has no subscribers, failedmessage=genericmessage [payload=com.foobar.service.greencheckout.message.greenorderpropertyaddedmessage@2ba0efb2, headers={id=f750a792-6b01-16d3-8206-6e553a03f8fa, type=com.foobar.service.greencheckout.message.greenorderpropertyaddedmessage, amqp_deliverymode=persistent, timestamp=1505797680967}], failedmessage=genericmessage [payload=com.foobar.service.greencheckout.message.greenorderpropertyaddedmessage@2ba0efb2, headers={id=f750a792-6b01-16d3-8206-6e553a03f8fa, type=com.foobar.service.greencheckout.message.greenorderpropertyaddedmessage, amqp_deliverymode=persistent, timestamp=1505797680967}] @ org.springframework.integration.channel.abstractsubscribablechannel.dosend(abstractsubscribablechannel.java:93) @ org.springframework.integration.channel.abstractmessagechannel.send(abstractmessagechannel.java:423) @ org.springframework.integration.channel.abstractmessagechannel.send(abstractmessagechannel.java:373) @ org.springframework.messaging.core.genericmessagingtemplate.dosend(genericmessagingtemplate.java:115) @ org.springframework.messaging.core.genericmessagingtemplate.dosend(genericmessagingtemplate.java:45) @ org.springframework.messaging.core.abstractmessagesendingtemplate.send(abstractmessagesendingtemplate.java:105) @ org.springframework.messaging.core.abstractmessagesendingtemplate.convertandsend(abstractmessagesendingtemplate.java:143) @ org.springframework.messaging.core.abstractmessagesendingtemplate.convertandsend(abstractmessagesendingtemplate.java:135) @ org.springframework.integration.gateway.messaginggatewaysupport.send(messaginggatewaysupport.java:392) @ org.springframework.integration.gateway.gatewayproxyfactorybean.invokegatewaymethod(gatewayproxyfactorybean.java:477) @ org.springframework.integration.gateway.gatewayproxyfactorybean.doinvoke(gatewayproxyfactorybean.java:429) @ org.springframework.integration.gateway.gatewayproxyfactorybean.invoke(gatewayproxyfactorybean.java:420) @ org.springframework.aop.framework.reflectivemethodinvocation.proceed(reflectivemethodinvocation.java:179) @ org.springframework.aop.framework.jdkdynamicaopproxy.invoke(jdkdynamicaopproxy.java:213) @ com.sun.proxy.$proxy223.send(unknown source) @ com.foobar.library.messaging.bus.transactionawarebus.aftercommit(transactionawarebus.java:50) @ org.springframework.transaction.support.transactionsynchronizationutils.invokeaftercommit(transactionsynchronizationutils.java:133) @ org.springframework.transaction.support.transactionsynchronizationutils.triggeraftercommit(transactionsynchronizationutils.java:121) @ org.springframework.transaction.support.abstractplatformtransactionmanager.triggeraftercommit(abstractplatformtransactionmanager.java:958) @ org.springframework.transaction.support.abstractplatformtransactionmanager.processcommit(abstractplatformtransactionmanager.java:803) @ org.springframework.transaction.support.abstractplatformtransactionmanager.commit(abstractplatformtransactionmanager.java:730) @ org.springframework.transaction.interceptor.transactionaspectsupport.committransactionafterreturning(transactionaspectsupport.java:504) @ org.springframework.transaction.interceptor.transactionaspectsupport.invokewithintransaction(transactionaspectsupport.java:292) @ org.springframework.transaction.interceptor.transactioninterceptor.invoke(transactioninterceptor.java:96) @ org.springframework.aop.framework.reflectivemethodinvocation.proceed(reflectivemethodinvocation.java:179) @ org.springframework.aop.framework.cglibaopproxy$dynamicadvisedinterceptor.intercept(cglibaopproxy.java:673) @ com.foobar.service.greencheckout.silo.domain.modifyingservice.transitionservice$$enhancerbyspringcglib$$de36730c.execute(<generated>) @ com.foobar.service.greencheckout.silo.domain.statemachine.orderfsm.invokecreationalcommandmethod(orderfsm.java:380) @ com.foobar.service.greencheckout.silo.domain.statemachine.orderfsm.lambda$allowcreational$1(orderfsm.java:345) @ akka.japi.pf.fsmstatefunctionbuilder$2.apply(fsmstatefunctionbuilder.java:80) @ akka.japi.pf.fsmstatefunctionbuilder$2.apply(fsmstatefunctionbuilder.java:77) @ akka.japi.pf.casestatement.apply(casestatements.scala:18) @ scala.partialfunction$class.applyorelse(partialfunction.scala:123) @ akka.japi.pf.casestatement.applyorelse(casestatements.scala:13) @ scala.partialfunction$orelse.apply(partialfunction.scala:167) @ scala.partialfunction$class.applyorelse(partialfunction.scala:123) @ akka.japi.pf.casestatement.applyorelse(casestatements.scala:13) @ scala.partialfunction$orelse.apply(partialfunction.scala:167) @ scala.partialfunction$class.applyorelse(partialfunction.scala:123) @ akka.japi.pf.casestatement.applyorelse(casestatements.scala:13) @ scala.partialfunction$orelse.apply(partialfunction.scala:167) @ scala.partialfunction$class.applyorelse(partialfunction.scala:123) @ akka.japi.pf.casestatement.applyorelse(casestatements.scala:13) @ scala.partialfunction$orelse.apply(partialfunction.scala:167) @ akka.actor.fsm$class.processevent(fsm.scala:663) @ akka.actor.abstractfsm.processevent(abstractfsm.scala:36) @ akka.actor.fsm$class.akka$actor$fsm$$processmsg(fsm.scala:657) @ akka.actor.fsm$$anonfun$receive$1.applyorelse(fsm.scala:651) @ akka.actor.actor$class.aroundreceive(actor.scala:497) @ akka.actor.abstractfsm.aroundreceive(abstractfsm.scala:36) @ akka.actor.actorcell.receivemessage(actorcell.scala:526) @ akka.actor.actorcell.invoke(actorcell.scala:495) @ akka.dispatch.mailbox.processmailbox(mailbox.scala:257) @ akka.dispatch.mailbox.run(mailbox.scala:224) @ akka.dispatch.mailbox.exec(mailbox.scala:234) @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260) @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339) @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979) @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) caused by: org.springframework.integration.messagedispatchingexception: dispatcher has no subscribers, failedmessage=genericmessage [payload=com.foobar.service.greencheckout.message.greenorderpropertyaddedmessage@2ba0efb2, headers={id=f750a792-6b01-16d3-8206-6e553a03f8fa, type=com.foobar.service.greencheckout.message.greenorderpropertyaddedmessage, amqp_deliverymode=persistent, timestamp=1505797680967}] @ org.springframework.integration.dispatcher.unicastingdispatcher.dodispatch(unicastingdispatcher.java:154) @ org.springframework.integration.dispatcher.unicastingdispatcher.dispatch(unicastingdispatcher.java:121) @ org.springframework.integration.channel.abstractsubscribablechannel.dosend(abstractsubscribablechannel.java:89) ... 58 more
the code set looks like:
import org.springframework.amqp.core.amqpadmin; import org.springframework.amqp.core.amqptemplate; import org.springframework.amqp.core.bindingbuilder; import org.springframework.amqp.core.exchange; import org.springframework.amqp.core.queue; import org.springframework.beans.factory.beanfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.context.annotation.dependson; import org.springframework.context.annotation.scope; import org.springframework.context.annotation.scopedproxymode; import org.springframework.expression.expressionparser; import org.springframework.integration.amqp.outbound.amqpoutboundendpoint; import org.springframework.integration.amqp.support.defaultamqpheadermapper; import org.springframework.integration.channel.directchannel; import org.springframework.integration.channel.executorchannel; import org.springframework.integration.channel.nullchannel; import org.springframework.integration.endpoint.eventdrivenconsumer; import org.springframework.integration.gateway.gatewayproxyfactorybean; import org.springframework.messaging.subscribablechannel; import java.util.concurrent.executors; @configuration public class messagingoutboundconfiguration { @autowired private amqptemplate amqptemplate; @autowired private exchange publisherexchange; @autowired private amqpadmin amqpadmin; @autowired private exchange errorexchange; @autowired private queue errorqueue; @autowired private beanfactory beanfactory; @autowired private expressionparser expressionparser; @autowired private messagingsettings messagingsettings; @bean @dependson({"connectionfactory", "consumer"}) public asynchronousbus asyncbus(subscribablechannel amqpoutboundchannel) throws exception { gatewayproxyfactorybean factorybean = new gatewayproxyfactorybean(asynchronousbus.class); factorybean.setbeanfactory(beanfactory); factorybean.setdefaultrequestchannel(amqpoutboundchannel); factorybean.afterpropertiesset(); return (asynchronousbus) factorybean.getobject(); } @bean @scope(value = "request", proxymode = scopedproxymode.target_class) public transactionawarebus transactionawarebus() { return new transactionawarebus(); } /** * channel message bus outbound channel adapter */ @bean public subscribablechannel amqpoutboundchannel(headerchannelinterceptor headerchannelinterceptor) { directchannel channel = new directchannel(); channel.setcomponentname("amqp-outbound-channel"); channel.addinterceptor(headerchannelinterceptor); return channel; } /** * outbound channel adapter */ @bean public amqpoutboundendpoint endpoint(subscribablechannel confirmationchannel) { defaultamqpheadermapper headermapper = defaultamqpheadermapper.outboundmapper(); string[] allowedheaders = new string[]{"*"}; headermapper.setrequestheadernames(allowedheaders); headermapper.setreplyheadernames(allowedheaders); amqpoutboundendpoint endpoint = new phasedamqpoutboundendpoint(amqptemplate); endpoint.setheadermapper(headermapper); endpoint.setexchangename(publisherexchange.getname()); endpoint.setroutingkeyexpression(expressionparser.parseexpression("headers.type")); endpoint.setconfirmcorrelationexpression(expressionparser.parseexpression("#this")); if (messagingsettings.getpublisherconfirmations()) { endpoint.setconfirmnackchannel(confirmationchannel); endpoint.setconfirmackchannel(new nullchannel()); } return endpoint; } @bean public headerchannelinterceptor headerchannelinterceptor() { return new headerchannelinterceptor(); } /** * shovels messages channel outbound channel adapter */ @bean public eventdrivenconsumer consumer(subscribablechannel amqpoutboundchannel, amqpoutboundendpoint endpoint) { final eventdrivenconsumer consumer = new eventdrivenconsumer(amqpoutboundchannel, endpoint); consumer.setbeanname("amqp-outbound-consumer"); return consumer; } /** * outbound channel adapter */ @bean public amqpoutboundendpoint errorendpoint() { amqpadmin.declarebinding(bindingbuilder.bind(errorqueue).to(errorexchange).with("#").noargs()); defaultamqpheadermapper headermapper = defaultamqpheadermapper.outboundmapper(); string[] allowedheaders = new string[1]; allowedheaders[0] = "*"; headermapper.setrequestheadernames(allowedheaders); amqpoutboundendpoint endpoint = new phasedamqpoutboundendpoint(amqptemplate); endpoint.setheadermapper(headermapper); endpoint.setexchangename(errorexchange.getname()); endpoint.setroutingkeyexpression(expressionparser.parseexpression("headers.routing")); return endpoint; } @bean public eventdrivenconsumer errorconsumer(subscribablechannel errorchannel, amqpoutboundendpoint errorendpoint) { final eventdrivenconsumer consumer = new eventdrivenconsumer(errorchannel, errorendpoint); consumer.setbeanname("amqp-error-consumer"); return consumer; } @bean public subscribablechannel errorchannel() { directchannel channel = new directchannel(); channel.setcomponentname("amqp-error-channel"); return channel; } @bean public subscribablechannel confirmationchannel() { executorchannel channel = new executorchannel(executors.newsinglethreadexecutor()); channel.setcomponentname("amqp-confirmation-channel"); return channel; } }
from understanding:
this message means (especially in shutdown case) beans not correctly wired or context not know best order stopping them.
what not understand: what's wrong setup? :(
update:
as can see code called inside actor. actor system configured way:
@bean @dependson({"asyncbus", "prototypedtransactionawarebus", "transactionawarebus", "syncbus"}) public springactorsystem actorsystem() throws exception { string profile = environment.getactiveprofiles()[0]; actorsystem system = actorsystem.create(akkasettings.getsystemname(), akkaconfiguration(profile)); if ("testing".equals(profile)) { cluster.get(system).joinseednodes(lists.newarraylist(cluster.get(system).selfaddress())); } if ("kubernetes".equals(profile)) { joinkubernetesseednodes(system); } springextension.springextprovider.get(system).initialize(context); return new springactorsystem(system); }
and springactorsystem:
public class springactorsystem implements applicationlistener<contextclosedevent> { private static final logger logger = loggerfactory.getlogger(springactorsystem.class); private final actorsystem actorsystem; public springactorsystem(actorsystem actorsystem) { this.actorsystem = actorsystem; } public actorsystem system() { return actorsystem; } @override public void onapplicationevent(contextclosedevent event) { final cluster cluster = cluster.get(this.actorsystem); cluster.leave(cluster.selfaddress()); logger.info("springactorsystem shutdown initiated"); this.actorsystem.terminate(); try { await.result(actorsystem.whenterminated(), duration.create(10, timeunit.seconds)); } catch (exception e) { logger.info("exception while waiting termination", e); } logger.info("springactorsystem shutdown finished"); } }
well, think
springextension.springextprovider.get(system).initialize(context);
in initializing phase early.
consider implement smartlifecycle
springactorsystem
mover initialize(context)
start()
method.
Comments
Post a Comment