0%

Netty基础

前言

介绍Netty的概述和基础用法。

  1. 组件
  2. 简单实现

概述

Netty是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务端和客户端。这里的异步并没有使用AIO,只是使用了多线程。Netty在网络应用框架中的地位,相当于Spring框架在javaEE开发中的地位。

Netty的优势:

  • Netty vs NIO: 工作量大,bug多

    1. 需要自己构建协议
    2. 解决TCP传输的问题,如黏包、半包
    3. epoll空轮询导致CPU 100%
    4. 对API进行增强,使之更加易用,如FastThreadLocal => ThreadLocal, ByteBuf => ByteBuffer
  • Netty vs 其他网络框架:Netty地位是龙头的,文档也优秀

简单实现

  • 依赖
1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.50.Final</version>
</dependency>
  • 服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class HelloServer {
public static void main(String[] args) {
// 1. 服务器端的启动器,负责组装Netty组件并启动
new ServerBootstrap()
// 2. Boss + Worker(thread, worker) ==> 加入一个组,自动监听accept read
.group(new NioEventLoopGroup())
// 3. 选择服务器的实现(可以是NIO,OIO,BIO等),
.channel(NioServerSocketChannel.class)
// 4. boss负责连接,child(即worker)负责读写,这里添加worker事件触发的具体处理(handler
.childHandler(
// 5. 和客户端连接Channel的初始化器,负责增强这个channel
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 6. 添加解码器 ByteBuf -> String
nioSocketChannel.pipeline().addLast(new StringDecoder());
// 7.添加自定义channel,读事件发生后转换上一步转换的字符串
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
// 8. 绑定监听端口
.bind(8080);
}
}
  • 客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
// 连接建立后
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 客户端发送的数据进行编码
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect("localhost", 8080)
.sync()
.channel()
.writeAndFlush("hello world");
}
}

理解

  • Channel理解成数据的传输通道
  • 把Msg理解成channel中流动的数据,最开始是ByteBuf,通过pipeline流水线进行加工,最终变成其他的样子,最后又通过ByteBuf输出
  • 把handler理解成数据处理的工序,处理工序合在一起就是pipeline(ChannelPipeline),这个流水线可以在channel的各种事件的前后切点处织入方法(方法可以自定义)
    • handler分为Inbound和outbound(数据入栈和出栈)
  • eventLoop理解成工人(selector + 单线程线程池)
    • 工人管理多个channel的io操作,并且会和这些channel绑定(selector绑定, IO绑定)
    • 工人既可以执行io操作,也可以进行数据逻辑处理(数据的处理和selector就没关系了),每个工人有任务队列(阻塞),任务可以分为定时任务和普通任务
    • 工人根据pipeline的顺序,去处理数据,每道工序都可以指派给不同的工人(离谱),但最后还是由这个工人来执行最后的IO。
      • 换句话讲,就是第一个handle负责绑定EventLoop,尾部handle负责IO,尾部必须使用首部绑定的EventLoop

组件

Eventloop

事件循环对象。

Eventloop本质是一个单线程执行器(同时维护了一个selector),里面有run方法执行源源不断的IO事件。

继承关系(复杂)

  • SchelduledExecutorService:包含了线程池中所有方法
  • 自己的OrderedEventExecutor
    • boolean inEventloop(Thread thread):判断线程是否属于这个loop
    • parent()查看自己属于哪个Group

EventloopGroup

事件循环组。

一组Eventloop,channel会调用group中的register方法绑定其中的一个Eventloop,后续的IO操作都有这个Eventloop处理。

基本操作样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TestEventLoop {
public static void main(String[] args) {
// 1. 创建事件循环组, 默认线程数是cpu核心数的两倍
NioEventLoopGroup group = new NioEventLoopGroup(2); // 可以绑定 io事件,普通任务,定时任务
// DefaultEventLoopGroup group1 = new DefaultEventLoopGroup(); // 普通任务,定时任务

// 2. 获取下一个事件循环对象
EventLoop loop = group.next(); // 可以循环重复获取,数组取模
// 3. 执行普通任务
loop.submit(() -> System.out.println(1111));
// 4. 定时任务
group.next().scheduleAtFixedRate(() -> System.out.println(2222), 0L, 1L, TimeUnit.SECONDS);
}
}

