1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > 【NIO与Netty】Java NIO入门:多线程多Selector模式 零拷贝 同步阻塞 同步非阻塞

【NIO与Netty】Java NIO入门:多线程多Selector模式 零拷贝 同步阻塞 同步非阻塞

时间:2023-07-24 04:24:32

相关推荐

【NIO与Netty】Java NIO入门:多线程多Selector模式 零拷贝 同步阻塞 同步非阻塞

黑马程序员Netty笔记合集

注意:由于章节连贯,此套笔记更适合学习《黑马Netty全套课程》的同学参考、复习使用。

一、概述

1.1 BIO vs NIO

1.1.1 stream vs channel

stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用二者均为全双工,即读写可以同时进行

1.1.2 IO 模型

同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞

同步:线程自己去获取结果(一个线程)异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)

当调用一次 channel.read 或 stream.read 后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

等待数据阶段复制数据阶段 阻塞 IO 非阻塞 IO 多路复用

信号驱动

异步 IO

阻塞 IO vs 多路复用

🔖 参考

UNIX 网络编程 - 卷 I

1.1.3 零拷贝

传统 IO 问题

传统的 IO 将一个文件通过 socket 写出

File f = new File("helloword/data.txt");RandomAccessFile file = new RandomAccessFile(file, "r");byte[] buf = new byte[(int)f.length()];file.read(buf);Socket socket = ...;socket.getOutputStream().write(buf);

内部工作流程是这样的:

java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 cpu

DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO

内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA

调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入socket 缓冲区,cpu 会参与拷贝

接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

用户态与内核态的切换发生了 3 次,这个操作比较重量级数据拷贝了共 4 次

NIO 优化

通过 DirectByteBuf

ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存

大部分步骤与优化前相同,不再赘述。唯有一点:java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用

这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步 DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列通过专门线程访问引用队列,根据虚引用释放堆外内存 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据

java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu数据从内核缓冲区传输到socket 缓冲区,cpu 会参与拷贝最后使用 DMA 将socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到

只发生了一次用户态与内核态的切换数据拷贝了 3 次

进一步优化(linux 2.4)

java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu只会将一些 offset 和 length 信息拷入socket 缓冲区,几乎无消耗使用 DMA 将内核缓冲区的数据写入网卡,不会使用 cpu

整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中,零拷贝的优点有

更少的用户态与内核态的切换不利用 cpu 计算,减少 cpu 缓存伪共享零拷贝适合小文件传输

1.1 多线程与单线程

1.1.1 多线程版

⚠️ 多线程版缺点:

内存占用高线程上下文切换成本高只适合连接数少的场景

1.1.2 线程池版

⚠️ 线程池版缺点:

阻塞模式下,线程仅能处理一个 socket 连接仅适合短连接场景

1.1.3 多路复用版

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

多路复用仅针对网络 IO、普通文件 IO 没法利用多路复用如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证 有可连接事件时才去连接有可读事件才去读取有可写事件才去写入 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件 减少线程数量,线程数量可以根据CPU核心数确定非阻塞设计:解决线程需要阻塞处理 socket 连接,直至有可供读取的数据或者数据能够写入,线程得不到释放的问题

1.2 阻塞与非阻塞

accept()、read()不等待

6.1.1 阻塞

阻塞模式下,相关方法都会导致线程暂停 ServerSocketChannel.accept 会在没有连接建立时让线程暂停SocketChannel.read 会在没有数据可读时让线程暂停阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持但多线程下,有新的问题,体现在以下方面 32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接

服务器端

// 使用 nio 来理解阻塞模式, 单线程// 0. ByteBufferByteBuffer buffer = ByteBuffer.allocate(16);// 1. 创建了服务器ServerSocketChannel ssc = ServerSocketChannel.open();// 2. 绑定监听端口ssc.bind(new InetSocketAddress(8080));// 3. 连接集合List<SocketChannel> channels = new ArrayList<>();while (true) {// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信log.debug("connecting...");SocketChannel sc = ssc.accept(); // 阻塞方法,线程停止运行log.debug("connected... {}", sc);channels.add(sc);for (SocketChannel channel : channels) {// 5. 接收客户端发送的数据log.debug("before read... {}", channel);channel.read(buffer); // 阻塞方法,线程停止运行buffer.flip();debugRead(buffer);buffer.clear();log.debug("after read...{}", channel);}}

客户端

SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost", 8080));System.out.println("waiting...");

6.1.2 非阻塞

非阻塞模式下,相关方法都会不会让线程暂停 在 ServerSocketChannel.accept 在没有连接建立时,会返回 null,继续运行SocketChannel.read 在没有数据可读时,会返回 0,但线程不必阻塞,可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了 cpu数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)

服务器端,客户端代码不变

// 使用 nio 来理解非阻塞模式, 单线程// 0. ByteBufferByteBuffer buffer = ByteBuffer.allocate(16);// 1. 创建了服务器ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false); // 非阻塞模式// 2. 绑定监听端口ssc.bind(new InetSocketAddress(8080));// 3. 连接集合List<SocketChannel> channels = new ArrayList<>();while (true) {// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,但sc是nullif (sc != null) {log.debug("connected... {}", sc);sc.configureBlocking(false); // 非阻塞模式channels.add(sc);}for (SocketChannel channel : channels) {// 5. 接收客户端发送的数据int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,read 返回 0if (read > 0) {buffer.flip();debugRead(buffer);buffer.clear();log.debug("after read...{}", channel);}}}

6.1.3 多路复用

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

多路复用仅针对网络 IO、普通文件 IO 没法利用多路复用如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证 有可连接事件时才去连接有可读事件才去读取有可写事件才去写入 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件

二、组件介绍

2.1 Channel

通道类似流,但又有些不同:

Channel 是双向通道,可以异步地读写

可以将 buffer 的数据写入channel,也可以从 channel 将数据读入buffer

NIO 中的 Channel 的主要实现有:FileChannel、DatagramChannel、SocketChannel 和 ServerSocketChannel

2.2 SocketChannel

SocketChannel 就是 NIO 对于非阻塞 socket 操作的支持的组件,被实例化时创建一个对等 socket 对象

其在 socket 上封装了一层,主要是支持了非阻塞的读写,改进了传统的单向流 API,支持同时读写。

要把一个socket 通道置于非阻塞模式,我们要依靠所有 socket 通道类的公有超级类:SelectableChannel。就绪选择(readiness selection)是一种可以用来查询通道的机制,该查询可以判断通道是否准备好执行一个目标操作,如读或写。非阻塞 I/O 和可选择性是紧密相连的,那也正是管理阻塞模式的 API 代码要在 SelectableChannel超级类中定义的原因。

设置阻塞模式:configureBlocking(boolean )、判断阻塞模式:isBlocking( )

public final SelectableChannel configureBlocking(boolean block) throws IOException{synchronized (regLock) {if (!isOpen())throw new ClosedChannelException();if (blocking == block)return this;if (block && haveValidKeys())throw new IllegalBlockingModeException();implConfigureBlocking(block);blocking = block;}return this;}

