基于SpringBoot实现WebSocket实时通讯的服务端和客户端

实现功能

服务端注册的客户端的列表;服务端向客户端发送广播消息;服务端向指定客户端发送消息;服务端向多个客户端发送消息;客户端给服务端发送消息;
效果:
请添加图片描述在这里插入图片描述

环境

jdk:1.8
SpringBoot:2.4.17

服务端

1.引入依赖:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2.在启动类上加上开启WebSocket的注解

@EnableWebSocket

3.配置类

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * date created : Created in 2024/3/18 16:57
 * description  : WebSocketConfig 主要解决使用了@ServerEndpoint注解的websocket endpoint不被springboot扫描到的问题
 * class name   : WebSocketConfig
 */
@Configuration
public class WebSocketConfig {

    /**
     * 注入ServerEndpointExporter,
     * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }


}

4.服务端实现

/**
 * date created : Created in 2024/3/18 16:31
 * description  : 服务端实现,方法的封装
 * class name   : WebSocketServer
 */
@Component
@Slf4j
@ServerEndpoint("/websocket/{applicationName}")
public class WebSocketServer {
    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    // 应用名称
    private String applicationName;

    //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
    private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
    // 用来存在线连接用户信息
    private static final ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();


    /**
     * 链接成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "applicationName") String applicationName) {
        try {
            this.session = session;
            this.applicationName = applicationName;
            webSockets.add(this);
            sessionPool.put(applicationName, session);
            log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
            log.info("【当前客户端列表】:"+ sessionPool.keySet());
        } catch (Exception e) {
        }
    }

    /**
     * description : 有连接断开之后的处理方法
     * method name : onClose
     * param       : []
     * return      : void
     */
    @OnClose
    public void onClose() {
        try {
            webSockets.remove(this);
            sessionPool.remove(this.applicationName);
            log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
            log.info("【当前客户端列表】:"+ sessionPool.keySet());
        } catch (Exception e) {
        }
    }