channel绑定案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 服务端
public class EventLoopServer {
public static void main(String[] args) {
new ServerBootstrap()
// 第一个eventgroup只处理accept,另一个是worker,处理read等
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(4))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(8080);
}
}

// 客户端
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
System.out.println(channel);
System.out.println("");
}
}

问题:如果一个EventLoop绑定了很多channel,处理某一个channel的handle时间过长,则会阻塞其他channel的事件,这怎么办?

答:做一次细分,创建一个独立的EventLoop来处理handle的任务,将IO和数据逻辑处理分离。

优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Sl4fj
public class EventLoopServer {
public static void main(String[] args) {
DefaultEventLoopGroup taskGroup = new DefaultEventLoopGroup();
new ServerBootstrap()
// 第一个eventgroup只处理accept,另一个是worker,处理read等
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(4))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.info(buf.toString(StandardCharsets.UTF_8));
ctx.fireChannelRead(msg); // 将消息传递给下一个handler(自定义handler需要加)
}
}).addLast(taskGroup, "handler2", new ChannelInboundHandlerAdapter() { // 指定这个数据处理用另一个线程
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.info(buf.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(8080);
}
}

handle切换线程原理

关键代码:io.netty.channel.AbstractChannelHandlerContext

每个handler都要判断下一个handler是否绑定了当前的EventLoop。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// next表示下一个事件处理器(handler),当前handler会在这个函数判断下一个handler的状态
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor(); // 下一个handler的线程
if (executor.inEventLoop()) { // 如果下一个handler的线程也在这个eventloop里面(共用一个线程)
next.invokeChannelRead(m); // 直接在当前线程处理事件
} else {
executor.execute(new Runnable() { // 用下 一个线程触发事件
public void run() {
next.invokeChannelRead(m);
}
});
}
}

Channel

方法:

  • close():用来关闭channel(异步关闭,返回一个ChannelFutrue)
  • closeFuture():用来处理close的关闭
    • sync():同步等待channel关闭
    • addListener():异步等待关闭
  • pipeline():添加处理器
  • write():数据写入,可能会先进入缓冲
  • wtiteAndFlush():数据写入并刷出

ChannelFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ChannelFutureClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nc) throws Exception {
//...
}
})
.connect("localhost", 8080);
Channel channel = channelFuture.channel();
channel.writeAndFlush("1111"); // 服务端无法接受数据
}
}

问题:为什么无法发送数据呢?

答:

  • connect方法是异步非阻塞的,主线程调用connect,但是由另一个NIO线程去建立连接。
    • 一般连接都比较慢,需要1s去连接
  • 主线程去如果不执行sync,连接还未建立,write自然就不生效了。

发送数据

一般带有Future和Promise的类就是为了用来异步处理结果的。

获取异步处理结果,让Channel能发送数据的方法如下:

  1. sync:阻塞主线程直到NIO成功建立连接
  2. addListener:异步等待channel连接的结果(给一个回调对象)
1
2
3
4
5
6
7
8
9
10
// 方法1
channelFuture.sync();
Channel channel = channelFuture.channel();
channel.writeAndFlush("1111");

// 方法2
channelFuture.addListener((ChannelFutureListener) future -> {
Channel channel1 = future.channel();
channel.writeAndFlush("qqqqqqqq");
});

关闭连接

1
2
3
4
5
6
7
8
9
10
11
12
Channel channel = channelFuture.sync().channel();
new Thread(() -> {
Scanner in = new Scanner(System.in);
while (true) {
String next = in.next();
if ("q".equals(next)) {
channel.close(); // 异步关闭
break;
}
channel.writeAndFlush(next);
}
},"input").start();

因为channel.close异步,那如何在channel关闭后,处理业务逻辑?

1
2
3
4
5
6
7
ChannelFuture future = channel.closeFuture();
// 方法1
future.sync();
// + 业务逻辑...

// 异步处理
future.addListener((f) -> {});

还有问题:channel关闭后,主线程并没有退出,占用资源,如果优雅退出?

  • 把EventLoopGroup关闭掉:group.shutdownGracefully()

Future && Promise

  • Netty中的Future继承自JDK中的Future接口
  • Netty中的Promise继承自Netty的Future接口

功能:

  • JDK的Future只能同步等待任务结束才能得到结果
  • Netty的Futrue可以通过异步方式获取结果(addListener)
  • Netty的Promise可以脱离任务存在,只作为两个线程间传递数据的容器

