解决websocket集群的session共享问题

在websocket中,服务端主要使用的是session打交道,但是由于session无法实现序列化,不能存储到redis这些中间存储里面,因此这里我们只能把session存储在本地的内存中,那么如果是集群的话,我们如何实现session准确的发送消息呢,其实就是session共享。在websocket中,其实是无法做到session共享的,目前通用的解决方案都是通过消息中间件,实现消息的发布与订阅,也就是每一个服务端实例都订阅某个消息队列的topic,根据对应的sessionid来判断是否在本地存储,如果在本地通过sessionid找到了session,则给客户端发送消息,如果在本地找不到对应的session,那么就直接把这条消息丢弃掉。具体的如下图所示:

 这里的图来自于网上,网上大多都是基于redis做发布与订阅,在真实的环境中,我们一般用kafka或者rocketmq等。根据上面的图示,我们介绍下整个流程:

1、我们同时有A,B,C,D四个websocket服务端,同时订阅消息队列的topic: test8
2、我们发送一条消息a1到消息队列的topic:test8
3、此时A,B,C,D四个websocket服务端都会收到这条消息a1
4、A根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,如果没有直接放弃掉此条消息。
5、B根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,如果没有直接放弃掉此条消息。
6、C根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,如果没有直接放弃掉此条消息。
7、D根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,结果找到有对应的session,此时我们就把这条消息发送给=这个session。
8、客户端就收到了对应的消息。

一、创建一个公共的map,用来存放session

package com.websocket.utils;

import java.util.concurrent.ConcurrentHashMap;

import javax.websocket.Session;

import org.springframework.stereotype.Component;

@Component
public class OnlineSessionCache {

	private ConcurrentHashMap<Integer, Session> onlines = new ConcurrentHashMap<Integer, Session>();

	public void setUserSession(Integer userId, Session session) {
		onlines.put(userId, session);
	}

	public Session getUserSession(Integer userId) {
		return onlines.get(userId);
	}

	public void removeUserSession(Integer userId) {
		onlines.remove(userId);
	}
	
	public ConcurrentHashMap<Integer, Session> getAllSession() {
		return this.onlines;
	}

}

二、在websocket连接和关闭的时候,把session关闭掉

@OnOpen
	public void onOpen(Session session,EndpointConfig config) {

		this.session = session;
		log.info("当前session id : {}  登录进来了", session.getId());
		OnlineCalUtils.addOnlineCount();
		onlineSessionCache.setUserSession(Integer.valueOf(session.getId()), session);
		log.info("存储session了多少个session:{}", onlineSessionCache.getAllSession().size());
		log.info("有新连接加入!当前在线人数为 :{} ", getOnlineCount());
	}
@OnClose
	public void onClose() {
		OnlineCalUtils.subOnlineCount();
		log.info("有一连接关闭!当前在线人数为: {}", getOnlineCount());
		onlineSessionCache.removeUserSession(Integer.valueOf(this.session.getId()));
		log.info("当前session id : {}  退出去了");
	}

三、编写一个接口,用来给指定的用户发送消息

package com.websocket.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.websocket.model.ChatModel;
import com.websocket.producer.RocketProducer;
import com.websocket.utils.ChatModelUtils;

import lombok.extern.slf4j.Slf4j;

@RestController
@Slf4j
public class ChatMsgController {

	@Autowired
	private RocketProducer rocketProducer;
	
	@RequestMapping("/sendToSimpleUser")
	public String sendToSimpleUser(Integer fromUserId,Integer toUserId) {
		
		ChatModel model = ChatModelUtils.createNewChatModel(fromUserId, toUserId, "手动发送消息");
		rocketProducer.sendDirectMessage(model);
		
		return "成功";
	}
	
	
}

这里我们是把消息直接发送给了rocketmq里面,发送者代码如下:

package com.websocket.producer;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.websocket.model.ChatModel;

@Component
public class RocketProducer {

	@Autowired
	private RocketMQTemplate rocketMQTemplate;
	
	public void sendDirectMessage(ChatModel message) {
		String msg = JSON.toJSONString(message);
        rocketMQTemplate.syncSend("test8", msg);
	}
	
}

四、编写消费者,获取mq的消息,并且发送消息给对应的session

package com.websocket.producer;

import javax.websocket.Session;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.websocket.model.ChatModel;
import com.websocket.product.SocketServerProduct;
import com.websocket.utils.OnlineSessionCache;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
@RocketMQMessageListener(topic = "test8", consumerGroup = "${chat.group.groupname}")
public class RocketConsumer implements RocketMQListener<String>{

