0%

NIO基础

前言

目前互联网环境下,分布式系统十分重要,而分布式的根基在于网络编程。Netty是java领域网络编程的佼佼者。而要学好Netty,首先就必须要会NIO编程。本笔记旨在学习NIO的概念、使用及原理,并附带一些代码实现。

  1. NIO三大组件
  2. 文件编程
  3. 阻塞和非阻塞的NIO网络编程
  4. 非阻塞多线程解决方案

概念

NIO:non-blocking io 非阻塞IO。NIO有三大组件:Channel,Bugger, Selector

三大组件

Channel && Buffer

Channel相当与一个stream,它就是读写数据的双向通道,channel可以把数据写入buffer,也可以把数据从buffer读出。

1
2
3
graph LR;
channel-->buffer
buffer-->channel

常见的channel和buffer

  • FileChannel

  • DatagramChannel

  • SocketChannel

  • ServerSocketChannel

常见的buffer:

  • ByteBuffer(最常用)
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • ……
  • CharBuffer

Selector

以下是常用的几种多线程实现:

多线程版

1
2
3
4
5
6
graph TD
subgraph 多线程版
thread1-->socket1
thread2-->socket2
thread3-->socket3
end
**缺点:**
  • 内存占用高
  • 线程上下文切换成本高(因为CPU并发线程并没有那么多)
  • 只适合连接数较小的情况

线程池

1
2
3
4
5
6
7
graph TD
subgraph 线程池版
thread1-->socket1
thread1-.->socket3
thread2-->socket2
thread2-.->socket4
end
**缺点:**
  • 阻塞模式下,线程仅能处理一个socket连接
  • 仅适用于短连接的场景

Selector

1
2
3
4
5
6
7
graph TD
subgraph Selector版
thread-->selector
selector-->channel1
selector-->channel2
selector-->channel3
end

Selector的作用就是配合一个线程来管理多个channel,获取这些channel上发生的事件,这些channel工作在非阻塞模下,线程不会吊死在一个channel上。适合连接数多,但是流量低的场景(low traffic)。

调用selector的select()会阻塞,直到channel发生了读写就绪事件,一但这些事件发生,select方法就会赶回这些事件交给thread处理。

ByteBuffer

ByteBuffer可以理解成一个byte数组,然后封装了一些参数和API。

基本参数

  • position:指针,通过指针位置来进行读写,因此同一个ByteBuffer读写模式下,position位置会不一样。
  • limit:可读或可写的最大索引位置。
  • capacity:ByteBuffer中数组的容量。

基本API

分配

  • ByteBuffer buff = ByteBuffer.allocate(int capacity):获取一个ByteBuffer ==> HeapByteBuffer,使用java堆内存
    • 读写效率较低
    • 受垃圾回收影响,会改变内存位置
  • ByteBuffer buff = ByteBuffer.allocateDirect(int capacity)
    • 直接内存,操作系统内存
    • 分配效率差
    • 操作不当容易内存泄漏

读取

  • byte b = buff.get():获取缓存中一个字节(pos后移)
  • buff.get(int idx):获取指定位置的字节(pos不变)
  • int writeBytes = channel.write(buff):从buff中读数据,通过channel写入其他
    • 返回读入的字节数(channel一次可能读不完)

写入

  • buff.put(byte b) || buff.put(byte[] bs):存入字节或字节数组(pos变化)
  • int readBytes = channel.read(buff):把channel中的数据写入buff
    • 返回写入的字节数,会受到buff容量的限制
    • 返回-1表示没有东西可以写了

其他

  • buff.flip():切换读模式(pos变0)
  • buff.rewind():读模式下,pos置0,即从头开始读
  • buff.clear():切换写模式(pos归零)
  • buff.compact():切换写模式(未读内容左移到0,pos变成空白buff的起始位置)
  • buff.mark():标记当前pos的位置
  • buff.reset():pos回到mark的位置,还是读模式
  • buff.hasRemaining:buff是否读完

图解

  • ByteBuffer buff = ByteBuffer.allocate(int capacity)
pos capacity + limit
  • buff.put(...)
pos capacity + limit
a b c
  • buff.flip()
pos limit capacity
a b c
  • buffer.get() * 2:pos改变
pos limit capacity
a b c
  • buffer.get(2):pos不变

  • buffer.compact():已读部分删除,留下未读

