使用 Netty 实现群聊功能的步骤和注意事项

文章目录

  • 前言
  • 声明
  • 功能说明
  • 实现步骤
    • WebSocket 服务启动
    • Channel 初始化
    • HTTP 请求处理
    • HTTP 页面内容
    • WebSocket 请求处理
  • 效果展示
  • 总结

前言

通过之前的文章介绍,我们可以深刻认识到Netty在网络编程领域的卓越表现和强大实力。这篇文章将介绍如何利用 Netty 框架开发一个 WebSocket 服务端,从而实现一个简单的在线聊天功能。

声明

文章中所提供的代码仅供参考,旨在帮助无 Netty 经验的开发人员快速上手。请注意,这些代码并不适用于实际应用中

功能说明

聊天页面

  • 用户进入页面后,会看到一个简单的文本框,可以用来发送消息。
  • 页面下方会显示聊天的消息内容。

服务端主要有以下三个功能:

  • 响应聊天页面:用来接收和响应聊天页面的请求。
  • 处理消息:对接收到的消息进行处理。
  • 实现群聊功能:提供群聊的功能,使多个用户能够在同一个聊天室中进行交流。

功能很简单,但是可以通过这个示例实现更多复杂的场景。

实现步骤

创建一个简单的 Maven 项目,直接引入 netty-all 包即可编码。

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.28.Final</version>
</dependency>

实现该功能共有五个类,如下:

├── MakeIndexPage.java
├── ProcessWsIndexPageHandler.java
├── ProcesssWsFrameHandler.java
├── WebSocketServer.java
└── WebSocketServerInitializer.java

下面对实现该功能所涉及的五个类的代码进行详细说明

WebSocket 服务启动

这个类是一个基于 Netty 启动的常规服务端。它包含了一些配置项,包括 Reactor 模式、IO 类型以及消息处理配置,大部分都是这样。代码如下:

/**
 * 类说明:
 */
public final class WebSocketServer {

    /*创建 DefaultChannelGroup,用来保存所
    有已经连接的 WebSocket Channel,群发和一对一功能可以用上*/
    private final static ChannelGroup channelGroup =
            new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);

    static final boolean SSL = false;//是否启用ssl
    /*通过ssl访问端口为8443,否则为8080*/
    static final int PORT
            = Integer.parseInt(
                    System.getProperty("port", SSL? "8443" : "80"));

    public static void main(String[] args) throws Exception {
        /*SSL配置*/
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(),
                    ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new WebSocketServerInitializer(sslCtx,channelGroup));

            Channel ch = b.bind(PORT).sync().channel();

            System.out.println("打开浏览器访问: " +
                    (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');

            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

Channel 初始化

这个类的主要功能是创建了一个 ChannelInitializer,用于初始化 ChannelPipeline,并添加了一些通道处理器。这些处理器包括由Netty提供的处理SSL协议、处理HTTP协议和支持WebSocket协议的功能,还有一些由业务自定义的处理器,用于处理页面展示和处理WebSocket数据。代码如下:

/**
 * 类说明:增加handler
 */
public class WebSocketServerInitializer
        extends ChannelInitializer<SocketChannel> {

    private final ChannelGroup group;

    /*websocket访问路径*/
    private static final String WEBSOCKET_PATH = "/chat";

    private final SslContext sslCtx;

    public WebSocketServerInitializer(SslContext sslCtx,ChannelGroup group) {
        this.sslCtx = sslCtx;
        this.group = group;
    }

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (sslCtx != null) {
            pipeline.addLast(sslCtx.newHandler(ch.alloc()));
        }
        /*增加对http的支持*/
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(65536));

        /*Netty提供,支持WebSocket应答数据压缩传输*/
        pipeline.addLast(new WebSocketServerCompressionHandler());
        /*Netty提供,对整个websocket的通信进行了初始化(发现http报文中有升级为websocket的请求)
        ,包括握手,以及以后的一些通信控制*/
        pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,
                null, true));

        /*浏览器访问时展示index页面*/
        pipeline.addLast(new ProcessWsIndexPageHandler(WEBSOCKET_PATH));

        /*对websocket的数据进行处理*/
        pipeline.addLast(new ProcesssWsFrameHandler(group));
    }
}