	@Autowired
	private OnlineSessionCache onlineSessionCache;

	@Autowired
	private SocketServerProduct socketServerProduct;
	
	@Value("${chat.group.groupname}")
	private String groupName;
	
	@Override
	public void onMessage(String message) {
		log.info("监听到的topic是:{}  groupname是:{}","test8",groupName);
		ChatModel model = JSON.parseObject(message, ChatModel.class);
		Integer userId = model.getToUserId();
		Session session = onlineSessionCache.getUserSession(userId);
		if (null != session) {
			log.info("找到了对应的session,准备回复消息");
			socketServerProduct.sendMessage(session, model.getMessage());
		}else {
			log.info("没有找到对应的session,准备丢弃");
		}
	}
}

以上就是一个完整的关于websocket服务端集群关于session共享的解决方案。

WebSocket服务端数据推送及心跳机制(Spring Boot + VUE)_websocket心跳机制-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/215273.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

COMP4121Advanced Algorithms

COMP4121Advanced Algorithms WeChat&#xff1a;yj4399_ Sina Visitor System

OpenHarmony 实现屏幕横竖屏

前言 OpenHarmony源码版本&#xff1a;4.0release 开发板&#xff1a;DAYU / rk3568 一、修改“abilities”中的“orientation”实现横竖屏 当我们应用的Alility继承的是UIAbility时&#xff0c;对应的module.json5中的属性是abilities&#xff08;标识当前Module中UIAbili…

Stable diffusion ai图像生成本地部署教程

前言 本文将用最干最简单的方式告诉你怎么将Stable Diffusion AI图像生成软件部署到你的本地环境 关于Stable Diffusion的实现原理和训练微调请看我其他文章 部署Stable Diffusion主要分为三个部分 下载模型&#xff08;模型可以认为是被训练好的&#xff0c;生成图像的大脑…

全自动洗衣机什么牌子好?内衣洗衣机便宜好用的牌子

随着内衣洗衣机的流行&#xff0c;很多小伙伴在纠结该不该入手一款内衣洗衣机&#xff0c;专门来洗一些贴身衣物&#xff0c;答案是非常有必要的&#xff0c;因为我们现在市面上的大型洗衣机只能做清洁&#xff0c;无法对我们的贴身衣物进行一个高强度的清洁&#xff0c;而小小…

Unity对接后台和加载图片

1、前言 在unity中与后台对接&#xff0c;用await在web端暂时还不支持&#xff0c;所以&#xff0c;协程成为比较好的通用方式&#xff0c;以下适用除post访问外的所有对接 2、对接后台 2.1、安装插件 首先我们需要用到Newtonsoft.dll&#xff0c;如果没有这个.dll的请跟着我…

无人机语音中继电台 U-ATC118

简介 甚高频无线电中继通讯系统使用经过适航认证的机载电台连接数字网络传输模块&#xff0c;通过网络远程控制无缝实现无人机操作员与塔台直接语音通话。无人机操作员可以从地面控制站远程操作机载电台进行频率切换、静噪开关、PTT按钮&#xff0c;电台虚拟面板与真实面板布局…

zabbix配置snmp trap--使用snmptrapd和Bash接收器(缺zabbix_trap_handler.sh文中自取)--图文教程

1.前言 我的zabbix的版本是5.0版本&#xff0c;5.0的官方文档没有使用bash接收器的示例&#xff0c;6.0的官方文档有使用bash接收器的示例&#xff0c;但是&#xff0c;下载文件的链接失效&#xff1f;&#xff01; 这里讲解zabbix-server端配置和zabbix web端配置 2.zabbix-…

文章解读与仿真程序复现思路——中国电机工程学报EI\CSCD\北大核心《考虑量化储热的多区域电–热综合能源系统优化调度》

标题 "考虑量化储热的多区域电–热综合能源系统优化调度" 可以分解为几个关键词和短语&#xff0c;我们逐步解读&#xff1a; 考虑量化储热&#xff1a; 考虑&#xff1a; 意味着在解决问题或进行研究时&#xff0c;会综合或纳入特定因素。量化&#xff1a; 将抽象的…

路由策略,gRPC 路由如何实现

目录 一、为啥我们要路由策略&#xff1a; 二、基于gRPC 路由策略 一、为啥我们要路由策略&#xff1a; 我们可以重新回到调用方发起 RPC 调用的流程。在 RPC 发起真实请求的时候&#xff0c;有一个步骤就是从服务提供方节点集合里面选择一个合适的节点&#xff08;就是我们…