2.3 Buffer

缓冲区作为应用程序文件、网络之间的桥梁本质上是一块可以写入数据、读取数据的内存。这块内存被包装成 NIO Buffer 对象,并提供了一组方法,用来方便的访问该块内存。

2.4 Selector

调用 selector 的 select() 会查询并阻塞直到 【channel 发生了注册过的读写就绪事件**(SelectionKey)**】,将返回的事件交给 thread 来处理

可读 : SelectionKey.OP_READ可写 : SelectionKey.OP_WRITE连接 : SelectionKey.OP_CONNECT,客户端在连接建立后触发接收 : SelectionKey.OP_ACCEPT,服务端在有连接请求时触发

如果 Selector 对通道的多操作类型感兴趣,可以用“位或”操作符来实现:比如:int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE ;

没有继承 SelectableChannel 的类不能被选择。如 FileChannel

所有 channel 工作在非阻塞模式下,不会让线程吊死在一个 channel 上。

适合连接数特别多,但流量低的场景

三、文件编程

FileChannel

从文件中读写数据。实现常用的 read,write 以及 scatter/gather 操作,同时它也提供了很多专用于文件的新方法。

3.1 获取实例

需要通过使用一个 InputStream、OutputStream 或RandomAccessFile 获取实例,决定是否可读可写

