Springboot项目使用原生Websocket

目录

  • 1.启用Websocket功能
  • 2.封装操作websocket session的工具
  • 3.保存websocket session的接口
  • 4.保存websocket session的类
  • 5.定义websocket 端点
  • 6.创建定时任务 ping websocket 客户端

1.启用Websocket功能

package com.xxx.robot.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
@EnableWebSocket
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpoint() {
        return new ServerEndpointExporter();
    }

}

2.封装操作websocket session的工具

package com.xxx.robot.websocket.util;

import java.util.Map;

import javax.websocket.Session;

import org.apache.tomcat.websocket.Constants;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;

import com.xxx.framework.security.config.MyUserDetails;
import com.xxx.framework.security.entity.LoginUser;
import com.xxx.user.entity.User;

public final class WebSocketSessionUtils {

    private WebSocketSessionUtils() {}

	public static final int WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;
    
    public static final int WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;
    
    /**
     * websocket block 发送超时 毫秒
     */
    public static final long WEBSOCKET_BLOCKING_SEND_TIMEOUT = 10 * 1000;

	/**
	 * 从 websocket session 中找到登录用户
	 * 其中 MyUserDetails 继承自 org.springframework.security.core.userdetails.User
	 * LoginUser、User 从业务层自定义的类
	 * 项目中使用了spring security框架
	 */
    public static User findUser (Session session) {
        UsernamePasswordAuthenticationToken uToken = (UsernamePasswordAuthenticationToken) session.getUserPrincipal();
        MyUserDetails userDetails = (MyUserDetails) uToken.getPrincipal();
        LoginUser loginUser = (LoginUser) userDetails.getUserData();
        return (User) loginUser.getAdditionalInfo();
    }
    
    /**
     * 给 websocket session 设置参数
     */
    public static void setProperties(Session session) {
    	//设置websocket文本消息的长度为8M,默认为8k
        session.setMaxTextMessageBufferSize(WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE);
        //设置websocket二进制消息的长度为8M,默认为8k
        session.setMaxBinaryMessageBufferSize(WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE);
        Map<String, Object> userProperties = session.getUserProperties();
        //设置websocket发送消息的超时时长为10秒,默认为20秒
        userProperties.put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, WEBSOCKET_BLOCKING_SEND_TIMEOUT);
    }
}

3.保存websocket session的接口

package com.xxx.robot.websocket;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

import javax.websocket.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface WebSocketSessionManager {
    Logger log = LoggerFactory.getLogger(WebSocketSessionManager.class);
    
    String PING = "ping";
    String PONG = "pong";
    
    Session get (String key);
    
    List<String> keys();

    void add (String key, Session session);
    
    Session remove (String key);
    
    /**
     * ping每一个websocket客户端,如果ping超时,则触发由@OnError注释的方法
     */
    default void pingBatch () {
        List<String> keyList = keys();
        log.info("WebSocket: {} 数量为:{}", this.getClass().getSimpleName(), keyList.size());
        for (String key : keyList) {
            if (key != null) {
                Session session = get(key);
                if (session != null) {
                    try {
                        session.getBasicRemote().sendPing(ByteBuffer.wrap(PING.getBytes()));
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e1) {
                        }
                    } catch (Exception e) {
                        log.error("WebSocket-ping异常", e);
                    }
                }
            }
        }
    }
    
    /**
     * 消除所有websocket客户端
     */
    default void clearAllSession () {
        List<String> keyList = keys();
        int i = 0;
        for (String key : keyList) {
            if (key != null) {
                Session session = get(key);
                if (session != null) {
                    try {
                        remove(key);
                        i++;
                        session.close();
                    } catch (IOException e1) {
                        log.error("WebSocket-移除并关闭session异常", e1);
                    }
                    if (i % 10 == 0) {
                        try {
                            Thread.sleep(0);
                        } catch (InterruptedException e1) {
                        }
                    }
                }
            }
        }
        log.info("WebSocket-移除并关闭session数量为:{}", i);
    }
}

4.保存websocket session的类

package com.xxx.robot.websocket.robot.manager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

import javax.websocket.Session;