Promise样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
public class TestNettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup loopGroup = new NioEventLoopGroup();
EventLoop loop = loopGroup.next();
DefaultPromise<Object> promise = new DefaultPromise<>(loop);
// 填充计算结果
new Thread(() -> {
log.info("...开始计算");
try {
Thread.sleep(1000); // 计算
}catch (Exception e) {
e.printStackTrace();
}
promise.setSuccess(800);
}).start();

log.info("等待结果...");
log.info("结果:{}", promise.get()); // 阻塞直到有结果
}
}

Handler && pipeline

ChannelHandler用来处理Channel上的各种事件,分为入站和出站handler(读入和写出),所有的ChannelHandler组成一串,就是pipeline。

  • 入站处理器通常继承ChannelInboundHandlerAdapter类,主要用来读取客户端数据,返回结果
  • 出站处理器通常继承ChannelOutboundHandlerAdapter类,主要对写回结果进行加工

  • 入站执行按找pipeline顺序(head —> tail),出站按照pipeline反序(tail —-> head)

    • 要有写事件,才会触发出站
    • 注意代码里的write注释,如果用ctx,则会从ctx所在handler向前找出站处理器,如果用nc,则是从tail开始找。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Slf4j
public class TestNettyHandler {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
// head -- h1 -- h2 -- h3 -- h4 -- h5 -- h6 -- tail (双向)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nc) throws Exception {
nc.pipeline().addLast("h1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("1");
super.channelRead(ctx, msg); // 必须调用,找到下一个处理器,不然链就断了
}
}).addLast("h2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("2");
super.channelRead(ctx, msg);
}
}).addLast("h3", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("3");
nc.writeAndFlush("111"); // 如果调用ctx.write, 则只会从h3向前找出站处理器,导致啥也不打印
super.channelRead(ctx, msg);
}
}).addLast("h4", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("4");
super.write(ctx, msg, promise);
}
}).addLast("h5", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("5");
super.write(ctx, msg, promise);
}
}).addLast("h6", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);

}
}

###### 客户端发送数据后 #####
服务端:
15:28:06.481 [nioEventLoopGroup-3-1] INFO c.e.demo.netty.c3.TestNettyHandler - 1
15:28:06.484 [nioEventLoopGroup-3-1] INFO c.e.demo.netty.c3.TestNettyHandler - 2
15:28:06.484 [nioEventLoopGroup-3-1] INFO c.e.demo.netty.c3.TestNettyHandler - 3
15:28:06.484 [nioEventLoopGroup-3-1] INFO c.e.demo.netty.c3.TestNettyHandler - 6
15:28:06.484 [nioEventLoopGroup-3-1] INFO c.e.demo.netty.c3.TestNettyHandler - 5
15:28:06.484 [nioEventLoopGroup-3-1] INFO c.e.demo.netty.c3.TestNettyHandler - 4

为什么要写Handler? —> 业务处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{...}.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nc) throws Exception {
nc.pipeline().addLast("h1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
String jsonString = buf.toString(StandardCharsets.UTF_8);
super.channelRead(ctx, jsonString); // 找下一个handler
}
}).addLast("h2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object jsonString) throws Exception {
String s = (String) jsonString;
char[] chars = s.toCharArray();
super.channelRead(ctx, chars);
}
}).addLast("h3", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
nc.writeAndFlush(msg);
}
}).{.....}

EmbeddedChannel

用来测试的内部Channel,里面可以绑定多个channel,这样就不用模拟一个服务端和客户端了。

  • 构造器里面按顺序传handler
  • wirteInbound: 模拟入站
  • writeOutbound:模拟出站

ByteBuf

创建

1
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); // 默认256,自动扩容
  • 创建一个默认的ByteBuf(池化基于直接内存的ByteBuf),初始容量为256(默认,也可以自己指定)
1
2
3
4
5
6
7
8
9
10
11
12
// 打印工具
public static void log(ByteBuf buf) {
int length = buf.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder sb = new StringBuilder(rows * 80 * 2)
.append("readIdx: ").append(buf.readerIndex())
.append(" writeIdx: ").append(buf.writerIndex())
.append(" capacity: ").append(buf.capacity())
.append(NEWLINE);
appendPrettyHexDump(sb, buf);
System.out.println(sb.toString());
}