public class FileChannelTest {@Testpublic void test1() throws Exception{//通过文件连接流获取通道RandomAccessFile raf=new RandomAccessFile("hello.txt","r");FileChannel channel = raf.getChannel();}@Testpublic void test2() throws Exception{//直接打开文件通道FileChannel channel = FileChannel.open(Paths.get("a.jpg"), StandardOpenOption.WRITE, StandardOpenOption.APPEND);}}

3.2 文件复制

方式一:使用缓冲区

/*** 1.read(ByteBuffer):读取Channel的数据到ByteBuffer。返回读取的字节数,文件末尾返回-1* 2.read(ByteBuffer[]):将Channel中的数据“分散”到ByteBuffer[]* 3.write(ByteBuffer):写入ByteBuffer数据到Channel*注意:文件存在则失败* 4.write(ByteBuffer[]):将ByteBuffer[]中的数据“聚集”到Channel* 5.close():关闭通道*/public class FileChannelTest {@Testpublic void test1() throws Exception{//通过文件连接流获取通道RandomAccessFile raf1=new RandomAccessFile("hello.txt","r");RandomAccessFile raf2=new RandomAccessFile("hello1.txt","rw");FileChannel channel1 = raf1.getChannel();FileChannel channel2 = raf2.getChannel();//创建缓冲区ByteBuffer buffer=ByteBuffer.allocate(20);//1.read(ByteBuffer)while(channel1.read(buffer)!=-1){buffer.flip();//3.write(ByteBuffer)channel2.write(buffer);buffer.clear();}//5.close():关闭通道raf1.close();raf2.close();channel1.close();channel2.close();}}

方式二:直接管理传输

transferXxx():一次最多2g 方法的输入参数 position 表示从 position 处开始向目标文件写入数据,size 表示最多传输的字节数。如果源通道的剩余空间小于 size 个字节,则所传输的字节数要小于请求的字节数。此外要注意,在 SoketChannel 的实现中,SocketChannel 只会传输此刻准备好的数据(可能不足 size 字节)。因此,SocketChannel 可能不会将请

求的所有数据(size 个字节)全部传输到 FileChannel 中。

/*** 1.size():获取通道连接的文件的大小* 2.transferFrom(FileChannel,long,long):获取数据从自FileChannel* 3.transferTo(long,long,FileChannel):传输数据到FileChannel*/public class FileChannelTest2 {@Testpublic void test() throws Exception{//创建管道RandomAccessFile raf1=new RandomAccessFile("a.jpg","r");RandomAccessFile raf2=new RandomAccessFile("b.jpg","rw");RandomAccessFile raf3=new RandomAccessFile("c.jpg","rw");FileChannel channel1 = raf1.getChannel();FileChannel channel2 = raf2.getChannel();FileChannel channel3 = raf3.getChannel();//1.size()Long size=channel1.size();//2.transferFrom(FileChannel,long,long)channel2.transferFrom(channel1,0,size);//3.transferTo(long,long,FileChannel):传输大于2g的文件// left 变量代表还剩余多少字节for (long left = size; left > 0; ) {left -= from.transferTo((size - left), left, to);}//关闭资源raf1.close();raf2.close();channel1.close();channel2.close();}}

3.3 其他操作

/***1.truncate(long):截取文件,指定长度后的被删除*注意:源文件被修改*2.position():获取 FileChannel 的当前位置*3.position(Long):设置 FileChannel 的当前位置* 4.force(boolean):将缓存在内存中的通道数据强制写入磁盘* 5.size():获取文件大小*/public class FileChannelTest3 {@Testpublic void test() throws Exception{//通过文件连接流获取通道RandomAccessFile raf1=new RandomAccessFile("hello.txt","rw");RandomAccessFile raf2=new RandomAccessFile("hello2.txt","rw");FileChannel channel1 = raf1.getChannel();FileChannel channel2 = raf2.getChannel();//1.truncate(long):中国梦!北京欢迎您!哈哈哈哈哈! ————> 中国梦!北京欢迎您!channel1.truncate(30l);//创建缓冲区ByteBuffer buffer=ByteBuffer.allocate(20);//2.position(Long)channel1.position(12l);// 中国梦!北京欢迎您! ————> 北京欢迎您!while(channel1.read(buffer)!=-1){buffer.flip();channel2.write(buffer);buffer.clear();}//关闭资源:二选一raf1.close();raf2.close();channel1.close();channel2.close();}}

3.4 分散读、聚集写

分散读:Scattering Reads

buffer先插入到数组,每个buffer在数组中被按顺序写满不适用于动态消息(译者注:消息大小不固定)

/*** 分散(scatter):将 Channel 中读取的数据写入多个 buffer*/public class readTest {@Testpublic void test() throws Exception{RandomAccessFile raf=new RandomAccessFile("hello.txt","r");FileChannel channel = raf.getChannel();//1.buffer先插入到数组ByteBuffer head=ByteBuffer.allocate(12);ByteBuffer tail=ByteBuffer.allocate(18);ByteBuffer[] buffers={head,tail};//2.在数组中按顺序将每个buffer写满channel.read(buffers);System.out.println(new String(head.array())); //中国梦!System.out.println("======================================================");System.out.println(new String(tail.array())); //北京欢迎您!}}

聚集写:Gathering Writes

注意:每个 Buffer 中只有 position 和 limit 之间的数据才会被写入因此与 Scattering Reads 相反,Gathering Writes 能较好的处理动态消息。

/*** 聚集(gather):将多个 buffer 的数据写入同一个 Channel*/public class WriteTest {@Testpublic void test() throws Exception{RandomAccessFile raf=new RandomAccessFile("hello3.txt","rw");FileChannel channel = raf.getChannel();//1.buffer先插入到数组ByteBuffer head=ByteBuffer.allocate(12);ByteBuffer tail=ByteBuffer.allocate(18);head.put("中国梦!".getBytes());tail.put("北京欢迎您!".getBytes());ByteBuffer[] buffers={head,tail};head.flip();tail.flip();//2.按顺序读完数组中每个 bufferchannel.write(buffers); //中国梦!北京欢迎您!}}

3.5 Path

类似 java.io.File 类。java.nio.file.Path 表示文件系统中的文件或目录许多情况下,使用 Path 接口来替换 File 类的使用Paths 是工具类,用来获取 Path 实例

/*** 1.Paths.get(URI):完整路径。绝对或相对* 2.Paths.get(String,String):拼接路径。绝对或相对* 3.normalize():路径标准化*/public class PathTest {@Testpublic void test(){//1.完整路径。绝对或相对Path path = Paths.get("F:\\software\\IDEA\\JavaProjects\\NioTest\\a.jpg");//2.拼接路径。绝对或相对Path path1 = Paths.get("F:\\software\\IDEA\\JavaProjects\\NioTest", "a.jpg");//3.路径标准化Path path2 = Paths.get("F:\\software\\IDEA\\JavaProjects\\JavaseTest\\..\\NioTest", "a.jpg");System.out.println(path2);Path path3 = path2.normalize();System.out.println(path3);}}

3.6 Files

提供了操作文件系统中文件的方法与 java.nio.file.Path 实例一起工作遍历目录树:访问者模式walkFileTree()参数:Path 、 FileVisitorFileVisitor 是一个接口,子类 SimpleFileVisitor 默认实现了 接口中所有方法FileVisitor 的所有方法都返回 FileVisitResult 枚举实例 CONTINUE:继续TERMINATE:终止SKIP_SIBLING:跳过同级SKIP_SUBTREE:跳过子级

/*** 1.createFile(Path):创建文件。文件已经存在抛异常 * 2.createDirectory(Path):创建目录。目录已经存在、父目录不存在抛异常* 复制*3.copy(Path,Path):复制文件。目标已经存在、目录不存在抛异常*4.copy(Path,Path,CopyOption):复制文件。目标已经存在则覆盖原有文件* 移动*5.move(Path,Path):移动文件。文件存在抛异常*6.move(Path,Path,CopyOption):移动文件。文件存在则覆盖* 7.delete(Path):文件不存在则抛异常* 8.walkFileTree():遍历目录树*/public class FilesTest {@Testpublic void test1() throws Exception{Path path = Paths.get("F:\\software\\IDEA\\JavaProjects\\NioTest\\path.txt");Path directory = Paths.get("F:\\software\\IDEA\\JavaProjects\\NioTest\\directory");Path source = Paths.get("F:\\software\\IDEA\\JavaProjects\\NioTest\\a.jpg");Path dest = Paths.get("F:\\software\\IDEA\\JavaProjects\\NioTest\\b.jpg");Path rename = Paths.get("F:\\software\\IDEA\\JavaProjects\\NioTest\\bb.jpg");//1.创建文件Files.createFile(path);//2.创建目录Files.createDirectory(directory);//4.复制文件Files.copy(source,dest, StandardCopyOption.REPLACE_EXISTING);//5.移动文件Files.move(dest,rename);//7.删除文件Files.delete(directory);}@Testpublic void test2() throws Exception{Path path = Paths.get("F:\\software\\IDEA\\JavaProjects\\NioTest");String fileToFind= "a.jpg";//8.遍历目录树Files.walkFileTree(path,new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {String s = file.toAbsolutePath().toString();if(s.endsWith(fileToFind)){System.out.println(s);return FileVisitResult.TERMINATE;}return FileVisitResult.CONTINUE;}});}}

3.7 练习

遍历目录文件

public static void main(String[] args) throws IOException {Path path = Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91");AtomicInteger dirCount = new AtomicInteger();AtomicInteger fileCount = new AtomicInteger();Files.walkFileTree(path, new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {System.out.println(dir);dirCount.incrementAndGet();return super.preVisitDirectory(dir, attrs);}@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {System.out.println(file);fileCount.incrementAndGet();return super.visitFile(file, attrs);}});System.out.println(dirCount); // 133System.out.println(fileCount); // 1479}

统计 jar 的数目

Path path = Paths.get("C:\\Program Files\\Java\\jdk1.8.0_91");AtomicInteger fileCount = new AtomicInteger();Files.walkFileTree(path, new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {if (file.toFile().getName().endsWith(".jar")) {fileCount.incrementAndGet();}return super.visitFile(file, attrs);}});System.out.println(fileCount); // 724

删除多级目录

删除是危险操作,确保要递归删除的文件夹没有重要内容

Path path = Paths.get("d:\\a");Files.walkFileTree(path, new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {Files.delete(file);return super.visitFile(file, attrs);}@Overridepublic FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {Files.delete(dir);return super.postVisitDirectory(dir, exc);}});

拷贝多级目录

long start = System.currentTimeMillis();String source = "D:\\Snipaste-1.16.2-x64";String target = "D:\\Snipaste-1.16.2-x64aaa";Files.walk(Paths.get(source)).forEach(path -> {try {String targetName = path.toString().replace(source, target);// 是目录if (Files.isDirectory(path)) {Files.createDirectory(Paths.get(targetName));}// 是普通文件else if (Files.isRegularFile(path)) {Files.copy(path, Paths.get(targetName));}} catch (IOException e) {e.printStackTrace();}});long end = System.currentTimeMillis();System.out.println(end - start);

四、网络编程

4.1 ServerSocketChannel

类似 TCP:ServerSocket ,通过 TCP 读写网络中的数据

ServerSocketChannel监听新进来的 TCP 连接,像 Web 服务器那样。对每一个新进来的连接都会创建一个 SocketChannel。可以在非阻塞模式下运行

/*** 会创建一个对等 Serversocket 对象** 1.open():获取实例* 2.accept():监听通道* 3.close():关闭通道* 4.configureBlocking(boolean):设置是否阻塞*/public class ServerSocketChannelTest {@Testpublic void test() throws Exception{//1.获取实例ServerSocketChannel channel = ServerSocketChannel.open();channel.bind(new InetSocketAddress(8080));//4.设置非阻塞channel.configureBlocking(false);ByteBuffer buffer=ByteBuffer.allocate(20);buffer.put("北京欢迎您!".getBytes());buffer.flip();while(true){System.out.println("非阻塞监听");//2.监听通道SocketChannel accept = channel.accept();if (accept!=null){System.out.println("Incoming connection from: " +accept.getRemoteAddress());accept.write(buffer);//关闭通道accept.close();break;}else {System.out.println("null");Thread.sleep(2000);}}//3.关闭通道channel.close();}}

4.2 SocketChannel

类似 TCP:Socket ,通过 TCP 读写网络中的数据

用来连接 Socket 套接字。支持阻塞式和非阻塞式实现了可选择通道,可以被多路复用的对于已经存在的 socket 不能创建 SocketChannel支持异步关闭: 如果 SocketChannel 在一个线程上 read 阻塞,另一个线程对该 SocketChannel 调用 shutdownInput,则读阻塞的线程将返回-1 表示没有

读取任何数据;如果 SocketChannel 在一个线程上 write 阻塞,另一个线程对该SocketChannel 调用 shutdownWrite,则写阻塞的线程将抛出

AsynchronousCloseException 支持设定参数

/***会创建一个对等 socket 对象** 1.open():创建实例,没有进行实质的 tcp 连接。需配合connect(InetSocketAddress)使用*注意:未进行连接的 SocketChannle 执行 I/O 操作时,会抛出NotYetConnectedException* 2.connect(InetSocketAddress):进行 tcp 连接* 3.open(InetSocketAddress):创建实例,有实质性的 tcp 连接* 4.连接检验*- isOpen():是否为 open 状态*- isConnected():是否已经被连接*- isConnectionPending():是否正在进行连接*- finishConnect():是否已经完成连接* 5.configureBlocking(boolean):设置是否阻塞* 6.read(ByteBuffer):读,支持阻塞式和非阻塞式。*- 读写方式与 FileChannel 相同* 7.setOption(StandardSocketOptions):设置参数* 8.getOption(StandardSocketOptions):获取参数*- SO_SNDBUF 套接字发送缓冲区大小*- SO_RCVBUF 套接字接收缓冲区大小*- SO_KEEPALIVE 保活连接*- O_REUSEADDR 复用地址*- SO_LINGER 有数据传输时延缓关闭 Channel (只有在非阻塞模式下有用)*- TCP_NODELAY 禁用 Nagle 算法*/public class SocketChannelTest {@Testpublic void test() throws Exception{//1.创建实例SocketChannel channel = SocketChannel.open();//2.进行 tcp 连接channel.connect(new InetSocketAddress("127.0.0.1",8080));//4.连接检验System.out.println(channel.isOpen());System.out.println(channel.isConnected());System.out.println(channel.isConnectionPending());System.out.println(channel.finishConnect());//5.设置阻塞模式channel.configureBlocking(true);//7.设置参数:保活连接、禁用 Nagle 算法channel.setOption(StandardSocketOptions.SO_KEEPALIVE, Boolean.TRUE).setOption(StandardSocketOptions.TCP_NODELAY, Boolean.TRUE);//8.获取参数System.out.println(channel.getOption(StandardSocketOptions.SO_KEEPALIVE));System.out.println(channel.getOption(StandardSocketOptions.TCP_NODELAY));//6.读数据ByteBuffer buffer=ByteBuffer.allocate(20);int length = channel.read(buffer);System.out.println("获取信息完毕:"+new String(buffer.array(),0,length));}}

4.3 DatagramChannel

类似 UDP:DatagramSocket,通过 UDP 读写网络中的数据

会创建一个对等 DatagramSocket 对象是无连接的。可以发送单独的数据报给不同的目的地址,也可以接收来自任意地址的数据包。

/*** 1.open():实例化* 2.bind(InetSocketAddress):绑定监听的端口* 3.receive(ByteBuffer)* 4.send(ByteBuffer,InetSocketAddress):发送数据包* 5.connect(InetSocketAddress):客户端指定连接的socket,连接后只有这个socket能进行读写操作*- 其他socket的操作将抛出异常*/public class DatagramChannelTest {@Testpublic void send() throws Exception{//1.实例化DatagramChannel client = DatagramChannel.open();//5.指定连接的socketclient.connect(new InetSocketAddress("127.0.0.1",8080));//4.发送数据包client.send(ByteBuffer.wrap("你好服务器!".getBytes()),new InetSocketAddress("127.0.0.1",8080));client.close();}@Testpublic void receive() throws Exception{DatagramChannel server = DatagramChannel.open();//2.绑定监听的端口server.bind(new InetSocketAddress(8080));ByteBuffer buffer = ByteBuffer.allocate(20);//设置非阻塞server.configureBlocking(false);//3.接收数据包while(true){System.out.println("非阻塞循环");if(server.receive(buffer)!=null){buffer.flip();System.out.println(new String(buffer.array(),0, buffer.limit()));buffer.clear();break;}Thread.sleep(3000);}server.close();}}

4.4 分散读、聚集写

参考3.4

五、Buffer

5.1 Buffer结构

position():设置定位limit():设置限制

写模式

capacity:容量。一旦 Buffer 满了,需要将其清空(通过读数据或者清除数据)才能继续写数据往里写数据。position:表示写入数据的当前位置,初始值为0。最大可为 capacity – 1。limit:表示最多可写入多少个数据。limit = capacity。

读模式

capacity:容量position:表示读出数据的当前位置。通过 flip()切换到读模式时 position 会被重置为 0limit:表示有多少可读数据(not null 的数据)。一般情况下,这个值就是写模式下的 position

5.2 使用步骤

写入数据到 Buffer调用 flip()方法,切换到读模式从 Buffer 中读取数据清空缓冲区:一旦读完 Buffer 中的数据,需要让 Buffer 准备好再次被写入 clear():position=0,limit=capacity。数据并未清除compact():未读的数据拷贝到 Buffer 起始处,position 设到最后一个未读元素正后面。limit=capacity。 重复以上步骤

5.3 常见命令

与字符串的转换

/***字符串 转 ByteBuffer*1.put("hello".getBytes()):不可直接读*2.StandardCharsets.UTF_8.encode("hello"):可直接读*3.ByteBuffer.warp("hello".getBytes()):可直接读*ByteBuffer 转 字符串*4.StandardCharsets.UTF_8.decode(buffer).toString()*5.new String(buffer.array())*/

基本使用

/***- position():设置定位*- limit():设置限制** 创建缓冲区:*1.allocate(int):实例化指定大小的缓冲区。- java 堆内存,读写效率较低,受GC影响。*1.allocateDirect(int):实例化指定大小的缓冲区。- 系统内存,读写效率高(少一次拷贝),不受GC影响,分配效率低。* 写数据:*2.put(int/int[]/IntBuffer/int,int/int[],int,int):写入数据*3.channel.read(ByteBuffer):从 Channel 写入** 读数据:*4.flip():切换到读模式。将 position 设为0,limit 表示之前写入的元素个数或未读元素个数*5.rewind():重读。在读模式下将 position 设为0,limit 保持不变*6.hasRemaining():是否还有可读字节*7.get()/get(int):获取一个字节/获取对应索引的字节,不改变指针。*8.channel.write(ByteBuffer):从 Channel 读入* 转换为写模式*9.clear():从0开始写入*pact():未读数据拷贝到 Buffer 起始处,从未读数据后面写入* 标记:rewind()的增强,从指定位置开始重读*11.mark():标记一个特定的 position*12.reset():恢复到标记的 position*/public class BufferTest {@Testpublic void test1() throws Exception{//1.实例化IntBuffer buffer=IntBuffer.allocate(20);//2.直接写入for (int i = 0; i < 20; i++) {buffer.put(i);}//4.切换到读模式buffer.flip();//6.是否还有可读字节while (buffer.hasRemaining()){//7.获取一个字节System.out.print(buffer.get()+" ");}//5.重读buffer.rewind();System.out.println();while (buffer.hasRemaining()){System.out.print(buffer.get()+" ");}}@Testpublic void test2() throws Exception{RandomAccessFile raf=new RandomAccessFile("hello.txt","rw");FileChannel channel = raf.getChannel();//1.实例化ByteBuffer buffer=ByteBuffer.allocate(20);int length;ByteArrayOutputStream baos=new ByteArrayOutputStream();//3.从 Channel 写入while((length=channel.read(buffer))!=-1){//4.切换到读模式buffer.flip();//8.从 Channel 读入baos.write(buffer.array(),0,length);//9.从0开始写入buffer.clear();}System.out.println(baos);channel.close();}}

5.4 缓冲区操作

5.4.1 缓冲区分片

创建一个子缓冲区,在底层数组层面上是数据共享的

/*** 分片:*1.position(int):定位分片读数据的开始位置*2.limit(int):限制分片读数据的个数*3.slice():分片*/public class SliceTest {@Testpublic void test(){ByteBuffer buffer = ByteBuffer.allocate(10);for (int i = 0; i < buffer.capacity(); ++i) {buffer.put((byte) i);}// 创建子缓冲区//1.定位分片读数据的开始位置buffer.position(3);//2.限制分片读数据的个数buffer.limit(7);//3.分片ByteBuffer slice = buffer.slice();// 改变子缓冲区的内容for (int i = 0; i < slice.capacity(); ++i) {byte b = slice.get(i);b *= 10;slice.put(i, b);}buffer.position(0);buffer.limit(10);while (buffer.remaining() > 0) {System.out.print(buffer.get()+" "); //0 1 2 30 40 50 60 7 8 9 }}}

5.4.2 只读缓冲区

如果尝试修改只读缓冲区的内容,则会报 ReadOnlyBufferException 异常

/*** 只读缓冲区:*1.asReadOnlyBuffer():创建只读缓冲区*/public class ReadOnlyTest {@Testpublic void test(){ByteBuffer buffer = ByteBuffer.allocate(10);for (int i = 0; i < buffer.capacity(); ++i) {buffer.put((byte) i);}//1.创建只读缓冲区ByteBuffer readonly = buffer.asReadOnlyBuffer();// 改变原缓冲区的内容for (int i = 0; i < buffer.capacity(); ++i) {byte b = buffer.get(i);b *= 10;buffer.put(i, b);}readonly.position(0);readonly.limit(buffer.capacity());while (readonly.remaining() > 0) {System.out.print(readonly.get()+" "); //0 10 20 30 40 50 60 70 80 90}}}

5.4.3 直接缓冲区

加快 I/O 速度,使用一种特殊方式为其分配内存的缓冲区JDK 文档描述:给定一个直接字节缓冲区,Java 虚拟机将尽最大努力直接对它执行本机I/O 操作

/*** 直接缓冲区*1.allocateDirect(int):创建直接缓冲区。系统内存,读写效率高(少一次拷贝),不受GC影响,分配效率低。*/public class DirectTest {@Testpublic void test() throws Exception{RandomAccessFile raf1=new RandomAccessFile("a.jpg","r");RandomAccessFile raf2=new RandomAccessFile("d.jpg","rw");FileChannel channel1 = raf1.getChannel();FileChannel channel2 = raf2.getChannel();//1.创建直接缓冲区ByteBuffer buffer = ByteBuffer.allocateDirect(20);while (channel1.read(buffer)!=-1){buffer.flip();channel2.write(buffer);buffer.clear();}raf1.close();raf2.close();channel1.close();channel2.close();}}

5.4.4 内存映射文件I/O

读和写文件数据的方法将文件中实际读取或者写入的部分映射到内存中

/*** 内存映射文件IO*/public class MapTest {@Testpublic void test() throws Exception{RandomAccessFile raf=new RandomAccessFile("hello4.txt","rw");FileChannel channel = raf.getChannel();MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024);map.put(0, (byte) 97);map.put(1023, (byte) 122);}}

5.5 处理粘包、半包

网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔

但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为

Hello,world\nI’m zhangsan\nHow are you?\n

变成了下面的两个 byteBuffer (黏包,半包)

Hello,world\nI’m zhangsan\nHow are you?\n

现在要求你编写程序,将错乱的数据恢复成原始的按 \n 分隔的数据

public static void main(String[] args) {ByteBuffer source = ByteBuffer.allocate(32);// 11 24source.put("Hello,world\nI'm zhangsan\nHo".getBytes());split(source);source.put("w are you?\nhaha!\n".getBytes());split(source);}private static void split(ByteBuffer source) {source.flip();int oldLimit = source.limit();for (int i = 0; i < oldLimit; i++) {if (source.get(i) == '\n') {System.out.println(i);ByteBuffer target = ByteBuffer.allocate(i + 1 - source.position());// 0 ~ limitsource.limit(i + 1);target.put(source); // 从source 读,向 target 写debugAll(target);source.limit(oldLimit);}}pact();}

六、Selector

6.1 注意事项

⚠️select 何时不阻塞

事件未处理事件发生时 客户端发起连接请求,会触发 accept 事件客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件channel 可写,会触发 write 事件在 linux 下 nio bug 发生时 调用 selector.wakeup()调用 selector.close()selector 所在线程 interrupt

⚠️只能注册非阻塞通道

Selector注册的通道都必须处于非阻塞模式下,否则将抛出异常IllegalBlockingModeException

即FileChannel 不能与 Selector 一起使用

⚠️为何要 iter.remove()

因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

第一次触发了 key1上的 accept 事件,没有移除 key1第二次触发了 key2上的 read 事件。但这时 selectedKeys 中还有上次的 key1,再次处理key1上的 accept 事件时因为没有真正的 serverSocket 连上了,就会导致空指针异常

⚠️cancel 的作用

cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除。 key 后续不会再监听事件

⚠️通道支持的操作

并非所有通道都支持四种操作

比如ServerSocketChannel 支持 Accept 接受操作,而 SocketChannel 客户端通道则不支持。

6.2 基本使用

服务端

/*** 1.open():创建选择器实例* 2.channel.register(selector, int):注册通道* 轮询就绪操作* 3.select():阻塞轮询* 4.select(long timeout):带过期事件的阻塞轮询* 5.selectNow():非阻塞轮询* 6.selectedKeys():获取已选择键集合* 7.wakeup():唤醒选择器的轮询操作*注意:该方法使得选择器上的第一个还没有返回的选择操作立即返回。* 如果当前没有进行中的选择操作,那么下一次对 select()的调用将立即返回。* 8.close:关闭选择器,所有 Channel 被注销。但是 Channel本身并不会关闭* 9.validOps():获取通道支持的操作集合*/public class SelectorTest {@Testpublic void test(){try {//1.创建选择器实例Selector selector = Selector.open();ServerSocketChannel channel=ServerSocketChannel.open();channel.bind(new InetSocketAddress(8080));channel.configureBlocking(false);ByteBuffer readBuffer = ByteBuffer.allocate(20);ByteBuffer writeBuffer = ByteBuffer.allocate(20);writeBuffer.put("你好,中国!".getBytes());writeBuffer.flip();//2.注册服务器的socket:当有客户端连接时匹配OP_ACCEPT操作,即服务器可以接收socket时channel.register(selector, SelectionKey.OP_ACCEPT);while (true){//3.阻塞轮询 if(selector.select()<=0) continue;//6.获取已选择键集合Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()){SelectionKey selectionKey = iterator.next();if(selectionKey.isAcceptable()) {System.out.println("isAcceptable");SocketChannel accept = channel.accept();accept.configureBlocking(false);//2.注册客户端的socket为:客户端发送信息时执行OP_READ操作,即服务器可以读取信息accept.register(selector,SelectionKey.OP_READ);} else if (selectionKey.isReadable()) {//更改客户端的行为为:客户端接收信息时匹配OP_WRITE操作,即服务器可以发送信息selectionKey.interestOps(SelectionKey.OP_WRITE);} else if (selectionKey.isWritable()) {}//注意iterator.remove();}}} catch (Exception e) {e.printStackTrace();} finally {}}}

6.3 处理 accept 事件

💡 事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

public class ChannelDemo6 {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));System.out.println(channel);Selector selector = Selector.open();channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_ACCEPT);while (true) {int count = selector.select();if(count <= 0) continue;Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();// 1.处理 accept 事件if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();// 必须处理SocketChannel sc = c.accept();}// 2.处理完毕,必须将事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}}

6.4 处理 read 事件

public class ChannelDemo6 {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));System.out.println(channel);Selector selector = Selector.open();channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_ACCEPT);while (true) {int count = selector.select();if(count <= 0) continue;Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();// 判断事件类型if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();// 必须处理SocketChannel sc = c.accept();sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_READ);log.debug("连接已建立: {}", sc);} else if (key.isReadable()) {SocketChannel sc = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(128);int read = sc.read(buffer);//1.连接关闭,注销与selector的关系//这里省略try-catch捕获关闭异常if(read == -1) {key.cancel();sc.close();} else {buffer.flip();debug(buffer);}}// 处理完毕,必须将事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}}

💡 消息边界的问题

以前有同学写过这样的代码,思考注释中两个问题,以 bio 为例,其实 nio 道理是一样的

public class Server {public static void main(String[] args) throws IOException {ServerSocket ss=new ServerSocket(9000);while (true) {Socket s = ss.accept();InputStream in = s.getInputStream();// 这里这么写,有没有问题byte[] arr = new byte[4];while(true) {int read = in.read(arr);// 这里这么写,有没有问题if(read == -1) {break;}System.out.println(new String(arr, 0, read));}}}}

客户端

public class Client {public static void main(String[] args) throws IOException {Socket max = new Socket("localhost", 9000);OutputStream out = max.getOutputStream();out.write("hello".getBytes());out.write("world".getBytes());out.write("你好".getBytes());max.close();}}

输出

helloworld��好

为什么?

💡 处理消息的边界

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DBZdzDwP-1666592863910)(F:/Java后端学习笔记/2基础-高级衔接/NIO/img/0023.png)]

第一种思路是固定消息长度:数据包大小一样,服务器按预定长度读取,缺点是浪费带宽第二种思路是按分隔符拆分:缺点是效率低第三种思路是TLV 格式:即 Type 类型、Length 长度、Value 数据。类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量 Http 1.1 是 TLV 格式Http 2.0 是 LTV 格式

第二种思路:按分隔符拆分

服务器端

private static void split(ByteBuffer source) {source.flip();for (int i = 0; i < source.limit(); i++) {// 找到一条完整消息if (source.get(i) == '\n') {int length = i + 1 - source.position();// 把这条完整消息存入新的 ByteBufferByteBuffer target = ByteBuffer.allocate(length);// 从 source 读,向 target 写for (int j = 0; j < length; j++) {target.put(source.get());}debugAll(target);}}pact(); // 0123456789abcdef position 16 limit 16}public static void main(String[] args) throws IOException {// 1. 创建 selector, 管理多个 channelSelector selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);// 2. 建立 selector 和 channel 的联系(注册)// SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件SelectionKey sscKey = ssc.register(selector, 0, null);// key 只关注 accept 事件sscKey.interestOps(SelectionKey.OP_ACCEPT);log.debug("sscKey:{}", sscKey);ssc.bind(new InetSocketAddress(8080));while (true) {// 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行// select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理selector.select();// 4. 处理事件, selectedKeys 内部包含了所有发生的事件Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, readwhile (iter.hasNext()) {SelectionKey key = iter.next();// 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题iter.remove();log.debug("key: {}", key);// 5. 区分事件类型if (key.isAcceptable()) {// 如果是 acceptServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel sc = channel.accept();sc.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(16); // attachment// 将一个 byteBuffer 作为附件关联到 selectionKey 上SelectionKey scKey = sc.register(selector, 0, buffer);scKey.interestOps(SelectionKey.OP_READ);log.debug("{}", sc);log.debug("scKey:{}", scKey);} else if (key.isReadable()) {// 如果是 readtry {SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel// 获取 selectionKey 上关联的附件ByteBuffer buffer = (ByteBuffer) key.attachment();int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1if(read == -1) {key.cancel();} else {split(buffer);// 需要扩容if (buffer.position() == buffer.limit()) {ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);buffer.flip();newBuffer.put(buffer); // 0123456789abcdef3333\nkey.attach(newBuffer);}}} catch (IOException e) {e.printStackTrace();key.cancel(); // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)}}}}}

客户端

SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost", 8080));SocketAddress address = sc.getLocalAddress();// sc.write(Charset.defaultCharset().encode("hello\nworld\n"));sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));System.in.read();

💡 ByteBuffer 大小分配

每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBufferByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 /java-performance/resizable-array.html另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

6.5 处理 write 事件

💡 一次无法写完例子

非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略 当第一次没有写出完消息时,才将 channel写事件 注册到 selector 上channel 可写事件完成后,如果所有的数据写完了,就取消 可写事件 的注册、清空附件 如果不取消:每次socket缓冲可写均会触发 write 事件,且此时附件已无数据

public class WriteServer {public static void main(String[] args) throws IOException {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);ssc.bind(new InetSocketAddress(8080));Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);while(true) {selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);// 1. 拼接发送的内容StringBuilder sb = new StringBuilder();for (int i = 0; i < 3000000; i++) {sb.append("a");}// 2. 将内容放入缓冲区并写出ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());// 3. 写出内容,返回写出的字节数int write = sc.write(buffer);// 4. 没有写完:关注写事件if (buffer.hasRemaining()) {// 在原有关注事件的基础上,多关注 写事件sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);// 把 buffer 作为附件加入 sckeysckey.attach(buffer);}} else if (key.isWritable()) {// 1. 继续写出内容ByteBuffer buffer = (ByteBuffer) key.attachment();SocketChannel sc = (SocketChannel) key.channel();int write = sc.write(buffer);// 2. 写完了:取消写事件,清空附件if (!buffer.hasRemaining()) {key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);key.attach(null);}}}}}}

客户端

public class WriteClient {public static void main(String[] args) throws IOException {Selector selector = Selector.open();SocketChannel sc = SocketChannel.open();sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);sc.connect(new InetSocketAddress("localhost", 8080));int count = 0;while (true) {selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isConnectable()) {System.out.println(sc.finishConnect());} else if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);count += sc.read(buffer);buffer.clear();System.out.println(count);}}}}}

💡 write 为何要取消

只要向 channel 发送数据时socket 缓冲可写,这个事件会频繁触发,因此应当只在 socket 缓冲区写不下时再关注可写事件,数据写完之后再取消关注

6.6 多核多线程

Redis使用单线程处理,建议使用时间复杂度低的方法

💡 利用多线程优化

现在都是多核 cpu,设计时要充分考虑别让 cpu 的力量被白白浪费

前面的代码只有一个选择器,没有充分利用多核 cpu,如何改进呢?

分两组选择器

单线程配一个选择器,专门处理 accept 事件创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件

public class ChannelDemo7 {public static void main(String[] args) throws IOException {new BossEventLoop().register();}@Slf4jstatic class BossEventLoop implements Runnable {private Selector boss;private WorkerEventLoop[] workers;private volatile boolean start = false;AtomicInteger index = new AtomicInteger();public void register() throws IOException {if (!start) {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.bind(new InetSocketAddress(8080));ssc.configureBlocking(false);boss = Selector.open();SelectionKey ssckey = ssc.register(boss, 0, null);ssckey.interestOps(SelectionKey.OP_ACCEPT);workers = initEventLoops();new Thread(this, "boss").start();log.debug("boss start...");start = true;}}public WorkerEventLoop[] initEventLoops() {// EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()];WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];for (int i = 0; i < workerEventLoops.length; i++) {workerEventLoops[i] = new WorkerEventLoop(i);}return workerEventLoops;}@Overridepublic void run() {while (true) {try {boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();SocketChannel sc = c.accept();sc.configureBlocking(false);log.debug("{} connected", sc.getRemoteAddress());workers[index.getAndIncrement() % workers.length].register(sc);}}} catch (IOException e) {e.printStackTrace();}}}}@Slf4jstatic class WorkerEventLoop implements Runnable {private Selector worker;private volatile boolean start = false;private int index;private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();public WorkerEventLoop(int index) {this.index = index;}public void register(SocketChannel sc) throws IOException {if (!start) {worker = Selector.open();new Thread(this, "worker-" + index).start();start = true;}tasks.add(() -> {try {SelectionKey sckey = sc.register(worker, 0, null);sckey.interestOps(SelectionKey.OP_READ);worker.selectNow();} catch (IOException e) {e.printStackTrace();}});worker.wakeup();}@Overridepublic void run() {while (true) {try {worker.select();Runnable task = tasks.poll();if (task != null) {task.run();}Set<SelectionKey> keys = worker.selectedKeys();Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();if (key.isReadable()) {SocketChannel sc = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(128);try {int read = sc.read(buffer);if (read == -1) {key.cancel();sc.close();} else {buffer.flip();log.debug("{} message:", sc.getRemoteAddress());debugAll(buffer);}} catch (IOException e) {e.printStackTrace();key.cancel();sc.close();}}iter.remove();}} catch (IOException e) {e.printStackTrace();}}}}}

💡 如何拿到 cpu 个数

Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启

七、Pipe和FileLock

7.1 Pipe

Java NIO 管道是 2 个线程之间的单向数据连接。Pipe 有一个 source 通道和一个 sink通道。数据会被写到 sink 通道,从 source 通道读取。

/*** 1.open():创建管道* 2.sink():获取数据写通道* 3.source():获取数据读通道*/public class PipeTest {@Testpublic void test() throws Exception{//1.创建管道Pipe pipe = Pipe.open();//2.获取数据写通道,写入数据Pipe.SinkChannel sink = pipe.sink();ByteBuffer buffer1 = ByteBuffer.allocate(20);buffer1.put("你好管道!".getBytes());buffer1.flip();sink.write(buffer1);sink.close();//3.获取数据读通道,读取数据Pipe.SourceChannel source = pipe.source();source.configureBlocking(true);ByteBuffer buffer2 = ByteBuffer.allocate(20);int length;while ((length=source.read(buffer2))!=-1){buffer2.flip();System.out.println(new String(buffer2.array(),0,length));buffer2.clear();}source.close();}}

7.2 FileLock

文件锁是进程级别的,不是线程级别的。是不可重入的。文件锁可以解决多个进程并发访问、修改同一个文件的问题,但不能解决多线程并发访问、修改同一文件的问题。使用文件锁时,同一进程内的多个线程,可以同时访问、修改此文件。释放锁: release()关闭对应的 FileChannel 对象当前 JVM 退出 锁分类 排它锁(独占锁):其他进程不能读写此文件,直到该进程释放文件锁共享锁:其他进程可以读此文件,不能写,线程是安全的。写操作抛出异常。 在某些 OS 上,对某个文件加锁后,不能对此文件使用通道映射。

/*** 获取文件锁*1.channel.lock():阻塞加锁,默认为排它锁。*2.channel.lock(long,long,booean):自定义阻塞加锁。*注意:从哪个位置开始多大的范围不能进行读写,是否为共享锁*3.tryLock():尝试加锁,默认为排它锁。获取失败返回null*4.tryLock(long,long,booean):自定义尝试加锁。*注意:从哪个位置开始多大的范围不能进行读写,是否为共享锁。获取失败返回null* 5.isShared():是否是共享锁* 6.isValid():是否还有效* 7.release():释放文件锁*/public class FileLockTest {@Testpublic void test() throws Exception{FileChannel channel = new RandomAccessFile("hi2.txt", "rw").getChannel();//1.阻塞加锁,默认为排它锁。FileLock lock = channel.lock();//5.是否是共享锁System.out.println("是否为共享锁:"+lock.isShared());//6.是否还有效System.out.println("是否有效:"+lock.isValid());ByteBuffer buffer = ByteBuffer.allocate(20);buffer.put("中国欢迎您!".getBytes(),0,18);buffer.flip();channel.write(buffer);//7.释放文件锁lock.release();channel = FileChannel.open(Paths.get("hi2.txt"), StandardOpenOption.READ,StandardOpenOption.WRITE);//再次加锁成功FileLock lock1 = channel.lock();buffer = ByteBuffer.allocate(20);int length;while ((length=channel.read(buffer))!=-1){buffer.flip();System.out.println(new String(buffer.array(),0,length));buffer.clear();}channel.close();}}

八、字符集(Charset)

使用不同于编码时的编码类型进行解码将出现乱码

/*** Charset静态方法:*1.forName():通过编码类型获得 Charset 对象*2.availableCharsets():获得系统支持的所有编码方式*3.defaultCharset():获得虚拟机默认的编码方式*4.isSupported(String):是否支持该编码类型* Charset普通方法:*5.name():获得编码类型(String)*6.newEncoder():获得编码器对象*7.newDecoder():获得解码器对象*8.encode(CharBuffer):编码*9.decode(ByteBuffer):解码**标准字符编码:StandardCharsets.UTF_8.decode(buffer)**/public class CharsetTest {@Testpublic void test1() throws Exception{//1.通过编码类型获得 Charset 对象Charset charset = Charset.forName("UTF-8");//6.获得编码器对象CharsetEncoder encoder = charset.newEncoder();//7.获得解码器对象CharsetDecoder decoder = charset.newDecoder();//获取数据CharBuffer buffer = CharBuffer.allocate(20);buffer.put("你好中国!");buffer.flip();//编码:将字符串编成字节ByteBuffer encode = encoder.encode(buffer);System.out.println("编码后:");for (int i=0;i<encode.limit();i++) {System.out.println(encode.get());}//解码:将字节解析为字符串encode.flip();CharBuffer charBuffer1=decoder.decode(encode);System.out.println("解码后:");System.out.println(charBuffer1);}}

九、群聊案例

该案例代码并未完善细节,请知悉!

9.1 服务端代码

public class ChatServer {public static void main(String[] args) {try {//启动服务器new ChatServer().startServer();} catch (IOException e) {e.printStackTrace();//向客户端广播服务器宕机}}//启动服务器private void startServer() throws IOException {//1.创建SelectorSelector selector = Selector.open();//2.创建ServerSocketChannelServerSocketChannel serverChannel = ServerSocketChannel.open();serverChannel.bind(new InetSocketAddress(8080));//3.设置ServerSocketChannel非阻塞模式serverChannel.configureBlocking(false);//4.将ServerSocketChannel注册监听事件:有连接接入serverChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("服务器启动成功!");//5.查询selector的选择键for (;;){int select = selector.select();if (select<=0){continue;}//5.1 循环遍历选择键,根据对应事件进行相关处理Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while(iterator.hasNext()){SelectionKey selectionKey = iterator.next();iterator.remove();if (selectionKey.isAcceptable()){//处理连接接入handleAccept(serverChannel,selector);}if (selectionKey.isReadable()){//处理信息接收handleRead(selectionKey,selector);}}}}//处理信息接收private void handleRead(SelectionKey selectionKey, Selector selector) throws IOException {//1.获取连接SocketChannel channel = (SocketChannel) selectionKey.channel();//2.读取信息ByteBuffer buffer = ByteBuffer.allocate(1024);String message="";int length;if ((length=channel.read(buffer))!=-1){buffer.flip();message=new String(buffer.array(),0,length);buffer.clear();}//3.判断客户端是否退出聊天室if ("exit".equalsIgnoreCase(message)){channel.close();}channel.register(selector,SelectionKey.OP_READ);if (message.length()>0 && !"exit".equalsIgnoreCase(message)){System.out.println(message);//4.广播信息radioMessage(message,selector,channel);}}//广播信息private void radioMessage(String message, Selector selector, SocketChannel channel) throws IOException {//1.获取所有已经接入 channelSet<SelectionKey> selectionKeys = selector.keys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()){SelectionKey next = iterator.next();//1.获取连接Channel channel1 = next.channel(); //2.判断并给其他所有人广播信息if(channel1 instanceof SocketChannel && channel1!=channel){//2.1 发送信息((SocketChannel)channel1).write(Charset.forName("UTF-8").encode(message));}}}//处理连接接入private void handleAccept(ServerSocketChannel serverChannel, Selector selector) throws IOException {//1.获取连接SocketChannel accept = serverChannel.accept();accept.configureBlocking(false);//2.注册为可读连接accept.register(selector,SelectionKey.OP_READ);//3.发送欢迎信息accept.write(Charset.forName("UTF-8").encode("欢迎进入聊天室!"));}}

9.2 客户端代码

启动类

public class ChatClient {//启动客户端public static void startClient() throws IOException {BufferedReader br=new BufferedReader(new InputStreamReader(System.in));System.out.println("请输入昵称:");String name=br.readLine();//1.创建连接SocketChannel channel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8080));channel.configureBlocking(false);//2.创建选择器并注册Selector selector = Selector.open();channel.register(selector, SelectionKey.OP_READ);//3.创建线程实现异步获取消息new Thread(new AsynReceive(selector)).start();//4.获取控制台输入并发送String message;ByteBuffer buffer = ByteBuffer.allocate(1024);System.out.println("输入消息(exit 退出):");while ((message=br.readLine())!=null && !"exit".equalsIgnoreCase(message)){buffer.put(name.getBytes());buffer.put(":".getBytes());buffer.put(message.getBytes());buffer.flip();channel.write(buffer);buffer.clear();}if ("exit".equalsIgnoreCase(message)){buffer.put("exit".getBytes());buffer.flip();channel.write(buffer);buffer.clear();}System.out.println("退出聊天室");channel.close();br.close();}}

异步接收信息类

public class AsynReceive implements Runnable{private Selector selector;public AsynReceive(Selector selector) {this.selector = selector;}@Overridepublic void run() {try {for (;;){int select = selector.select();if (select<=0){continue;}//循环遍历选择键,根据对应事件进行相关处理Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while(iterator.hasNext()){SelectionKey selectionKey = iterator.next();if (selectionKey.isReadable()){//处理信息接收handleRead(selectionKey,selector);}iterator.remove();}}} catch (IOException e) {e.printStackTrace();}}//处理信息接收private void handleRead(SelectionKey selectionKey, Selector selector) throws IOException {//1.获取连接SocketChannel channel = (SocketChannel) selectionKey.channel();//2.读取信息ByteBuffer buffer = ByteBuffer.allocate(1024);String message="";int length;if ((length=channel.read(buffer))!=-1){buffer.flip();message=new String(buffer.array(),0,length);buffer.clear();}//3.判断客户端是否退出聊天室if ("exit".equalsIgnoreCase(message)){channel.close();}channel.register(selector,SelectionKey.OP_READ);if (message.length()>0 && !"exit".equalsIgnoreCase(message)){System.out.println(message);}}}

9.3 效果展示

创建客户端A

public class ClientA {public static void main(String[] args) {try {ChatClient.startClient();} catch (IOException e) {e.printStackTrace();}}}

创建客户端B

public class ClinetB {public static void main(String[] args) {try {ChatClient.startClient();} catch (IOException e) {e.printStackTrace();}}}

启动服务器,启动客户端A和客户端B

服务器 客户端A 客户端B

【NIO与Netty】Java NIO入门:多线程多Selector模式 零拷贝 同步阻塞 同步非阻塞 同步多路复用 异步非阻塞的区别

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。