前言
介绍Netty的概述和基础用法。
- 组件
- 简单实现
概述
Netty是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务端和客户端。这里的异步并没有使用AIO,只是使用了多线程。Netty在网络应用框架中的地位,相当于Spring框架在javaEE开发中的地位。
Netty的优势:
简单实现
| 12
 3
 4
 5
 
 | <dependency><groupId>io.netty</groupId>
 <artifactId>netty-all</artifactId>
 <version>4.1.50.Final</version>
 </dependency>
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 
 | public class HelloServer {public static void main(String[] args) {
 
 new ServerBootstrap()
 
 .group(new NioEventLoopGroup())
 
 .channel(NioServerSocketChannel.class)
 // 4. boss负责连接,child(即worker)负责读写,这里添加worker事件触发的具体处理(handler)
 .childHandler(
 // 5. 和客户端连接Channel的初始化器,负责增强这个channel
 new ChannelInitializer<NioSocketChannel>() {
 @Override
 protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
 
 nioSocketChannel.pipeline().addLast(new StringDecoder());
 
 nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 System.out.println(msg);
 }
 });
 }
 })
 
 .bind(8080);
 }
 }
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 
 | public class HelloClient {public static void main(String[] args) throws InterruptedException {
 new Bootstrap()
 .group(new NioEventLoopGroup())
 .channel(NioSocketChannel.class)
 .handler(new ChannelInitializer<NioSocketChannel>() {
 
 @Override
 protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
 
 nioSocketChannel.pipeline().addLast(new StringEncoder());
 }
 })
 .connect("localhost", 8080)
 .sync()
 .channel()
 .writeAndFlush("hello world");
 }
 }
 
 | 
理解
- Channel理解成数据的传输通道
- 把Msg理解成channel中流动的数据,最开始是ByteBuf,通过pipeline流水线进行加工,最终变成其他的样子,最后又通过ByteBuf输出
- 把handler理解成数据处理的工序,处理工序合在一起就是pipeline(ChannelPipeline),这个流水线可以在channel的各种事件的前后切点处织入方法(方法可以自定义)
- handler分为Inbound和outbound(数据入栈和出栈)
 
- eventLoop理解成工人(selector + 单线程线程池)
- 工人管理多个channel的io操作,并且会和这些channel绑定(selector绑定, IO绑定)
- 工人既可以执行io操作,也可以进行数据逻辑处理(数据的处理和selector就没关系了),每个工人有任务队列(阻塞),任务可以分为定时任务和普通任务
- 工人根据pipeline的顺序,去处理数据,每道工序都可以指派给不同的工人(离谱),但最后还是由这个工人来执行最后的IO。
- 换句话讲,就是第一个handle负责绑定EventLoop,尾部handle负责IO,尾部必须使用首部绑定的EventLoop
 
 
组件
Eventloop
事件循环对象。
Eventloop本质是一个单线程执行器(同时维护了一个selector),里面有run方法执行源源不断的IO事件。
继承关系(复杂)
- SchelduledExecutorService:包含了线程池中所有方法
- 自己的OrderedEventExecutor
- boolean inEventloop(Thread thread):判断线程是否属于这个loop
- parent()查看自己属于哪个Group
 
EventloopGroup
事件循环组。
一组Eventloop,channel会调用group中的register方法绑定其中的一个Eventloop,后续的IO操作都有这个Eventloop处理。

基本操作样例:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 
 | public class TestEventLoop {public static void main(String[] args) {
 
 NioEventLoopGroup group = new NioEventLoopGroup(2);
 
 
 
 EventLoop loop = group.next();
 
 loop.submit(() -> System.out.println(1111));
 
 group.next().scheduleAtFixedRate(() -> System.out.println(2222), 0L, 1L, TimeUnit.SECONDS);
 }
 }
 
 | 