import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import com.xxx.robot.websocket.WebSocketSessionManager;

/**
 * 机器人模块WebSocket Session管理器
 */
@Component
public class RobotSessionManager implements WebSocketSessionManager {
    
    /**
     * key = userId + '-' + managerId
     * userId 从当前登录用户中可得到, managerId由客户端连接websocket时按服务端的接口传给服务端
     * 因为业务中不仅要获取每一个客户端,还要获取同一个用户下的所有客户端,所以由ConcurrentHashMap改为ConcurrentSkipListMap
     */
    private static final ConcurrentSkipListMap<String, Session> SESSION_POOL = new ConcurrentSkipListMap<>();
    
    public static final String joinKey (String userId, String managerId) {
        return userId + '-' + managerId;
    }

    public static final String joinKey (Long userId, String managerId) {
        return userId.toString() + '-' + managerId;
    }
    
    public static final String[] splitKey (String key) {
        return StringUtils.split(key, '-');
    }

    @Override
    public Session get(String key) {
        return SESSION_POOL.get(key);
    }
    
    /**
     * 根据用户ID查询所有websocket session的key
     * @param userId
     * @param excludeManagerId 排除的key, 可为空
     * @return
     */
    public List<String> keysByUserId(String userId, String excludeManagerId) {
    	//'-'的ascii码为45, '.'的ascii码为46, 所以下面获得的是key以 userId + '-' 为前缀的map视图
        ConcurrentNavigableMap<String, Session> subMap = SESSION_POOL.subMap(userId + '-', userId + '.');
        NavigableSet<String> keySet = subMap.navigableKeySet();
        List<String> list = new ArrayList<>();
        if (StringUtils.isBlank(excludeManagerId)) {
            for (String key : keySet) {
                if (key != null) {
                    list.add(key);
                }
            }
        } else {
            for (String key : keySet) {
                if (key != null && !key.equals(excludeManagerId)) {
                    list.add(key);
                }
            }
        }
        return list;
    }

    @Override
    public List<String> keys() {
        NavigableSet<String> keySet = SESSION_POOL.navigableKeySet();
        List<String> list = new ArrayList<>();
        for (String key : keySet) {
            if (key != null) {
                list.add(key);
            }
        }
        return list;
    }

    @Override
    public synchronized void add(String key, Session session) {
        removeAndClose(key);
        SESSION_POOL.put(key, session);
    }

    @Override
    public synchronized Session remove(String key) {
        return SESSION_POOL.remove(key);
    }
    
    /**
     * 必须key和value都匹配才能删除
     */
    public synchronized void remove(String key, Session session) {
        SESSION_POOL.remove(key, session);
    }
    
    private void removeAndClose (String key) {
        Session session = remove(key);
        if (session != null) {
            try {
                session.close();
            } catch (IOException e) {
            }
        }
    }

}

5.定义websocket 端点

package com.xxx.robot.websocket.robot.endpoint;

import java.util.Map;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.JsonNode;
import com.xxx.framework.util.SpringBeanUtils;
import com.xxx.user.entity.User;
import com.xxx.robot.corefunc.service.RobotCoreService;
import com.xxx.robot.util.serial.BaseJsonUtils;
import com.xxx.robot.websocket.WebSocketSessionManager;
import com.xxx.robot.websocket.robot.manager.RobotSessionManager;
import com.xxx.robot.websocket.util.WebSocketSessionUtils;

import lombok.extern.slf4j.Slf4j;

/**
 * 机器人模块WebSocket接口
 * 每一次websocket请求,RobotWebSocketServer都是一个新的实例,所以成员变量是安全的
 * 以致虽然类由@Component注释,但不可使用@Autowired等方式注入bean
 */
@Slf4j
@Component
@ServerEndpoint(value = "/robot/{id}")
public class RobotWebSocketServer {
    
    private volatile User user;
    
    private volatile String id;
    
    private volatile Session session;
    
    private volatile Map<String, RobotCoreService> robotCoreServiceMap;