pos capacity+limit
c
  • buffer.clear():没读,但也清零
pos capacity + limit
  • buffer.put(new byte[]{'a','b','c','d'})
pos capacity + limit
a b c d
  • buffer.flip()
  • buff.get() * 2
  • buff.mark():记录pos到c的位置
pos limit capacity
a b c d
  • buff.get() * 2
pos+limit capacity
a b c d
  • buff.reset():重置到c,还是读模式
pos limit capacity
a b c d

ByteBuffer和字符串互转

字符串转ByteBuffer

1
2
3
4
5
6
7
8
9
// 1. put() ==> 写模式
ByteBuffer buff1 = ByteBuffer.allocate(16);
buff.put("hello".getBytes());

// 2. Charset ==> 读模式
ByteBuffer buff2 = StandardCharsets.UTF_8.encode("hello");

// 3. warp === Charset方法 ==> 读模式
ByteBuffer buff3 = ByteBuffer.warp("hello".getBytes());

ByteBuffer转字符串

1
2
buff.flip();
StandardCharsets.UTF_8.decode(buff).toString();

注意:Charset转字符串是根据pos位置来转的,因此必须要在读模式

分散读 && 集中写

分散读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* data.txt
* onetwothree
*/
try(RandAccessFile file = new RandAccessFile(path, "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer b1 = ByteBuffer.allocate(3);
ByteBuffer b2 = ByteBuffer.allocate(3);
ByteBuffer b3 = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{b1, b2, b3});
b1.flip(); // one
b2.flip(); // two
b3.flip(); // three
}catch(IOException e) {
...
}

集中写

1
2
3
4
5
6
7
8
ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello");
ByteBuffer b2 = StandardCharsets.UTF_8.encode("world");
ByteBuffer b3 = StandardCharsets.UTF_8.encode("你好!");
try(FileChannel channel = new RandAccessFile(path, "rw").getChannel()) {
channel.write(new ByteBuffer[]{b1, b2, b3});
}catch(IOException e) {
...
}

ByteBuffer小练习

需求描述(粘包半包)

多条数据,组合成一条消息发送给服务端,通过字符c来进行分割每条数据,服务端可能会接收到多个包(ByteBuffer[]),写一段代码,将每条数据进行打印。

解法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static List<String> parseMessage(ByteBuffer[] buffers, char c) {
List<String> res = new ArrayList<>();
int capacity = 0;
for (ByteBuffer byteBuffer : buffers) capacity += byteBuffer.capacity();
ByteBuffer source = ByteBuffer.allocate(capacity); // 用一个source来接受所有的包
for (ByteBuffer byteBuffer : buffers) {
source.put(byteBuffer);
source.flip();
for (int j = 0; j < source.limit(); j++) {
if ((char) source.get(j) == c) {
int length = j - source.position();
ByteBuffer targetData = ByteBuffer.allocate(length);
for (int k = 0; k < length; k++) {
targetData.put(source.get()); // source读,并pos移动位置
}
source.get(); // pos跳过分割字符
targetData.flip(); // 必须切换成读模式
res.add(StandardCharsets.UTF_8.decode(targetData).toString());
}
}
source.compact();
}
return res;
}

文件编程

FileChannel

注意:只能工作在阻塞模式下

获取

不能直接打开FileChannel,必须通过FileInputStream, FileOutputStream或者RandomAccessFile来获取,通过getChannel()方法

FileInputStream获取的Channel只读

FileOutputStream获取的Channel只写

RandomAccessFile,可以控制读写权限

读取

1
2
// 返回读到的字节数,如果读到-1,则表示没有内容可读了
int readBytes = channel.read(byteBuffer);

写入

正确姿势如下:(虽然FileChannel不存在这个问题,但是SocketChannel并不能一次性写入所有数据,存在写能力的上限 ===> 必须用while来判断buffer里面是否还有数据)

1
2
3
4
5
6
ByteBuffer buffer = ...;
buffer,put(...); // 存输入
buffer.flip();
while (buffer.hasRemaining()) {
channel.write(buffer);
}

关闭

channel必须关闭,不过调用了FileInputStream, FileOutputStream或者RandomAccessFile的close方法,会间接调用channel的close。推荐使用twr包裹,自动close。