channel绑定案例:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 
 | public class EventLoopServer {
 public static void main(String[] args) {
 new ServerBootstrap()
 
 .group(new NioEventLoopGroup(1), new NioEventLoopGroup(4))
 .channel(NioServerSocketChannel.class)
 .childHandler(new ChannelInitializer<NioSocketChannel>() {
 @Override
 protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
 nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 ByteBuf buf = (ByteBuf) msg;
 System.out.println(buf.toString(StandardCharsets.UTF_8));
 }
 });
 }
 })
 .bind(8080);
 }
 }
 
 
 public class EventLoopClient {
 public static void main(String[] args) throws InterruptedException {
 Channel channel = new Bootstrap()
 .group(new NioEventLoopGroup())
 .channel(NioSocketChannel.class)
 .handler(new ChannelInitializer<NioSocketChannel>() {
 @Override
 protected void initChannel(NioSocketChannel ch) throws Exception {
 ch.pipeline().addLast(new StringEncoder());
 }
 })
 .connect(new InetSocketAddress("localhost", 8080))
 .sync()
 .channel();
 System.out.println(channel);
 System.out.println("");
 }
 }
 
 | 
问题:如果一个EventLoop绑定了很多channel,处理某一个channel的handle时间过长,则会阻塞其他channel的事件,这怎么办?
答:做一次细分,创建一个独立的EventLoop来处理handle的任务,将IO和数据逻辑处理分离。
优化
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 
 | @Sl4fjpublic class EventLoopServer {
 public static void main(String[] args) {
 DefaultEventLoopGroup taskGroup = new DefaultEventLoopGroup();
 new ServerBootstrap()
 
 .group(new NioEventLoopGroup(1), new NioEventLoopGroup(4))
 .channel(NioServerSocketChannel.class)
 .childHandler(new ChannelInitializer<NioSocketChannel>() {
 @Override
 protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
 nioSocketChannel.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 ByteBuf buf = (ByteBuf) msg;
 log.info(buf.toString(StandardCharsets.UTF_8));
 ctx.fireChannelRead(msg);
 }
 }).addLast(taskGroup, "handler2", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 ByteBuf buf = (ByteBuf) msg;
 log.info(buf.toString(StandardCharsets.UTF_8));
 }
 });
 }
 })
 .bind(8080);
 }
 }
 
 | 
handle切换线程原理
关键代码:io.netty.channel.AbstractChannelHandlerContext
每个handler都要判断下一个handler是否绑定了当前的EventLoop。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 
 | static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
 final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
 EventExecutor executor = next.executor();
 if (executor.inEventLoop()) {
 next.invokeChannelRead(m);
 } else {
 executor.execute(new Runnable() {
 public void run() {
 next.invokeChannelRead(m);
 }
 });
 }
 }
 
 | 
Channel
方法:
- close():用来关闭channel(异步关闭,返回一个ChannelFutrue)
- closeFuture():用来处理close的关闭- 
- sync():同步等待channel关闭
- addListener():异步等待关闭
 
- pipeline():添加处理器
- write():数据写入,可能会先进入缓冲
- wtiteAndFlush():数据写入并刷出
ChannelFuture
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 
 | public class ChannelFutureClient {public static void main(String[] args) throws InterruptedException {
 ChannelFuture channelFuture = new Bootstrap()
 .group(new NioEventLoopGroup())
 .channel(NioSocketChannel.class)
 .handler(new ChannelInitializer<NioSocketChannel>() {
 @Override
 protected void initChannel(NioSocketChannel nc) throws Exception {
 
 }
 })
 .connect("localhost", 8080);
 Channel channel = channelFuture.channel();
 channel.writeAndFlush("1111");
 }
 }
 
 | 