【Linux】 OpenSSH_9.3p1 升级到 OpenSSH_9.5p1(亲测无问题,建议收藏)

&#x1f468;‍&#x1f393;博主简介 &#x1f3c5;云计算领域优质创作者   &#x1f3c5;华为云开发者社区专家博主   &#x1f3c5;阿里云开发者社区专家博主 &#x1f48a;交流社区&#xff1a;运维交流社区 欢迎大家的加入&#xff01; &#x1f40b; 希望大家多多支…

node批量修改文件名称

示例目录 在终端中执行即可 node index.js // 第一步&#xff1a;引入 fs 文件系统模块 let fs require("fs"); // 读取目标文件夹名称 const dirName "./img"; // 文件后缀匹配规则 const reg /(?<[.])[a-z]/; // 统一文件名前缀 const fileNam…

SVG-椭圆弧-参数转换-计算公式-标准解读

文章目录 1.简介2.基本参数2.1.椭圆的表达2.2.参数变换2.3.注意事项 3.参考资料4.总结 1.简介 为了与其他路径段表示法保持一致&#xff0c; SVG 路径中的圆弧是根据曲线上的起点和终点定义的。椭圆弧的这种端点参数化。优点是它允许与其它路径一致的语法&#xff0c;其中所有…

22款奔驰GLE450升级香氛负离子 车载香薰功能

相信大家都知道&#xff0c;奔驰自从研发出香氛负离子系统后&#xff0c;一直都受广大奔驰车主的追捧&#xff0c;香氛负离子不仅可以散发出清香淡雅的香气外&#xff0c;还可以对车内的空气进行过滤&#xff0c;使车内的有害气味通过负离子进行过滤&#xff0c;达到车内保持清…

ASP.NET-BS结构的城市酒店入住信息管理系统的设计

2 理论基础 2.1 数据库技术 数据库技术应用中&#xff0c;经常用到的基本概念有&#xff1a;数据库&#xff08;DB&#xff09;、数据库管理系统&#xff08;DBMS&#xff09;、数据库系统&#xff08;DBS&#xff09;、数据库技术及数据模型。 数据库技术是研究数据库的结构、…

第二证券:科创板股票代码?

跟着中国经济的持续展开&#xff0c;出资商场也更加兴盛。科技立异是推动经济展开的基石&#xff0c;因此科技股在近年来的A股商场上备受瞩目。尤其是2019年7月22日&#xff0c;中国科创板正式开板&#xff0c;在A股商场上增添了一份新的亮点。本文将从多个视点深度解析科创板股…

终端安全管理软件是监控软件吗

有些人在后台问&#xff0c;终端安全管理软件是监控软件吗&#xff1f; 先回答&#xff0c;是监控软件。 因为它具有监控的功能&#xff0c;在很大程度上&#xff0c;是可以用来当做监控软件来用的。 终端安全管理软件是一种集中管理终端设备的软件工具&#xff0c;可以在企业…

线程中出现异常的处理

目录 前言 正文 1.线程出现异常的默认行为 2.使用 setUncaughtExceptionHandler() 方法进行异常处理 3.使用 setDefaultUncaughtExceptionHandler() 方法进行异常处理 4.线程组内处理异常 5.线程异常处理的优先性 总结 前言 在紧密交织的多线程环境中&#xff0c;异…

BearPi Std 板从入门到放弃 - 引气入体篇(2)(按键触发外部中断控制LED亮灭)

简介 基于 第一篇文章 的介绍&#xff0c; 我们新增按键的中断控制&#xff1b; 开发板 &#xff1a; Bearpi Std(小熊派标准板) 主芯片: STM32L431RCT6 LED : PC13 \ 推挽输出即可 \ 高电平点亮 KEY1 : PB2 \ 上拉 \ 按下下降沿触发(一次)/上下沿触发(两次&#xff0c;实现…

git常用命令小记

&#xff08;文章正在持续更新中&#xff09; git init - 在当前目录下初始化一个新的 Git 仓库。 git clone [url] - 克隆远程仓库到本地。 git add [file] - 将文件添加到暂存区。 git commit -m "commit message" - 将添加到暂存区的文件提交到本地仓库。 git pus…

python毕业设计论文选题管理系统b615y

毕业论文管理方式效率低下&#xff0c;为了提高效率&#xff0c;特开发了本毕业论文管理系统。本毕业论文管理系统主要实现的功能模块包括学生模块、导师模块和管理员模块三大部分&#xff0c;具体功能分析如下&#xff1a; &#xff08;1&#xff09;导师功能模块&#xff1a;…
最新文章