    /**
     * 所有初始化操作都写在@OnOpen注释的方法中
     * 连接成功
     * @param session
     */
    @OnOpen
    public void onOpen(@PathParam("id") String id, Session session) {
        WebSocketSessionUtils.setProperties(session);
        this.user = WebSocketSessionUtils.findUser(session);
        this.id = id;
        this.session = session;
        log.info("连接成功:{}, {}", id, this.user.getUserCode());
        //使用BeanUtils代替@Autowired获取bean, 
        //RobotCoreService为业务类,不必关心
        robotCoreServiceMap = SpringBeanUtils.getApplicationContext().getBeansOfType(RobotCoreService.class);
        RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);
        //保存websocket session
        robotSessionManager.add(RobotSessionManager.joinKey(this.user.getId(), id), session);
    }

    /**
     * 连接关闭
     * @param session
     */
    @OnClose
    public void onClose() {
        log.info("连接关闭:{}, {}", this.id, this.user.getUserCode());
        RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);
        //连接关闭时,使用两个参数的remove方法,多线程下安全删除
        robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);
    }
    
    @OnError
    public void onError(Throwable error) {
        log.error("onError:id = {}, {}, {}", this.id, this.session.getId(), this.user.getUserCode(), error);
        RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);
        //websocket异常时,使用两个参数的remove方法,多线程下安全删除
        //比如ping客户端超时,触发此方法,删除该客户端
        robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);
    }

    /**
     * 接收到消息
     * @param message
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("onMessage:id = {}, {}, {}", this.id, this.user.getUserCode(), message);
        if (WebSocketSessionManager.PING.equals(message)) {
        	//自定义ping接口,收到ping后,响应pong,客户端暂时未使用此接口
            this.session.getAsyncRemote().sendText(WebSocketSessionManager.PONG);
            return;
        }
        //用 try...catch 包裹防止抛出异常导致websocket关闭
        try {
        	//业务层,使用jackson反序列化json,不必关心具体的业务
            JsonNode root = BaseJsonUtils.readTree(message);
            String apiType = root.at("/apiType").asText();
            //业务层代码应在子线程中执行,防止wesocket线程执行时间过长导致websocket关闭
            robotCoreServiceMap.get(apiType + "Service").receiveFrontMessage(this.user, RobotSessionManager.joinKey(this.user.getId(), this.id), root);
        } catch (Exception e) {
            log.error("处理消息错误", e);
        }
    }
    
}

在这里插入图片描述

6.创建定时任务 ping websocket 客户端

package com.xxx.robot.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * 启用定时任务功能
 * 因为websocket session是有状态的,只能保存在各自的服务端,
 * 所以只能使用单机式的定时任务,而不能使用分布式定时任务,
 * 因此 springboot自带的定时任务功能成为了首选
 * springboot定时任务线程池
 */
@Configuration
@EnableScheduling
public class TaskExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(10);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("scheduler-executor-");
        return executor;
    }

}
package com.xxx.robot.websocket;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
 * @author Sunzhihua
 */
@Slf4j
@Component
public class WebSocketSchedulerTask {
    
    /**
     * 注入所有的 websocket session 管理器
     */
    @Autowired
    private List<WebSocketSessionManager> webSocketSessionManagers;