直接内存和堆内存

  1. 直接内存分配慢,但读取快,要手动释放或归还,默认直接内存

  2. 堆内存分配快,读取慢,容易受垃圾回收影响

  • 初始化堆内存

    1
    ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
  • 初始化直接内存

    1
    ByteBuf buf = ByteBufAllocator.DEFAULT.directBuffer();

池化和非池化

池化的最大意义在ByteBuf可以重用,优点是:

  • 没有池化,每次都要创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会加重GC负担
  • 有池化,重用池中的ByteBuf实例,采用了与jemalloc类似的内存分配算法提高分配效率
  • 高并发时,池化功能更加节约内存,减少内存溢出的可能性

池化功能是否开启,可以 通过下面的系统环境变量来设置。

1
-Dio.netty.allocator.type={unpooled|pooled}
  • 4.1 以后池化是默认开启
  • 直接内存和堆内存都有池化功能

组成

  • 灰色是已读部分
  • 绿箭头是读指针,绿色是未读已写
  • 蓝箭头是写指针,蓝色是可写部分

  • 橙色区域是可扩容容量。容量按需扩容。

写入

  • writeBoolean(boolean bool):写入一个字节(0:false;1:true)
  • writeByte(int val)
  • writeInt(int value):小端写入(先写低位字节)
  • writeIntE(int value)::大端写入(先写高位字节),网络编程一般采用大端
  • ….
  • writeBytes(byte[] bytes)
  • writeBytes(ByteBuf buf)
  • int writeCharSequence(CharSequence chars, Charset charset):写入字符串

注意:

  1. 除了最后一个,其余方法都是返回ByteBuf
  2. 网络编程一般都是大端写入

扩容

  1. 写入的数据大小未超过512字节,按照16的整数倍扩容(写入33,扩容成16 * 3 = 48)
  2. 写入的数据大小超过512字节,按照2的n次方扩容(写入513,扩容成2的10次方=1024)

读取

  • readByte() :每次读取一个字节,读过的就废弃了,不能再读了
  • markReadIndex():标记
  • reset():读指针重置到标记

retain && release

Netty中有堆外内存的的ByteBuf实现,堆外内存最好手动释放,而不是等GC回收。

  • UnpooledHeapByteBuf使用JVM内存,等GC回收内存就好
  • UnpooledDirectByteBuf使用的就是直接内存,可以等GC间接回收,但不建议,需要特殊方法来回收内存
  • PooledByteBuf和它的子类使用了池化机制,需要更负责的机制来回收内存

回收内存的源码实现,下列方法的不同实现

abstract void deallocate()

Netty使用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口

  • 每个ByteBuf对象初始计数为1
  • 调用release方法,计数减1,计数为0则回收ByteBuf
  • 调用retain方法,计数加1,免得其他handler调用release影响到我正在使用的buf
  • 当计数为0,底层内存内回收,即使ByteBuf对象还在,也不能使用了

谁来负责release?

谁最后负责ByteBuf,谁负责ByteBuf的释放。

  • 如果ByteBuf传递到header和tail,它们会处理
  • 如果ByteBuf没到最后(可能是已经转成了字符串),那么就不会处理

Slice

【零拷贝】的体现之一。对原始的ByteBuf进行切片,切片后的ByteBuf没有进行内存复制,还是沿用原始的ByteBuf内存,切片后的ByteBuf维护独立的read,write指针。

  • slice(int idx, int len):切片过程中没有发生数据复制(非常牛逼!!!
1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestByteBufSlice {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{'1','2','3','4','5','6','7','8','9','a'});
// log(buf);
ByteBuf buf1 = buf.slice(0, 4);
ByteBuf buf2 = buf.slice(4, 6);
log(buf1);
buf2.setByte(2, 'x'); // 修改buf2
log(buf2);
log(buf); // 可以看到原始的buf也被修改了,也就是说内存没比那
}
}

问题:切片后要是想加额外的字节,岂不是乱套了?

答:不允许添加, 要加限制,原始内存不能释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TestByteBufSlice {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{'1','2','3','4','5','6','7','8','9','a'});
// log(buf);
ByteBuf buf1 = buf.slice(0, 4);
buf1.retain(); // 计数 + 1
ByteBuf buf2 = buf.slice(4, 6);
buf.retain();
log(buf1);
buf1.release(); // 使用完,计数 - 1
buf2.setByte(2, 'x');
log(buf2);
buf2.release(); // 使用完,计数 - 1
log(buf);
}
}