位置

  • 获取当前位置:long pos = channel.potision()
  • 设置当前位置:channel.position(int newPos)
    • 如果新位置设置在文件末尾,读取会返回-1
    • 写入会进行追加,如果position超过了文件末尾,再写入新的内容和原末尾之间会有空值(00)

大小

使用size()可以 获得文件的大小

强制写入

操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘。可以调用force(true)方法将文件内容和原数据(文件的权限等)立刻写入磁盘。

两个FileChannel之间传输数据

  • transferTo:返回实际传输的字节数,一次最多只能传输2GB,但效率非常高
1
2
3
4
5
6
7
8
9
10
11
12
13
try (
FileChannel from = new FileInputstream(path).getChannel();
FileChannel to = new FileOutputStream(path).getChannel();
) {
// 效率高,底层使用了操作系统的零拷贝进行了优化
long size = from.size();
for(long left = size; left > 0;) {
left -= from.transferTo((size - left), left, to);
}
from.transferTo(0, from.size(), to);
} catch (IOException e) {
e.printStackTrace();
}

Path

JDK7引入的Path和Paths

  • Path表示文件路径
  • Paths是工具类,用来获取Path实例
1
2
3
4
Path source = Paths.get("cxy.txt");
Path source = Paths.get("d:\\cxy.txt");
Path source = Paths.get("d:/cxy.txt");
Path source = Paths.get("d:\\data", "cxy.txt");
  • 支持:./../来表示文件层级
  • path.normaliaze()可以把文件路径格式化
1
2
3
Path source = Paths.get("d:/data/test/../cxy.txt");
print(source); // d:/data/test/../cxy.txt
print(source.nomalize()) // d:/data/cxy.txt

Files

JDK7引入

基础API

  • 检查文件是否存在:Files.exists(Path path)
  • 创建一级目录:Files.createDirectory(Path path)
    • 如果目录已经存在,会抛异常
    • 不能创建多级
  • 创建多级目录:Files.createDirectories(Path path)
  • 拷贝文件Files.copy(Path source, Path target)
    • 如果文件已经存在,会报异常
    • 如果要覆盖原文件: Files.copy(source, target, StrandardCopyoption.REPLACE_EXISITING)
    • transforTo性能差不多
  • 移动文件:Files.move(Path source, Path target, StandardCopyOption.ATOMIC_MOVE)
    • 保证原子性
  • 删除文件:Files.delete(Path target)
    • 文件不存在,会抛异常
  • 删除目录:Files.delete(Path target)
    • 如果目录内有文件,会抛异常,无法删除
    • 需要先删文件再删目录

进阶API

遍历文件夹:Files.walkFileTree(Path start, FileVisitor<? super Path> visitor)

  • FileVisitor是一个接口,有4个方法,一般用new SimpleFileVisitor<Path>来写(可以不用全部实现,重写就行)
    • preVisitDirectory:进入文件夹前触发
    • postVisitDirectory:离开文件夹前触发
    • visitFile:访问文件时触发
    • visitFileFailed:访问文件失败时触发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 简单用法
public static void main(String[] args) throws IOException {
final AtomicInteger fileCount = new AtomicInteger();
final AtomicInteger directorCount = new AtomicInteger();
Files.walkFileTree(Paths.get("/Users/memoforward/Blog/MemoForward"), new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
directorCount.incrementAndGet();
return super.preVisitDirectory(dir, attrs);
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
fileCount.incrementAndGet();
return super.visitFile(file, attrs);
}
});
System.out.println("files: " + fileCount.get());
System.out.println("directors: " + directorCount.get());
}

需求1:如何删除多级目录?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws IOException {
Files.walkFileTree(Paths.get("~"), new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return super.visitFile(file, attrs);
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return super.postVisitDirectory(dir, exc);
}
});
}

Walk

返回一个stream流。

1
2
3
4
5
6
7
8
Files.walk(Paths.get("~")).forEach(path -> {
String targetName = path.toString().replace(source, target);
if (Files.isDirectory()) {
Files.createDirectories(Paths.get(tartetName));
} else if(Files.isRegularFile(path)) {
Files.copy(path, Paths.get(targetName));
} catch (Exception e) {...}
})

网络编程

阻塞式(NIO实现)

单线程处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) throws IOException {
//0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();

// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8888));

List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 3. accept建立与客户端的连接,SocketChannel用来与客户端之间通信
SocketChannel sc = ssc.accept();
channels.add(sc);
for (SocketChannel channel : channels) {
channel.read(buffer);
buffer.flip();
// 4. 逻辑处理
// ....
buffer.clear();
}
}
}