问题:为什么无法发送数据呢?
答:
- connect方法是异步非阻塞的,主线程调用connect,但是由另一个NIO线程去建立连接。
- 主线程去如果不执行sync,连接还未建立,write自然就不生效了。
发送数据
一般带有Future和Promise的类就是为了用来异步处理结果的。
获取异步处理结果,让Channel能发送数据的方法如下:
- sync:阻塞主线程直到NIO成功建立连接
- addListener:异步等待channel连接的结果(给一个回调对象)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | channelFuture.sync();
 Channel channel = channelFuture.channel();
 channel.writeAndFlush("1111");
 
 
 channelFuture.addListener((ChannelFutureListener) future -> {
 Channel channel1 = future.channel();
 channel.writeAndFlush("qqqqqqqq");
 });
 
 | 
关闭连接
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 
 | Channel channel = channelFuture.sync().channel();new Thread(() -> {
 Scanner in = new Scanner(System.in);
 while (true) {
 String next = in.next();
 if ("q".equals(next)) {
 channel.close();
 break;
 }
 channel.writeAndFlush(next);
 }
 },"input").start();
 
 | 
因为channel.close异步,那如何在channel关闭后,处理业务逻辑?
| 12
 3
 4
 5
 6
 7
 
 | ChannelFuture future = channel.closeFuture();
 future.sync();
 
 
 
 future.addListener((f) -> {});
 
 | 
还有问题:channel关闭后,主线程并没有退出,占用资源,如果优雅退出?
- 把EventLoopGroup关闭掉:group.shutdownGracefully()
Future && Promise
- Netty中的Future继承自JDK中的Future接口
- Netty中的Promise继承自Netty的Future接口
功能:
- JDK的Future只能同步等待任务结束才能得到结果
- Netty的Futrue可以通过异步方式获取结果(addListener)
- Netty的Promise可以脱离任务存在,只作为两个线程间传递数据的容器
Promise样例:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 
 | @Slf4jpublic class TestNettyPromise {
 public static void main(String[] args) throws ExecutionException, InterruptedException {
 NioEventLoopGroup loopGroup = new NioEventLoopGroup();
 EventLoop loop = loopGroup.next();
 DefaultPromise<Object> promise = new DefaultPromise<>(loop);
 
 new Thread(() -> {
 log.info("...开始计算");
 try {
 Thread.sleep(1000);
 }catch (Exception e) {
 e.printStackTrace();
 }
 promise.setSuccess(800);
 }).start();
 
 log.info("等待结果...");
 log.info("结果:{}", promise.get());
 }
 }
 
 | 
Handler && pipeline
ChannelHandler用来处理Channel上的各种事件,分为入站和出站handler(读入和写出),所有的ChannelHandler组成一串,就是pipeline。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 
 | @Slf4jpublic class TestNettyHandler {
 public static void main(String[] args) {
 new ServerBootstrap()
 .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
 .channel(NioServerSocketChannel.class)
 // head -- h1 -- h2 -- h3 -- h4 -- h5 -- h6 -- tail (双向)
 .childHandler(new ChannelInitializer<NioSocketChannel>() {
 @Override
 protected void initChannel(NioSocketChannel nc) throws Exception {
 nc.pipeline().addLast("h1", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 log.info("1");
 super.channelRead(ctx, msg);
 }
 }).addLast("h2", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 log.info("2");
 super.channelRead(ctx, msg);
 }
 }).addLast("h3", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 log.info("3");
 nc.writeAndFlush("111");
 super.channelRead(ctx, msg);
 }
 }).addLast("h4", new ChannelOutboundHandlerAdapter() {
 @Override
 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
 log.info("4");
 super.write(ctx, msg, promise);
 }
 }).addLast("h5", new ChannelOutboundHandlerAdapter() {
 @Override
 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
 log.info("5");
 super.write(ctx, msg, promise);
 }
 }).addLast("h6", new ChannelOutboundHandlerAdapter() {
 @Override
 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
 log.info("6");
 super.write(ctx, msg, promise);
 }
 });
 }
 })
 .bind(8080);
 
 }
 }
 
 ###### 客户端发送数据后 #####
 服务端:
 15:28:06.481 [nioEventLoopGroup-3-1] INFO  c.e.demo.netty.c3.TestNettyHandler - 1
 15:28:06.484 [nioEventLoopGroup-3-1] INFO  c.e.demo.netty.c3.TestNettyHandler - 2
 15:28:06.484 [nioEventLoopGroup-3-1] INFO  c.e.demo.netty.c3.TestNettyHandler - 3
 15:28:06.484 [nioEventLoopGroup-3-1] INFO  c.e.demo.netty.c3.TestNettyHandler - 6
 15:28:06.484 [nioEventLoopGroup-3-1] INFO  c.e.demo.netty.c3.TestNettyHandler - 5
 15:28:06.484 [nioEventLoopGroup-3-1] INFO  c.e.demo.netty.c3.TestNettyHandler - 4
 
 | 
