stomp over websocket协议原理与实现
文章目录
本文你将学习到
- websocket协议原理.
- stomp协议原理.
- websocket协议的缺点, 为什么websocket协议需要stomp协议来补充.
- spring websocket架构与实现.
- spring websocket的性能优化.
- 如果使用java编写stomp over websocket协议的client.
stomp over websocket协议原理
http协议
http协议是单工的, 只能由client发请求再由server返回请求结果, 在http协议中server是不能主动发请求到client的.
单工, 半双工, 全双工区别 参考: http://blog.csdn.net/erwangshi/article/details/44940069
大多数情况下http协议都是适用的, 但当遇到在线聊天, 股票行情这样需要实时获取取服务端信息的应用时, client需要频繁轮询server 过程如下:
- client 建立连接
- client 问server有没有新的消息, 并根据返回结果进行处理
- client 关闭连接
- client 建立连接
- client 问server有没有新的消息, 并根据返回结果进行处理
- client 关闭连接
- …
为了拿到最新信息, client一直这样循环下去 server如果一直没有新的消息, client的大多请求都是无效的, 效率低下.
为了提高效率需要有一种协议可以让server主动发消息给client. 这样就不需要client频繁轮询, 只要server有新消息就会主动推送给client.
这种协议就是websocket协议.
websocket协议
webSocket协议是基于TCP的一种网络协议。它实现了浏览器与服务器全双工
(full-duplex)通信——允许服务器主动发送信息给客户端。
在webscoket协议中, client利用http来建立tcp连接, 建立tcp连接之后, client与server就可以基于tcp连接来愉快的进行通信了.
那么webscoket如何利用http建立连接的呢?
建立连接
client与server是利用http的一次request, response来建立连接的.
其中http request消息体如下:
GET /echo HTTP/1.1
Host:jingxu.test.com:8100
Origin:http://jingxu.test.com:8099
Sec-WebSocket-Extensions:permessage-deflate; client_max_window_bits
Sec-WebSocket-Key:rVX0XFeQzA9QVXXEfjm0yw==
Sec-WebSocket-Version:13
Upgrade:websocket
其中前三个header:Get, Host, Origin都是http协议之前就有, 不多做解释, 主要解释一下后面这几个header Sec-WebSocket-Extensions: 用于对websocket协议进行扩展. 比如websocket协议本身不支持压缩, 但可以通过Sec-WebSocket-Extensions中的permessage-deflate来协商压缩. Sec-WebSocket-Key:client随机生成的一段key. 详情之后response中Sec-WebSocket-Accept的解释. Sec-WebSocket-Version:协议的版本号 Upgrade:通过http的Upgrade对协议进行切换. 告诉server, 建立连接后用websocket协议.
http response消息体如下:
Connection:upgrade
Date:Mon, 04 Dec 2017 10:05:18 GMT
Sec-WebSocket-Accept:q3dUKg4lRGCqqRhGIvmE7sH8Yuc=
Sec-WebSocket-Extensions:permessage-deflate;client_max_window_bits=15
Upgrade:websocket
Connection与Date都是http协议之前就有的header, 主要解释一下后面的. Sec-WebSocket-Accept:对应于request中的Sec-WebSocket-Key. server会根据request中的Sec-WebSocket-Key的值来生成response中的Sec-WebSocket-Accept的值. 具体的算法是根据Sec-WebSocket-Key与协议中已定义的一个guid “258EAFA5-E914-47DA-95CA-C5AB0DC85B11”进行拼接 再对结果进行sha1, 再对sha1的结果进行base64, 最后得到Sec-WebSocket-Accept的值.
client通过验证server返回的Sec-WebSocket-Accept的值, 来确定两件事情:
- server理解websocket协议. 如果server不理解, 那么server不会返回正确的Sec-WebSocket-Accept. 如果server没有返回正确的Sec-WebSocket-Accept, 那么建立websocket连接失败.
- server返回的response是对于client的此次reuqest的响应而不是之前的缓存. 主要是防止有些缓存服务器返回缓存的response.
发送数据
现在websocket连接已经建立, 由于websocket没有规范payload的格式, 所以应用需要自己去定义payload的格式.
websocket的payload可以是文本也可以是二进制. 应用一般会选择用文本. 这个文本是什么格式websocket协议本身并没有规定, 由应用自己来定.
比如我要请求发送消息这个接口, 那么我的payload可以写成:
/send | params=我是消息
这里我自己定义了一个格式, 中坚线之前的是要调用的地址, 中竖线之后是参数.
由于格式是自己定义的, 所以在服务端我也需要自己写代码来解析这个格式.
把/send
路由到相应的处理方法.
那有没有一种统一的协议呢? 统一的标准呢? 因为这样就会有相应的已经实现的库来解析路由, 而不用自己去写, 自己去定义格式.
这个统一的协议就是stomp协议.
stomp协议
stomp是一个用于client之间进行异步消息传输的简单文本协议, 全称是Simple Text Oriented Messaging Protocol. > 对于stomp协议来说, client分为消费者client与生产者client两种. server是指broker, 也就是消息队列的管理者.
stomp协议并不是为websocket设计的, 它是属于消息队列的一种协议, 和amqp, jms平级. 只不过由于它的简单性恰巧可以用于定义websocket的消息体格式. stomp协议很多mq都已支持, 比如rabbitmq, activemq. 很多语言也都有stomp协议的解析client库.
可以这么理解, websocket结合stomp相当于一个面向公网对用户比较友好的一种消息队列.
stomp协议中的client分为两角色:
- 生产者: 通过
SEND
命令给某个目的地址(destination)发送消息. - 消费者: 通过
SUBSCRIBE
命令订阅某个目的地址(destination), 当生产者发送消息到目的地址后, 订阅此目的地址的消费者会即时收到消息.
stomp协议的结构与http结构相似, 结构如下:
COMMAND
header1:value1
header2:value2
Body^@
其中^@代表null结尾.
与http相似有三部分组成: 命令, header, 消息体. 命令与header使用utf-8格式, body可以是二进制也可以是文本.
命令有SEND, SUBSCRIBE, MESSAGE, CONNECT, CONNECTED
等.
header类似http有content-length, content-type等. 消息体类似http可以是二进制也可以是文本.
下面例举一些主要命令
建立连接
和http, websocket类似, 首先要确认双方都懂stomp这个协议, 通过建立连接来确认. 由于我们已经建立了webscoket连接, 接下来我只需要在webscoket连接的基础上建立stomp连接.
将以下内容写到websocket的payload中, 来发送建立stomp连接请求
CONNECT
accept-version:1.2
host:stomp.github.org
^@
stomp协议并不与websocket协议耦合, 比如双方建立了tcp连接, 那么完成可以在tcp连接上建立stomp连接, 也就是将上述内容写到tcp的payload中.
server收到后回复, 同样以下内容是在server回复的websocket的消息体中
CONNECTED
version:1.2
^@
这样一个stomp连接就建立了, 协议版本为1.2.
心跳
由于stomp连接是一个长连接, stomp协议定义了发送心跳来监测stomp连接是否存活.
在CONNECT命令消息中加入heart-beat心跳header来建立连接就开启了心跳:
CONNECT
accept-version:1.2
heart-beat:<cx>,<cy>
host:stomp.github.org
^@
server回复CONNECTED命令消息
CONNECTED
version:1.2
heart-beat:<sx>,<sy>
^@
其中<cx>, <cy>, <sx>, <sy>
分别代表一个以毫秒为单位的数字.
client发送的CONNECT命令消息中的<cx>,<cy>
分别代表:
<cx>
:client能保证的发送心跳的最小间隔, 如果是0代表client不发送心跳.<cy>
:client希望收到server心跳的间隔, 如果是0代表client不希望收到server的心跳. 与client类似, server发送的CONNECTED命令消息中的<sx>,<sy>
分别代表:<sx>
:server能保证的发送心跳的最小间隔, 如果是0代表server不发送心跳.<sy>
:server希望收到client心跳的间隔, 如果是0代表server不希望收到client的心跳.
如果在建立连接时没有心跳header, 默认当作heart-beat:0,0
. 也就是不发心跳, 也不希望对方发心跳.
加入心跳header进行连接后, 最终协商得出发送心跳的频率的逻辑如下:
对于client来说, 取<cx>
与<sy>
的最大值, 也就是说client会取client最小能发送的间隔与server希望client发送间隔的最大值来发送心跳.
如果<cx>
或<sy>
中任何一个为0, client都不发送心跳.
类似, 对于server来说, 取<sx>
与<cy>
的最大值, 也就是说server取server最小能发送的间隔与client希望server发送间隔的最大值来发送心跳.
如果<sx>
或<cy>
中任何一个为0, server都不发送心跳.
举个例子, 如下建立连接
CONNECT
accept-version:1.2
heart-beat:1000,2000
host:stomp.github.org
^@
CONNECTED
version:1.2
heart-beat:3000,4000
^@
上述, client发送CONNECT命令消息中heart-beat为1000, 2000. 解释为client最小能保证发送心跳间隔为1000毫秒, 希望server发送心跳间隔为2000毫秒
server回复的CONNECTED命令消息中heart-beat为3000,4000 解释为server最小能保证发送心跳间隔为3000毫秒, 希望client发送心跳间隔为4000毫秒
最终的协商结果, client取自己能保证的最小心跳间隔1000毫秒与server希望client发送心跳间隔4000毫秒的最大值 得出client会每4000毫秒发一次心跳,
同理, server取自己能保证的最小心跳间隔3000毫秒与client希望server发送心跳间隔2000毫秒的最大值 得出server会每3000毫秒发一次心跳.
client和server根据心跳来判定对方已经挂掉了的逻辑如下:
以server为例, 假设经过协商, client每10秒发送一个心跳.
- client必须在10秒以内给server至少发送一次数据, 不管是心跳还是正常数据.
- 如果在10秒内client未发送数据, 那么server认为与client的stomp连接已经挂掉.
现在连接已经建立, 接下来准备发送stomp消息.
发送消息
发送消息使用SEND这个COMMAND, 如下:
SEND
destination:/topic/a
content-type:text/plain
hello world
^@
其中destination这个header的值为发送消息的目的地址.
上述SEND命令消息的意思为, 给/topic/a
这个目的地址发送一条类型为text/plain
, 内容是hello world的消息.
所有订阅/topic/a
这个目的地址的消费者client都会收到hello world这条消息.
stomp协议并没有规定destination的格式, 这个是由使用stomp协议的应用自己来定义. 比如,
/topic/a, /queue/a, queue.a, topic.a, topic-a, queue-a
对于stomp协议来说都是正确的. 应用可以自己规定不同的格式以及此格式代表的含义. 比如, 应用自己可以定义以/topic打头的为发布订阅模式, 消息会被所有消费者client收到, 以/queue打头的为负载平衡模式, 只会被一个消费都client收到.
client发送SEND命令消息如何确保server收到了这条消息呢?
协议规定, 可以在SEND命令消息中加入receipt header. receipt header的值唯一确定一次send. server收到有receipt header的SEND命令消息后, 需要回复一个RECEIPT命令消息, 里面会包含receipt-id header, receipt-id的值就是SEND命令消息中receipt header的值. 这样当client收到了这条RECEIPT命令消息后, 就能确定server已收到SEND命令消息. 例如:
SEND
destination:/queue/a
receipt:message-12345
hello queue a^@
RECEIPT
receipt-id:message-12345
^@
接下来看一下消费者client如何订阅消息.
订阅消息
订阅消息用SUBSCRIBE命令, 如下:
SUBSCRIBE
id:0
destination:/topic/foo
ack:client
^@
上述代表client订阅/topic/foo
这个目的地址.
其中多了两个新的header: id与ack.
订阅中的id header 此id能唯一确定一个订阅. 一个client对于一个server可以订阅多次, 甚至对于同一个目的地址都可以订阅多次. 为了唯一确定一次订阅, 协议规定必须包含id header, 此id要求在同一连接中唯一.
订阅中的ack header
ack header告诉server, server如何确认client已经收到消息.
有三个值: auto, client, client-individual
auto
表示当server发出消息后就立即确认client收到了消息. 也就是说当client收到消息后不会对server进行确认.client
表示只有当server收到client的ack后才确认client收到了消息, 也就是说client需要对server发ack进行确认. 这个确认是累积的, 意思是说收到某条消息的ack, 那么这条消息之前的所有的消息, server都认为client已收到.client-individual
与client类似. 只不过不是累积的. 每收到一条消息都需要给server回复ack来确认.
有订阅消息, 那肯定有取消订阅消息.
取消订阅
取消订阅用UNSUBSCRIBE这个命令
UNSUBSCRIBE
id:0
^@
取消订阅相对来说比较简单只需要传一个id header. 这个id header的值来自订阅时id header值. 这样server才能唯一确定到底要取消哪个订阅.
当有生产者client给目的地址发消息后, 首先server会收到消息, server收到消息后会把消息发送给所有订阅这个目的地址的client, 那么server是如何发送这个消息到消费都client的呢?
server发送消息
server发送消息用MESSAGE这个命令来给client发送消息, 如下
MESSAGE
subscription:0
message-id:007
destination:/queue/a
content-type:text/plain
hello queue a^@
message-id这个header的值能唯一确定一条消息 subscription的值就是订阅时SUBSCRIBE命令中id header的值, 表示这条消息属于哪个订阅.
到此, 介绍了一些stomp常用的命令, 还有一些其他命令, 有兴趣可以查看stomp协议文档: https://stomp.github.io/stomp-specification-1.2.html
总结
由于http是一个单工的协议, server不能主动发送消息给client, 导致http在处理实时性要求高的应用时效率不高.
为了提高效率, 我们使用了全双工的websocket协议, 可以让server主动推送消息.
又由于websocket协议是个低层协议, 不是应用层协议, 未对payload的格式进行规范, 导致我们需要自己定义消息体格式, 自己解析消息体, 成本高, 扩展性也不好, 所以我们引入了已被很多库和消息队列厂商实现的stomp协议, 将websocket协议与stomp协议结合.
我们再总结一下websocket与stomp的优点 websocket相对于http的优点:
- 全双工. 相对于http协议只能由client发送消息. 全双工的websocket协议, server与client都可以发送消息.
- 消息体更轻量. http的一个请求比websocket的请求大不少. 主要因为http的每次请求都要加很多的header.
stomp over websocket相对于websocket的优点:
- 不需要自己去规定消息的格式, 以及对消息的格式做解析.
- 由于stomp是一个统一的标准, 有很多库与厂商都对stomp协议进行了支持. 拿来用就可以. 成本低, 扩展好.
理论到此为止, 接下来我们一起了解下spring websocket是如何实现stomp over websocket协议的.
使用spring实现stomp over websocket
先说说spring websocket的架构
架构
上面的图是spring websocket的架构图. 其中由以下几个角色:
- 生产者client: 发送send命令到某个目的地址(destination)的client.
- 消费者client: 订阅某个目的地址(destination), 并接收此目的地址所推送过来的消息的client.
- request channel: 一组用来接收生产者client所推送过来的消息的线程池.
- response channel: 一组用来推送消息给消费者client的线程池.
- broker: 消息队列管理者. 简单讲就是记录哪些client订阅了哪个目的地址(destination).
- 应用目的地址(图中的”/app”): 发送到这类目的地址的消息在到达broker之前, 会先路由到由应用写的某个方法. 相当于对进入broker的消息进行一次拦截, 目的是针对消息做一些业务处理.
- 非应用目的地址(图中的”/topic”): 发送到这类目的地址的消息会直接转到broker. 不会被应用拦截.
- SimAnnotatonMethod: 发送到应用目的地址的消息在到达broker之前, 先路由到的方法. 这部分代码是由应用控制的.
一个消息从生产者发出到消费者消费, 流程如下:
- 生产者通过发送一条SEND命令消息到某个目的地址(destination)
- 服务端request channel接受到这条SEND命令消息
- 如果目的地址是应用目的地址则转到相应的由应用自己写的业务方法做处理, 再转到broker.
- 如果目的地址是非应用目的地址则直接转到broker.
- broker通过SEND命令消息来构建MESSAGE命令消息, 再通过response channel推送MESSAGE命令消息到所有订阅此目的地址的消费者.
一些demo
demo网上已经有很多了, 可以依照这些demo来搭建一个spring stomp over websocket应用, 比如:
- 简单的聊天应用:https://spring.io/guides/gs/messaging-stomp-websocket/
- 简单的股票行情应用:https://github.com/rstoyanchev/spring-websocket-portfolio
这里主要介绍一些其他需要注意的点.
配置broker
继承AbstractWebSocketMessageBrokerConfigurer重写configureMessageBroker方法
配置简单broker
spring自带了一个基于于内存的broker实现—SimpleBrokerMessageHandler. 简单broker只是为了让开发快速搭建一个stomp over websocket应用. 简单broker有以下缺点:
- 对于任何形式的目的地址(destination)都只能是发布订阅模式(topic), 没有负载均衡模式(queue)
- 只能单点.也就是如果启动两个实例, 他们之间的简单broker是不共享数据的.
配置简单broker代码如下:
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
//配置前缀, 有这些前缀的会路由到broker
config.enableSimpleBroker("/topic", "/queue")
//配置stomp协议里, server返回的心跳
.setHeartbeatValue(new long[]{10000l, 10000l})
//配置发送心跳的scheduler
.setTaskScheduler(new DefaultManagedTaskScheduler());
//配置前缀, 有这些前缀的会被到有@SubscribeMapping与@MessageMapping的业务方法拦截
config.setApplicationDestinationPrefixes("/app", "/topic", "/queue");
}
解读上述配置:
- setHeartbeatValue所配置的是server返回client的CONNECTED命令消息中heart-beat header的值. 就像理论中stomp部分说的一样, 第一值表示server最小能保证发的心跳间隔毫秒数, 第二个值代码server希望client发的心跳间隔毫秒数. 返回10000, 10000并不代表client与server都是10秒发心跳. 最终client与server到底以什么频率发心跳还需要结合client发送的CONNECT命令消息中的heart-beat header中的值.
- 如果目的地址的前缀在enableSimpleBroker的配置中, 那么会被路由到简单broker. 也就是说订阅不再enableSimpleBroker配置中的目的地址, broker将不接收, 订阅不会起作用.
- 如果目的地址以setApplicationDestinationPrefixes中配置的前缀打头则首先会被路由到@SubscribeMapping与@MessageMapping的业务方法中, 进行处理, 之后再到broker.
比如上述配置前缀/topic
与/queue
即在setApplicationDestinationPrefixes中配置中, 又在enableSimpleBroker配置中, 再结合架构那节所讲的, 那么当一个SUBSCRIBE命令订阅以/topic/chat
为前缀的目的地址时, 流程如下:
* 先路由到controller中以`@SubscribeMapping("/chat")`所标注的方法. (在这个业务方法中可以直接返回数据, 返回数据会同步返回给client)
* 然后再中路由到broker, 订阅`/topic/chat`目的地址
配置外部broker
由于简单broker有上节讲的缺点, 一般不会在生产环境使用. 如果要在生产环境使用, 则需要配置外部broker, 比如, rabbitmq或activemq. 以下以rabbitmq为例, 首先需要安装rabbitmq并开启stomp, 如下我是在centos6.5环境执行的命令:
安装rabbitmq
创建文件/etc/yum.repos.d/rabbitmq-erlang.repo
# In /etc/yum.repos.d/rabbitmq-erlang.repo
[rabbitmq-erlang]
name=rabbitmq-erlang
baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/20/el/6
gpgcheck=1
gpgkey=https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
repo_gpgcheck=0
enabled=1
安装erlang
sudo yum install erlang
下载rabbitmq rpm包 wget https://www.rabbitmq.com/install-rpm.html
安装
sudo rpm -ivh rabbitmq-server-3.6.14-1.el6.noarch.rpm
启用stomp扩展
sudo rabbitmq-plugins enable rabbitmq_stomp
启用rabbitmq
/etc/init.d/rabbitmq-server start
spring配置
简单broker使用enableSimpleBroker, 而外部broker使用enableStompBrokerRelay, 如下
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("rabbitmq地址")
.setRelayHost("stomp端口");
//配置前缀, 有这些前缀的会被到有@SubscribeMapping与@MessageMapping的业务方法拦截
config.setApplicationDestinationPrefixes("/app", "/topic", "/queue");
}
配置endpoints
endpoints就是连接点, websocket根据哪个url来进行webscoket连接. 同样, 继承AbstractWebSocketMessageBrokerConfigurer重写registerStompEndpoints方法:
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/stomp/echo")
.setAllowedOrigins("*")
.withSockJS();
registry.addEndpoint("/stomp/echo");
}
上面我配置了两个连接点, 同为”/stomp/echo”. 第一个连接点配置是为了sockjs可以进行连接并允许跨域. 这个连接点是为了h5应用进行连接准备的连接点. 第二个连接点配置是为了普通的stomp client进行连接. 这个连接点是为了原生应用进行连接准备的连接点.
性能优化
在架构那节, 我们知道spring wesocket在处理消息时有两个线程池: 一个为request channel线程池接收并处理来自client的请求 一个为response channel线程池发送消息给client.
性能优化主要对这两个线程池进行配置. 默认这两个线程池核心线程数为系统核数*2.
如果业务没大量io操作, client与server网络情况情况良好, 则默认配置就可以.
这里的业务方法就是指在到达broker之前被业务拦截的那些方法.
如果业务方法有大量io操作, 那应当适当加大request channel的线程数, 以充分利用cpu. 如果client与server之间网络连接不可控, 比如通过外网连接手机上的客户端, 则应该当适当加大response channel的线程数.
同样, 继承AbstractWebSocketMessageBrokerConfigurer重写configureClientInboundChannel与configureClientOutboundChannel方法 来配置request channel与response channel线程池. 如下:
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.setInterceptors(new StompChannelInterceptor());
registration.taskExecutor()
.corePoolSize(32)
.maxPoolSize(200);
.queueCapacity(10000);
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.setInterceptors(new StompChannelInterceptor());
registration.taskExecutor()
.corePoolSize(100)
.maxPoolSize(400);
.queueCapacity(20000);
}
其中corePoolSize为核心线程数, maxPoolSize最大线程数, queueCapacity队列容积. 这里需要注意一点, queueCapacity的默认配置是无限大, 如果是无限大, 那么线程数则永远是核心线程数. 只能当队列容积不够用时, 实际线程数才会大于核心线程数.
由于server到client的网络状况以及client的处理能力很难预测, 所以合理配置response channel线程数相对比较困难. 为此spring websocket提供了两个额外的配置来进行优化. 继承AbstractWebSocketMessageBrokerConfigurer重写configureWebSocketTransport方法, 如下:
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration
.setSendTimeLimit(15 * 1000)
.setSendBufferSizeLimit(512 * 1024);
}
其中setSendTimeLimit限制了一条消息发送到client的时间, 如果在配置时间内发送不到client则会尝试关闭session. setSendBufferSizeLimit设置了缓存大小, 发送不到client的消息都会先缓存, 如果缓存满则会尝试关闭session.
注意只有当消息累积时上述配置才启作用. 比如如果只发一条消息到client, 而这条消息被阻塞, 上述配置并不启作用. 只能当第二条消息进入队列并阻塞, 上述配置才启作用.
client编写
这里只介绍如何用java写stomp client以方便做单元测试. > js client可以参考demo小节两个例子中的scokjs用法.
以下代码是连接在配置endpoints小节中配置的第二个连接点, 并订阅以及发送消息.
import com.jingxu.web.vo.Message;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.simp.stomp.*;
import org.springframework.util.MimeType;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import java.lang.reflect.Type;
public class StompClientDemo {
public static void main(String[] args) {
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient webSocketStompClient = new WebSocketStompClient(webSocketClient);
//配置convert, 这里配置了对象与json互转
webSocketStompClient.setMessageConverter(new MappingJackson2MessageConverter());
//配置心跳
webSocketStompClient.setDefaultHeartbeat(new long[]{10000l, 10000l});
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
webSocketStompClient.setTaskScheduler(taskScheduler);
//连接
webSocketStompClient.connect("ws://地址/stomp/echo", new MyHandler());
try {
//hang信进程, 如果不hang, 进程线程websocket连接也就都关闭了
Thread.sleep(100000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static class MyHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(StompSession stompSession, StompHeaders stompHeaders) {
stompSession.subscribe("/topic/chat", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return Message.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
System.out.println("收到消息:" + payload);
}
});
StompHeaders sendStompHeaders = new StompHeaders();
stompHeaders.setDestination("/app/send");
stompHeaders.setContentType(MimeType.valueOf("application/json;charset=utf-8"));
//这个Message是我自己随便定义的一个结构体, 这个可以自己定义
Message message = new Message();
message.setType(1);
message.setContent("hello world!");
stompSession.send(sendStompHeaders, message);
}
}
}
如果想连接配置endpoints小节中配置的第一个连接点, 也就是模仿sockjs进行连接, 则连接代码变为如下:
WebSocketClient webSocketClient = new StandardWebSocketClient();
//创建sockJsClient
List<Transport> transports = new ArrayList<>();
transports.add(new WebSocketTransport(webSocketClient));
SockJsClient sockJsClient = new SockJsClient(transports);
//构造函数中传入sockJsClient而不是原来的webSocketClient
WebSocketStompClient webSocketStompClient = new WebSocketStompClient(sockJsClient);
//配置convert, 这里配置了对象与json互转
webSocketStompClient.setMessageConverter(new MappingJackson2MessageConverter());
//配置心跳
webSocketStompClient.setDefaultHeartbeat(new long[]{10000l, 10000l});
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
webSocketStompClient.setTaskScheduler(taskScheduler);
//连接
webSocketStompClient.connect("ws://地址/stomp/echo", new MyHandler());
相对于连接第一个连接点, 主要修改了3到9行.
性能测试注意事项
性能测试可以自己写一个client建立大量连接, 并发送消息来测试 就把上述client例子改一改就可以用了
需要注意的是, 由于要建立大量连接, 需要把系统中ulimit调大, 临时调大的方法通过ulimit -n 100000
来设置;
临时调大只影响当前session, 所以调大后在同一session中重启应用并运行测试程序.
我在测试中遇到这么一个报错No messages received after xxx closing xxx.
这是由于在建立websocket连接之后一分钟内未建立stomp连接导致的.
总结
以上就是我在使用spring websocket搭建stomp over websocket应用时的一些经验. 可见用spring实现一个完备的websocket应用是非常简单的.