问题

以上代码,有两处地方阻塞accept()和read()。

  • accept():等待客户端连接而导致的阻塞(没有线程连接就阻塞)
  • read():等待客户端写入数据而导致的阻塞(没有数据写入就阻塞)

这就导致了:单线程处理连接,如果同时有两个线程进行连接和写入,不同线程的accpet()read()会相互阻塞,如果一个客户端连接后,一直不写,这个线程就永远无法与其他客户端连接。

结论

阻塞式单线程处理,无法进行处理多个连接的情况。

非阻塞式(NIO Channel实现)

单线程处理:

1
2
3
4
// 将服务端的channel设置成非阻塞模式
ssc.configureBlocking(false);
// 将与客户端通信的SocketChannel也设置成非阻塞
ssc.configureBlocking(false);

改进版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) throws IOException {
//0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式

// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8889));

List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 3. accept建立与客户端的连接,SocketChannel用来与客户端之间通信
SocketChannel sc = ssc.accept();
if (sc != null) {
sc.configureBlocking(false);
channels.add(sc);
}
for (SocketChannel channel : channels) {
int read = channel.read(buffer); // 没有读到数据,返回0
if(read > 0) {
buffer.flip();
// 4. 逻辑处理
// ....
buffer.clear();
}
}
}
}

问题

  • 非阻塞模式可以让线程不阻塞,但很消耗资源,相当于线程一直空转

非阻塞式改进(Selector)

Selector监听事件,没事件就阻塞,有事件就分配线程去运行。提高线程使用效率。

Selector

  • 创建:Selector sel = Selector.open()
  • channel注册:Selectionkey selectorKey = channel.register(sel, int ops, Object attention)
    • sel:绑定的Selector
    • ops:绑定的事件(SelectionKey.XXXX)
      • 0:不关注任何事件
      • accept:服务端事件,有连接请求时产生的事件(SelectionKey.OP_ACCEPT)
      • connect:客户端事件,连接建立后产生的事件(SelectionKey.OP_CONNECT)
      • read:服务端事件,读事件(SelectionKey.OP_READ)
      • write:客户端时间,写事件(SelectionKey.OP_WRITE)
      • 可以用两个事件的值相加来绑定多个事件(SelectionKey.OP_ACCEPT + SelectionKey.OP_READ)
    • SelectorKey:channel和selector绑定的关系具象,可以通过key获得对应的channel和绑定的事件
    • attention:绑定在SelectorKey上的对象,用来做拓展用
  • 也可以手动绑定事件:selectionKey.interestOps(SelectionKey.OP_ACCEPT)
  • 开启监听:sel.select() —>没事件是阻塞的

SelectedKeys

概念:Selector里的keys集合,用来实时处理事件

如果有事件发生,Selector就会把事件放到SelectedKeys集合里面,我们可以通过key来获取事件,并针对事件做出相应的逻辑处理。

事件处理后,理论上需要把事件移出集合(不移除,每次SelectedKeys都会遍历到这个过期事件),但是Selector仅做了逻辑删除,所以我们最好要手动移除,而在遍历集合中要做到移除元素,需要用迭代器。

简单实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) throws IOException {
// 创建Selector,管理多个channel
Selector selector = Selector.open();

ByteBuffer buffer = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);

// 2. 将channel注册到selector
// SelectionKey是事件发生后,可以知道对应channel发生的具体事件
SelectionKey selectionKey = ssc.register(selector, SelectionKey.OP_ACCEPT, null);
// selectionKey.interestOps(SelectionKey.OP_READ);
ssc.bind(new InetSocketAddress(8787));
while(true) {
// 3. select方法,没有事件发生,线程阻塞,有事件,线程才会恢复运行
selector.select();
// 4. 处理事件, keys里面包含了所有的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
SocketChannel sc = channel.accept();
}
}
}

注意:事件发生,必须要处理,如果不处理,集合内的事件甚至不会被逻辑删除,这就导致select永远不会阻塞,消耗cpu资源。

  • 调用key.cancel()方法,会把key逻辑删除,导致不监听任何方法(除非重新注册,可以理解成物理删除),如果不处理,或发生异常来不及处理,就一定要cancel