为什么要写Handler? —> 业务处理。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 
 | {...}.childHandler(new ChannelInitializer<NioSocketChannel>() {@Override
 protected void initChannel(NioSocketChannel nc) throws Exception {
 nc.pipeline().addLast("h1", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 ByteBuf buf = (ByteBuf) msg;
 String jsonString = buf.toString(StandardCharsets.UTF_8);
 super.channelRead(ctx, jsonString);
 }
 }).addLast("h2", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object jsonString) throws Exception {
 String s = (String) jsonString;
 char[] chars = s.toCharArray();
 super.channelRead(ctx, chars);
 }
 }).addLast("h3", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 nc.writeAndFlush(msg);
 }
 }).{.....}
 
 | 
EmbeddedChannel
用来测试的内部Channel,里面可以绑定多个channel,这样就不用模拟一个服务端和客户端了。
- 构造器里面按顺序传handler
- wirteInbound: 模拟入站
- writeOutbound:模拟出站
ByteBuf
创建
| 1
 | ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); 
 | 
- 创建一个默认的ByteBuf(池化基于直接内存的ByteBuf),初始容量为256(默认,也可以自己指定)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 
 | public static void log(ByteBuf buf) {
 int length = buf.readableBytes();
 int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
 StringBuilder sb = new StringBuilder(rows * 80 * 2)
 .append("readIdx: ").append(buf.readerIndex())
 .append(" writeIdx: ").append(buf.writerIndex())
 .append(" capacity: ").append(buf.capacity())
 .append(NEWLINE);
 appendPrettyHexDump(sb, buf);
 System.out.println(sb.toString());
 }
 
 | 
直接内存和堆内存
- 直接内存分配慢,但读取快,要手动释放或归还,默认直接内存 
- 堆内存分配快,读取慢,容易受垃圾回收影响 
- 初始化堆内存 | 1
 | ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
 |  
 
- 初始化直接内存 | 1
 | ByteBuf buf = ByteBufAllocator.DEFAULT.directBuffer();
 |  
 
池化和非池化
池化的最大意义在ByteBuf可以重用,优点是:
- 没有池化,每次都要创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会加重GC负担
- 有池化,重用池中的ByteBuf实例,采用了与jemalloc类似的内存分配算法提高分配效率
- 高并发时,池化功能更加节约内存,减少内存溢出的可能性
池化功能是否开启,可以 通过下面的系统环境变量来设置。
| 1
 | -Dio.netty.allocator.type={unpooled|pooled}
 | 
- 4.1 以后池化是默认开启
- 直接内存和堆内存都有池化功能
组成

