上一篇文章中,没有使用Selector,实习服务端的读取多个客户端的数据;本文先使用Selector实现读取多个客户单数据的功能,然后做些扩展。
一、基于NIO Selector读取多个客户的数据
1.服务端:基于Selector处理客户端的连接事件:OP_READ,处理客户端的数据具备事件:OP_READ
2.客户端:和上一篇一样,基于BIO实现连接和发送数据
服务端代码:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* 基于NIO实现服务端,通过Selector基于事件驱动客户端的读取
*
*/
class NIOSelectorServer {
Selector selector;
public static void main(String[] args) throws IOException {
NIOSelectorServer server = new NIOSelectorServer();
server.start(); // 开启监听和事件处理
}
public void start() {
initServer();
// selector非阻塞轮询有哪些感兴趣的事件到了
doService();
}
private void doService() {
if (selector == null) {
System.out.println("server init failed, without doing read/write");
return;
}
try {
while (true) {
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys(); // 感兴趣且准备好的事件
Iterator<SelectionKey> iterator = keys.iterator(); // 迭代器遍历处理,后面要删除集合元素
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove(); // 删除当前元素,防止重复处理
// 下面根据事件进行分别处理
if (key.isAcceptable()) {
// 客户端连接事件
acceptHandler(key);
} else if (key.isReadable()) {
// 读取客户端数据
readHandler(key);
}
}
}
}
} catch (IOException exception) {
exception.printStackTrace();
}
}
private void initServer() {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(9090));
// 此时在selector上注册感兴趣的事件
// 这里先注册OP_ACCEPT: 客户端连接事件
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server init success");
} catch (IOException exception) {
exception.printStackTrace();
System.out.println("server init failied");
}
}
public void acceptHandler(SelectionKey key) {
ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 获取客户端的channel
try {
SocketChannel client = server.accept();
client.configureBlocking(false); // 设置client非阻塞
System.out.println("server receive a client :" + client);
// 注册OP_READ事件,用于从客户端读取数据
// 给Client分配一个buffer,用于读取数据,注意buffer的线程安全
ByteBuffer buffer = ByteBuffer.allocate(1024); // buffer这个参数注册的时候也可以不用
client.register(key.selector(), SelectionKey.OP_READ, buffer);
} catch (IOException exception) {
exception.printStackTrace();
}
}
public void readHandler(SelectionKey key) {
System.out.println("read handler");
SocketChannel client = (SocketChannel) key.channel(); // 获取客户端的channel
ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取Client channel关联的buffer
buffer.clear(); // 使用前clear
// 防止数据分包,需要while循环读取
try {
while (true) {
int readLen = client.read(buffer);
if (readLen > 0) {
// 读取到数据了
buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
System.out.println("server read data from " + client + ", data is :" + new String(data));
} else if (readLen == 0) {
// 没读到数据
System.out.println(client + " : no data");
break;
} else if (readLen == -1) {
// client 关闭连接
System.out.println(client + " close");
break;
}
}
} catch (IOException exception) {
// exception.printStackTrace();
// client 关闭连接
System.out.println(client + " disconnect");
// todo:disconnect 导致一直有read事件,怎么办?
}
}
}
客户端代码:
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
/**
* 基于BIO的TCP网络通信的客户端,接收控制台输入的数据,然后通过字节流发送给服务端
*
* @author freddy
*/
class ChatClient {
public static void main(String[] args) throws IOException {
// 连接server
Socket serverSocket = new Socket("localhost", 9090);
System.out.println("client connected to server");
// 读取用户在控制台上的输入,并发送给服务器
new Thread(new ClientThread(serverSocket)).start();
// 接收服务端发送过来的数据
try (InputStream serverSocketInputStream = serverSocket.getInputStream();) {
byte[] buffer = new byte[1024];
int len;
while ((len = serverSocketInputStream.read(buffer)) != -1) {
String data = new String(buffer, 0, len);
System.out.println(
"client receive data from server" + serverSocketInputStream + " data size:" + len + ": " + data);
}
}
}
}
class ClientThread implements Runnable {
private Socket serverSocket;
public ClientThread(Socket serverSocket) {
this.serverSocket = serverSocket;
}
@Override
public void run() {
// 读取用户在控制台上的输入,并发送给服务器
InputStream in = System.in;
byte[] buffer = new byte[1024];
int len;
try (OutputStream outputStream = serverSocket.getOutputStream();) {
// read操作阻塞,直到有数据可读,由于后面还要接收服务端转发过来的数据,这两个操作都是阻塞的,所以需要两个线程
while ((len = in.read(buffer)) != -1) {
String data = new String(buffer, 0, len);
System.out.println("client receive data from console" + in + " : " + new String(buffer, 0, len));
if ("exit\n".equals(data)) {
// 模拟客户端关闭连接
System.out.println("client close :" + serverSocket);
// 这里跳出循环后,try-with-resources 会自动关闭outputStream
break;
}
// 发送数据给服务器端
outputStream.write(new String(buffer, 0, len).getBytes()); // 此时buffer中是有换行符
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
测试:
先启动服务端,再启动2个客户端,客户端发送数据
server init success
server receive a client :java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:53568]
server receive a client :java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:53584]
read handler
server read data from java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:53584], data is :client1
java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:53584] : no data
read handler
server read data from java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:53568], data is :client2
java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:53568] : no data
客户端1,exit 关闭连接
客户端2,异常关闭
会导致一直有read事件,这个要看看为啥