问题: 好像也只是单线程处理一个事件,如何根据不同的channel执行不同的事件呢?

  • 给accept获得的channel也注册到selector中
  • SelectedKey可以识别自身发生的事件类型
    • key.isAcceptable()
    • key.isReadable()
    • …..

改进版本一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static void main(String[] args) throws IOException {
// 创建Selector,管理多个channel
Selector selector = Selector.open();

ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);

// 2. 将channel注册到selector
// SelectionKey是事件发生后,可以知道对应channel发生的具体事件
SelectionKey selectionKey = ssc.register(selector, SelectionKey.OP_ACCEPT, null);
// selectionKey.interestOps(SelectionKey.OP_READ);
ssc.bind(new InetSocketAddress(8787));
while(true) {
// 3. select方法,没有事件发生,线程阻塞,有事件,线程才会恢复运行
selector.select();
// 4. 处理事件, keys里面包含了所有的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// iterator.remove(); 从集合中移除key
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
SocketChannel sc = channel.accept();
sc.register(selector, SelectionKey.OP_READ, null);

continue;
}
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel sc = (SocketChannel) key.channel();
sc.read(buffer);
buffer.flip();
// ...业务逻辑
}

}
}
}

问题:这个代码有一个不明显的问题,就是上文提到的remove机制。

  • ==前提知识==:selector中会有两个集合,一个是注册的channel集合(包含了channel和监听的事件类型,这个集合就是用来阻塞监听的),另一个是selectedKeys集合。当有事件发生时,selector会从channel集合中把对应的元素拿出来放到selectedKeys集合中,供用户处理,也就是说,这两个集合放的元素都是一样,都是key,而且一个key包含了channel + 事件。
    • Set\ publicKeys: 注册进来的key
    • Set\ publicSelectedKeys: 事件发生后,加入的key
  • 问题原因:事件处理后,selectedKeys里面仅做了逻辑删除(具体源码未知),这个删除不会让select()监测到。
  • 问题复现:虽然逻辑删除的事件不会取消select()的阻塞,但是会在遍历集合的时候遍历到这个channel,而且isAcceptable()同样也会被触发,因此当其他事件触发时,ssc又会去accept一个channel,但这个channel是null,因此会导致空指针异常。
  • 问题解决:将以上代码的注释放开就行,手动remove。
  • 复盘:这个更像是NIO的一个bug。

改进版本二

上述代码有两个问题:

  • 客户端异常关闭,会导致一个read事件。
  • 客户端正常关闭,也会导致一个read事件。
  • read事件不会得到处理(因为这个channel没法read了),会导致selectedKeys集合每次循环都会增加这个事件,因为publicKeys里面这个事件未被处理,(所以remove了也没用,因为remove是selectedKeys集合里面的内容)。