HTTP 请求处理

这个类的主要功能是在收到 HTTP 请求时,当 URI 为“/”或“/index.html”时,会返回一个聊天界面作为响应。代码如下:

/**
 * 类说明:对http请求,将index的页面返回给前端
 */
public class ProcessWsIndexPageHandler
        extends SimpleChannelInboundHandler<FullHttpRequest> {

    private final String websocketPath;

    public ProcessWsIndexPageHandler(String websocketPath) {
        this.websocketPath = websocketPath;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx,
                                FullHttpRequest req) throws Exception {
        // 处理错误或者无法解析的http请求
        if (!req.decoderResult().isSuccess()) {
            sendHttpResponse(ctx, req,
                    new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
            return;
        }

        //只允许Get请求
        if (req.method() != GET) {
            sendHttpResponse(ctx, req,
                    new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
            return;
        }

        // 发送index页面的内容
        if ("/".equals(req.uri()) || "/index.html".equals(req.uri())) {
            //生成WebSocket的访问地址,写入index页面中
            String webSocketLocation
                    = getWebSocketLocation(ctx.pipeline(), req,
                    websocketPath);
            System.out.println("WebSocketLocation:["+webSocketLocation+"]");
            //生成index页面的具体内容,并送往浏览器
            ByteBuf content
                    = MakeIndexPage.getContent(
                            webSocketLocation);
            FullHttpResponse res = new DefaultFullHttpResponse(
                    HTTP_1_1, OK, content);

            res.headers().set(HttpHeaderNames.CONTENT_TYPE,
                    "text/html; charset=UTF-8");
            HttpUtil.setContentLength(res, content.readableBytes());

            sendHttpResponse(ctx, req, res);
        } else {
            sendHttpResponse(ctx, req,
                    new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    /*发送应答*/
    private static void sendHttpResponse(ChannelHandlerContext ctx,
                                         FullHttpRequest req,
                                         FullHttpResponse res) {
        // 错误的请求进行处理 (code<>200).
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),
                    CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            HttpUtil.setContentLength(res, res.content().readableBytes());
        }

        // 发送应答.
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        //对于不是长连接或者错误的请求直接关闭连接
        if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    /*根据用户的访问,告诉用户的浏览器,WebSocket的访问地址*/
    private static String getWebSocketLocation(ChannelPipeline cp,
                                               HttpRequest req,
                                               String path) {
        String protocol = "ws";
        if (cp.get(SslHandler.class) != null) {
            protocol = "wss";
        }
        return protocol + "://" + req.headers().get(HttpHeaderNames.HOST)
                + path;
    }
}

HTTP 页面内容

这个类的主要目的是生成一个包含消息发送框和内容展示功能的HTML页面,并实现WebSocket的相关功能,包括建立连接、向服务端发送消息以及接收服务端的响应。当然,也可以单独写一个HTML文件。代码如下:

/**
 * 类说明:生成index页面的内容
 */
public final class MakeIndexPage {

    private static final String NEWLINE = "\r\n";

    public static ByteBuf getContent(String webSocketLocation) {
        return Unpooled.copiedBuffer(
                "<html><head><title>Web Socket Test</title><meta charset=\"utf-8\" /></head>"
                        + NEWLINE +
                "<body>" + NEWLINE +
                "<script type=\"text/javascript\">" + NEWLINE +
                "var socket;" + NEWLINE +
                "if (!window.WebSocket) {" + NEWLINE +
                "  window.WebSocket = window.MozWebSocket;" + NEWLINE +
                '}' + NEWLINE +
                "if (window.WebSocket) {" + NEWLINE +
                "  socket = new WebSocket(\"" + webSocketLocation + "\");"
                        + NEWLINE +
                "  socket.onmessage = function(event) {" + NEWLINE +
                "    var ta = document.getElementById('responseText');"
                        + NEWLINE +
                "    ta.value = ta.value + '\\n' + event.data" + NEWLINE +
                "  };" + NEWLINE +
                "  socket.onopen = function(event) {" + NEWLINE +
                "    var ta = document.getElementById('responseText');"
                        + NEWLINE +
                "    ta.value = \"Web Socket opened!\";" + NEWLINE +
                "  };" + NEWLINE +
                "  socket.onclose = function(event) {" + NEWLINE +
                "    var ta = document.getElementById('responseText');"
                        + NEWLINE +
                "    ta.value = ta.value + \"Web Socket closed\"; "
                        + NEWLINE +
                "  };" + NEWLINE +
                "} else {" + NEWLINE +
                "  alert(\"Your browser does not support Web Socket.\");"
                        + NEWLINE +
                '}' + NEWLINE +
                NEWLINE +
                "function send(message) {" + NEWLINE +
                "  if (!window.WebSocket) { return; }" + NEWLINE +
                "  if (socket.readyState == WebSocket.OPEN) {" + NEWLINE +
                "    socket.send(message);" + NEWLINE +
                "  } else {" + NEWLINE +
                "    alert(\"The socket is not open.\");" + NEWLINE +
                "  }" + NEWLINE +
                '}' + NEWLINE +
                "</script>" + NEWLINE +
                "<form οnsubmit=\"return false;\">" + NEWLINE +
                "<input type=\"text\" name=\"message\" " +
                        "value=\"Hi, 你好啊\"/>" +
                "<input type=\"button\" value=\"发送\""
                        + NEWLINE +
                "       οnclick=\"send(this.form.message.value)\" />"
                        + NEWLINE +
                "<h3>消息内容</h3>" + NEWLINE +
                "<textarea id=\"responseText\" " +
                        "style=\"width:500px;height:300px;\"></textarea>"
                        + NEWLINE +
                "</form>" + NEWLINE +
                "</body>" + NEWLINE +
                "</html>" + NEWLINE, CharsetUtil.UTF_8);
    }

}

WebSocket 请求处理

这个类的主要功能是处理与 Channel 相关的事件。例如,当一个 Channel 连接成功时,会将该 Channel 添加到一个 ChannelGroup 中。当接收到该 Channel 的数据时,可以通过向 ChannelGroup 写入数据来实现群聊效果。代码如下

/**
 * 类说明:对websocket的数据进行处理
 */
public class ProcesssWsFrameHandler
        extends SimpleChannelInboundHandler<WebSocketFrame> {

    private final ChannelGroup group;

    public ProcesssWsFrameHandler(ChannelGroup group) {
        this.group = group;
    }

    private static final Logger logger
            = LoggerFactory.getLogger(ProcesssWsFrameHandler.class);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx,
                                WebSocketFrame frame) throws Exception {
        //判断是否为文本帧,目前只处理文本帧
        if (frame instanceof TextWebSocketFrame) {
            // Send the uppercase string back.
            String request = ((TextWebSocketFrame) frame).text();
            logger.info("{} received {}", ctx.channel(), request);
//            ctx.channel().writeAndFlush(
//                    new TextWebSocketFrame(request.toUpperCase(Locale.CHINA)));
            /*群发实现:一对一道理一样*/
            group.writeAndFlush(new TextWebSocketFrame(
                    ctx.channel().remoteAddress() + " :" + request.toUpperCase(Locale.CHINA)));
        } else {
            String message = "unsupported frame type: "
                    + frame.getClass().getName();
            throw new UnsupportedOperationException(message);
        }
    }

    /*重写 userEventTriggered()方法以处理自定义事件*/
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx,
                                   Object evt) throws Exception {
        /*检测事件,如果是握手成功事件,做点业务处理*/
        if (evt == WebSocketServerProtocolHandler
                .ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {

            //通知所有已经连接的 WebSocket 客户端新的客户端已经连接上了
            group.writeAndFlush(new TextWebSocketFrame(
                    "Client " + ctx.channel().remoteAddress() + " joined"));

            //将新的 WebSocket Channel 添加到 ChannelGroup 中,
            // 以便它可以接收到所有的消息
            group.add(ctx.channel());
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

效果展示

服务端启动

在这里插入图片描述

聊天页面1

在这里插入图片描述

聊天页面2

在这里插入图片描述

总结

总的来说,基于 Netty 实现一个 WebSocket 功能是非常方便且高效的,但是我们需要知其所以然,要理解 Websocket 协议,也要懂的在 Netty 中,通过添加 ChannelHandler 来处理各种异常情况,例如握手失败、连接关闭等,当然,还要考虑安全性问题,例如处理跨站脚本攻击(XSS)、防止恶意数据传输等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/96790.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

报错sql_mode=only_full_group_by

首发博客地址 https://blog.zysicyj.top/ 报错内容 ### The error may exist in file[D:\code\cppCode20221025\leader-system\target\classes\mapper\system\TJsonDataMapper.xml] ### The error may involve defaultParameterMap ### The error occurred while…

Haproxy+Keepalive 整合rabbitmq实现高可用负载均衡

Haproxy 实现负载均衡 HAProxy 提供高可用性、负载均衡及基于 TCPHTTP 应用的代理&#xff0c;支持虚拟主机&#xff0c;它是免费、快速并且可靠的一种解决方案&#xff0c;包括 Twitter,Reddit,StackOverflow,GitHub 在内的多家知名互联网公司在使用。HAProxy 实现了一种…

康希诺的再估值:市场到底,行业向上

生物医药是整个二级市场弹性数一数二&#xff0c;但拐点难以揣摩的行业。这一点&#xff0c;美港A三大市场都曾经有过足够多的暴涨暴跌案例可用于佐证。 但很多时候&#xff0c;这种片面的表现又掩盖了生物医药自身的永续价值&#xff1a;在绝大多数细分赛道上&#xff0c;任何…

加密的PDF文件,如何解密?

PDF文件带有打开密码、限制编辑&#xff0c;这两种密码设置了之后如何解密&#xff1f; 不管是打开密码或者是限制编辑&#xff0c;在知道密码的情况下&#xff0c;解密PDF密码&#xff0c;我们只需要在PDF编辑器中打开文件 – 属性 – 安全&#xff0c;将权限状态修改为无保护…

基于金枪鱼群算法优化的BP神经网络(预测应用) - 附代码

基于金枪鱼群算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码 文章目录 基于金枪鱼群算法优化的BP神经网络&#xff08;预测应用&#xff09; - 附代码1.数据介绍2.金枪鱼群优化BP神经网络2.1 BP神经网络参数设置2.2 金枪鱼群算法应用 4.测试结果&#xff1a;5…

链表(详解)

一、链表 1.1、什么是链表 1、链表是物理存储单元上非连续的、非顺序的存储结构&#xff0c;数据元素的逻辑顺序是通过链表的指针地址实现&#xff0c;有一系列结点&#xff08;地址&#xff09;组成&#xff0c;结点可动态的生成。 2、结点包括两个部分&#xff1a;&#x…

Fedora Linux 的家族(一):官方版本

导读本文将对 Fedora Linux 官方版本进行更详细的介绍。共有五个 版本&#xff1a; Fedora Workstation、Fedora Server、Fedora IoT、Fedora CoreOS 和 Fedora Silverblue。Fedora Linux 下载页面目前显示其中三个为 官方 版本&#xff0c;另外两个为 新兴 版本。本文将涵盖所…

js的this指向问题

代码一&#xff1a; 这段代码定义了run函数、obj对象&#xff0c;然后我们把run函数作为obj的方法。 function run(){console.log(this);}let obj{a:1,b:2};obj.runrun;obj.run(); 那么我们调用obj的run方法&#xff0c;那么这个方法打印的this指向obj。 分析&#xff1a;即…

【javaweb】学习日记Day4 - Maven 依赖管理 Web入门

目录 一、Maven入门 - 管理和构建java项目的工具 1、IDEA如何构建Maven项目 2、Maven 坐标 &#xff08;1&#xff09;定义 &#xff08;2&#xff09;主要组成 3、IDEA如何导入和删除项目 二、Maven - 依赖管理 1、依赖配置 2、依赖传递 &#xff08;1&#xff09;查…

11. 盛最多水的容器(c++题解)

11. 盛最多水的容器&#xff08;c题解&#xff09; 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大…

分享一种针对uni-app相对通用的抓包方案

PART1&#xff0c;前言 近年来混合开发APP逐渐成为主流的开发模式&#xff0c;与传统的开发模式相比混合开发极大的提升了开发效率&#xff0c;同时跨平台的特性也降低了开发成本&#xff0c;一直以来混合开发被诟病的性能问题随着技术的发展也得到改善。技术的发展往往是一把…

HPC是如何助力AI推理加速的?

高性能计算&#xff08;High-Performance Computing&#xff0c;HPC&#xff09;通过提供强大的计算能力、存储资源和网络互联&#xff0c;可以显著地辅助人工智能&#xff08;AI&#xff09;应用更快地进行训练和推断。那么&#xff0c;HPC是如何助力AI推理加速的&#xff1f;…

多线程学习之线程池

线程状态 线程状态具体含义NEW一个尚未启动的线程的状态。也称之为初始、开始状态。线程刚被创建&#xff0c;但是并未启动。还没调用start方法。MyThread t new MyThread()只有线程对象&#xff0c;没有线程特征。RUNNABLE当我们调用线程对象的start方法&#xff0c;那么此时…

Java线程 - 详解(2)

一&#xff0c;线程安全问题 有些代码在单个线程的环境下运行&#xff0c;完全正确&#xff0c;但是同样的代码&#xff0c;让多个线程去执行&#xff0c;此时就可能出现BUG&#xff0c;这就是所谓的 "线程安全问题"。举一个例子&#xff1a; public class Demo {s…

python的可哈希对象

一、介绍 在Python中&#xff0c;可哈希&#xff08;hashable&#xff09;是指一种对象类型&#xff0c;该类型的对象可以用作字典的键&#xff08;keys&#xff09;或集合&#xff08;sets&#xff09;的元素。可哈希的对象具有以下特点&#xff1a; 不可变性&#xff08;Imm…

使用Linux部署Kafka教程

目录 一、部署Zookeeper 1 拉取Zookeeper镜像 2 运行Zookeeper 二、部署Kafka 1 拉取Kafka镜像 2 运行Kafka 三、验证是否部署成功 1 进入到kafka容器中 2 创建topic 生产者 3 生产者发送消息 4 消费者消费消息 四、搭建kafka管理平台 五、SpringBoot整合Kafka 1…

natApp内网穿透工作原理

如图所示&#xff0c;用户启动内网穿透工具会将token传入natapp服务器与我们自己的主机建立一个类似于websocket的长链接&#xff0c;当从外网访问我们主机的接口时&#xff0c;会进行一个本地接口地址的截取&#xff0c;然后进行拼接成我们主机应用的真实地址。然后将数据返回…

k-近邻算法概述,k-means与k-NN的区别对比

目录 k-近邻算法概述 k-近邻算法细节 k值的选取 分类器的决策 k-means与k-NN的区别对比 k-近邻算法概述 k近邻&#xff08;k-nearest neighbor, k-NN&#xff09;算法由 Cover 和 Hart 于1968年提出&#xff0c;是一种简单的分类方法。通俗来说&#xff0c;就是给定一个…

《异常检测——从经典算法到深度学习》22 Kontrast: 通过自监督对比学习识别软件变更中的错误

《异常检测——从经典算法到深度学习》 0 概论1 基于隔离森林的异常检测算法 2 基于LOF的异常检测算法3 基于One-Class SVM的异常检测算法4 基于高斯概率密度异常检测算法5 Opprentice——异常检测经典算法最终篇6 基于重构概率的 VAE 异常检测7 基于条件VAE异常检测8 Donut: …
最新文章