duplicate && copy

  • Duplicate: 也是零拷贝。可以理解成引用全部,但读写指针独立。

  • Copy:深拷贝。

composite

零拷贝,把ByteBuf整合。和writeBytes深拷贝对应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TestByteComposite {
public static void main(String[] args) {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
buf1.retain();
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
buf2.retain();
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
ByteBuf buf3 = ByteBufAllocator.DEFAULT.buffer();
buf3.retain();
buf3.writeBytes(new byte[]{11, 12, 13, 14, 15});

CompositeByteBuf compositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true, buf1, buf2); // 加true自动移动写指针
buf1.release();buf2.release();
compositeByteBuf.addComponent(buf3); // 不加true,不会变写指针,只会改变capacity, writeIdx: 10 ,cap: 15
// 可见如果维护写指针,会找上一个写指针的位置往后写,并不是单纯的内存拼接
compositeByteBuf.addComponent(true, buf3); // writeIdx:15, cap: 20
buf3.release();
log(compositeByteBuf);
}
}

注意

使用零拷贝,最好都要retain和release。

UDP的简单实现(纯原创,欢迎交流)

上面都是TCP,咱们参照github,写一段UDP的链接,Netty-github官网

  • 服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Slf4j
public class UDPServer {
public static void main(String[] args) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(eventLoopGroup)
.channel(NioDatagramChannel.class)
.handler(new UDPServerHandler());
Channel udpChannel = b.bind(8888).sync().channel();
log.info("开启线程....");
udpChannel.closeFuture().await(); // 阻塞直到连接关闭
} catch (Exception e) {
log.error(e.getMessage());
} finally {
log.error("完蛋了,服务端挂了.....");
eventLoopGroup.shutdownGracefully();
}
}
}

public class UDPServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {

protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket udpPack) throws Exception {
log.info(udpPack.toString());
ByteBuf content = udpPack.content();
ByteBuf xxlx_buf = content.slice(0, 4);
xxlx_buf.retain();
int xxlx = xxlx_buf.readInt();
log.info(String.valueOf(xxlx));
xxlx_buf.release();
ByteBuf msg_buf = content.slice(4, content.writerIndex());
msg_buf.retain();
log.info(msg_buf.toString(CharsetUtil.UTF_8));
msg_buf.release();
// 回执
ctx.write(
new DatagramPacket(
Unpooled.wrappedBuffer("success".getBytes(CharsetUtil.UTF_8))
, udpPack.sender()
)
);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
  • 客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@Slf4j
public class UDPClient {
public static <T> void send(String ip, int port, T obj) {
send(ip, port, obj, (data) -> {
// 模拟押包
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(128);
buf.writeInt(5514);
int count = 0;
while (count++ < 124) {
buf.writeByte('x');
}
return buf;
});
}

private static <T> void send(String ip, int port, T obj, PackageFunc<T> func) {
InetSocketAddress address = new InetSocketAddress(ip, port);
Bootstrap client = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop watcher = group.next();
DefaultPromise<Object> promise = new DefaultPromise<>(watcher);
try {
client.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, false)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel ch) throws Exception {
// 用另一个线程去监听回执
ch.pipeline().addLast("watcher", new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
protected void channelRead0(ChannelHandlerContext cxt, DatagramPacket udpPack) throws Exception {
ByteBuf content = udpPack.content();
String s = content.toString(CharsetUtil.UTF_8);
log.info(s);
if ("success".equals(s)) {
promise.setSuccess("success");
}
}
});
}
});
Channel channel = client.bind(0).sync().channel();
channel.writeAndFlush(new DatagramPacket(func.doPack(obj), address)).sync();

// 超时重传3次
AtomicInteger count = new AtomicInteger(0);
while (count.getAndIncrement() < 3) {
// 主线程等待7秒,检查回执是否过来
boolean await = promise.await(7, TimeUnit.SECONDS);
if (await) {
log.info("发送成功!");
break;
}
else {
log.error("第{}次发送失败,正在重试...", count.get());
channel.writeAndFlush(new DatagramPacket(func.doPack(obj), address)).sync();
}
}
if (count.get() > 3) log.error("草拟吗连不上啊!!");
channel.close();
}catch (Exception e) {
log.error(e.getMessage());
} finally {
group.shutdownGracefully();
watcher.shutdownGracefully();
}
}

public static void main(String[] args) {
send("localhost", 8888, null);
}
}