How to send Messages in spring boot via websocket broker in the order they were submitted

问题: I want to configure a spring boot websocket message broker such that it dispatches messages in the order they were submitted. Based on answers to similar questions I have...

问题:

I want to configure a spring boot websocket message broker such that it dispatches messages in the order they were submitted.

Based on answers to similar questions I have tried to set the pool size of the dispatching task executors to 1, but I still get messages dispatched in the wrong order.

For debugging purposes I have added pre- and post-send channel interceptors which log the threads on which the messages are being dispatched and I can see that the Thread IDs vary.

What I am I doing wrong?

Code (Kotlin):

Websocket configuration:

package foo.bar

import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.Message
import org.springframework.messaging.MessageChannel
import org.springframework.messaging.simp.config.ChannelRegistration
import org.springframework.messaging.simp.config.MessageBrokerRegistry
import org.springframework.messaging.support.ChannelInterceptor
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker
import org.springframework.web.socket.config.annotation.StompEndpointRegistry
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer

@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer{
    companion object {
        private val LOGGER = LoggerFactory.getLogger(WebSocketConfig::class.java)
    }
    override fun configureMessageBroker(config: MessageBrokerRegistry) {
        config.enableSimpleBroker("/topic")
        config.setApplicationDestinationPrefixes("/app");
        config.configureBrokerChannel().taskExecutor().corePoolSize(1)
        config.configureBrokerChannel().taskExecutor().maxPoolSize(1)
        val channelInterceptor: ChannelInterceptor = object: ChannelInterceptor {
            override fun preSend(message: Message<*>, channel: MessageChannel): Message<*> {
                LOGGER.debug("Message broker sending message on Thread " + Thread.currentThread().id);
                return message
            }

            override fun postSend(message: Message<*>, channel: MessageChannel, sent: Boolean) {
                LOGGER.debug("Message broker sent message on Thread " + Thread.currentThread().id);
            }
        }
        config.configureBrokerChannel().interceptors(channelInterceptor)
    }

    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        registry.addEndpoint("/ws")
                .withSockJS()
    }

    override fun configureClientOutboundChannel(registration: ChannelRegistration) {
        registration.taskExecutor().corePoolSize(1)
        registration.taskExecutor().maxPoolSize(1)
    }

    override fun configureClientInboundChannel(registration: ChannelRegistration) {
        registration.taskExecutor().corePoolSize(1)
        registration.taskExecutor().maxPoolSize(1)
    }
}

(Stripped) code for sending the message:

@Controller
class StateController

@Autowired constructor(
    private val template: SimpMessagingTemplate
) {

....

    fun publishMsg(topicId: String, msg: MyMessageType){
        template.convertAndSend("/topic/msg/"+topicId, msg)
    }
}

Here is some examplary logging. As you can see, the executor is using more than one Thread, or rather: there seems to me more than one executor. Also the Thread IDs are jumping back and forth, which to me looks like a clear confirmation that the dispatch execution is not as single-threaded as I would expect. Logging:

09:48:12.257 DEBUG [ault-executor-4] channelInterceptor$1.preSend             :  32  Message broker sending message on Thread 60
09:48:12.257 DEBUG [ault-executor-0] channelInterceptor$1.preSend             :  32  Message broker sending message on Thread 47
09:48:12.257 DEBUG [ault-executor-0] channelInterceptor$1.postSend            :  38  Message broker sent message on Thread 47
09:48:12.257 DEBUG [ault-executor-4] channelInterceptor$1.postSend            :  38  Message broker sent message on Thread 60

回答1:

After further debugging I found my mistake. The different threads are from the submitting code. I should have added the channel interceptor to the outbound client channel, not the broker:

override fun configureClientOutboundChannel(registration: ChannelRegistration) {
    val channelInterceptor: ChannelInterceptor = object: ChannelInterceptor {
        override fun preSend(message: Message<*>, channel: MessageChannel): Message<*> {
            LOGGER.debug("Message broker sending message on Thread " + Thread.currentThread().id);
            return message
        }

        override fun postSend(message: Message<*>, channel: MessageChannel, sent: Boolean) {
            LOGGER.debug("Message broker sent message on Thread " + Thread.currentThread().id);
        }
    }
    registration.interceptors(channelInterceptor)
}
  • 发表于 2019-01-09 23:54
  • 阅读 ( 445 )
  • 分类:网络文章

条评论

请先 登录 后评论
不写代码的码农
小编

篇文章

作家榜 »

  1. 小编 文章
返回顶部
部分文章转自于网络,若有侵权请联系我们删除