解决:

  • 用catch捕获异常,并且在异常中cancel。
  • 如果channel.read() == -1, 则cancel`。
1
2
3
4
5
6
7
8
9
10
11
12
13
if (key.isReadable()) {
try {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel sc = (SocketChannel) key.channel();
int read = sc.read(buffer);
if (read == -1) key.cancel();
buffer.flip();
// ...业务逻辑
} catch (Exception e) {
e.printStackTrace();
key.cancel();
}
}

复盘:这个也像是NIO的一个bug。

消息边界问题

NIO处理黏包半包问题。

解决思路:

  1. 约定Buffer容量,缺点是浪费资源
  2. 定义包间分割符号,利用符号进行包的提取,缺点是拆包效率低
  3. 约定一个报文头,表明后面的消息长度【[4byte][xxxx]】。TLV模式,Type + Length + Value。根据解析的结果分配Buffer,缺点是buffer需要提前分配,如果内容过大,影响server吞吐。
    • HTTP1.1是TLV格式
    • HTTP2.0 是LTV格式

第二种方法的实现就是前面的小练习的增强版。

  • 得用channel绑定的自己的ByteBuffer(不能共用ByteBuffer,不然会有线程安全问题)
  • ByteBuffer不够,需要有措施

    • 扩容。缺点是扩容时拷贝会浪费性能,优点是消息是连续处理的。
    • 用数组绑定channel,不够就加新建一个ByteBuffer加进数组。缺点是消息存储不连续,解析 复杂。

    这里引入了channel注册的另一个参数:attachment,表示和channel绑定的一个对象,可以通过key获取。

实现(使用扩容):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
if (key.isAcceptable()) {
// ...
sc.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(4));
//...
}
if (key.isReadable()) {
try {
ByteBuffer buffer = (ByteBuffer)key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int read = sc.read(buffer);
if (read == -1) key.cancel();
else {
split(buffer, '\n');
// buffer满
if (buffer.limit() == buffer.position()){
// 扩容
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer); // 注意此时newBuffer的pos在正确的位置
key.attach(newBuffer);
}
}catch(Exception e) {...}
}
}


public static void split(ByteBuffer source, char c) {
source.flip();
for (int j = 0; j < source.limit(); j++) {
if ((char) source.get(j) == c) {
int length = j - source.position();
ByteBuffer targetData = ByteBuffer.allocate(length);
for (int k = 0; k < length; k++) {
targetData.put(source.get()); // source读,并pos移动位置
}
source.get(); // pos跳过分割字符
targetData.flip(); // 必须切换成读模式
// 对target逻辑处理
}
}
source.compact();
}

处理写事件

写事件的监听不太好理解,这里单独用一个样例表示。

  • 服务端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class WriterServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while(true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false); // 注意阻塞模式下,write会一次性写完所有数据
// 给客户端传递大量数据
StringBuffer sb = new StringBuffer();
for (int i = 0; i< 3000000; i++) sb.append("a");
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
while (buffer.hasRemaining()) {
int write = sc.write(buffer); // 返回每次写多少
System.out.println(write);
}
}
}
}

}
}

Console log:
261676
998124
1521840
0
0
0
0
0
73728
144632
  • 客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class WriteClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
// 接受数据
int count = 0;
while(true) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
System.out.println(count);
buffer.clear();
}
}
}

Console log:
130656
1161444
2200476
3000000
  • 问题

我们会发现,在非阻塞模式下, 一次写不完的数据会分多次写入,但是,在while循环里,如果buffer里的数据没有被完全传递(buffer满着),就会write一个0,这不合理,我们想把写0的时候,释放线程。

  • 改进服务端(触发写事件,buffer空了再去写)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class WriterServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false); // 注意阻塞模式下,write会一次性写完所有数据
SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
// 给客户端传递大量数据
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 3000000; i++) sb.append("a");
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
if (buffer.hasRemaining()) {
scKey.interestOps(scKey.interestOps() | SelectionKey.OP_WRITE); // 添加事件
scKey.attach(buffer); // 将buffer挂上
}
continue;
}
if (key.isWritable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println(write);
if (!buffer.hasRemaining()) {
key.attach(null); // 清理
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); // 取消关注
}
}
}
}

}
}

小结

  • 阻塞模式下,acceptread会阻塞线程,线程阻塞不会占用cpu资源,是闲置的
  • 非阻塞模式下,使用Selector实现多路复用,一个线程处理多个事件
    • selector无法绑定FileChannel
  • select()方法是一直阻塞的,返回可触发事件的次数

    • int count = select(long time): 设置超时
    • int count = selectNow(): 不阻塞,返回可触发事件的次数
  • select何时不阻塞?

    • 事件发生
      • 客户端发起连接请求,触发accept事件
      • 客户端发送数据、客户端异常关闭、客户端正常关闭,都会触发read事件,如果发送数据大于buffer缓冲区,会触发多次read
      • channel可写,会触发写事件(基本都可写,所以要监听的时候打开,写完了就移除)
      • linux下nio bug
    • 调用selector.wakeup()
    • 调用selector.close()
    • selector所在线程interrupt

多线程优化

一个线程处理多个channel,没法释放cpu所有的核心,也没有办法进行业务区分。

解决方法:分两组选择器。

  • 单线程配一个选择器(boss),专门处理accpet事件
  • 创建cpu核数的selector,轮流处理read

单Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class MultiThreadServer {

public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
ssc.register(boss, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
Worker worker = new Worker("worker-0");
while(true) {
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
// 关联selector
worker.register(sc);
}
}
}

static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false;
// 保证线程执行顺序,非常巧妙
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

public Worker(String name) {
this.name = name;
}

// 初始化任线程和selector
public void register(SocketChannel sc) throws IOException {
if (!start) {
this.thread = new Thread(this, name);
this.selector = Selector.open();
thread.start();
start = true;
}
// 注册需要selector是非阻塞状态
queue.add(() -> {
try {
sc.register(selector, SelectionKey.OP_WRITE);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup(); // 不阻塞时也可以wakeup,下次循环时触发这次的wakeup
}

@Override
public void run() {
while (true) {
try {
selector.select();
Runnable task = queue.poll();
if (task != null) task.run(); // 让注册channel放入worker线程一起执行,一定等待阻塞,所以要手动唤醒
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
// ....逻辑处理
}
}
} catch (IOException e){
e.printStackTrace();
}
}
}
}
}

多worker

轮询

1
2
3
4
5
6
7
8
9
10
11
12
/* 申领Workers */
// 获取cpu核心线程数
// 如果程序运行在docker中,会获取到物理机上核心数量,jdk10才修复
// 所以尽量手动计算,参考阿姆达定律来设置,一般要大于CPU核心数
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}
AtomicInteger workIdx = new AtomicInteger();

/* 注册 */
workers[workIdx.getAndIncrement() % workers.length].register(sc);

概念解析

NIO vs BIO

stream和channel

  • stream不会自动缓冲数据,channel会利用系统提供的发送缓冲区,接受缓冲区(更为底层)
  • stream仅支持阻塞API,channel同时支持阻塞和非阻塞,网络channel可以通过Selector实现多路复用
  • 二者都是全双工的,读写可以同时

IO模型

同步阻塞,同步非阻塞,多路复用,异步阻塞(不存在),异步非阻塞。建议参考书籍《UNIX网络编程》。

当调用一次channel.read或stream.read中,会切换值操作系统内核态来完成真正的数据读取,而读取又分为两个阶段:

  • 等待数据
  • 复制数据(从网卡读,复制到内存)

然后在切换回用户态。

  • 阻塞IO:用户线程会等待操作系统操作,一直阻塞
  • 非阻塞IO:用户线程在等待数据时不阻塞,等复制数据时才会阻塞
  • 多路复用:用一个线程在等待数据时阻塞,等复制数据时来处理,乍一看和阻塞IO一样,但是这个线程可以处理多个channel,这些channel都在复制数据时才会处理,理论上不是纯串行的,更加高效

同步和异步

  • 同步:线程自己去获取结果(一个线程)
  • 异步:线程自己不去获取结果,而是由其他线程送结果(至少两个线程)

上述讲的三种IO模型分别是:同步阻塞,同步非阻塞,(同步)多路复用。

上面的例子worker就是异步的,但是一般异步需要指定回调方法来处理异步结果。

异步不可能阻塞,一定是非阻塞的。

异步模式(AIO)

AIO用来解决数据复制阶段的阻塞问题。

异步模型需要操作系统提供支持。这里不举例啦。有空上网找找。

零拷贝

传统拷贝

实现一个文件读,像socket写,内部流程如下:

  1. java本身不具备IO读写的能力,因此read调用后,会从用户态切换成内核态,去调用操作系统(kernel)的读能力,将数据读入内核缓冲区。这个期间,用户的线程是阻塞的,操作系统使用DMA(Direct Memory Access)进行文件读,期间不会使用cpu。

    DMA理解成硬件单元,用来解放cpu完成IO

  2. 内核态切换成用户态,将数据从内核缓冲区读入用户缓冲区(即byte[]),这期间cpu会参与拷贝,无法利用DMA

  3. 调用write方法,这时候将数据从缓冲区(byte[])写入socket缓冲区,cpu会参与拷贝
  4. socket缓冲区写入网卡,这意味着又要从用户态切换为内核态,使用DMA,cpu不参与。

可以看到中间环节比较多,java的IO实际不是物理设备级别的读写,而是缓存的复制,底层的读写是操作系统完成的。

  • 用户态和内核态切换了3次,很重量级
  • 参与了4次数据拷贝

NIO优化

使用DirectByteBuffer,将用户缓冲区和内核缓冲区合二为一。

可以看到,避免了一次数据拷贝。

  • 3次用户态和内核态的切换
  • 3次数据拷贝

零拷贝优化

通过tansferTo,进行零拷贝,使用操作系统内存。

可以看到,不通过内核缓冲区了,这就避免了两次次内核切换和两次数据复制。

  • 整个复制过程中,只有一次用户态和内核态的切换
  • 参与了2次数据拷贝
  • 适合小文件的传输