    /**
     * description : 收到客户端消息的处理方法
     * method name : onMessage
     * param       : [message]
     * return      : void
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("【websocket消息】收到客户端消息:" + message);
    }

    /**
     * description : 错误处理
     * method name : onError
     * param       : [session, error]
     * return      : void
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误,原因:" + error.getMessage());
        error.printStackTrace();
    }


    /**
     * description : 广播消息 给所有注册的客户端发送消息
     * method name : sendBroadcastMessage
     * param       : [message]
     * return      : void
     */
    public void sendBroadcastMessage(String message) {
        log.info("【websocket消息】广播消息:" + message);
        for (WebSocketServer webSocket : webSockets) {
            try {
                if (webSocket.session.isOpen()) {
                    webSocket.session.getAsyncRemote().sendText(message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * description : 给指定的客户端发送消息
     * method name : sendApplicationMessage
     * param       : [applicationName 客户端的应用名称, message 要发送的消息]
     * return      : void
     */
    public void sendApplicationMessage(String applicationName, String message) {
        Session session = sessionPool.get(applicationName);
        if (session != null && session.isOpen()) {
            try {
                log.info("【websocket消息】 单点消息:" + message);
                session.getAsyncRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * description : 给多个客户端发送消息
     * method name : sendMassApplicationMessage
     * param       : [applicationNames 注册的客户端的应用名称, message 要发送的消息]
     * return      : void
     */
    public void sendMassApplicationMessage(String[] applicationNames, String message) {
        for (String userId : applicationNames) {
            Session session = sessionPool.get(userId);
            if (session != null && session.isOpen()) {
                try {
                    log.info("【websocket消息】 单点消息:" + message);
                    session.getAsyncRemote().sendText(message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }

}

客户端

1.客户端配置

yaml文件的末尾添加

# websocket的配置
websocket:
  host: localhost
  port: 19022
  prefix: websocket

2.客户端配置类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

/**
 * date created : Created in 2024/3/19 14:36
 * description  : 注入配置文件中的参数 并生成服务端的对应的url
 * class name   : WebSocketProperties
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Component
@ConfigurationProperties(prefix = "websocket")
@Configuration
public class WebSocketProperties {
    @Value("${spring.application.name}")
    String appName;
    String host;
    String port;
    String prefix;
    public String getUrl() {
        return String.format("ws://%s:%s/%s/%s", host, port, prefix,appName);
    }
}

3.客户端实现

import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.stereotype.Component;

import javax.websocket.ClientEndpoint;
import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * date created : Created in 2024/3/18 16:36
 * description  : 客户端接收服务端的实时消息、发送消息等方法的封装
 * class name   : WebSocketClient
 */


@ClientEndpoint
@AutoConfigureBefore(WebSocketProperties.class)
@Component
@Import(WebSocketProperties.class)
@Configuration
public class WebSocketClient {


    private Session session;

    public WebSocketClient() {
        try {
            WebSocketProperties webSocketProperties = SpringUtils.getBean(WebSocketProperties.class);
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
            container.connectToServer(this, new URI(webSocketProperties.getUrl()));
        } catch (DeploymentException | URISyntaxException | IOException e) {
            e.printStackTrace();
        }
    }

    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        System.out.println("Connected to server");
    }

    @OnMessage
    public String onMessage(String message) {
        System.out.println("来自WebSocket的消息: " + message);
        return message;
    }

    @OnClose
    public void onClose() {
        System.out.println("Disconnected from server");
    }

    public void register() {
        try {
            session.getBasicRemote().sendText("register");
            System.out.println("Registered with server");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void unregister() {
        try {
            session.getBasicRemote().sendText("unregister");
            System.out.println("Unregistered from server");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }



}

使用@Autowired注入配置类无法注入,使用工具类获取,工具类:

* Copyright (c) 2020 pig4cloud Authors. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

@Component
public class SpringUtils implements ApplicationContextAware {
	private static ApplicationContext context;

	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		context = applicationContext;
	}

	public static Object getBean(String name) {
		return context.getBean(name);
	}

	public static <T> T getBean(Class<T> clazz) {
		return context.getBean(clazz);
	}

	public static <T> T getBean(String name, Class<T> clazz) {
		return context.getBean(name, clazz);
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/473762.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

some/ip CAN CANFD

关于SOME/IP的理解 在CAN总线的车载网络中&#xff0c;通信过程是面向信号的 当ECU的信号的值发生了改变&#xff0c;或者发送周期到了&#xff0c;就会发送消息&#xff0c;而不考虑接收者是否需要&#xff0c;这样就会造成总线上出现不必要的信息&#xff0c;占用了带宽 …

get_local_ip.bat:快速获取IPv4地址

批处理脚本&#xff0c;用于在Windows命令提示符下获取本地计算机的IPv4地址。 echo off ipconfig | findstr IPv4 pause - echo off&#xff1a;这会关闭命令提示符窗口中的命令回显&#xff0c;使得在运行脚本时不会显示每条命令的执行结果。 - ipconfig&#xff1a;这是一…

流畅的 Python 第二版(GPT 重译)(十三)

第二十四章&#xff1a;类元编程 每个人都知道调试比一开始编写程序要困难两倍。所以如果你在编写时尽可能聪明&#xff0c;那么你将如何调试呢&#xff1f; Brian W. Kernighan 和 P. J. Plauger&#xff0c;《编程风格的要素》 类元编程是在运行时创建或自定义类的艺术。在 P…

元素定位之xpath和css

元素定位 xpath绝对路径相对路径案例xpath策略&#xff08;路径&#xff09;案例xpath策略&#xff08;层级、扩展&#xff09;属性层级与属性层级与属性拓展层级与属性综合 csscss选择器&#xff08;id、类、标签、属性&#xff09;id选择器类选择器标签选择器属性选择器案例-…

按面积筛选填充二值图中的孔洞-python源码

目录 &#x1f64b;&#x1f64b;需求 &#x1f345;&#x1f345;解决方案 &#x1f64b;&#x1f64b;需求 前提条件是二值图中0是背景&#xff0c;255是前景。 二值化后的影像中有很多小孔洞&#xff0c;现在需要按孔洞面积进行筛选&#xff0c;填充面积小于阈值的孔洞&…

何恺明重提十年之争——模型表现好是源于能力提升还是捕获数据集偏见

2011年,知名学者Antonio Torralba和Alyosha Efros提出了“数据集偏差之战”&#xff0c;他们发现机器学习模型很容易“过拟合”到特定的数据集上&#xff0c;导致在其他数据集上表现不佳。过去十年&#xff0c;随着深度学习革命的到来&#xff0c;建立多样化、大规模、全面且尽…

三级数据库技术考点(详解!!)

1、 答疑:【解析】分布式数据库系统按不同层次提供的分布透明性有:分片透明性;②位置透明性;③局部映像透明性&#xff0c;位置透明性是指数据分片的分配位置对用户是透明的&#xff0c;用户编写程序时只需 要考虑数据分片情况&#xff0c;不需要了解各分片在各个场地的分配情…

GitHub配置SSH Key(详细版本)

GitHub配置SSH Key的目的是为了帮助我们在通过git提交代码是&#xff0c;不需要繁琐的验证过程&#xff0c;简化操作流程。比如新建的仓库可以下载, 但是提交需要账号密码。 步骤 一、设置git的user name和email 如果你是第一次使用&#xff0c;或者还没有配置过的话需要操作…

zookeeper底层细节

zk 临时节点和watch机制实现注册中心自动注册和发现&#xff0c;数据都在内存&#xff0c;nio 多线程模型&#xff1b; cp注重一致性&#xff0c;数据不一致时集群不可用 事务请求处理方式 1.all事务由唯一服务器处理 2.将客户端事务请求转成proposal分发follower 3.等待半…

基于Jenkins + Argo 实现多集群的持续交付

作者&#xff1a;周靖峰&#xff0c;青云科技容器顾问&#xff0c;云原生爱好者&#xff0c;目前专注于 DevOps&#xff0c;云原生领域技术涉及 Kubernetes、KubeSphere、Argo。 前文概述 前面我们已经掌握了如何通过 Jenkins Argo CD 的方式实现单集群的持续交付&#xff0c…

Maven,pom.xml,查找 子jar包

在IDEA打开pom.xml&#xff0c;会看到这里&#xff1a; 然后如果有需要&#xff0c;把相关的 子jar包 去掉 <dependency><groupId>XXX</groupId><artifactId>XXX</artifactId><exclusions><exclusion><artifactId>xxx</a…

Node.js核心命令与工具:提升开发效率的实用指南

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

HarmonyOS NEXT应用开发之Web组件预览PDF文件实现案例

介绍 本案例通过Web组件实现预览本地PDF文件和预览网络PDF文件&#xff0c;代码为Tabs容器组件包含了两个独立的TabContent子组件&#xff0c;分别标示为预览本地PDF文件和预览网络PDF文件。每个子组件内部构建一个Web组件。第一个Web组件利用resource协议关联本地PDF文件路径…

Docker系列

目录 练习&#xff1a;去DockerHub搜索并拉取一个Redis镜像 练习&#xff1a;去DockerHub搜索并拉取一个Redis镜像 目标&#xff1a; 1&#xff09;去DockerHub搜索Redis镜像 2&#xff09;查看Redis镜像的名称和版本 3&#xff09;利用docker pull命令拉取镜像 查看是否…

计算机网络简答题:复试+期末

文章目录 1.计算机网络的功能:2.计算机网络的分类:3.主机间的通信方式:4.电报交换、报文交换、分组交换的区别:5.计算机网络的性能指标:6.0SI模型和TCP/IP模型:7.通信信通的方式:8.端到端的通信与点到点通信的区别:9.同步通信和异步通信:10.频分复用、时分复用、波分复用和码分…

【Qt】常用控件

目录 一、控件概述 二、QWidget 三、Buttons类控件 3.1 QPushButton 3.2 QRadioButton 3.3 QCheckBox 3.4 QToolButton 四、Display Widgets&#xff08;显示类控件&#xff09; 4.1 QLabel 4.2 QLCDNumber 4.3 QProgressBar 4.4 QCalendarWidget 五、Input Widge…

深度学习 | 神经网络

一、神经网络原理 1、神经元模型 虽然叫个神经元&#xff0c;但骨子里还是线性模型。 2、神经网络结构 顾名思义就是由很多个神经元结点前后相连组成的一个网络。虽然长相上是个网络&#xff0c;但是本质上是多个线性模型的模块化组合。 在早期也被称为 多层感知机 Multi-Layer…

【java】java环境变量分类

测试代码&#xff1a; public class TestSys {public static void main(String[] args) {/*** 获取所有的系统环境变量*/Map<String, String> map System.getenv();map.forEach((key, value) -> System.out.printf("env&#xff1a;key:%s->value:%s%n"…

AQS源码分析

前言 AbstractQueuedSynchronizer是抽象同步队列&#xff0c;其是实现同步机器的基础组件&#xff0c;并发包中的锁的底层就是使用AQS实现的。AQS中 维护了一个volatile int state&#xff08;代表共享资源&#xff09;和一个FIFO线程等待队列&#xff08;多线程争用资源被阻塞…

监控系统prometheus+grafana+发送告警信息

1、基础环境准备两台或更多的主机 2、关闭selinux vi /etc/selinux/config&#xff0c;修改SELINUX的值为disabled 3、关闭防火墙 systemctl disable firewalld systemctl stop firewalld 4、prometheus官网下载 https://prometheus.io/download/ 5、grafana官网下载 https…
最新文章