	/**
	 * initialDelay 表示 延迟60秒初始化
	 * fixedDelay 表示 上一次任务结束后,再延迟30秒执行
	 */
    @Scheduled(initialDelay = 60000, fixedDelay = 30000)
    public void clearInvalidSession() {
        try {
            log.info("pingBatch 开始。。。");
            for (WebSocketSessionManager webSocketSessionManager : webSocketSessionManagers) {
                webSocketSessionManager.pingBatch();
            }
            log.info("pingBatch 完成。。。");
        } catch (Exception e) {
            log.error("pingBatch异常", e);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/30887.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

NSSCTF MOBILE [SWPU 2019]easyapp 详细题解

文章目录 一. 前言二. 安装安卓SDK三. 安装安卓模拟器(推荐夜神模拟器)四. 安装frida和objection五. 解题过程六. 总结 一. 前言 题目地址:[SWPU 2019]easyapp大佬题解[SWPU 2019]easyapp pwjcw的WriteUp 大佬的题解很简单,直接hook就可以看到返回值,但是我看了半天没看明白是…

Redis简介

Redis是基于内存&#xff0c;也可以基于磁盘持久化nosql数据库&#xff0c;使用c语言开发。 数据存储结构&#xff1a;key-value 安装环境准备 Redis使用c语言开发&#xff0c;需要使用gcc编译程序进行编译。 1&#xff09; 安装gcc a) 从磁盘镜像中进行安装&#xff1a;&…

决策树分类算法

#CSDN AI写作助手创作测评 目录 ID3算法 1.算法原理 2.代码实现 3.ID3算法的优缺点分析 C4.5算法 1.原理 2.优缺点 心得感受 决策树表示方法是应用最广泛的逻辑方法之一&#xff0c;它从一组无次序、无规则的事例中推理出决策树表示形式的分类规则。在决策树的内部…

Java集合详解

集合详解 1、集合&#xff0c;也可以说是容器。由两大接口派生而来&#xff0c;一个是collection&#xff0c;主要用于存储单一元素&#xff1b;另一个是map接口&#xff0c;主要用于存储键值对。 Collection接口 Map接口 2、集合和数组 在之前我们保存多个数据可以使用数组…

Tcl常用命令备忘录-正则命令篇

正则表达式是一种用于匹配、查找、替换文本中特定模式的工具。在Tcl脚本中&#xff0c;可以使用正则表达式对字符串进行匹配、查找和替换。 regexp 语法&#xff1a; regexp ?选项? 正则表达式 字符串 ?变量1 变量2 ...? 其中&#xff0c;?选项?为可选项&#xff0c;…

Spring Security OAuth2授权原理、流程与源码解读

文章目录 前言AuthorizationServerConfigurerAdapter(身份认证服务配置适配器)OAuth2AuthorizationServerConfiguration(OAuth2授权服务配置) EnableAuthorizationServer(开启身份认证服务)AuthorizationServerEndpointsConfigurations身份认证服务站点配置类AuthorizationEndp…

Qt编写精美输入法(历时十年迭代/可换肤/支持Qt4/5/6/win/linux/mac/嵌入式等)

一、前言 大概是从2012年就开始研究用Qt写输入法&#xff0c;因为项目需要&#xff0c;嵌入式板子上&#xff0c;没有对应的输入法&#xff0c;当初使用过很多NVR&#xff0c;里面也是鼠标按下弹出输入法面板进行输入&#xff0c;可以切换数字和字母及中文&#xff0c;于是借鉴…

jmeter如何进行http压力测试

目录 前言&#xff1a; 1、添加线程组&#xff1a; 2、添加采样器&#xff1a; 3、添加监视器 压力测试知识说明 前言&#xff1a; JMeter是一个基于Java的开源压力测试工具&#xff0c;可用于评估Web应用程序的性能&#xff0c;包括HTTP、HTTPS、FTP、SOAP、Restful、JD…

Oracle-高版本SQL优化分析(bind mismatch)

背景: 接到用户报障说一套Oracle19c数据库近期出现insert语句执行变慢的情况&#xff0c;执行一次数据插入需要1秒的时间&#xff0c;而且问题发生的数据库是跑在一体机上面&#xff0c;数据插入正常不应该这么慢&#xff0c;需要分析插入慢的原因 问题: 数据库近期出现insert…

StarRocks 文章收集

StarRocks在58的实践 StarRocks在58的实践 - 墨天轮StarRocks在58的实践 --2022-06-08https://www.modb.pro/db/639611 StarRocks之系统架构 StarRocks之系统架构 - 墨天轮https://www.modb.pro/db/610300 StarRocks小规模集群部署最佳实践(1/2) 0016.S StarRocks小规模集…

2自由度并联仿生腿的制作

1. 运动功能说明 本文实例将实现2自由度并联仿生腿模组运动起来&#xff0c;模拟实现狗腿行走的动作。 2. 结构说明 2自由度并联仿生腿模组是由两个舵机驱动的&#xff0c;它的所有动作都将在两个舵机的配合运动下实现。 3. 运动原理说明 2自由度并联仿生腿模组运动的点位如下…

数据结构-各种树(二叉树、二叉查找树、平衡二叉树、红黑树、B树、B+树)

文章目录 二叉树二叉查找树平衡二叉树红黑树B树B树 二叉树 概念&#xff1a;二叉树&#xff08;binary tree&#xff09;是指树中节点的度不大于2的有序树&#xff0c;它是一种最简单且最重要的树。二叉树的递归定义为&#xff1a;二叉树是一棵空树&#xff0c;或者是一棵由一…

2023 年6月开发者调查统计结果——最流行的技术(1)

2023 年6月开发者调查统计结果——最流行的技术&#xff08;1&#xff09; 本文目录&#xff1a; 一、编程、脚本和标记语言 二、数据库 三、云平台 四、网络框架和技术 五、其他框架和库 六、其他工具 七、集成开发环境 八、异步工具 九、同步工具 ​十、操作系统 …

端午出行电脑没网怎么办?无线网卡解决网络问题

无线网卡是一种可以让电脑或其他设备通过无线信号连接网络的硬件设备&#xff0c;无线网卡有多种类型和接口&#xff0c;例如USB无线网卡&#xff0c;PCI-E无线网卡&#xff0c;PCMCIA无线网卡等。端午出行在即&#xff0c;不妨看看驱动人生准备的无线网卡攻略&#xff0c;让大…

基于Python的招聘信息可视化系统,附源码

文章目录 1 简介2 技术栈3 总体设计3.1 系统结构3.2 数据库设计3.2.1 数据库实体3.2.2 数据库表设计 4 运行设计4.1 招聘热门行业分析4.2热门岗位分析界面4.3招聘岗位学历分析界面4.4岗位分布分析界面 5 源码下载 1 简介 基于Python的招聘信息可视化系统,通过对招聘数据进行分…

MFC扩展库BCGControlBar Pro v33.5亮点 - Ribbon Bar等全新升级

BCGControlBar库拥有500多个经过全面设计、测试和充分记录的MFC扩展类。 我们的组件可以轻松地集成到您的应用程序中&#xff0c;并为您节省数百个开发和调试时间。 BCGControlBar专业版 v33.5已正式发布了&#xff0c;此版本包含了Ribbon&#xff08;功能区&#xff09;自定义…

Linux国产操作系统,UCA-系统工程师学习必备技能,使用dpkg管理软件包、apt命令、内网获取依赖包及源码安装

目录 ​编辑 1.使用dpkg管理软件包 2.apt命令 3.内网获取依赖包 4.源码安装 1.使用dpkg管理软件包 第一种方法当然可以上网搜索软件安装包&#xff0c;下载然后解压成软件。 第二种也就是我接下来要介绍的&#xff0c;dpkg 命令&#xff0c;dpkg 全称叫做debian package…

步长(stride) | 填充(padding) | 扩长(dilation)

这几个名词中文真的好难翻译&#xff0c;不是大佬就不要造名词了&#xff0c;后面还是老老实实用英文吧&#xff01;&#xff08;标题是机翻的 。&#xff09; stride stride 很好理解&#xff0c;stride 就是卷积核移动的步长。 如下图&#xff1a; stride1 stride2 paddi…

技术新动向 | 谷歌云大举扩展安全 AI 生态系统

【本文由 Cloud Ace 整理发布&#xff0c; Cloud Ace 是谷歌云全球战略合作伙伴&#xff0c;拥有 300 多名工程师&#xff0c;也是谷歌最高级别合作伙伴&#xff0c;多次获得 Google Cloud 合作伙伴奖。作为谷歌托管服务商&#xff0c;我们提供谷歌云、谷歌地图、谷歌办公套件…

【设计模式】SpringBoot优雅使用策略模式

文章目录 1.概述1.1.简述策略模式 2.实现方法2.1.实现思路2.2.实现代码2.3.策略拓展2.4.执行调用 3.总结 1.概述 本篇文章主要会描述SpringBoot与策略模式的结合使用&#xff0c;因为不涉及到理论部分&#xff0c;所以在阅读本篇之前&#xff0c;需要对策略模式的理论已经有了…
最新文章