前言
目前互联网环境下,分布式系统十分重要,而分布式的根基在于网络编程。Netty是java领域网络编程的佼佼者。而要学好Netty,首先就必须要会NIO编程。本笔记旨在学习NIO的概念、使用及原理,并附带一些代码实现。
- NIO三大组件
- 文件编程
- 阻塞和非阻塞的NIO网络编程
- 非阻塞多线程解决方案
概念
NIO:non-blocking io 非阻塞IO。NIO有三大组件:Channel,Bugger, Selector。
三大组件
Channel && Buffer
Channel相当与一个stream,它就是读写数据的双向通道,channel可以把数据写入buffer,也可以把数据从buffer读出。
1 2 3
| graph LR; channel-->buffer buffer-->channel
|
常见的channel和buffer
FileChannel
DatagramChannel
SocketChannel
ServerSocketChannel
常见的buffer:
- ByteBuffer(最常用)
- MappedByteBuffer
- DirectByteBuffer
- HeapByteBuffer
- ShortBuffer
- ……
- CharBuffer
Selector
以下是常用的几种多线程实现:
多线程版
1 2 3 4 5 6
| graph TD subgraph 多线程版 thread1-->socket1 thread2-->socket2 thread3-->socket3 end
|
**缺点:**
- 内存占用高
- 线程上下文切换成本高(因为CPU并发线程并没有那么多)
- 只适合连接数较小的情况
线程池
1 2 3 4 5 6 7
| graph TD subgraph 线程池版 thread1-->socket1 thread1-.->socket3 thread2-->socket2 thread2-.->socket4 end
|
**缺点:**
- 阻塞模式下,线程仅能处理一个socket连接
- 仅适用于短连接的场景
Selector
1 2 3 4 5 6 7
| graph TD subgraph Selector版 thread-->selector selector-->channel1 selector-->channel2 selector-->channel3 end
|
Selector的作用就是配合一个线程来管理多个channel,获取这些channel上发生的事件,这些channel工作在非阻塞模下,线程不会吊死在一个channel上。适合连接数多,但是流量低的场景(low traffic)。
调用selector的select()会阻塞,直到channel发生了读写就绪事件,一但这些事件发生,select方法就会赶回这些事件交给thread处理。
ByteBuffer
ByteBuffer可以理解成一个byte数组,然后封装了一些参数和API。
基本参数
- position:指针,通过指针位置来进行读写,因此同一个ByteBuffer读写模式下,position位置会不一样。
- limit:可读或可写的最大索引位置。
- capacity:ByteBuffer中数组的容量。
基本API
分配
ByteBuffer buff = ByteBuffer.allocate(int capacity)
:获取一个ByteBuffer ==> HeapByteBuffer
,使用java堆内存
ByteBuffer buff = ByteBuffer.allocateDirect(int capacity)
- 直接内存,操作系统内存
- 分配效率差
- 操作不当容易内存泄漏
读取
byte b = buff.get()
:获取缓存中一个字节(pos后移)
buff.get(int idx)
:获取指定位置的字节(pos不变)
int writeBytes = channel.write(buff)
:从buff中读数据,通过channel写入其他
写入
buff.put(byte b) || buff.put(byte[] bs)
:存入字节或字节数组(pos变化)
int readBytes = channel.read(buff)
:把channel中的数据写入buff
- 返回写入的字节数,会受到buff容量的限制
- 返回-1表示没有东西可以写了
其他
buff.flip()
:切换读模式(pos变0)
buff.rewind()
:读模式下,pos置0,即从头开始读
buff.clear()
:切换写模式(pos归零)
buff.compact()
:切换写模式(未读内容左移到0,pos变成空白buff的起始位置)
buff.mark()
:标记当前pos的位置
buff.reset()
:pos回到mark的位置,还是读模式
buff.hasRemaining
:buff是否读完
图解
ByteBuffer buff = ByteBuffer.allocate(int capacity)
|
|
|
pos |
|
|
capacity + limit |
a |
b |
c |
|
|
|
buffer.put(new byte[]{'a','b','c','d'})
|
|
|
|
pos |
|
capacity + limit |
a |
b |
c |
d |
|
|
buffer.flip()
buff.get() * 2
buff.mark()
:记录pos到c的位置
|
|
pos |
|
limit |
|
capacity |
a |
b |
c |
d |
|
|
|
|
|
|
pos+limit |
|
capacity |
a |
b |
c |
d |
|
|
|
|
pos |
|
limit |
|
capacity |
a |
b |
c |
d |
|
|
ByteBuffer和字符串互转
字符串转ByteBuffer
1 2 3 4 5 6 7 8 9
| ByteBuffer buff1 = ByteBuffer.allocate(16); buff.put("hello".getBytes());
ByteBuffer buff2 = StandardCharsets.UTF_8.encode("hello");
ByteBuffer buff3 = ByteBuffer.warp("hello".getBytes());
|
ByteBuffer转字符串
1 2
| buff.flip(); StandardCharsets.UTF_8.decode(buff).toString();
|
注意:Charset转字符串是根据pos位置来转的,因此必须要在读模式!
分散读 && 集中写
分散读
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
try(RandAccessFile file = new RandAccessFile(path, "rw")) { FileChannel channel = file.getChannel(); ByteBuffer b1 = ByteBuffer.allocate(3); ByteBuffer b2 = ByteBuffer.allocate(3); ByteBuffer b3 = ByteBuffer.allocate(5); channel.read(new ByteBuffer[]{b1, b2, b3}); b1.flip(); b2.flip(); b3.flip(); }catch(IOException e) { ... }
|
集中写
1 2 3 4 5 6 7 8
| ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello"); ByteBuffer b2 = StandardCharsets.UTF_8.encode("world"); ByteBuffer b3 = StandardCharsets.UTF_8.encode("你好!"); try(FileChannel channel = new RandAccessFile(path, "rw").getChannel()) { channel.write(new ByteBuffer[]{b1, b2, b3}); }catch(IOException e) { ... }
|
ByteBuffer小练习
需求描述(粘包半包)
多条数据,组合成一条消息发送给服务端,通过字符c来进行分割每条数据,服务端可能会接收到多个包(ByteBuffer[]),写一段代码,将每条数据进行打印。
解法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public static List<String> parseMessage(ByteBuffer[] buffers, char c) { List<String> res = new ArrayList<>(); int capacity = 0; for (ByteBuffer byteBuffer : buffers) capacity += byteBuffer.capacity(); ByteBuffer source = ByteBuffer.allocate(capacity); for (ByteBuffer byteBuffer : buffers) { source.put(byteBuffer); source.flip(); for (int j = 0; j < source.limit(); j++) { if ((char) source.get(j) == c) { int length = j - source.position(); ByteBuffer targetData = ByteBuffer.allocate(length); for (int k = 0; k < length; k++) { targetData.put(source.get()); } source.get(); targetData.flip(); res.add(StandardCharsets.UTF_8.decode(targetData).toString()); } } source.compact(); } return res; }
|
文件编程
FileChannel
注意:只能工作在阻塞模式下
获取
不能直接打开FileChannel,必须通过FileInputStream, FileOutputStream
或者RandomAccessFile
来获取,通过getChannel()
方法
FileInputStream获取的Channel只读
FileOutputStream获取的Channel只写
RandomAccessFile,可以控制读写权限
读取
1 2
| int readBytes = channel.read(byteBuffer);
|
写入
正确姿势如下:(虽然FileChannel不存在这个问题,但是SocketChannel并不能一次性写入所有数据,存在写能力的上限 ===> 必须用while来判断buffer里面是否还有数据)
1 2 3 4 5 6
| ByteBuffer buffer = ...; buffer,put(...); buffer.flip(); while (buffer.hasRemaining()) { channel.write(buffer); }
|
关闭
channel必须关闭,不过调用了FileInputStream, FileOutputStream或者RandomAccessFile
的close方法,会间接调用channel的close。推荐使用twr包裹,自动close。
位置
- 获取当前位置:
long pos = channel.potision()
- 设置当前位置:
channel.position(int newPos)
- 如果新位置设置在文件末尾,读取会返回-1
- 写入会进行追加,如果position超过了文件末尾,再写入新的内容和原末尾之间会有空值(00)
大小
使用size()
可以 获得文件的大小
强制写入
操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘。可以调用force(true)
方法将文件内容和原数据(文件的权限等)立刻写入磁盘。
两个FileChannel之间传输数据
transferTo
:返回实际传输的字节数,一次最多只能传输2GB,但效率非常高
1 2 3 4 5 6 7 8 9 10 11 12 13
| try ( FileChannel from = new FileInputstream(path).getChannel(); FileChannel to = new FileOutputStream(path).getChannel(); ) { long size = from.size(); for(long left = size; left > 0;) { left -= from.transferTo((size - left), left, to); } from.transferTo(0, from.size(), to); } catch (IOException e) { e.printStackTrace(); }
|
Path
JDK7引入的Path和Paths
- Path表示文件路径
- Paths是工具类,用来获取Path实例
1 2 3 4
| Path source = Paths.get("cxy.txt"); Path source = Paths.get("d:\\cxy.txt"); Path source = Paths.get("d:/cxy.txt"); Path source = Paths.get("d:\\data", "cxy.txt");
|
- 支持:
./
和../
来表示文件层级
path.normaliaze()
可以把文件路径格式化
1 2 3
| Path source = Paths.get("d:/data/test/../cxy.txt"); print(source); print(source.nomalize())
|
Files
JDK7引入
基础API
- 检查文件是否存在:
Files.exists(Path path)
- 创建一级目录:
Files.createDirectory(Path path)
- 创建多级目录:
Files.createDirectories(Path path)
- 拷贝文件:
Files.copy(Path source, Path target)
- 如果文件已经存在,会报异常
- 如果要覆盖原文件:
Files.copy(source, target, StrandardCopyoption.REPLACE_EXISITING)
- 和
transforTo
性能差不多
- 移动文件:
Files.move(Path source, Path target, StandardCopyOption.ATOMIC_MOVE)
- 删除文件:
Files.delete(Path target)
- 删除目录:
Files.delete(Path target)
- 如果目录内有文件,会抛异常,无法删除
- 需要先删文件再删目录
进阶API
遍历文件夹:Files.walkFileTree(Path start, FileVisitor<? super Path> visitor)
FileVisitor
是一个接口,有4个方法,一般用new SimpleFileVisitor<Path>
来写(可以不用全部实现,重写就行)
preVisitDirectory
:进入文件夹前触发
postVisitDirectory
:离开文件夹前触发
visitFile
:访问文件时触发
visitFileFailed
:访问文件失败时触发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public static void main(String[] args) throws IOException { final AtomicInteger fileCount = new AtomicInteger(); final AtomicInteger directorCount = new AtomicInteger(); Files.walkFileTree(Paths.get("/Users/memoforward/Blog/MemoForward"), new SimpleFileVisitor<Path>(){ @Override public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { directorCount.incrementAndGet(); return super.preVisitDirectory(dir, attrs); }
@Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { fileCount.incrementAndGet(); return super.visitFile(file, attrs); } }); System.out.println("files: " + fileCount.get()); System.out.println("directors: " + directorCount.get()); }
|
需求1:如何删除多级目录?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static void main(String[] args) throws IOException { Files.walkFileTree(Paths.get("~"), new SimpleFileVisitor<Path>(){ @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { Files.delete(file); return super.visitFile(file, attrs); }
@Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { Files.delete(dir); return super.postVisitDirectory(dir, exc); } }); }
|
Walk
返回一个stream流。
1 2 3 4 5 6 7 8
| Files.walk(Paths.get("~")).forEach(path -> { String targetName = path.toString().replace(source, target); if (Files.isDirectory()) { Files.createDirectories(Paths.get(tartetName)); } else if(Files.isRegularFile(path)) { Files.copy(path, Paths.get(targetName)); } catch (Exception e) {...} })
|
网络编程
阻塞式(NIO实现)
单线程处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public static void main(String[] args) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(16); ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8888));
List<SocketChannel> channels = new ArrayList<>(); while (true) { SocketChannel sc = ssc.accept(); channels.add(sc); for (SocketChannel channel : channels) { channel.read(buffer); buffer.flip(); buffer.clear(); } } }
|
问题
以上代码,有两处地方阻塞:accept()和read()。
accept()
:等待客户端连接而导致的阻塞(没有线程连接就阻塞)
read()
:等待客户端写入数据而导致的阻塞(没有数据写入就阻塞)
这就导致了:单线程处理连接,如果同时有两个线程进行连接和写入,不同线程的accpet()
和read()
会相互阻塞,如果一个客户端连接后,一直不写,这个线程就永远无法与其他客户端连接。
结论
阻塞式单线程处理,无法进行处理多个连接的情况。
非阻塞式(NIO Channel实现)
单线程处理:
1 2 3 4
| ssc.configureBlocking(false);
ssc.configureBlocking(false);
|
改进版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public static void main(String[] args) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(16); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8889));
List<SocketChannel> channels = new ArrayList<>(); while (true) { SocketChannel sc = ssc.accept(); if (sc != null) { sc.configureBlocking(false); channels.add(sc); } for (SocketChannel channel : channels) { int read = channel.read(buffer); if(read > 0) { buffer.flip(); buffer.clear(); } } } }
|
问题
- 非阻塞模式可以让线程不阻塞,但很消耗资源,相当于线程一直空转
非阻塞式改进(Selector)
由Selector
监听事件,没事件就阻塞,有事件就分配线程去运行。提高线程使用效率。
Selector
- 创建:
Selector sel = Selector.open()
- channel注册:
Selectionkey selectorKey = channel.register(sel, int ops, Object attention)
sel
:绑定的Selector
ops
:绑定的事件(SelectionKey.XXXX)
- 0:不关注任何事件
- accept:服务端事件,有连接请求时产生的事件
(SelectionKey.OP_ACCEPT)
- connect:客户端事件,连接建立后产生的事件
(SelectionKey.OP_CONNECT)
- read:服务端事件,读事件
(SelectionKey.OP_READ)
- write:客户端时间,写事件
(SelectionKey.OP_WRITE)
- 可以用两个事件的值相加来绑定多个事件
(SelectionKey.OP_ACCEPT + SelectionKey.OP_READ)
SelectorKey
:channel和selector绑定的关系具象,可以通过key获得对应的channel和绑定的事件
attention
:绑定在SelectorKey上的对象,用来做拓展用
- 也可以手动绑定事件:
selectionKey.interestOps(SelectionKey.OP_ACCEPT)
- 开启监听:
sel.select()
—>没事件是阻塞的
SelectedKeys
概念:Selector里的keys集合,用来实时处理事件
如果有事件发生,Selector就会把事件放到SelectedKeys集合里面,我们可以通过key来获取事件,并针对事件做出相应的逻辑处理。
事件处理后,理论上需要把事件移出集合(不移除,每次SelectedKeys都会遍历到这个过期事件),但是Selector仅做了逻辑删除,所以我们最好要手动移除,而在遍历集合中要做到移除元素,需要用迭代器。
简单实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public static void main(String[] args) throws IOException { Selector selector = Selector.open();
ByteBuffer buffer = ByteBuffer.allocate(16); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false);
SelectionKey selectionKey = ssc.register(selector, SelectionKey.OP_ACCEPT, null); ssc.bind(new InetSocketAddress(8787)); while(true) { selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); ServerSocketChannel channel = (ServerSocketChannel)key.channel(); SocketChannel sc = channel.accept(); } } }
|
注意:事件发生,必须要处理,如果不处理,集合内的事件甚至不会被逻辑删除,这就导致select永远不会阻塞,消耗cpu资源。
- 调用
key.cancel()
方法,会把key逻辑删除,导致不监听任何方法(除非重新注册,可以理解成物理删除),如果不处理,或发生异常来不及处理,就一定要cancel
。
问题: 好像也只是单线程处理一个事件,如何根据不同的channel执行不同的事件呢?
- 给accept获得的channel也注册到selector中
- SelectedKey可以识别自身发生的事件类型
key.isAcceptable()
key.isReadable()
- …..
改进版本一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public static void main(String[] args) throws IOException { Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false);
SelectionKey selectionKey = ssc.register(selector, SelectionKey.OP_ACCEPT, null); ssc.bind(new InetSocketAddress(8787)); while(true) { selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel)key.channel(); SocketChannel sc = channel.accept(); sc.register(selector, SelectionKey.OP_READ, null); continue; } if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel sc = (SocketChannel) key.channel(); sc.read(buffer); buffer.flip(); }
} } }
|
问题:这个代码有一个不明显的问题,就是上文提到的remove机制。
- ==前提知识==:selector中会有两个集合,一个是注册的channel集合(包含了channel和监听的事件类型,这个集合就是用来阻塞监听的),另一个是selectedKeys集合。当有事件发生时,selector会从channel集合中把对应的元素拿出来放到selectedKeys集合中,供用户处理,也就是说,这两个集合放的元素都是一样,都是key,而且一个key包含了channel + 事件。
- Set\ publicKeys: 注册进来的key
- Set\ publicSelectedKeys: 事件发生后,加入的key
- 问题原因:事件处理后,selectedKeys里面仅做了逻辑删除(具体源码未知),这个删除不会让select()监测到。
- 问题复现:虽然逻辑删除的事件不会取消select()的阻塞,但是会在遍历集合的时候遍历到这个channel,而且
isAcceptable()
同样也会被触发,因此当其他事件触发时,ssc又会去accept一个channel,但这个channel是null,因此会导致空指针异常。
- 问题解决:将以上代码的注释放开就行,手动remove。
- 复盘:这个更像是NIO的一个bug。
改进版本二
上述代码有两个问题:
- 客户端异常关闭,会导致一个read事件。
- 客户端正常关闭,也会导致一个read事件。
- read事件不会得到处理(因为这个channel没法read了),会导致selectedKeys集合每次循环都会增加这个事件,因为
publicKeys
里面这个事件未被处理,(所以remove了也没用,因为remove是selectedKeys集合里面的内容)。
解决:
- 用catch捕获异常,并且在异常中cancel。
- 如果
channel.read() == -1, 则
cancel`。
1 2 3 4 5 6 7 8 9 10 11 12 13
| if (key.isReadable()) { try { ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel sc = (SocketChannel) key.channel(); int read = sc.read(buffer); if (read == -1) key.cancel(); buffer.flip(); } catch (Exception e) { e.printStackTrace(); key.cancel(); } }
|
复盘:这个也像是NIO的一个bug。
消息边界问题
NIO处理黏包半包问题。
解决思路:
- 约定Buffer容量,缺点是浪费资源
- 定义包间分割符号,利用符号进行包的提取,缺点是拆包效率低
- 约定一个报文头,表明后面的消息长度【[4byte][xxxx]】。TLV模式,Type + Length + Value。根据解析的结果分配Buffer,缺点是buffer需要提前分配,如果内容过大,影响server吞吐。
- HTTP1.1是TLV格式
- HTTP2.0 是LTV格式
第二种方法的实现就是前面的小练习的增强版。
实现(使用扩容):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| if (key.isAcceptable()) { sc.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(4)); } if (key.isReadable()) { try { ByteBuffer buffer = (ByteBuffer)key.attachment(); SocketChannel sc = (SocketChannel) key.channel(); int read = sc.read(buffer); if (read == -1) key.cancel(); else { split(buffer, '\n'); if (buffer.limit() == buffer.position()){ ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); buffer.flip(); newBuffer.put(buffer); key.attach(newBuffer); } }catch(Exception e) {...} } }
public static void split(ByteBuffer source, char c) { source.flip(); for (int j = 0; j < source.limit(); j++) { if ((char) source.get(j) == c) { int length = j - source.position(); ByteBuffer targetData = ByteBuffer.allocate(length); for (int k = 0; k < length; k++) { targetData.put(source.get()); } source.get(); targetData.flip(); } } source.compact(); }
|
处理写事件
写事件的监听不太好理解,这里单独用一个样例表示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public class WriterServer { public static void main(String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8080)); while(true) { selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false); StringBuffer sb = new StringBuffer(); for (int i = 0; i< 3000000; i++) sb.append("a"); ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString()); while (buffer.hasRemaining()) { int write = sc.write(buffer); System.out.println(write); } } } }
} }
Console log: 261676 998124 1521840 0 0 0 0 0 73728 144632
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class WriteClient { public static void main(String[] args) throws IOException { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress("localhost", 8080)); int count = 0; while(true) { ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); count += sc.read(buffer); System.out.println(count); buffer.clear(); } } }
Console log: 130656 1161444 2200476 3000000
|
我们会发现,在非阻塞模式下, 一次写不完的数据会分多次写入,但是,在while循环里,如果buffer里的数据没有被完全传递(buffer满着),就会write一个0,这不合理,我们想把写0的时候,释放线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public class WriterServer { public static void main(String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8080)); while (true) { selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false); SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ); StringBuffer sb = new StringBuffer(); for (int i = 0; i < 3000000; i++) sb.append("a"); ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString()); if (buffer.hasRemaining()) { scKey.interestOps(scKey.interestOps() | SelectionKey.OP_WRITE); scKey.attach(buffer); } continue; } if (key.isWritable()) { ByteBuffer buffer = (ByteBuffer) key.attachment(); SocketChannel sc = (SocketChannel) key.channel(); int write = sc.write(buffer); System.out.println(write); if (!buffer.hasRemaining()) { key.attach(null); key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); } } } }
} }
|
小结
多线程优化
一个线程处理多个channel,没法释放cpu所有的核心,也没有办法进行业务区分。
解决方法:分两组选择器。
- 单线程配一个选择器(boss),专门处理accpet事件
- 创建cpu核数的selector,轮流处理read
单Worker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| public class MultiThreadServer {
public static void main(String[] args) throws IOException { Thread.currentThread().setName("boss"); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); Selector boss = Selector.open(); ssc.register(boss, SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8080)); Worker worker = new Worker("worker-0"); while(true) { boss.select(); Iterator<SelectionKey> iterator = boss.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); ServerSocketChannel channel = (ServerSocketChannel)key.channel(); SocketChannel sc = channel.accept(); sc.configureBlocking(false); worker.register(sc); } } }
static class Worker implements Runnable{ private Thread thread; private Selector selector; private String name; private volatile boolean start = false; private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Worker(String name) { this.name = name; }
public void register(SocketChannel sc) throws IOException { if (!start) { this.thread = new Thread(this, name); this.selector = Selector.open(); thread.start(); start = true; } queue.add(() -> { try { sc.register(selector, SelectionKey.OP_WRITE); } catch (ClosedChannelException e) { e.printStackTrace(); } }); selector.wakeup(); }
@Override public void run() { while (true) { try { selector.select(); Runnable task = queue.poll(); if (task != null) task.run(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); } } } catch (IOException e){ e.printStackTrace(); } } } } }
|
多worker
轮询
1 2 3 4 5 6 7 8 9 10 11 12
|
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()]; for (int i = 0; i < workers.length; i++) { workers[i] = new Worker("worker-" + i); } AtomicInteger workIdx = new AtomicInteger();
workers[workIdx.getAndIncrement() % workers.length].register(sc);
|
概念解析
NIO vs BIO
stream和channel
- stream不会自动缓冲数据,channel会利用系统提供的发送缓冲区,接受缓冲区(更为底层)
- stream仅支持阻塞API,channel同时支持阻塞和非阻塞,网络channel可以通过Selector实现多路复用
- 二者都是全双工的,读写可以同时
IO模型
同步阻塞,同步非阻塞,多路复用,异步阻塞(不存在),异步非阻塞。建议参考书籍《UNIX网络编程》。
当调用一次channel.read或stream.read中,会切换值操作系统内核态来完成真正的数据读取,而读取又分为两个阶段:
然后在切换回用户态。
- 阻塞IO:用户线程会等待操作系统操作,一直阻塞
- 非阻塞IO:用户线程在等待数据时不阻塞,等复制数据时才会阻塞
- 多路复用:用一个线程在等待数据时阻塞,等复制数据时来处理,乍一看和阻塞IO一样,但是这个线程可以处理多个channel,这些channel都在复制数据时才会处理,理论上不是纯串行的,更加高效
同步和异步
- 同步:线程自己去获取结果(一个线程)
- 异步:线程自己不去获取结果,而是由其他线程送结果(至少两个线程)
上述讲的三种IO模型分别是:同步阻塞,同步非阻塞,(同步)多路复用。
上面的例子worker就是异步的,但是一般异步需要指定回调方法来处理异步结果。
异步不可能阻塞,一定是非阻塞的。
异步模式(AIO)
AIO用来解决数据复制阶段的阻塞问题。
异步模型需要操作系统提供支持。这里不举例啦。有空上网找找。
零拷贝
传统拷贝
实现一个文件读,像socket写,内部流程如下:
java本身不具备IO读写的能力,因此read调用后,会从用户态切换成内核态,去调用操作系统(kernel)的读能力,将数据读入内核缓冲区。这个期间,用户的线程是阻塞的,操作系统使用DMA(Direct Memory Access)进行文件读,期间不会使用cpu。
DMA理解成硬件单元,用来解放cpu完成IO
从内核态切换成用户态,将数据从内核缓冲区读入用户缓冲区(即byte[]),这期间cpu会参与拷贝,无法利用DMA
- 调用write方法,这时候将数据从缓冲区(byte[])写入socket缓冲区,cpu会参与拷贝
- 将socket缓冲区写入网卡,这意味着又要从用户态切换为内核态,使用DMA,cpu不参与。
可以看到中间环节比较多,java的IO实际不是物理设备级别的读写,而是缓存的复制,底层的读写是操作系统完成的。
- 用户态和内核态切换了3次,很重量级
- 参与了4次数据拷贝
NIO优化
使用DirectByteBuffer
,将用户缓冲区和内核缓冲区合二为一。
可以看到,避免了一次数据拷贝。
零拷贝优化
通过tansferTo
,进行零拷贝,使用操作系统内存。
可以看到,不通过内核缓冲区了,这就避免了两次次内核切换和两次数据复制。
- 整个复制过程中,只有一次用户态和内核态的切换
- 参与了2次数据拷贝
- 适合小文件的传输