- 灰色是已读部分
- 绿箭头是读指针,绿色是未读已写
- 蓝箭头是写指针,蓝色是可写部分 
- 橙色区域是可扩容容量。容量按需扩容。 
写入
- writeBoolean(boolean bool):写入一个字节(0:false;1:true)
- writeByte(int val)
- writeInt(int value):小端写入(先写低位字节)
- writeIntE(int value)::大端写入(先写高位字节),网络编程一般采用大端
- ….
- writeBytes(byte[] bytes)
- writeBytes(ByteBuf buf)
- int writeCharSequence(CharSequence chars, Charset charset):写入字符串
注意:
- 除了最后一个,其余方法都是返回ByteBuf
- 网络编程一般都是大端写入
扩容
- 写入的数据大小未超过512字节,按照16的整数倍扩容(写入33,扩容成16 * 3 = 48)
- 写入的数据大小超过512字节,按照2的n次方扩容(写入513,扩容成2的10次方=1024)
读取
- readByte() :每次读取一个字节,读过的就废弃了,不能再读了
- markReadIndex():标记
- reset():读指针重置到标记
retain && release
Netty中有堆外内存的的ByteBuf实现,堆外内存最好手动释放,而不是等GC回收。
- UnpooledHeapByteBuf使用JVM内存,等GC回收内存就好
- UnpooledDirectByteBuf使用的就是直接内存,可以等GC间接回收,但不建议,需要特殊方法来回收内存
- PooledByteBuf和它的子类使用了池化机制,需要更负责的机制来回收内存
回收内存的源码实现,下列方法的不同实现
abstract void deallocate()
Netty使用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口
- 每个ByteBuf对象初始计数为1
- 调用release方法,计数减1,计数为0则回收ByteBuf
- 调用retain方法,计数加1,免得其他handler调用release影响到我正在使用的buf
- 当计数为0,底层内存内回收,即使ByteBuf对象还在,也不能使用了
谁来负责release?
谁最后负责ByteBuf,谁负责ByteBuf的释放。
- 如果ByteBuf传递到header和tail,它们会处理
- 如果ByteBuf没到最后(可能是已经转成了字符串),那么就不会处理
Slice
【零拷贝】的体现之一。对原始的ByteBuf进行切片,切片后的ByteBuf没有进行内存复制,还是沿用原始的ByteBuf内存,切片后的ByteBuf维护独立的read,write指针。

- slice(int idx, int len):切片过程中没有发生数据复制(非常牛逼!!!)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | public class TestByteBufSlice {public static void main(String[] args) {
 ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
 buf.writeBytes(new byte[]{'1','2','3','4','5','6','7','8','9','a'});
 
 ByteBuf buf1 = buf.slice(0, 4);
 ByteBuf buf2 = buf.slice(4, 6);
 log(buf1);
 buf2.setByte(2, 'x');
 log(buf2);
 log(buf);
 }
 }
 
 | 
问题:切片后要是想加额外的字节,岂不是乱套了?
答:不允许添加, 要加限制,原始内存不能释放。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 
 | public class TestByteBufSlice {public static void main(String[] args) {
 ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
 buf.writeBytes(new byte[]{'1','2','3','4','5','6','7','8','9','a'});
 
 ByteBuf buf1 = buf.slice(0, 4);
 buf1.retain();
 ByteBuf buf2 = buf.slice(4, 6);
 buf.retain();
 log(buf1);
 buf1.release();
 buf2.setByte(2, 'x');
 log(buf2);
 buf2.release();
 log(buf);
 }
 }
 
 | 
duplicate && copy
composite
零拷贝,把ByteBuf整合。和writeBytes深拷贝对应。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 
 | public class TestByteComposite {public static void main(String[] args) {
 ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
 buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
 buf1.retain();
 ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
 buf2.retain();
 buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
 ByteBuf buf3 = ByteBufAllocator.DEFAULT.buffer();
 buf3.retain();
 buf3.writeBytes(new byte[]{11, 12, 13, 14, 15});
 
 CompositeByteBuf compositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer();
 compositeByteBuf.addComponents(true, buf1, buf2);
 buf1.release();buf2.release();
 compositeByteBuf.addComponent(buf3);
 
 compositeByteBuf.addComponent(true, buf3);
 buf3.release();
 log(compositeByteBuf);
 }
 }
 
 | 
注意
使用零拷贝,最好都要retain和release。
UDP的简单实现(纯原创,欢迎交流)
上面都是TCP,咱们参照github,写一段UDP的链接,Netty-github官网。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 
 | @Slf4jpublic class UDPServer {
 public static void main(String[] args) {
 NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
 try {
 Bootstrap b = new Bootstrap();
 b.group(eventLoopGroup)
 .channel(NioDatagramChannel.class)
 .handler(new UDPServerHandler());
 Channel udpChannel = b.bind(8888).sync().channel();
 log.info("开启线程....");
 udpChannel.closeFuture().await();
 } catch (Exception e) {
 log.error(e.getMessage());
 } finally {
 log.error("完蛋了,服务端挂了.....");
 eventLoopGroup.shutdownGracefully();
 }
 }
 }
 
 public class UDPServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
 
 protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket udpPack) throws Exception {
 log.info(udpPack.toString());
 ByteBuf content = udpPack.content();
 ByteBuf xxlx_buf = content.slice(0, 4);
 xxlx_buf.retain();
 int xxlx = xxlx_buf.readInt();
 log.info(String.valueOf(xxlx));
 xxlx_buf.release();
 ByteBuf msg_buf = content.slice(4, content.writerIndex());
 msg_buf.retain();
 log.info(msg_buf.toString(CharsetUtil.UTF_8));
 msg_buf.release();
 
 ctx.write(
 new DatagramPacket(
 Unpooled.wrappedBuffer("success".getBytes(CharsetUtil.UTF_8))
 , udpPack.sender()
 )
 );
 }
 
 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
 ctx.flush();
 }
 }
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 
 | @Slf4jpublic class UDPClient {
 public static <T> void send(String ip, int port, T obj) {
 send(ip, port, obj, (data) -> {
 
 ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(128);
 buf.writeInt(5514);
 int count = 0;
 while (count++ < 124) {
 buf.writeByte('x');
 }
 return buf;
 });
 }
 
 private static <T> void send(String ip, int port, T obj, PackageFunc<T> func) {
 InetSocketAddress address = new InetSocketAddress(ip, port);
 Bootstrap client = new Bootstrap();
 NioEventLoopGroup group = new NioEventLoopGroup();
 EventLoop watcher = group.next();
 DefaultPromise<Object> promise = new DefaultPromise<>(watcher);
 try {
 client.group(group)
 .channel(NioDatagramChannel.class)
 .option(ChannelOption.SO_BROADCAST, false)
 .handler(new ChannelInitializer<NioDatagramChannel>() {
 @Override
 protected void initChannel(NioDatagramChannel ch) throws Exception {
 
 ch.pipeline().addLast("watcher", new SimpleChannelInboundHandler<DatagramPacket>() {
 @Override
 protected void channelRead0(ChannelHandlerContext cxt, DatagramPacket udpPack) throws Exception {
 ByteBuf content = udpPack.content();
 String s = content.toString(CharsetUtil.UTF_8);
 log.info(s);
 if ("success".equals(s)) {
 promise.setSuccess("success");
 }
 }
 });
 }
 });
 Channel channel = client.bind(0).sync().channel();
 channel.writeAndFlush(new DatagramPacket(func.doPack(obj), address)).sync();
 
 
 AtomicInteger count = new AtomicInteger(0);
 while (count.getAndIncrement() < 3) {
 
 boolean await = promise.await(7, TimeUnit.SECONDS);
 if (await) {
 log.info("发送成功!");
 break;
 }
 else {
 log.error("第{}次发送失败,正在重试...", count.get());
 channel.writeAndFlush(new DatagramPacket(func.doPack(obj), address)).sync();
 }
 }
 if (count.get() > 3) log.error("草拟吗连不上啊!!");
 channel.close();
 }catch (Exception e) {
 log.error(e.getMessage());
 } finally {
 group.shutdownGracefully();
 watcher.shutdownGracefully();
 }
 }
 
 public static void main(String[] args) {
 send("localhost", 8888, null);
 }
 }
 
 |